• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.87
/routing/localchans/manager.go
1
package localchans
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/channeldb"
13
        "github.com/lightningnetwork/lnd/discovery"
14
        "github.com/lightningnetwork/lnd/fn/v2"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/lnrpc"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/routing"
19
)
20

21
// Manager manages the node's local channels. The only operation that is
22
// currently implemented is updating forwarding policies.
23
type Manager struct {
24
        // SelfPub contains the public key of the local node.
25
        SelfPub *btcec.PublicKey
26

27
        // DefaultRoutingPolicy is the default routing policy.
28
        DefaultRoutingPolicy models.ForwardingPolicy
29

30
        // UpdateForwardingPolicies is used by the manager to update active
31
        // links with a new policy.
32
        UpdateForwardingPolicies func(
33
                chanPolicies map[wire.OutPoint]models.ForwardingPolicy)
34

35
        // PropagateChanPolicyUpdate is called to persist a new policy to disk
36
        // and broadcast it to the network.
37
        PropagateChanPolicyUpdate func(
38
                edgesToUpdate []discovery.EdgeWithInfo) error
39

40
        // ForAllOutgoingChannels is required to iterate over all our local
41
        // channels. The ChannelEdgePolicy parameter may be nil.
42
        ForAllOutgoingChannels func(cb func(*models.ChannelEdgeInfo,
43
                *models.ChannelEdgePolicy) error) error
44

45
        // FetchChannel is used to query local channel parameters. Optionally an
46
        // existing db tx can be supplied.
47
        FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
48
                error)
49

50
        // AddEdge is used to add edge/channel to the topology of the router.
51
        AddEdge func(edge *models.ChannelEdgeInfo) error
52

53
        // policyUpdateLock ensures that the database and the link do not fall
54
        // out of sync if there are concurrent fee update calls. Without it,
55
        // there is a chance that policy A updates the database, then policy B
56
        // updates the database, then policy B updates the link, then policy A
57
        // updates the link.
58
        policyUpdateLock sync.Mutex
59
}
60

61
// UpdatePolicy updates the policy for the specified channels on disk and in
62
// the active links.
63
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
64
        createMissingEdge bool, chanPoints ...wire.OutPoint) (
65
        []*lnrpc.FailedUpdate, error) {
3✔
66

3✔
67
        r.policyUpdateLock.Lock()
3✔
68
        defer r.policyUpdateLock.Unlock()
3✔
69

3✔
70
        // First, we'll construct a set of all the channels that we are
3✔
71
        // trying to update.
3✔
72
        unprocessedChans := make(map[wire.OutPoint]struct{})
3✔
73
        for _, chanPoint := range chanPoints {
6✔
74
                unprocessedChans[chanPoint] = struct{}{}
3✔
75
        }
3✔
76

77
        haveChanFilter := len(unprocessedChans) != 0
3✔
78

3✔
79
        var failedUpdates []*lnrpc.FailedUpdate
3✔
80
        var edgesToUpdate []discovery.EdgeWithInfo
3✔
81
        policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
3✔
82

3✔
83
        // NOTE: edge may be nil when this function is called.
3✔
84
        processChan := func(info *models.ChannelEdgeInfo,
3✔
85
                edge *models.ChannelEdgePolicy) error {
6✔
86

3✔
87
                // If we have a channel filter, and this channel isn't a part
3✔
88
                // of it, then we'll skip it.
3✔
89
                _, ok := unprocessedChans[info.ChannelPoint]
3✔
90
                if !ok && haveChanFilter {
6✔
91
                        return nil
3✔
92
                }
3✔
93

94
                // Mark this channel as found by removing it. unprocessedChans
95
                // will be used to report invalid channels later on.
96
                delete(unprocessedChans, info.ChannelPoint)
3✔
97

3✔
98
                if edge == nil {
3✔
99
                        log.Errorf("Got nil channel edge policy when updating "+
×
100
                                "a channel. Channel point: %v",
×
101
                                info.ChannelPoint.String())
×
102

×
103
                        failedUpdates = append(failedUpdates, makeFailureItem(
×
104
                                info.ChannelPoint,
×
105
                                lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
×
106
                                "edge policy not found",
×
107
                        ))
×
108

×
109
                        return nil
×
110
                }
×
111

112
                // Apply the new policy to the edge.
113
                err := r.updateEdge(info.ChannelPoint, edge, newSchema)
3✔
114
                if err != nil {
3✔
115
                        failedUpdates = append(failedUpdates,
×
116
                                makeFailureItem(info.ChannelPoint,
×
117
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
118
                                        err.Error(),
×
119
                                ))
×
120

×
121
                        return nil
×
122
                }
×
123

124
                // Add updated edge to list of edges to send to gossiper.
125
                edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
3✔
126
                        Info: info,
3✔
127
                        Edge: edge,
3✔
128
                })
3✔
129

3✔
130
                // Extract inbound fees from the ExtraOpaqueData.
3✔
131
                var inboundWireFee lnwire.Fee
3✔
132
                _, err = edge.ExtraOpaqueData.ExtractRecords(&inboundWireFee)
3✔
133
                if err != nil {
3✔
134
                        return err
×
135
                }
×
136
                inboundFee := models.NewInboundFeeFromWire(inboundWireFee)
3✔
137

3✔
138
                // Add updated policy to list of policies to send to switch.
3✔
139
                policiesToUpdate[info.ChannelPoint] = models.ForwardingPolicy{
3✔
140
                        BaseFee:       edge.FeeBaseMSat,
3✔
141
                        FeeRate:       edge.FeeProportionalMillionths,
3✔
142
                        TimeLockDelta: uint32(edge.TimeLockDelta),
3✔
143
                        MinHTLCOut:    edge.MinHTLC,
3✔
144
                        MaxHTLC:       edge.MaxHTLC,
3✔
145
                        InboundFee:    inboundFee,
3✔
146
                }
3✔
147

3✔
148
                return nil
3✔
149
        }
150

151
        // Next, we'll loop over all the outgoing channels the router knows of.
152
        // If we have a filter then we'll only collect those channels, otherwise
153
        // we'll collect them all.
154
        err := r.ForAllOutgoingChannels(processChan)
3✔
155
        if err != nil {
3✔
156
                return nil, err
×
157
        }
×
158

159
        // Construct a list of failed policy updates.
160
        for chanPoint := range unprocessedChans {
3✔
UNCOV
161
                channel, err := r.FetchChannel(chanPoint)
×
UNCOV
162
                switch {
×
UNCOV
163
                case errors.Is(err, channeldb.ErrChannelNotFound):
×
UNCOV
164
                        failedUpdates = append(failedUpdates,
×
UNCOV
165
                                makeFailureItem(chanPoint,
×
UNCOV
166
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
×
UNCOV
167
                                        "not found",
×
UNCOV
168
                                ))
×
169

170
                case err != nil:
×
171
                        failedUpdates = append(failedUpdates,
×
172
                                makeFailureItem(chanPoint,
×
173
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INTERNAL_ERR,
×
174
                                        err.Error(),
×
175
                                ))
×
176

177
                case channel.IsPending:
×
178
                        failedUpdates = append(failedUpdates,
×
179
                                makeFailureItem(chanPoint,
×
180
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_PENDING,
×
181
                                        "not yet confirmed",
×
182
                                ))
×
183

184
                // If the edge was not found, but the channel is found, that
185
                // means the edge is missing in the graph database and should be
186
                // recreated. The edge and policy are created in-memory. The
187
                // edge is inserted in createEdge below and the policy will be
188
                // added to the graph in the PropagateChanPolicyUpdate call
189
                // below.
UNCOV
190
                case createMissingEdge:
×
UNCOV
191
                        log.Warnf("Missing edge for active channel (%s) "+
×
UNCOV
192
                                "during policy update. Recreating edge with "+
×
UNCOV
193
                                "default policy.",
×
UNCOV
194
                                channel.FundingOutpoint.String())
×
UNCOV
195

×
UNCOV
196
                        info, edge, failedUpdate := r.createMissingEdge(
×
UNCOV
197
                                channel, newSchema,
×
UNCOV
198
                        )
×
UNCOV
199
                        if failedUpdate == nil {
×
UNCOV
200
                                err = processChan(info, edge)
×
UNCOV
201
                                if err != nil {
×
202
                                        return nil, err
×
203
                                }
×
204
                        } else {
×
205
                                failedUpdates = append(
×
206
                                        failedUpdates, failedUpdate,
×
207
                                )
×
208
                        }
×
209

UNCOV
210
                default:
×
UNCOV
211
                        log.Warnf("Missing edge for active channel (%s) "+
×
UNCOV
212
                                "during policy update. Could not update "+
×
UNCOV
213
                                "policy.", channel.FundingOutpoint.String())
×
UNCOV
214

×
UNCOV
215
                        failedUpdates = append(failedUpdates,
×
UNCOV
216
                                makeFailureItem(chanPoint,
×
UNCOV
217
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
UNCOV
218
                                        "could not update policies",
×
UNCOV
219
                                ))
×
220
                }
221
        }
222

223
        // Commit the policy updates to disk and broadcast to the network. We
224
        // validated the new policy above, so we expect no validation errors. If
225
        // this would happen because of a bug, the link policy will be
226
        // desynchronized. It is currently not possible to atomically commit
227
        // multiple edge updates.
228
        err = r.PropagateChanPolicyUpdate(edgesToUpdate)
3✔
229
        if err != nil {
3✔
230
                return nil, err
×
231
        }
×
232

233
        // Update active links.
234
        r.UpdateForwardingPolicies(policiesToUpdate)
3✔
235

3✔
236
        return failedUpdates, nil
3✔
237
}
238

239
func (r *Manager) createMissingEdge(channel *channeldb.OpenChannel,
240
        newSchema routing.ChannelPolicy) (*models.ChannelEdgeInfo,
UNCOV
241
        *models.ChannelEdgePolicy, *lnrpc.FailedUpdate) {
×
UNCOV
242

×
UNCOV
243
        info, edge, err := r.createEdge(channel, time.Now())
×
UNCOV
244
        if err != nil {
×
245
                log.Errorf("Failed to recreate missing edge "+
×
246
                        "for channel (%s): %v",
×
247
                        channel.FundingOutpoint.String(), err)
×
248

×
249
                return nil, nil, makeFailureItem(
×
250
                        channel.FundingOutpoint,
×
251
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
252
                        "could not update policies",
×
253
                )
×
254
        }
×
255

256
        // Validate the newly created edge policy with the user defined new
257
        // schema before adding the edge to the database.
UNCOV
258
        err = r.updateEdge(channel.FundingOutpoint, edge, newSchema)
×
UNCOV
259
        if err != nil {
×
260
                return nil, nil, makeFailureItem(
×
261
                        info.ChannelPoint,
×
262
                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
263
                        err.Error(),
×
264
                )
×
265
        }
×
266

267
        // Insert the edge into the database to avoid `edge not
268
        // found` errors during policy update propagation.
UNCOV
269
        err = r.AddEdge(info)
×
UNCOV
270
        if err != nil {
×
271
                log.Errorf("Attempt to add missing edge for "+
×
272
                        "channel (%s) errored with: %v",
×
273
                        channel.FundingOutpoint.String(), err)
×
274

×
275
                return nil, nil, makeFailureItem(
×
276
                        channel.FundingOutpoint,
×
277
                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
278
                        "could not add edge",
×
279
                )
×
280
        }
×
281

UNCOV
282
        return info, edge, nil
×
283
}
284

285
// createEdge recreates an edge and policy from an open channel in-memory.
286
func (r *Manager) createEdge(channel *channeldb.OpenChannel,
287
        timestamp time.Time) (*models.ChannelEdgeInfo,
UNCOV
288
        *models.ChannelEdgePolicy, error) {
×
UNCOV
289

×
UNCOV
290
        nodeKey1Bytes := r.SelfPub.SerializeCompressed()
×
UNCOV
291
        nodeKey2Bytes := channel.IdentityPub.SerializeCompressed()
×
UNCOV
292
        bitcoinKey1Bytes := channel.LocalChanCfg.MultiSigKey.PubKey.
×
UNCOV
293
                SerializeCompressed()
×
UNCOV
294
        bitcoinKey2Bytes := channel.RemoteChanCfg.MultiSigKey.PubKey.
×
UNCOV
295
                SerializeCompressed()
×
UNCOV
296
        channelFlags := lnwire.ChanUpdateChanFlags(0)
×
UNCOV
297

×
UNCOV
298
        // Make it such that node_id_1 is the lexicographically-lesser of the
×
UNCOV
299
        // two compressed keys sorted in ascending lexicographic order.
×
UNCOV
300
        if bytes.Compare(nodeKey2Bytes, nodeKey1Bytes) < 0 {
×
UNCOV
301
                nodeKey1Bytes, nodeKey2Bytes = nodeKey2Bytes, nodeKey1Bytes
×
UNCOV
302
                bitcoinKey1Bytes, bitcoinKey2Bytes = bitcoinKey2Bytes,
×
UNCOV
303
                        bitcoinKey1Bytes
×
UNCOV
304
                channelFlags = 1
×
UNCOV
305
        }
×
306

UNCOV
307
        var featureBuf bytes.Buffer
×
UNCOV
308
        err := lnwire.NewRawFeatureVector().Encode(&featureBuf)
×
UNCOV
309
        if err != nil {
×
310
                return nil, nil, fmt.Errorf("unable to encode features: %w",
×
311
                        err)
×
312
        }
×
313

314
        // We need to make sure we use the real scid for public confirmed
315
        // zero-conf channels.
UNCOV
316
        shortChanID := channel.ShortChanID()
×
UNCOV
317
        isPublic := channel.ChannelFlags&lnwire.FFAnnounceChannel != 0
×
UNCOV
318
        if isPublic && channel.IsZeroConf() && channel.ZeroConfConfirmed() {
×
319
                shortChanID = channel.ZeroConfRealScid()
×
320
        }
×
321

UNCOV
322
        info := &models.ChannelEdgeInfo{
×
UNCOV
323
                ChannelID:    shortChanID.ToUint64(),
×
UNCOV
324
                ChainHash:    channel.ChainHash,
×
UNCOV
325
                Features:     featureBuf.Bytes(),
×
UNCOV
326
                Capacity:     channel.Capacity,
×
UNCOV
327
                ChannelPoint: channel.FundingOutpoint,
×
UNCOV
328
        }
×
UNCOV
329

×
UNCOV
330
        copy(info.NodeKey1Bytes[:], nodeKey1Bytes)
×
UNCOV
331
        copy(info.NodeKey2Bytes[:], nodeKey2Bytes)
×
UNCOV
332
        copy(info.BitcoinKey1Bytes[:], bitcoinKey1Bytes)
×
UNCOV
333
        copy(info.BitcoinKey2Bytes[:], bitcoinKey2Bytes)
×
UNCOV
334

×
UNCOV
335
        // Construct a dummy channel edge policy with default values that will
×
UNCOV
336
        // be updated with the new values in the call to processChan below.
×
UNCOV
337
        timeLockDelta := uint16(r.DefaultRoutingPolicy.TimeLockDelta)
×
UNCOV
338
        edge := &models.ChannelEdgePolicy{
×
UNCOV
339
                ChannelID:                 shortChanID.ToUint64(),
×
UNCOV
340
                LastUpdate:                timestamp,
×
UNCOV
341
                TimeLockDelta:             timeLockDelta,
×
UNCOV
342
                ChannelFlags:              channelFlags,
×
UNCOV
343
                MessageFlags:              lnwire.ChanUpdateRequiredMaxHtlc,
×
UNCOV
344
                FeeBaseMSat:               r.DefaultRoutingPolicy.BaseFee,
×
UNCOV
345
                FeeProportionalMillionths: r.DefaultRoutingPolicy.FeeRate,
×
UNCOV
346
                MinHTLC:                   r.DefaultRoutingPolicy.MinHTLCOut,
×
UNCOV
347
                MaxHTLC:                   r.DefaultRoutingPolicy.MaxHTLC,
×
UNCOV
348
        }
×
UNCOV
349

×
UNCOV
350
        copy(edge.ToNode[:], channel.IdentityPub.SerializeCompressed())
×
UNCOV
351

×
UNCOV
352
        return info, edge, nil
×
353
}
354

355
// updateEdge updates the given edge with the new schema.
356
func (r *Manager) updateEdge(chanPoint wire.OutPoint,
357
        edge *models.ChannelEdgePolicy,
358
        newSchema routing.ChannelPolicy) error {
3✔
359

3✔
360
        channel, err := r.FetchChannel(chanPoint)
3✔
361
        if err != nil {
3✔
362
                return err
×
363
        }
×
364

365
        // Update forwarding fee scheme and required time lock delta.
366
        edge.FeeBaseMSat = newSchema.BaseFee
3✔
367
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
3✔
368
                newSchema.FeeRate,
3✔
369
        )
3✔
370

3✔
371
        // If inbound fees are set, we update the edge with them.
3✔
372
        err = fn.MapOptionZ(newSchema.InboundFee,
3✔
373
                func(f models.InboundFee) error {
6✔
374
                        inboundWireFee := f.ToWire()
3✔
375
                        return edge.ExtraOpaqueData.PackRecords(
3✔
376
                                &inboundWireFee,
3✔
377
                        )
3✔
378
                })
3✔
379
        if err != nil {
3✔
380
                return err
×
381
        }
×
382

383
        edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
3✔
384

3✔
385
        // Retrieve negotiated channel htlc amt limits.
3✔
386
        amtMin, amtMax, err := r.getHtlcAmtLimits(channel)
3✔
387
        if err != nil {
3✔
388
                return err
×
389
        }
×
390

391
        // We now update the edge max htlc value.
392
        switch {
3✔
393
        // If a non-zero max htlc was specified, use it to update the edge.
394
        // Otherwise keep the value unchanged.
395
        case newSchema.MaxHTLC != 0:
3✔
396
                edge.MaxHTLC = newSchema.MaxHTLC
3✔
397

398
        // If this edge still doesn't have a max htlc set, set it to the max.
399
        // This is an on-the-fly migration.
400
        case !edge.MessageFlags.HasMaxHtlc():
×
401
                edge.MaxHTLC = amtMax
×
402

403
        // If this edge has a max htlc that exceeds what the channel can
404
        // actually carry, correct it now. This can happen, because we
405
        // previously set the max htlc to the channel capacity.
406
        case edge.MaxHTLC > amtMax:
×
407
                edge.MaxHTLC = amtMax
×
408
        }
409

410
        // If a new min htlc is specified, update the edge.
411
        if newSchema.MinHTLC != nil {
3✔
412
                edge.MinHTLC = *newSchema.MinHTLC
×
413
        }
×
414

415
        // If the MaxHtlc flag wasn't already set, we can set it now.
416
        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
3✔
417

3✔
418
        // Validate htlc amount constraints.
3✔
419
        switch {
3✔
420
        case edge.MinHTLC < amtMin:
×
421
                return fmt.Errorf(
×
422
                        "min htlc amount of %v is below min htlc parameter of %v",
×
423
                        edge.MinHTLC, amtMin,
×
424
                )
×
425

426
        case edge.MaxHTLC > amtMax:
×
427
                return fmt.Errorf(
×
428
                        "max htlc size of %v is above max pending amount of %v",
×
429
                        edge.MaxHTLC, amtMax,
×
430
                )
×
431

432
        case edge.MinHTLC > edge.MaxHTLC:
×
433
                return fmt.Errorf(
×
434
                        "min_htlc %v greater than max_htlc %v",
×
435
                        edge.MinHTLC, edge.MaxHTLC,
×
436
                )
×
437
        }
438

439
        // Clear signature to help prevent usage of the previous signature.
440
        edge.SetSigBytes(nil)
3✔
441

3✔
442
        return nil
3✔
443
}
444

445
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
446
// constraints.
447
func (r *Manager) getHtlcAmtLimits(ch *channeldb.OpenChannel) (
448
        lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
3✔
449

3✔
450
        // The max htlc policy field must be less than or equal to the channel
3✔
451
        // capacity AND less than or equal to the max in-flight HTLC value.
3✔
452
        // Since the latter is always less than or equal to the former, just
3✔
453
        // return the max in-flight value.
3✔
454
        maxAmt := ch.LocalChanCfg.ChannelStateBounds.MaxPendingAmount
3✔
455

3✔
456
        return ch.LocalChanCfg.MinHTLC, maxAmt, nil
3✔
457
}
3✔
458

459
// makeFailureItem creates a lnrpc.FailedUpdate object.
460
func makeFailureItem(outPoint wire.OutPoint, updateFailure lnrpc.UpdateFailure,
UNCOV
461
        errStr string) *lnrpc.FailedUpdate {
×
UNCOV
462

×
UNCOV
463
        outpoint := lnrpc.MarshalOutPoint(&outPoint)
×
UNCOV
464

×
UNCOV
465
        return &lnrpc.FailedUpdate{
×
UNCOV
466
                Outpoint:    outpoint,
×
UNCOV
467
                Reason:      updateFailure,
×
UNCOV
468
                UpdateError: errStr,
×
UNCOV
469
        }
×
UNCOV
470
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc