• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11170835610

03 Oct 2024 10:41PM UTC coverage: 49.188% (-9.6%) from 58.738%
11170835610

push

github

web-flow
Merge pull request #9154 from ziggie1984/master

multi: bump btcd version.

3 of 6 new or added lines in 6 files covered. (50.0%)

26110 existing lines in 428 files now uncovered.

97359 of 197934 relevant lines covered (49.19%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.78
/routing/localchans/manager.go
1
package localchans
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7

8
        "github.com/btcsuite/btcd/wire"
9
        "github.com/lightningnetwork/lnd/channeldb"
10
        "github.com/lightningnetwork/lnd/channeldb/models"
11
        "github.com/lightningnetwork/lnd/discovery"
12
        "github.com/lightningnetwork/lnd/fn"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
        "github.com/lightningnetwork/lnd/lnrpc"
15
        "github.com/lightningnetwork/lnd/lnwire"
16
        "github.com/lightningnetwork/lnd/routing"
17
)
18

19
// Manager manages the node's local channels. The only operation that is
20
// currently implemented is updating forwarding policies.
21
type Manager struct {
22
        // UpdateForwardingPolicies is used by the manager to update active
23
        // links with a new policy.
24
        UpdateForwardingPolicies func(
25
                chanPolicies map[wire.OutPoint]models.ForwardingPolicy)
26

27
        // PropagateChanPolicyUpdate is called to persist a new policy to disk
28
        // and broadcast it to the network.
29
        PropagateChanPolicyUpdate func(
30
                edgesToUpdate []discovery.EdgeWithInfo) error
31

32
        // ForAllOutgoingChannels is required to iterate over all our local
33
        // channels.
34
        ForAllOutgoingChannels func(cb func(kvdb.RTx,
35
                *models.ChannelEdgeInfo,
36
                *models.ChannelEdgePolicy) error) error
37

38
        // FetchChannel is used to query local channel parameters. Optionally an
39
        // existing db tx can be supplied.
40
        FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) (
41
                *channeldb.OpenChannel, error)
42

43
        // policyUpdateLock ensures that the database and the link do not fall
44
        // out of sync if there are concurrent fee update calls. Without it,
45
        // there is a chance that policy A updates the database, then policy B
46
        // updates the database, then policy B updates the link, then policy A
47
        // updates the link.
48
        policyUpdateLock sync.Mutex
49
}
50

51
// UpdatePolicy updates the policy for the specified channels on disk and in
52
// the active links.
53
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
54
        chanPoints ...wire.OutPoint) ([]*lnrpc.FailedUpdate, error) {
2✔
55

2✔
56
        r.policyUpdateLock.Lock()
2✔
57
        defer r.policyUpdateLock.Unlock()
2✔
58

2✔
59
        // First, we'll construct a set of all the channels that we are
2✔
60
        // trying to update.
2✔
61
        unprocessedChans := make(map[wire.OutPoint]struct{})
2✔
62
        for _, chanPoint := range chanPoints {
4✔
63
                unprocessedChans[chanPoint] = struct{}{}
2✔
64
        }
2✔
65

66
        haveChanFilter := len(unprocessedChans) != 0
2✔
67

2✔
68
        var failedUpdates []*lnrpc.FailedUpdate
2✔
69
        var edgesToUpdate []discovery.EdgeWithInfo
2✔
70
        policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
2✔
71

2✔
72
        // Next, we'll loop over all the outgoing channels the router knows of.
2✔
73
        // If we have a filter then we'll only collected those channels,
2✔
74
        // otherwise we'll collect them all.
2✔
75
        err := r.ForAllOutgoingChannels(func(
2✔
76
                tx kvdb.RTx,
2✔
77
                info *models.ChannelEdgeInfo,
2✔
78
                edge *models.ChannelEdgePolicy) error {
4✔
79

2✔
80
                // If we have a channel filter, and this channel isn't a part
2✔
81
                // of it, then we'll skip it.
2✔
82
                _, ok := unprocessedChans[info.ChannelPoint]
2✔
83
                if !ok && haveChanFilter {
4✔
84
                        return nil
2✔
85
                }
2✔
86

87
                // Mark this channel as found by removing it. unprocessedChans
88
                // will be used to report invalid channels later on.
89
                delete(unprocessedChans, info.ChannelPoint)
2✔
90

2✔
91
                // Apply the new policy to the edge.
2✔
92
                err := r.updateEdge(tx, info.ChannelPoint, edge, newSchema)
2✔
93
                if err != nil {
4✔
94
                        failedUpdates = append(failedUpdates,
2✔
95
                                makeFailureItem(info.ChannelPoint,
2✔
96
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
2✔
97
                                        err.Error(),
2✔
98
                                ))
2✔
99

2✔
100
                        return nil
2✔
101
                }
2✔
102

103
                // Add updated edge to list of edges to send to gossiper.
104
                edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
2✔
105
                        Info: info,
2✔
106
                        Edge: edge,
2✔
107
                })
2✔
108

2✔
109
                // Extract inbound fees from the ExtraOpaqueData.
2✔
110
                var inboundWireFee lnwire.Fee
2✔
111
                _, err = edge.ExtraOpaqueData.ExtractRecords(&inboundWireFee)
2✔
112
                if err != nil {
2✔
113
                        return err
×
114
                }
×
115
                inboundFee := models.NewInboundFeeFromWire(inboundWireFee)
2✔
116

2✔
117
                // Add updated policy to list of policies to send to switch.
2✔
118
                policiesToUpdate[info.ChannelPoint] = models.ForwardingPolicy{
2✔
119
                        BaseFee:       edge.FeeBaseMSat,
2✔
120
                        FeeRate:       edge.FeeProportionalMillionths,
2✔
121
                        TimeLockDelta: uint32(edge.TimeLockDelta),
2✔
122
                        MinHTLCOut:    edge.MinHTLC,
2✔
123
                        MaxHTLC:       edge.MaxHTLC,
2✔
124
                        InboundFee:    inboundFee,
2✔
125
                }
2✔
126

2✔
127
                return nil
2✔
128
        })
129
        if err != nil {
2✔
130
                return nil, err
×
131
        }
×
132

133
        // Construct a list of failed policy updates.
134
        for chanPoint := range unprocessedChans {
2✔
UNCOV
135
                channel, err := r.FetchChannel(nil, chanPoint)
×
UNCOV
136
                switch {
×
UNCOV
137
                case errors.Is(err, channeldb.ErrChannelNotFound):
×
UNCOV
138
                        failedUpdates = append(failedUpdates,
×
UNCOV
139
                                makeFailureItem(chanPoint,
×
UNCOV
140
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
×
UNCOV
141
                                        "not found",
×
UNCOV
142
                                ))
×
143

144
                case err != nil:
×
145
                        failedUpdates = append(failedUpdates,
×
146
                                makeFailureItem(chanPoint,
×
147
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INTERNAL_ERR,
×
148
                                        err.Error(),
×
149
                                ))
×
150

151
                case channel.IsPending:
×
152
                        failedUpdates = append(failedUpdates,
×
153
                                makeFailureItem(chanPoint,
×
154
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_PENDING,
×
155
                                        "not yet confirmed",
×
156
                                ))
×
157

158
                default:
×
159
                        failedUpdates = append(failedUpdates,
×
160
                                makeFailureItem(chanPoint,
×
161
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
162
                                        "could not update policies",
×
163
                                ))
×
164
                }
165
        }
166

167
        // Commit the policy updates to disk and broadcast to the network. We
168
        // validated the new policy above, so we expect no validation errors. If
169
        // this would happen because of a bug, the link policy will be
170
        // desynchronized. It is currently not possible to atomically commit
171
        // multiple edge updates.
172
        err = r.PropagateChanPolicyUpdate(edgesToUpdate)
2✔
173
        if err != nil {
2✔
174
                return nil, err
×
175
        }
×
176

177
        // Update active links.
178
        r.UpdateForwardingPolicies(policiesToUpdate)
2✔
179

2✔
180
        return failedUpdates, nil
2✔
181
}
182

183
// updateEdge updates the given edge with the new schema.
184
func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint,
185
        edge *models.ChannelEdgePolicy,
186
        newSchema routing.ChannelPolicy) error {
2✔
187

2✔
188
        // Update forwarding fee scheme and required time lock delta.
2✔
189
        edge.FeeBaseMSat = newSchema.BaseFee
2✔
190
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
2✔
191
                newSchema.FeeRate,
2✔
192
        )
2✔
193

2✔
194
        // If inbound fees are set, we update the edge with them.
2✔
195
        err := fn.MapOptionZ(newSchema.InboundFee,
2✔
196
                func(f models.InboundFee) error {
4✔
197
                        inboundWireFee := f.ToWire()
2✔
198
                        return edge.ExtraOpaqueData.PackRecords(
2✔
199
                                &inboundWireFee,
2✔
200
                        )
2✔
201
                })
2✔
202
        if err != nil {
2✔
203
                return err
×
204
        }
×
205

206
        edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
2✔
207

2✔
208
        // Retrieve negotiated channel htlc amt limits.
2✔
209
        amtMin, amtMax, err := r.getHtlcAmtLimits(tx, chanPoint)
2✔
210
        if err != nil {
4✔
211
                return err
2✔
212
        }
2✔
213

214
        // We now update the edge max htlc value.
215
        switch {
2✔
216
        // If a non-zero max htlc was specified, use it to update the edge.
217
        // Otherwise keep the value unchanged.
218
        case newSchema.MaxHTLC != 0:
2✔
219
                edge.MaxHTLC = newSchema.MaxHTLC
2✔
220

221
        // If this edge still doesn't have a max htlc set, set it to the max.
222
        // This is an on-the-fly migration.
223
        case !edge.MessageFlags.HasMaxHtlc():
×
224
                edge.MaxHTLC = amtMax
×
225

226
        // If this edge has a max htlc that exceeds what the channel can
227
        // actually carry, correct it now. This can happen, because we
228
        // previously set the max htlc to the channel capacity.
229
        case edge.MaxHTLC > amtMax:
×
230
                edge.MaxHTLC = amtMax
×
231
        }
232

233
        // If a new min htlc is specified, update the edge.
234
        if newSchema.MinHTLC != nil {
2✔
235
                edge.MinHTLC = *newSchema.MinHTLC
×
236
        }
×
237

238
        // If the MaxHtlc flag wasn't already set, we can set it now.
239
        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
2✔
240

2✔
241
        // Validate htlc amount constraints.
2✔
242
        switch {
2✔
243
        case edge.MinHTLC < amtMin:
×
244
                return fmt.Errorf(
×
245
                        "min htlc amount of %v is below min htlc parameter of %v",
×
246
                        edge.MinHTLC, amtMin,
×
247
                )
×
248

249
        case edge.MaxHTLC > amtMax:
×
250
                return fmt.Errorf(
×
251
                        "max htlc size of %v is above max pending amount of %v",
×
252
                        edge.MaxHTLC, amtMax,
×
253
                )
×
254

255
        case edge.MinHTLC > edge.MaxHTLC:
×
256
                return fmt.Errorf(
×
257
                        "min_htlc %v greater than max_htlc %v",
×
258
                        edge.MinHTLC, edge.MaxHTLC,
×
259
                )
×
260
        }
261

262
        // Clear signature to help prevent usage of the previous signature.
263
        edge.SetSigBytes(nil)
2✔
264

2✔
265
        return nil
2✔
266
}
267

268
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
269
// constraints.
270
func (r *Manager) getHtlcAmtLimits(tx kvdb.RTx, chanPoint wire.OutPoint) (
271
        lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
2✔
272

2✔
273
        ch, err := r.FetchChannel(tx, chanPoint)
2✔
274
        if err != nil {
4✔
275
                return 0, 0, err
2✔
276
        }
2✔
277

278
        // The max htlc policy field must be less than or equal to the channel
279
        // capacity AND less than or equal to the max in-flight HTLC value.
280
        // Since the latter is always less than or equal to the former, just
281
        // return the max in-flight value.
282
        maxAmt := ch.LocalChanCfg.ChannelStateBounds.MaxPendingAmount
2✔
283

2✔
284
        return ch.LocalChanCfg.MinHTLC, maxAmt, nil
2✔
285
}
286

287
// makeFailureItem creates a lnrpc.FailedUpdate object.
288
func makeFailureItem(outPoint wire.OutPoint, updateFailure lnrpc.UpdateFailure,
289
        errStr string) *lnrpc.FailedUpdate {
2✔
290

2✔
291
        outpoint := lnrpc.MarshalOutPoint(&outPoint)
2✔
292

2✔
293
        return &lnrpc.FailedUpdate{
2✔
294
                Outpoint:    outpoint,
2✔
295
                Reason:      updateFailure,
2✔
296
                UpdateError: errStr,
2✔
297
        }
2✔
298
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc