• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.84
/routing/localchans/manager.go
1
package localchans
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7

8
        "github.com/btcsuite/btcd/wire"
9
        "github.com/lightningnetwork/lnd/channeldb"
10
        "github.com/lightningnetwork/lnd/channeldb/models"
11
        "github.com/lightningnetwork/lnd/discovery"
12
        "github.com/lightningnetwork/lnd/fn"
13
        "github.com/lightningnetwork/lnd/kvdb"
14
        "github.com/lightningnetwork/lnd/lnrpc"
15
        "github.com/lightningnetwork/lnd/lnwire"
16
        "github.com/lightningnetwork/lnd/routing"
17
)
18

19
// Manager manages the node's local channels. The only operation that is
20
// currently implemented is updating forwarding policies.
21
type Manager struct {
22
        // UpdateForwardingPolicies is used by the manager to update active
23
        // links with a new policy.
24
        UpdateForwardingPolicies func(
25
                chanPolicies map[wire.OutPoint]models.ForwardingPolicy)
26

27
        // PropagateChanPolicyUpdate is called to persist a new policy to disk
28
        // and broadcast it to the network.
29
        PropagateChanPolicyUpdate func(
30
                edgesToUpdate []discovery.EdgeWithInfo) error
31

32
        // ForAllOutgoingChannels is required to iterate over all our local
33
        // channels.
34
        ForAllOutgoingChannels func(cb func(kvdb.RTx,
35
                *models.ChannelEdgeInfo,
36
                *models.ChannelEdgePolicy) error) error
37

38
        // FetchChannel is used to query local channel parameters. Optionally an
39
        // existing db tx can be supplied.
40
        FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) (
41
                *channeldb.OpenChannel, error)
42

43
        // policyUpdateLock ensures that the database and the link do not fall
44
        // out of sync if there are concurrent fee update calls. Without it,
45
        // there is a chance that policy A updates the database, then policy B
46
        // updates the database, then policy B updates the link, then policy A
47
        // updates the link.
48
        policyUpdateLock sync.Mutex
49
}
50

51
// UpdatePolicy updates the policy for the specified channels on disk and in
52
// the active links.
53
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
54
        chanPoints ...wire.OutPoint) ([]*lnrpc.FailedUpdate, error) {
4✔
55

4✔
56
        r.policyUpdateLock.Lock()
4✔
57
        defer r.policyUpdateLock.Unlock()
4✔
58

4✔
59
        // First, we'll construct a set of all the channels that we are
4✔
60
        // trying to update.
4✔
61
        unprocessedChans := make(map[wire.OutPoint]struct{})
4✔
62
        for _, chanPoint := range chanPoints {
7✔
63
                unprocessedChans[chanPoint] = struct{}{}
3✔
64
        }
3✔
65

66
        haveChanFilter := len(unprocessedChans) != 0
4✔
67

4✔
68
        var failedUpdates []*lnrpc.FailedUpdate
4✔
69
        var edgesToUpdate []discovery.EdgeWithInfo
4✔
70
        policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
4✔
71

4✔
72
        // Next, we'll loop over all the outgoing channels the router knows of.
4✔
73
        // If we have a filter then we'll only collected those channels,
4✔
74
        // otherwise we'll collect them all.
4✔
75
        err := r.ForAllOutgoingChannels(func(
4✔
76
                tx kvdb.RTx,
4✔
77
                info *models.ChannelEdgeInfo,
4✔
78
                edge *models.ChannelEdgePolicy) error {
8✔
79

4✔
80
                // If we have a channel filter, and this channel isn't a part
4✔
81
                // of it, then we'll skip it.
4✔
82
                _, ok := unprocessedChans[info.ChannelPoint]
4✔
83
                if !ok && haveChanFilter {
5✔
84
                        return nil
1✔
85
                }
1✔
86

87
                // Mark this channel as found by removing it. unprocessedChans
88
                // will be used to report invalid channels later on.
89
                delete(unprocessedChans, info.ChannelPoint)
3✔
90

3✔
91
                // Apply the new policy to the edge.
3✔
92
                err := r.updateEdge(tx, info.ChannelPoint, edge, newSchema)
3✔
93
                if err != nil {
3✔
UNCOV
94
                        failedUpdates = append(failedUpdates,
×
UNCOV
95
                                makeFailureItem(info.ChannelPoint,
×
UNCOV
96
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
×
UNCOV
97
                                        err.Error(),
×
UNCOV
98
                                ))
×
UNCOV
99

×
UNCOV
100
                        return nil
×
UNCOV
101
                }
×
102

103
                // Add updated edge to list of edges to send to gossiper.
104
                edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
3✔
105
                        Info: info,
3✔
106
                        Edge: edge,
3✔
107
                })
3✔
108

3✔
109
                // Extract inbound fees from the ExtraOpaqueData.
3✔
110
                var inboundWireFee lnwire.Fee
3✔
111
                _, err = edge.ExtraOpaqueData.ExtractRecords(&inboundWireFee)
3✔
112
                if err != nil {
3✔
113
                        return err
×
114
                }
×
115
                inboundFee := models.NewInboundFeeFromWire(inboundWireFee)
3✔
116

3✔
117
                // Add updated policy to list of policies to send to switch.
3✔
118
                policiesToUpdate[info.ChannelPoint] = models.ForwardingPolicy{
3✔
119
                        BaseFee:       edge.FeeBaseMSat,
3✔
120
                        FeeRate:       edge.FeeProportionalMillionths,
3✔
121
                        TimeLockDelta: uint32(edge.TimeLockDelta),
3✔
122
                        MinHTLCOut:    edge.MinHTLC,
3✔
123
                        MaxHTLC:       edge.MaxHTLC,
3✔
124
                        InboundFee:    inboundFee,
3✔
125
                }
3✔
126

3✔
127
                return nil
3✔
128
        })
129
        if err != nil {
4✔
130
                return nil, err
×
131
        }
×
132

133
        // Construct a list of failed policy updates.
134
        for chanPoint := range unprocessedChans {
5✔
135
                channel, err := r.FetchChannel(nil, chanPoint)
1✔
136
                switch {
1✔
137
                case errors.Is(err, channeldb.ErrChannelNotFound):
1✔
138
                        failedUpdates = append(failedUpdates,
1✔
139
                                makeFailureItem(chanPoint,
1✔
140
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
1✔
141
                                        "not found",
1✔
142
                                ))
1✔
143

144
                case err != nil:
×
145
                        failedUpdates = append(failedUpdates,
×
146
                                makeFailureItem(chanPoint,
×
147
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_INTERNAL_ERR,
×
148
                                        err.Error(),
×
149
                                ))
×
150

151
                case channel.IsPending:
×
152
                        failedUpdates = append(failedUpdates,
×
153
                                makeFailureItem(chanPoint,
×
154
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_PENDING,
×
155
                                        "not yet confirmed",
×
156
                                ))
×
157

158
                default:
×
159
                        failedUpdates = append(failedUpdates,
×
160
                                makeFailureItem(chanPoint,
×
161
                                        lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
×
162
                                        "could not update policies",
×
163
                                ))
×
164
                }
165
        }
166

167
        // Commit the policy updates to disk and broadcast to the network. We
168
        // validated the new policy above, so we expect no validation errors. If
169
        // this would happen because of a bug, the link policy will be
170
        // desynchronized. It is currently not possible to atomically commit
171
        // multiple edge updates.
172
        err = r.PropagateChanPolicyUpdate(edgesToUpdate)
4✔
173
        if err != nil {
4✔
174
                return nil, err
×
175
        }
×
176

177
        // Update active links.
178
        r.UpdateForwardingPolicies(policiesToUpdate)
4✔
179

4✔
180
        return failedUpdates, nil
4✔
181
}
182

183
// updateEdge updates the given edge with the new schema.
184
func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint,
185
        edge *models.ChannelEdgePolicy,
186
        newSchema routing.ChannelPolicy) error {
3✔
187

3✔
188
        // Update forwarding fee scheme and required time lock delta.
3✔
189
        edge.FeeBaseMSat = newSchema.BaseFee
3✔
190
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
3✔
191
                newSchema.FeeRate,
3✔
192
        )
3✔
193

3✔
194
        // If inbound fees are set, we update the edge with them.
3✔
195
        err := fn.MapOptionZ(newSchema.InboundFee,
3✔
196
                func(f models.InboundFee) error {
3✔
UNCOV
197
                        inboundWireFee := f.ToWire()
×
UNCOV
198
                        return edge.ExtraOpaqueData.PackRecords(
×
UNCOV
199
                                &inboundWireFee,
×
UNCOV
200
                        )
×
UNCOV
201
                })
×
202
        if err != nil {
3✔
203
                return err
×
204
        }
×
205

206
        edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
3✔
207

3✔
208
        // Retrieve negotiated channel htlc amt limits.
3✔
209
        amtMin, amtMax, err := r.getHtlcAmtLimits(tx, chanPoint)
3✔
210
        if err != nil {
3✔
UNCOV
211
                return err
×
UNCOV
212
        }
×
213

214
        // We now update the edge max htlc value.
215
        switch {
3✔
216
        // If a non-zero max htlc was specified, use it to update the edge.
217
        // Otherwise keep the value unchanged.
218
        case newSchema.MaxHTLC != 0:
2✔
219
                edge.MaxHTLC = newSchema.MaxHTLC
2✔
220

221
        // If this edge still doesn't have a max htlc set, set it to the max.
222
        // This is an on-the-fly migration.
223
        case !edge.MessageFlags.HasMaxHtlc():
×
224
                edge.MaxHTLC = amtMax
×
225

226
        // If this edge has a max htlc that exceeds what the channel can
227
        // actually carry, correct it now. This can happen, because we
228
        // previously set the max htlc to the channel capacity.
229
        case edge.MaxHTLC > amtMax:
×
230
                edge.MaxHTLC = amtMax
×
231
        }
232

233
        // If a new min htlc is specified, update the edge.
234
        if newSchema.MinHTLC != nil {
3✔
235
                edge.MinHTLC = *newSchema.MinHTLC
×
236
        }
×
237

238
        // If the MaxHtlc flag wasn't already set, we can set it now.
239
        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
3✔
240

3✔
241
        // Validate htlc amount constraints.
3✔
242
        switch {
3✔
243
        case edge.MinHTLC < amtMin:
×
244
                return fmt.Errorf(
×
245
                        "min htlc amount of %v is below min htlc parameter of %v",
×
246
                        edge.MinHTLC, amtMin,
×
247
                )
×
248

249
        case edge.MaxHTLC > amtMax:
×
250
                return fmt.Errorf(
×
251
                        "max htlc size of %v is above max pending amount of %v",
×
252
                        edge.MaxHTLC, amtMax,
×
253
                )
×
254

255
        case edge.MinHTLC > edge.MaxHTLC:
×
256
                return fmt.Errorf(
×
257
                        "min_htlc %v greater than max_htlc %v",
×
258
                        edge.MinHTLC, edge.MaxHTLC,
×
259
                )
×
260
        }
261

262
        // Clear signature to help prevent usage of the previous signature.
263
        edge.SetSigBytes(nil)
3✔
264

3✔
265
        return nil
3✔
266
}
267

268
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
269
// constraints.
270
func (r *Manager) getHtlcAmtLimits(tx kvdb.RTx, chanPoint wire.OutPoint) (
271
        lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
3✔
272

3✔
273
        ch, err := r.FetchChannel(tx, chanPoint)
3✔
274
        if err != nil {
3✔
UNCOV
275
                return 0, 0, err
×
UNCOV
276
        }
×
277

278
        // The max htlc policy field must be less than or equal to the channel
279
        // capacity AND less than or equal to the max in-flight HTLC value.
280
        // Since the latter is always less than or equal to the former, just
281
        // return the max in-flight value.
282
        maxAmt := ch.LocalChanCfg.ChannelStateBounds.MaxPendingAmount
3✔
283

3✔
284
        return ch.LocalChanCfg.MinHTLC, maxAmt, nil
3✔
285
}
286

287
// makeFailureItem creates a lnrpc.FailedUpdate object.
288
func makeFailureItem(outPoint wire.OutPoint, updateFailure lnrpc.UpdateFailure,
289
        errStr string) *lnrpc.FailedUpdate {
1✔
290

1✔
291
        outpoint := lnrpc.MarshalOutPoint(&outPoint)
1✔
292

1✔
293
        return &lnrpc.FailedUpdate{
1✔
294
                Outpoint:    outpoint,
1✔
295
                Reason:      updateFailure,
1✔
296
                UpdateError: errStr,
1✔
297
        }
1✔
298
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc