• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12430728843

20 Dec 2024 11:36AM UTC coverage: 61.336% (+2.6%) from 58.716%
12430728843

Pull #8777

github

ziggie1984
channeldb: fix typo.
Pull Request #8777: multi: make reassignment of alias channel edge atomic

161 of 213 new or added lines in 7 files covered. (75.59%)

70 existing lines in 17 files now uncovered.

23369 of 38100 relevant lines covered (61.34%)

115813.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.21
/routing/localchans/manager.go
1
package localchans
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/channeldb"
13
        "github.com/lightningnetwork/lnd/discovery"
14
        "github.com/lightningnetwork/lnd/fn/v2"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/lnrpc"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/routing"
19
)
20

21
// Manager manages the node's local channels. The only operation that is
22
// currently implemented is updating forwarding policies.
23
type Manager struct {
24
        // SelfPub contains the public key of the local node.
25
        SelfPub *btcec.PublicKey
26

27
        // DefaultRoutingPolicy is the default routing policy.
28
        DefaultRoutingPolicy models.ForwardingPolicy
29

30
        // UpdateForwardingPolicies is used by the manager to update active
31
        // links with a new policy.
32
        UpdateForwardingPolicies func(
33
                chanPolicies map[wire.OutPoint]models.ForwardingPolicy)
34

35
        // PropagateChanPolicyUpdate is called to persist a new policy to disk
36
        // and broadcast it to the network.
37
        PropagateChanPolicyUpdate func(
38
                edgesToUpdate []discovery.EdgeWithInfo) error
39

40
        // ForAllOutgoingChannels is required to iterate over all our local
41
        // channels. The ChannelEdgePolicy parameter may be nil.
42
        ForAllOutgoingChannels func(cb func(*models.ChannelEdgeInfo,
43
                *models.ChannelEdgePolicy) error) error
44

45
        // FetchChannel is used to query local channel parameters. Optionally an
46
        // existing db tx can be supplied.
47
        FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
48
                error)
49

50
        // AddEdge is used to add edge/channel to the topology of the router.
51
        AddEdge func(edge *models.ChannelEdgeInfo) error
52

53
        // policyUpdateLock ensures that the database and the link do not fall
54
        // out of sync if there are concurrent fee update calls. Without it,
55
        // there is a chance that policy A updates the database, then policy B
56
        // updates the database, then policy B updates the link, then policy A
57
        // updates the link.
58
        policyUpdateLock sync.Mutex
59
}
60

61
// UpdatePolicy updates the policy for the specified channels on disk and in
62
// the active links.
63
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
64
        createMissingEdge bool, chanPoints ...wire.OutPoint) (
65
        []*lnrpc.FailedUpdate, error) {
10✔
66

10✔
67
        r.policyUpdateLock.Lock()
10✔
68
        defer r.policyUpdateLock.Unlock()
10✔
69

10✔
70
        // First, we'll construct a set of all the channels that we are
10✔
71
        // trying to update.
10✔
72
        unprocessedChans := make(map[wire.OutPoint]struct{})
10✔
73
        for _, chanPoint := range chanPoints {
19✔
74
                unprocessedChans[chanPoint] = struct{}{}
9✔
75
        }
9✔
76

77
        haveChanFilter := len(unprocessedChans) != 0
10✔
78

10✔
79
        var failedUpdates []*lnrpc.FailedUpdate
10✔
80
        var edgesToUpdate []discovery.EdgeWithInfo
10✔
81
        policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
10✔
82

10✔
83
        // NOTE: edge may be nil when this function is called.
10✔
84
        processChan := func(info *models.ChannelEdgeInfo,
10✔
85
                edge *models.ChannelEdgePolicy) error {
19✔
86

9✔
87
                // If we have a channel filter, and this channel isn't a part
9✔
88
                // of it, then we'll skip it.
9✔
89
                _, ok := unprocessedChans[info.ChannelPoint]
9✔
90
                if !ok && haveChanFilter {
14✔
91
                        return nil
5✔
92
                }
5✔
93

94
                // Mark this channel as found by removing it. unprocessedChans
95
                // will be used to report invalid channels later on.
96
                delete(unprocessedChans, info.ChannelPoint)
8✔
97

8✔
98
                if edge == nil {
8✔
99
                        log.Errorf("Got nil channel edge policy when updating "+
×
100
                                "a channel. Channel point: %v",
×
101
                                info.ChannelPoint.String())
×
102

×
103
                        failedUpdates = append(failedUpdates, makeFailureItem(
×
104
                                info.ChannelPoint,
×
105
                                lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
×
106
                                "edge policy not found",
×
107
                        ))
×
108

×
109
                        return nil
×
110
                }
×
111

112
                // Apply the new policy to the edge.
113
                err := r.updateEdge(info.ChannelPoint, edge, newSchema)
8✔
114
                if err != nil {
8✔
UNCOV
115
                        failedUpdates = append(failedUpdates,
×
UNCOV
116
                                makeFailureItem(info.ChannelPoint,
×
UNCOV
117
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
UNCOV
118
                                        err.Error(),
×
UNCOV
119
                                ))
×
UNCOV
120

×
UNCOV
121
                        return nil
×
UNCOV
122
                }
×
123

124
                // Add updated edge to list of edges to send to gossiper.
125
                edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
8✔
126
                        Info: info,
8✔
127
                        Edge: edge,
8✔
128
                })
8✔
129

8✔
130
                // Extract inbound fees from the ExtraOpaqueData.
8✔
131
                var inboundWireFee lnwire.Fee
8✔
132
                _, err = edge.ExtraOpaqueData.ExtractRecords(&inboundWireFee)
8✔
133
                if err != nil {
8✔
134
                        return err
×
135
                }
×
136
                inboundFee := models.NewInboundFeeFromWire(inboundWireFee)
8✔
137

8✔
138
                // Add updated policy to list of policies to send to switch.
8✔
139
                policiesToUpdate[info.ChannelPoint] = models.ForwardingPolicy{
8✔
140
                        BaseFee:       edge.FeeBaseMSat,
8✔
141
                        FeeRate:       edge.FeeProportionalMillionths,
8✔
142
                        TimeLockDelta: uint32(edge.TimeLockDelta),
8✔
143
                        MinHTLCOut:    edge.MinHTLC,
8✔
144
                        MaxHTLC:       edge.MaxHTLC,
8✔
145
                        InboundFee:    inboundFee,
8✔
146
                }
8✔
147

8✔
148
                return nil
8✔
149
        }
150

151
        // Next, we'll loop over all the outgoing channels the router knows of.
152
        // If we have a filter then we'll only collect those channels, otherwise
153
        // we'll collect them all.
154
        err := r.ForAllOutgoingChannels(processChan)
10✔
155
        if err != nil {
10✔
156
                return nil, err
×
157
        }
×
158

159
        // Construct a list of failed policy updates.
160
        for chanPoint := range unprocessedChans {
13✔
161
                channel, err := r.FetchChannel(chanPoint)
3✔
162
                switch {
3✔
163
                case errors.Is(err, channeldb.ErrChannelNotFound):
1✔
164
                        failedUpdates = append(failedUpdates,
1✔
165
                                makeFailureItem(chanPoint,
1✔
166
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
1✔
167
                                        "not found",
1✔
168
                                ))
1✔
169

170
                case err != nil:
×
171
                        failedUpdates = append(failedUpdates,
×
172
                                makeFailureItem(chanPoint,
×
173
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INTERNAL_ERR,
×
174
                                        err.Error(),
×
175
                                ))
×
176

177
                case channel.IsPending:
×
178
                        failedUpdates = append(failedUpdates,
×
179
                                makeFailureItem(chanPoint,
×
180
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_PENDING,
×
181
                                        "not yet confirmed",
×
182
                                ))
×
183

184
                // If the edge was not found, but the channel is found, that
185
                // means the edge is missing in the graph database and should be
186
                // recreated. The edge and policy are created in-memory. The
187
                // edge is inserted in createEdge below and the policy will be
188
                // added to the graph in the PropagateChanPolicyUpdate call
189
                // below.
190
                case createMissingEdge:
1✔
191
                        log.Warnf("Missing edge for active channel (%s) "+
1✔
192
                                "during policy update. Recreating edge with "+
1✔
193
                                "default policy.",
1✔
194
                                channel.FundingOutpoint.String())
1✔
195

1✔
196
                        info, edge, failedUpdate := r.createMissingEdge(
1✔
197
                                channel, newSchema,
1✔
198
                        )
1✔
199
                        if failedUpdate == nil {
2✔
200
                                err = processChan(info, edge)
1✔
201
                                if err != nil {
1✔
202
                                        return nil, err
×
203
                                }
×
204
                        } else {
×
205
                                failedUpdates = append(
×
206
                                        failedUpdates, failedUpdate,
×
207
                                )
×
208
                        }
×
209

210
                default:
1✔
211
                        log.Warnf("Missing edge for active channel (%s) "+
1✔
212
                                "during policy update. Could not update "+
1✔
213
                                "policy.", channel.FundingOutpoint.String())
1✔
214

1✔
215
                        failedUpdates = append(failedUpdates,
1✔
216
                                makeFailureItem(chanPoint,
1✔
217
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
1✔
218
                                        "could not update policies",
1✔
219
                                ))
1✔
220
                }
221
        }
222

223
        // Commit the policy updates to disk and broadcast to the network. We
224
        // validated the new policy above, so we expect no validation errors. If
225
        // this would happen because of a bug, the link policy will be
226
        // desynchronized. It is currently not possible to atomically commit
227
        // multiple edge updates.
228
        err = r.PropagateChanPolicyUpdate(edgesToUpdate)
10✔
229
        if err != nil {
10✔
230
                return nil, err
×
231
        }
×
232

233
        // Update active links.
234
        r.UpdateForwardingPolicies(policiesToUpdate)
10✔
235

10✔
236
        return failedUpdates, nil
10✔
237
}
238

239
func (r *Manager) createMissingEdge(channel *channeldb.OpenChannel,
240
        newSchema routing.ChannelPolicy) (*models.ChannelEdgeInfo,
241
        *models.ChannelEdgePolicy, *lnrpc.FailedUpdate) {
1✔
242

1✔
243
        info, edge, err := r.createEdge(channel, time.Now())
1✔
244
        if err != nil {
1✔
245
                log.Errorf("Failed to recreate missing edge "+
×
246
                        "for channel (%s): %v",
×
247
                        channel.FundingOutpoint.String(), err)
×
248

×
249
                return nil, nil, makeFailureItem(
×
250
                        channel.FundingOutpoint,
×
251
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
252
                        "could not update policies",
×
253
                )
×
254
        }
×
255

256
        // Validate the newly created edge policy with the user defined new
257
        // schema before adding the edge to the database.
258
        err = r.updateEdge(channel.FundingOutpoint, edge, newSchema)
1✔
259
        if err != nil {
1✔
260
                return nil, nil, makeFailureItem(
×
261
                        info.ChannelPoint,
×
262
                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
263
                        err.Error(),
×
264
                )
×
265
        }
×
266

267
        // Insert the edge into the database to avoid `edge not
268
        // found` errors during policy update propagation.
269
        err = r.AddEdge(info)
1✔
270
        if err != nil {
1✔
271
                log.Errorf("Attempt to add missing edge for "+
×
272
                        "channel (%s) errored with: %v",
×
273
                        channel.FundingOutpoint.String(), err)
×
274

×
275
                return nil, nil, makeFailureItem(
×
276
                        channel.FundingOutpoint,
×
277
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
278
                        "could not add edge",
×
279
                )
×
280
        }
×
281

282
        return info, edge, nil
1✔
283
}
284

285
// createEdge recreates an edge and policy from an open channel in-memory.
286
func (r *Manager) createEdge(channel *channeldb.OpenChannel,
287
        timestamp time.Time) (*models.ChannelEdgeInfo,
288
        *models.ChannelEdgePolicy, error) {
3✔
289

3✔
290
        nodeKey1Bytes := r.SelfPub.SerializeCompressed()
3✔
291
        nodeKey2Bytes := channel.IdentityPub.SerializeCompressed()
3✔
292
        bitcoinKey1Bytes := channel.LocalChanCfg.MultiSigKey.PubKey.
3✔
293
                SerializeCompressed()
3✔
294
        bitcoinKey2Bytes := channel.RemoteChanCfg.MultiSigKey.PubKey.
3✔
295
                SerializeCompressed()
3✔
296
        channelFlags := lnwire.ChanUpdateChanFlags(0)
3✔
297

3✔
298
        // Make it such that node_id_1 is the lexicographically-lesser of the
3✔
299
        // two compressed keys sorted in ascending lexicographic order.
3✔
300
        if bytes.Compare(nodeKey2Bytes, nodeKey1Bytes) < 0 {
4✔
301
                nodeKey1Bytes, nodeKey2Bytes = nodeKey2Bytes, nodeKey1Bytes
1✔
302
                bitcoinKey1Bytes, bitcoinKey2Bytes = bitcoinKey2Bytes,
1✔
303
                        bitcoinKey1Bytes
1✔
304
                channelFlags = 1
1✔
305
        }
1✔
306

307
        var featureBuf bytes.Buffer
3✔
308
        err := lnwire.NewRawFeatureVector().Encode(&featureBuf)
3✔
309
        if err != nil {
3✔
310
                return nil, nil, fmt.Errorf("unable to encode features: %w",
×
311
                        err)
×
312
        }
×
313

314
        info := &models.ChannelEdgeInfo{
3✔
315
                ChannelID:    channel.ShortChanID().ToUint64(),
3✔
316
                ChainHash:    channel.ChainHash,
3✔
317
                Features:     featureBuf.Bytes(),
3✔
318
                Capacity:     channel.Capacity,
3✔
319
                ChannelPoint: channel.FundingOutpoint,
3✔
320
        }
3✔
321

3✔
322
        copy(info.NodeKey1Bytes[:], nodeKey1Bytes)
3✔
323
        copy(info.NodeKey2Bytes[:], nodeKey2Bytes)
3✔
324
        copy(info.BitcoinKey1Bytes[:], bitcoinKey1Bytes)
3✔
325
        copy(info.BitcoinKey2Bytes[:], bitcoinKey2Bytes)
3✔
326

3✔
327
        // Construct a dummy channel edge policy with default values that will
3✔
328
        // be updated with the new values in the call to processChan below.
3✔
329
        timeLockDelta := uint16(r.DefaultRoutingPolicy.TimeLockDelta)
3✔
330
        edge := &models.ChannelEdgePolicy{
3✔
331
                ChannelID:                 channel.ShortChanID().ToUint64(),
3✔
332
                LastUpdate:                timestamp,
3✔
333
                TimeLockDelta:             timeLockDelta,
3✔
334
                ChannelFlags:              channelFlags,
3✔
335
                MessageFlags:              lnwire.ChanUpdateRequiredMaxHtlc,
3✔
336
                FeeBaseMSat:               r.DefaultRoutingPolicy.BaseFee,
3✔
337
                FeeProportionalMillionths: r.DefaultRoutingPolicy.FeeRate,
3✔
338
                MinHTLC:                   r.DefaultRoutingPolicy.MinHTLCOut,
3✔
339
                MaxHTLC:                   r.DefaultRoutingPolicy.MaxHTLC,
3✔
340
        }
3✔
341

3✔
342
        copy(edge.ToNode[:], channel.IdentityPub.SerializeCompressed())
3✔
343

3✔
344
        return info, edge, nil
3✔
345
}
346

347
// updateEdge updates the given edge with the new schema.
348
func (r *Manager) updateEdge(chanPoint wire.OutPoint,
349
        edge *models.ChannelEdgePolicy,
350
        newSchema routing.ChannelPolicy) error {
9✔
351

9✔
352
        channel, err := r.FetchChannel(chanPoint)
9✔
353
        if err != nil {
9✔
UNCOV
354
                return err
×
UNCOV
355
        }
×
356

357
        // Update forwarding fee scheme and required time lock delta.
358
        edge.FeeBaseMSat = newSchema.BaseFee
9✔
359
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
9✔
360
                newSchema.FeeRate,
9✔
361
        )
9✔
362

9✔
363
        // If inbound fees are set, we update the edge with them.
9✔
364
        err = fn.MapOptionZ(newSchema.InboundFee,
9✔
365
                func(f models.InboundFee) error {
13✔
366
                        inboundWireFee := f.ToWire()
4✔
367
                        return edge.ExtraOpaqueData.PackRecords(
4✔
368
                                &inboundWireFee,
4✔
369
                        )
4✔
370
                })
4✔
371
        if err != nil {
9✔
372
                return err
×
373
        }
×
374

375
        edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
9✔
376

9✔
377
        // Retrieve negotiated channel htlc amt limits.
9✔
378
        amtMin, amtMax, err := r.getHtlcAmtLimits(channel)
9✔
379
        if err != nil {
9✔
380
                return err
×
381
        }
×
382

383
        // We now update the edge max htlc value.
384
        switch {
9✔
385
        // If a non-zero max htlc was specified, use it to update the edge.
386
        // Otherwise keep the value unchanged.
387
        case newSchema.MaxHTLC != 0:
8✔
388
                edge.MaxHTLC = newSchema.MaxHTLC
8✔
389

390
        // If this edge still doesn't have a max htlc set, set it to the max.
391
        // This is an on-the-fly migration.
392
        case !edge.MessageFlags.HasMaxHtlc():
×
393
                edge.MaxHTLC = amtMax
×
394

395
        // If this edge has a max htlc that exceeds what the channel can
396
        // actually carry, correct it now. This can happen, because we
397
        // previously set the max htlc to the channel capacity.
398
        case edge.MaxHTLC > amtMax:
×
399
                edge.MaxHTLC = amtMax
×
400
        }
401

402
        // If a new min htlc is specified, update the edge.
403
        if newSchema.MinHTLC != nil {
9✔
404
                edge.MinHTLC = *newSchema.MinHTLC
×
405
        }
×
406

407
        // If the MaxHtlc flag wasn't already set, we can set it now.
408
        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
9✔
409

9✔
410
        // Validate htlc amount constraints.
9✔
411
        switch {
9✔
412
        case edge.MinHTLC < amtMin:
×
413
                return fmt.Errorf(
×
414
                        "min htlc amount of %v is below min htlc parameter of %v",
×
415
                        edge.MinHTLC, amtMin,
×
416
                )
×
417

418
        case edge.MaxHTLC > amtMax:
×
419
                return fmt.Errorf(
×
420
                        "max htlc size of %v is above max pending amount of %v",
×
421
                        edge.MaxHTLC, amtMax,
×
422
                )
×
423

424
        case edge.MinHTLC > edge.MaxHTLC:
×
425
                return fmt.Errorf(
×
426
                        "min_htlc %v greater than max_htlc %v",
×
427
                        edge.MinHTLC, edge.MaxHTLC,
×
428
                )
×
429
        }
430

431
        // Clear signature to help prevent usage of the previous signature.
432
        edge.SetSigBytes(nil)
9✔
433

9✔
434
        return nil
9✔
435
}
436

437
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
438
// constraints.
439
func (r *Manager) getHtlcAmtLimits(ch *channeldb.OpenChannel) (
440
        lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
9✔
441

9✔
442
        // The max htlc policy field must be less than or equal to the channel
9✔
443
        // capacity AND less than or equal to the max in-flight HTLC value.
9✔
444
        // Since the latter is always less than or equal to the former, just
9✔
445
        // return the max in-flight value.
9✔
446
        maxAmt := ch.LocalChanCfg.ChannelStateBounds.MaxPendingAmount
9✔
447

9✔
448
        return ch.LocalChanCfg.MinHTLC, maxAmt, nil
9✔
449
}
9✔
450

451
// makeFailureItem creates a lnrpc.FailedUpdate object.
452
func makeFailureItem(outPoint wire.OutPoint, updateFailure lnrpc.UpdateFailure,
453
        errStr string) *lnrpc.FailedUpdate {
2✔
454

2✔
455
        outpoint := lnrpc.MarshalOutPoint(&outPoint)
2✔
456

2✔
457
        return &lnrpc.FailedUpdate{
2✔
458
                Outpoint:    outpoint,
2✔
459
                Reason:      updateFailure,
2✔
460
                UpdateError: errStr,
2✔
461
        }
2✔
462
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc