• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12352028755

16 Dec 2024 11:50AM UTC coverage: 58.632% (-0.004%) from 58.636%
12352028755

push

github

web-flow
Merge pull request #9357 from GeorgeTsagk/onchain-htlc-replay-wire-records

contractcourt: include custom records on replayed htlc

2 of 2 new or added lines in 1 file covered. (100.0%)

67 existing lines in 14 files now uncovered.

134431 of 229278 relevant lines covered (58.63%)

19301.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/graph/notifications.go
1
package graph
2

3
import (
4
        "fmt"
5
        "image/color"
6
        "net"
7
        "sync"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/go-errors/errors"
13
        "github.com/lightningnetwork/lnd/graph/db/models"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
)
16

17
// TopologyClient represents an intent to receive notifications from the
18
// channel router regarding changes to the topology of the channel graph. The
19
// TopologyChanges channel will be sent upon with new updates to the channel
20
// graph in real-time as they're encountered.
21
type TopologyClient struct {
22
        // TopologyChanges is a receive only channel that new channel graph
23
        // updates will be sent over.
24
        //
25
        // TODO(roasbeef): chan for each update type instead?
26
        TopologyChanges <-chan *TopologyChange
27

28
        // Cancel is a function closure that should be executed when the client
29
        // wishes to cancel their notification intent. Doing so allows the
30
        // ChannelRouter to free up resources.
31
        Cancel func()
32
}
33

34
// topologyClientUpdate is a message sent to the channel router to either
35
// register a new topology client or re-register an existing client.
36
type topologyClientUpdate struct {
37
        // cancel indicates if the update to the client is cancelling an
38
        // existing client's notifications. If not then this update will be to
39
        // register a new set of notifications.
40
        cancel bool
41

42
        // clientID is the unique identifier for this client. Any further
43
        // updates (deleting or adding) to this notification client will be
44
        // dispatched according to the target clientID.
45
        clientID uint64
46

47
        // ntfnChan is a *send-only* channel in which notifications should be
48
        // sent over from router -> client.
49
        ntfnChan chan<- *TopologyChange
50
}
51

52
// SubscribeTopology returns a new topology client which can be used by the
53
// caller to receive notifications whenever a change in the channel graph
54
// topology occurs. Changes that will be sent at notifications include: new
55
// nodes appearing, node updating their attributes, new channels, channels
56
// closing, and updates in the routing policies of a channel's directed edges.
57
func (b *Builder) SubscribeTopology() (*TopologyClient, error) {
8✔
58
        // If the router is not yet started, return an error to avoid a
8✔
59
        // deadlock waiting for it to handle the subscription request.
8✔
60
        if !b.started.Load() {
8✔
61
                return nil, fmt.Errorf("router not started")
×
62
        }
×
63

64
        // We'll first atomically obtain the next ID for this client from the
65
        // incrementing client ID counter.
66
        clientID := b.ntfnClientCounter.Add(1)
8✔
67

8✔
68
        log.Debugf("New graph topology client subscription, client %v",
8✔
69
                clientID)
8✔
70

8✔
71
        ntfnChan := make(chan *TopologyChange, 10)
8✔
72

8✔
73
        select {
8✔
74
        case b.ntfnClientUpdates <- &topologyClientUpdate{
75
                cancel:   false,
76
                clientID: clientID,
77
                ntfnChan: ntfnChan,
78
        }:
8✔
79
        case <-b.quit:
×
80
                return nil, errors.New("ChannelRouter shutting down")
×
81
        }
82

83
        return &TopologyClient{
8✔
84
                TopologyChanges: ntfnChan,
8✔
85
                Cancel: func() {
13✔
86
                        select {
5✔
87
                        case b.ntfnClientUpdates <- &topologyClientUpdate{
88
                                cancel:   true,
89
                                clientID: clientID,
90
                        }:
5✔
UNCOV
91
                        case <-b.quit:
×
UNCOV
92
                                return
×
93
                        }
94
                },
95
        }, nil
96
}
97

98
// topologyClient is a data-structure use by the channel router to couple the
99
// client's notification channel along with a special "exit" channel that can
100
// be used to cancel all lingering goroutines blocked on a send to the
101
// notification channel.
102
type topologyClient struct {
103
        // ntfnChan is a send-only channel that's used to propagate
104
        // notification s from the channel router to an instance of a
105
        // topologyClient client.
106
        ntfnChan chan<- *TopologyChange
107

108
        // exit is a channel that is used internally by the channel router to
109
        // cancel any active un-consumed goroutine notifications.
110
        exit chan struct{}
111

112
        wg sync.WaitGroup
113
}
114

115
// notifyTopologyChange notifies all registered clients of a new change in
116
// graph topology in a non-blocking.
117
func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) {
16✔
118
        // notifyClient is a helper closure that will send topology updates to
16✔
119
        // the given client.
16✔
120
        notifyClient := func(clientID uint64, client *topologyClient) bool {
26✔
121
                client.wg.Add(1)
10✔
122

10✔
123
                log.Tracef("Sending topology notification to client=%v, "+
10✔
124
                        "NodeUpdates=%v, ChannelEdgeUpdates=%v, "+
10✔
125
                        "ClosedChannels=%v", clientID,
10✔
126
                        len(topologyDiff.NodeUpdates),
10✔
127
                        len(topologyDiff.ChannelEdgeUpdates),
10✔
128
                        len(topologyDiff.ClosedChannels))
10✔
129

10✔
130
                go func(c *topologyClient) {
20✔
131
                        defer c.wg.Done()
10✔
132

10✔
133
                        select {
10✔
134

135
                        // In this case we'll try to send the notification
136
                        // directly to the upstream client consumer.
137
                        case c.ntfnChan <- topologyDiff:
10✔
138

139
                        // If the client cancels the notifications, then we'll
140
                        // exit early.
141
                        case <-c.exit:
×
142

143
                        // Similarly, if the ChannelRouter itself exists early,
144
                        // then we'll also exit ourselves.
145
                        case <-b.quit:
×
146

147
                        }
148
                }(client)
149

150
                // Always return true here so the following Range will iterate
151
                // all clients.
152
                return true
10✔
153
        }
154

155
        // Range over the set of active clients, and attempt to send the
156
        // topology updates.
157
        b.topologyClients.Range(notifyClient)
16✔
158
}
159

160
// TopologyChange represents a new set of modifications to the channel graph.
161
// Topology changes will be dispatched in real-time as the ChannelGraph
162
// validates and process modifications to the authenticated channel graph.
163
type TopologyChange struct {
164
        // NodeUpdates is a slice of nodes which are either new to the channel
165
        // graph, or have had their attributes updated in an authenticated
166
        // manner.
167
        NodeUpdates []*NetworkNodeUpdate
168

169
        // ChanelEdgeUpdates is a slice of channel edges which are either newly
170
        // opened and authenticated, or have had their routing policies
171
        // updated.
172
        ChannelEdgeUpdates []*ChannelEdgeUpdate
173

174
        // ClosedChannels contains a slice of close channel summaries which
175
        // described which block a channel was closed at, and also carry
176
        // supplemental information such as the capacity of the former channel.
177
        ClosedChannels []*ClosedChanSummary
178
}
179

180
// isEmpty returns true if the TopologyChange is empty. A TopologyChange is
181
// considered empty, if it contains no *new* updates of any type.
182
func (t *TopologyChange) isEmpty() bool {
29✔
183
        return len(t.NodeUpdates) == 0 && len(t.ChannelEdgeUpdates) == 0 &&
29✔
184
                len(t.ClosedChannels) == 0
29✔
185
}
29✔
186

187
// ClosedChanSummary is a summary of a channel that was detected as being
188
// closed by monitoring the blockchain. Once a channel's funding point has been
189
// spent, the channel will automatically be marked as closed by the
190
// ChainNotifier.
191
//
192
// TODO(roasbeef): add nodes involved?
193
type ClosedChanSummary struct {
194
        // ChanID is the short-channel ID which uniquely identifies the
195
        // channel.
196
        ChanID uint64
197

198
        // Capacity was the total capacity of the channel before it was closed.
199
        Capacity btcutil.Amount
200

201
        // ClosedHeight is the height in the chain that the channel was closed
202
        // at.
203
        ClosedHeight uint32
204

205
        // ChanPoint is the funding point, or the multi-sig utxo which
206
        // previously represented the channel.
207
        ChanPoint wire.OutPoint
208
}
209

210
// createCloseSummaries takes in a slice of channels closed at the target block
211
// height and creates a slice of summaries which of each channel closure.
212
func createCloseSummaries(blockHeight uint32,
213
        closedChans ...*models.ChannelEdgeInfo) []*ClosedChanSummary {
5✔
214

5✔
215
        closeSummaries := make([]*ClosedChanSummary, len(closedChans))
5✔
216
        for i, closedChan := range closedChans {
10✔
217
                closeSummaries[i] = &ClosedChanSummary{
5✔
218
                        ChanID:       closedChan.ChannelID,
5✔
219
                        Capacity:     closedChan.Capacity,
5✔
220
                        ClosedHeight: blockHeight,
5✔
221
                        ChanPoint:    closedChan.ChannelPoint,
5✔
222
                }
5✔
223
        }
5✔
224

225
        return closeSummaries
5✔
226
}
227

228
// NetworkNodeUpdate is an update for a  node within the Lightning Network. A
229
// NetworkNodeUpdate is sent out either when a new node joins the network, or a
230
// node broadcasts a new update with a newer time stamp that supersedes its
231
// old update. All updates are properly authenticated.
232
type NetworkNodeUpdate struct {
233
        // Addresses is a slice of all the node's known addresses.
234
        Addresses []net.Addr
235

236
        // IdentityKey is the identity public key of the target node. This is
237
        // used to encrypt onion blobs as well as to authenticate any new
238
        // updates.
239
        IdentityKey *btcec.PublicKey
240

241
        // Alias is the alias or nick name of the node.
242
        Alias string
243

244
        // Color is the node's color in hex code format.
245
        Color string
246

247
        // Features holds the set of features the node supports.
248
        Features *lnwire.FeatureVector
249
}
250

251
// ChannelEdgeUpdate is an update for a new channel within the ChannelGraph.
252
// This update is sent out once a new authenticated channel edge is discovered
253
// within the network. These updates are directional, so if a channel is fully
254
// public, then there will be two updates sent out: one for each direction
255
// within the channel. Each update will carry that particular routing edge
256
// policy for the channel direction.
257
//
258
// An edge is a channel in the direction of AdvertisingNode -> ConnectingNode.
259
type ChannelEdgeUpdate struct {
260
        // ChanID is the unique short channel ID for the channel. This encodes
261
        // where in the blockchain the channel's funding transaction was
262
        // originally confirmed.
263
        ChanID uint64
264

265
        // ChanPoint is the outpoint which represents the multi-sig funding
266
        // output for the channel.
267
        ChanPoint wire.OutPoint
268

269
        // Capacity is the capacity of the newly created channel.
270
        Capacity btcutil.Amount
271

272
        // MinHTLC is the minimum HTLC amount that this channel will forward.
273
        MinHTLC lnwire.MilliSatoshi
274

275
        // MaxHTLC is the maximum HTLC amount that this channel will forward.
276
        MaxHTLC lnwire.MilliSatoshi
277

278
        // BaseFee is the base fee that will charged for all HTLC's forwarded
279
        // across the this channel direction.
280
        BaseFee lnwire.MilliSatoshi
281

282
        // FeeRate is the fee rate that will be shared for all HTLC's forwarded
283
        // across this channel direction.
284
        FeeRate lnwire.MilliSatoshi
285

286
        // TimeLockDelta is the time-lock expressed in blocks that will be
287
        // added to outgoing HTLC's from incoming HTLC's. This value is the
288
        // difference of the incoming and outgoing HTLC's time-locks routed
289
        // through this hop.
290
        TimeLockDelta uint16
291

292
        // AdvertisingNode is the node that's advertising this edge.
293
        AdvertisingNode *btcec.PublicKey
294

295
        // ConnectingNode is the node that the advertising node connects to.
296
        ConnectingNode *btcec.PublicKey
297

298
        // Disabled, if true, signals that the channel is unavailable to relay
299
        // payments.
300
        Disabled bool
301

302
        // ExtraOpaqueData is the set of data that was appended to this message
303
        // to fill out the full maximum transport message size. These fields can
304
        // be used to specify optional data such as custom TLV fields.
305
        ExtraOpaqueData lnwire.ExtraOpaqueData
306
}
307

308
// appendTopologyChange appends the passed update message to the passed
309
// TopologyChange, properly identifying which type of update the message
310
// constitutes. This function will also fetch any required auxiliary
311
// information required to create the topology change update from the graph
312
// database.
313
func addToTopologyChange(graph DB, update *TopologyChange,
314
        msg interface{}) error {
29✔
315

29✔
316
        switch m := msg.(type) {
29✔
317

318
        // Any node announcement maps directly to a NetworkNodeUpdate struct.
319
        // No further data munging or db queries are required.
320
        case *models.LightningNode:
10✔
321
                pubKey, err := m.PubKey()
10✔
322
                if err != nil {
10✔
323
                        return err
×
324
                }
×
325

326
                nodeUpdate := &NetworkNodeUpdate{
10✔
327
                        Addresses:   m.Addresses,
10✔
328
                        IdentityKey: pubKey,
10✔
329
                        Alias:       m.Alias,
10✔
330
                        Color:       EncodeHexColor(m.Color),
10✔
331
                        Features:    m.Features.Clone(),
10✔
332
                }
10✔
333

10✔
334
                update.NodeUpdates = append(update.NodeUpdates, nodeUpdate)
10✔
335
                return nil
10✔
336

337
        // We ignore initial channel announcements as we'll only send out
338
        // updates once the individual edges themselves have been updated.
339
        case *models.ChannelEdgeInfo:
18✔
340
                return nil
18✔
341

342
        // Any new ChannelUpdateAnnouncements will generate a corresponding
343
        // ChannelEdgeUpdate notification.
344
        case *models.ChannelEdgePolicy:
9✔
345
                // We'll need to fetch the edge's information from the database
9✔
346
                // in order to get the information concerning which nodes are
9✔
347
                // being connected.
9✔
348
                edgeInfo, _, _, err := graph.FetchChannelEdgesByID(m.ChannelID)
9✔
349
                if err != nil {
9✔
350
                        return errors.Errorf("unable fetch channel edge: %v",
×
351
                                err)
×
352
                }
×
353

354
                // If the flag is one, then the advertising node is actually
355
                // the second node.
356
                sourceNode := edgeInfo.NodeKey1
9✔
357
                connectingNode := edgeInfo.NodeKey2
9✔
358
                if m.ChannelFlags&lnwire.ChanUpdateDirection == 1 {
15✔
359
                        sourceNode = edgeInfo.NodeKey2
6✔
360
                        connectingNode = edgeInfo.NodeKey1
6✔
361
                }
6✔
362

363
                aNode, err := sourceNode()
9✔
364
                if err != nil {
9✔
365
                        return err
×
366
                }
×
367
                cNode, err := connectingNode()
9✔
368
                if err != nil {
9✔
369
                        return err
×
370
                }
×
371

372
                edgeUpdate := &ChannelEdgeUpdate{
9✔
373
                        ChanID:          m.ChannelID,
9✔
374
                        ChanPoint:       edgeInfo.ChannelPoint,
9✔
375
                        TimeLockDelta:   m.TimeLockDelta,
9✔
376
                        Capacity:        edgeInfo.Capacity,
9✔
377
                        MinHTLC:         m.MinHTLC,
9✔
378
                        MaxHTLC:         m.MaxHTLC,
9✔
379
                        BaseFee:         m.FeeBaseMSat,
9✔
380
                        FeeRate:         m.FeeProportionalMillionths,
9✔
381
                        AdvertisingNode: aNode,
9✔
382
                        ConnectingNode:  cNode,
9✔
383
                        Disabled:        m.ChannelFlags&lnwire.ChanUpdateDisabled != 0,
9✔
384
                        ExtraOpaqueData: m.ExtraOpaqueData,
9✔
385
                }
9✔
386

9✔
387
                // TODO(roasbeef): add bit to toggle
9✔
388
                update.ChannelEdgeUpdates = append(update.ChannelEdgeUpdates,
9✔
389
                        edgeUpdate)
9✔
390
                return nil
9✔
391

392
        default:
×
393
                return fmt.Errorf("unable to add to topology change, "+
×
394
                        "unknown message type %T", msg)
×
395
        }
396
}
397

398
// EncodeHexColor takes a color and returns it in hex code format.
399
func EncodeHexColor(color color.RGBA) string {
19✔
400
        return fmt.Sprintf("#%02x%02x%02x", color.R, color.G, color.B)
19✔
401
}
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc