• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13416360603

19 Feb 2025 03:34PM UTC coverage: 58.686% (-0.1%) from 58.794%
13416360603

Pull #9529

github

ellemouton
graph/db: move Topology client management to ChannelGraph
Pull Request #9529: [Concept ACK 🙏 ] graph: move graph cache out of CRUD layer & move topology change subscription

2732 of 3486 new or added lines in 9 files covered. (78.37%)

358 existing lines in 29 files now uncovered.

135967 of 231686 relevant lines covered (58.69%)

19334.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.19
/graph/db/notifications.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "image/color"
6
        "net"
7
        "sync"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/go-errors/errors"
13
        "github.com/lightningnetwork/lnd/graph/db/models"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
)
16

17
// TopologyClient represents an intent to receive notifications from the
18
// channel router regarding changes to the topology of the channel graph. The
19
// TopologyChanges channel will be sent upon with new updates to the channel
20
// graph in real-time as they're encountered.
21
type TopologyClient struct {
22
        // TopologyChanges is a receive only channel that new channel graph
23
        // updates will be sent over.
24
        //
25
        // TODO(roasbeef): chan for each update type instead?
26
        TopologyChanges <-chan *TopologyChange
27

28
        // Cancel is a function closure that should be executed when the client
29
        // wishes to cancel their notification intent. Doing so allows the
30
        // ChannelRouter to free up resources.
31
        Cancel func()
32
}
33

34
// topologyClientUpdate is a message sent to the channel router to either
35
// register a new topology client or re-register an existing client.
36
type topologyClientUpdate struct {
37
        // cancel indicates if the update to the client is cancelling an
38
        // existing client's notifications. If not then this update will be to
39
        // register a new set of notifications.
40
        cancel bool
41

42
        // clientID is the unique identifier for this client. Any further
43
        // updates (deleting or adding) to this notification client will be
44
        // dispatched according to the target clientID.
45
        clientID uint64
46

47
        // ntfnChan is a *send-only* channel in which notifications should be
48
        // sent over from router -> client.
49
        ntfnChan chan<- *TopologyChange
50
}
51

52
// SubscribeTopology returns a new topology client which can be used by the
53
// caller to receive notifications whenever a change in the channel graph
54
// topology occurs. Changes that will be sent at notifications include: new
55
// nodes appearing, node updating their attributes, new channels, channels
56
// closing, and updates in the routing policies of a channel's directed edges.
57
func (c *ChannelGraph) SubscribeTopology() (*TopologyClient, error) {
6✔
58
        // If the router is not yet started, return an error to avoid a
6✔
59
        // deadlock waiting for it to handle the subscription request.
6✔
60
        if !c.started.Load() {
6✔
61
                return nil, fmt.Errorf("router not started")
×
62
        }
×
63

64
        // We'll first atomically obtain the next ID for this client from the
65
        // incrementing client ID counter.
66
        clientID := c.ntfnClientCounter.Add(1)
6✔
67

6✔
68
        log.Debugf("New graph topology client subscription, client %v",
6✔
69
                clientID)
6✔
70

6✔
71
        ntfnChan := make(chan *TopologyChange, 10)
6✔
72

6✔
73
        select {
6✔
74
        case c.ntfnClientUpdates <- &topologyClientUpdate{
75
                cancel:   false,
76
                clientID: clientID,
77
                ntfnChan: ntfnChan,
78
        }:
6✔
NEW
79
        case <-c.quit:
×
80
                return nil, errors.New("ChannelRouter shutting down")
×
81
        }
82

83
        return &TopologyClient{
6✔
84
                TopologyChanges: ntfnChan,
6✔
85
                Cancel: func() {
9✔
86
                        select {
3✔
87
                        case c.ntfnClientUpdates <- &topologyClientUpdate{
88
                                cancel:   true,
89
                                clientID: clientID,
90
                        }:
3✔
NEW
91
                        case <-c.quit:
×
92
                                return
×
93
                        }
94
                },
95
        }, nil
96
}
97

98
// topologyClient is a data-structure use by the channel router to couple the
99
// client's notification channel along with a special "exit" channel that can
100
// be used to cancel all lingering goroutines blocked on a send to the
101
// notification channel.
102
type topologyClient struct {
103
        // ntfnChan is a send-only channel that's used to propagate
104
        // notification s from the channel router to an instance of a
105
        // topologyClient client.
106
        ntfnChan chan<- *TopologyChange
107

108
        // exit is a channel that is used internally by the channel router to
109
        // cancel any active un-consumed goroutine notifications.
110
        exit chan struct{}
111

112
        wg sync.WaitGroup
113
}
114

115
// notifyTopologyChange notifies all registered clients of a new change in
116
// graph topology in a non-blocking.
117
func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) {
3,473✔
118
        // notifyClient is a helper closure that will send topology updates to
3,473✔
119
        // the given client.
3,473✔
120
        notifyClient := func(clientID uint64, client *topologyClient) bool {
3,481✔
121
                client.wg.Add(1)
8✔
122

8✔
123
                log.Tracef("Sending topology notification to client=%v, "+
8✔
124
                        "NodeUpdates=%v, ChannelEdgeUpdates=%v, "+
8✔
125
                        "ClosedChannels=%v", clientID,
8✔
126
                        len(topologyDiff.NodeUpdates),
8✔
127
                        len(topologyDiff.ChannelEdgeUpdates),
8✔
128
                        len(topologyDiff.ClosedChannels))
8✔
129

8✔
130
                go func(t *topologyClient) {
16✔
131
                        defer t.wg.Done()
8✔
132

8✔
133
                        select {
8✔
134

135
                        // In this case we'll try to send the notification
136
                        // directly to the upstream client consumer.
137
                        case t.ntfnChan <- topologyDiff:
8✔
138

139
                        // If the client cancels the notifications, then we'll
140
                        // exit early.
NEW
141
                        case <-t.exit:
×
142

143
                        // Similarly, if the ChannelRouter itself exists early,
144
                        // then we'll also exit ourselves.
NEW
145
                        case <-c.quit:
×
146
                        }
147
                }(client)
148

149
                // Always return true here so the following Range will iterate
150
                // all clients.
151
                return true
8✔
152
        }
153

154
        // Range over the set of active clients, and attempt to send the
155
        // topology updates.
156
        c.topologyClients.Range(notifyClient)
3,473✔
157
}
158

159
// handleTopologyUpdate is responsible for sending any topology changes
160
// notifications to registered clients.
161
//
162
// NOTE: must be run inside goroutine.
163
func (c *ChannelGraph) handleTopologyUpdate(update any) {
4,941✔
164
        defer c.wg.Done()
4,941✔
165

4,941✔
166
        topChange := &TopologyChange{}
4,941✔
167
        err := c.addToTopologyChange(topChange, update)
4,941✔
168
        if err != nil {
4,946✔
169
                log.Errorf("unable to update topology change notification: %v",
5✔
170
                        err)
5✔
171
                return
5✔
172
        }
5✔
173

174
        if topChange.isEmpty() {
6,418✔
175
                return
1,482✔
176
        }
1,482✔
177

178
        c.notifyTopologyChange(topChange)
3,456✔
179
}
180

181
// TopologyChange represents a new set of modifications to the channel graph.
182
// Topology changes will be dispatched in real-time as the ChannelGraph
183
// validates and process modifications to the authenticated channel graph.
184
type TopologyChange struct {
185
        // NodeUpdates is a slice of nodes which are either new to the channel
186
        // graph, or have had their attributes updated in an authenticated
187
        // manner.
188
        NodeUpdates []*NetworkNodeUpdate
189

190
        // ChanelEdgeUpdates is a slice of channel edges which are either newly
191
        // opened and authenticated, or have had their routing policies
192
        // updated.
193
        ChannelEdgeUpdates []*ChannelEdgeUpdate
194

195
        // ClosedChannels contains a slice of close channel summaries which
196
        // described which block a channel was closed at, and also carry
197
        // supplemental information such as the capacity of the former channel.
198
        ClosedChannels []*ClosedChanSummary
199
}
200

201
// isEmpty returns true if the TopologyChange is empty. A TopologyChange is
202
// considered empty, if it contains no *new* updates of any type.
203
func (t *TopologyChange) isEmpty() bool {
4,936✔
204
        return len(t.NodeUpdates) == 0 && len(t.ChannelEdgeUpdates) == 0 &&
4,936✔
205
                len(t.ClosedChannels) == 0
4,936✔
206
}
4,936✔
207

208
// ClosedChanSummary is a summary of a channel that was detected as being
209
// closed by monitoring the blockchain. Once a channel's funding point has been
210
// spent, the channel will automatically be marked as closed by the
211
// ChainNotifier.
212
//
213
// TODO(roasbeef): add nodes involved?
214
type ClosedChanSummary struct {
215
        // ChanID is the short-channel ID which uniquely identifies the
216
        // channel.
217
        ChanID uint64
218

219
        // Capacity was the total capacity of the channel before it was closed.
220
        Capacity btcutil.Amount
221

222
        // ClosedHeight is the height in the chain that the channel was closed
223
        // at.
224
        ClosedHeight uint32
225

226
        // ChanPoint is the funding point, or the multi-sig utxo which
227
        // previously represented the channel.
228
        ChanPoint wire.OutPoint
229
}
230

231
// createCloseSummaries takes in a slice of channels closed at the target block
232
// height and creates a slice of summaries which of each channel closure.
233
func createCloseSummaries(blockHeight uint32,
234
        closedChans ...*models.ChannelEdgeInfo) []*ClosedChanSummary {
19✔
235

19✔
236
        closeSummaries := make([]*ClosedChanSummary, len(closedChans))
19✔
237
        for i, closedChan := range closedChans {
41✔
238
                closeSummaries[i] = &ClosedChanSummary{
22✔
239
                        ChanID:       closedChan.ChannelID,
22✔
240
                        Capacity:     closedChan.Capacity,
22✔
241
                        ClosedHeight: blockHeight,
22✔
242
                        ChanPoint:    closedChan.ChannelPoint,
22✔
243
                }
22✔
244
        }
22✔
245

246
        return closeSummaries
19✔
247
}
248

249
// NetworkNodeUpdate is an update for a  node within the Lightning Network. A
250
// NetworkNodeUpdate is sent out either when a new node joins the network, or a
251
// node broadcasts a new update with a newer time stamp that supersedes its
252
// old update. All updates are properly authenticated.
253
type NetworkNodeUpdate struct {
254
        // Addresses is a slice of all the node's known addresses.
255
        Addresses []net.Addr
256

257
        // IdentityKey is the identity public key of the target node. This is
258
        // used to encrypt onion blobs as well as to authenticate any new
259
        // updates.
260
        IdentityKey *btcec.PublicKey
261

262
        // Alias is the alias or nick name of the node.
263
        Alias string
264

265
        // Color is the node's color in hex code format.
266
        Color string
267

268
        // Features holds the set of features the node supports.
269
        Features *lnwire.FeatureVector
270
}
271

272
// ChannelEdgeUpdate is an update for a new channel within the ChannelGraph.
273
// This update is sent out once a new authenticated channel edge is discovered
274
// within the network. These updates are directional, so if a channel is fully
275
// public, then there will be two updates sent out: one for each direction
276
// within the channel. Each update will carry that particular routing edge
277
// policy for the channel direction.
278
//
279
// An edge is a channel in the direction of AdvertisingNode -> ConnectingNode.
280
type ChannelEdgeUpdate struct {
281
        // ChanID is the unique short channel ID for the channel. This encodes
282
        // where in the blockchain the channel's funding transaction was
283
        // originally confirmed.
284
        ChanID uint64
285

286
        // ChanPoint is the outpoint which represents the multi-sig funding
287
        // output for the channel.
288
        ChanPoint wire.OutPoint
289

290
        // Capacity is the capacity of the newly created channel.
291
        Capacity btcutil.Amount
292

293
        // MinHTLC is the minimum HTLC amount that this channel will forward.
294
        MinHTLC lnwire.MilliSatoshi
295

296
        // MaxHTLC is the maximum HTLC amount that this channel will forward.
297
        MaxHTLC lnwire.MilliSatoshi
298

299
        // BaseFee is the base fee that will charged for all HTLC's forwarded
300
        // across the this channel direction.
301
        BaseFee lnwire.MilliSatoshi
302

303
        // FeeRate is the fee rate that will be shared for all HTLC's forwarded
304
        // across this channel direction.
305
        FeeRate lnwire.MilliSatoshi
306

307
        // TimeLockDelta is the time-lock expressed in blocks that will be
308
        // added to outgoing HTLC's from incoming HTLC's. This value is the
309
        // difference of the incoming and outgoing HTLC's time-locks routed
310
        // through this hop.
311
        TimeLockDelta uint16
312

313
        // AdvertisingNode is the node that's advertising this edge.
314
        AdvertisingNode *btcec.PublicKey
315

316
        // ConnectingNode is the node that the advertising node connects to.
317
        ConnectingNode *btcec.PublicKey
318

319
        // Disabled, if true, signals that the channel is unavailable to relay
320
        // payments.
321
        Disabled bool
322

323
        // ExtraOpaqueData is the set of data that was appended to this message
324
        // to fill out the full maximum transport message size. These fields can
325
        // be used to specify optional data such as custom TLV fields.
326
        ExtraOpaqueData lnwire.ExtraOpaqueData
327
}
328

329
// appendTopologyChange appends the passed update message to the passed
330
// TopologyChange, properly identifying which type of update the message
331
// constitutes. This function will also fetch any required auxiliary
332
// information required to create the topology change update from the graph
333
// database.
334
func (c *ChannelGraph) addToTopologyChange(update *TopologyChange,
335
        msg any) error {
4,941✔
336

4,941✔
337
        switch m := msg.(type) {
4,941✔
338

339
        // Any node announcement maps directly to a NetworkNodeUpdate struct.
340
        // No further data munging or db queries are required.
341
        case *models.LightningNode:
801✔
342
                pubKey, err := m.PubKey()
801✔
343
                if err != nil {
801✔
344
                        return err
×
345
                }
×
346

347
                nodeUpdate := &NetworkNodeUpdate{
801✔
348
                        Addresses:   m.Addresses,
801✔
349
                        IdentityKey: pubKey,
801✔
350
                        Alias:       m.Alias,
801✔
351
                        Color:       EncodeHexColor(m.Color),
801✔
352
                        Features:    m.Features.Clone(),
801✔
353
                }
801✔
354

801✔
355
                update.NodeUpdates = append(update.NodeUpdates, nodeUpdate)
801✔
356
                return nil
801✔
357

358
        // We ignore initial channel announcements as we'll only send out
359
        // updates once the individual edges themselves have been updated.
360
        case *models.ChannelEdgeInfo:
1,482✔
361
                return nil
1,482✔
362

363
        // Any new ChannelUpdateAnnouncements will generate a corresponding
364
        // ChannelEdgeUpdate notification.
365
        case *models.ChannelEdgePolicy:
2,662✔
366
                // We'll need to fetch the edge's information from the database
2,662✔
367
                // in order to get the information concerning which nodes are
2,662✔
368
                // being connected.
2,662✔
369
                edgeInfo, _, _, err := c.FetchChannelEdgesByID(m.ChannelID)
2,662✔
370
                if err != nil {
2,667✔
371
                        return errors.Errorf("unable fetch channel edge: %v",
5✔
372
                                err)
5✔
373
                }
5✔
374

375
                // If the flag is one, then the advertising node is actually
376
                // the second node.
377
                sourceNode := edgeInfo.NodeKey1
2,657✔
378
                connectingNode := edgeInfo.NodeKey2
2,657✔
379
                if m.ChannelFlags&lnwire.ChanUpdateDirection == 1 {
3,984✔
380
                        sourceNode = edgeInfo.NodeKey2
1,327✔
381
                        connectingNode = edgeInfo.NodeKey1
1,327✔
382
                }
1,327✔
383

384
                aNode, err := sourceNode()
2,657✔
385
                if err != nil {
2,657✔
386
                        return err
×
387
                }
×
388
                cNode, err := connectingNode()
2,657✔
389
                if err != nil {
2,657✔
390
                        return err
×
391
                }
×
392

393
                edgeUpdate := &ChannelEdgeUpdate{
2,657✔
394
                        ChanID:          m.ChannelID,
2,657✔
395
                        ChanPoint:       edgeInfo.ChannelPoint,
2,657✔
396
                        TimeLockDelta:   m.TimeLockDelta,
2,657✔
397
                        Capacity:        edgeInfo.Capacity,
2,657✔
398
                        MinHTLC:         m.MinHTLC,
2,657✔
399
                        MaxHTLC:         m.MaxHTLC,
2,657✔
400
                        BaseFee:         m.FeeBaseMSat,
2,657✔
401
                        FeeRate:         m.FeeProportionalMillionths,
2,657✔
402
                        AdvertisingNode: aNode,
2,657✔
403
                        ConnectingNode:  cNode,
2,657✔
404
                        Disabled:        m.ChannelFlags&lnwire.ChanUpdateDisabled != 0,
2,657✔
405
                        ExtraOpaqueData: m.ExtraOpaqueData,
2,657✔
406
                }
2,657✔
407

2,657✔
408
                // TODO(roasbeef): add bit to toggle
2,657✔
409
                update.ChannelEdgeUpdates = append(update.ChannelEdgeUpdates,
2,657✔
410
                        edgeUpdate)
2,657✔
411
                return nil
2,657✔
412

413
        default:
×
414
                return fmt.Errorf("unable to add to topology change, "+
×
415
                        "unknown message type %T", msg)
×
416
        }
417
}
418

419
// EncodeHexColor takes a color and returns it in hex code format.
420
func EncodeHexColor(color color.RGBA) string {
810✔
421
        return fmt.Sprintf("#%02x%02x%02x", color.R, color.G, color.B)
810✔
422
}
810✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc