• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14923997239

09 May 2025 07:40AM UTC coverage: 58.58% (-0.01%) from 58.59%
14923997239

Pull #9798

github

web-flow
Merge 863e269ea into ee25c228e
Pull Request #9798: graph/db: synchronous topology client subscriptions/cancellations

20 of 22 new or added lines in 2 files covered. (90.91%)

58 existing lines in 10 files now uncovered.

97408 of 166283 relevant lines covered (58.58%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.37
/graph/db/notifications.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "image/color"
6
        "net"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/go-errors/errors"
14
        "github.com/lightningnetwork/lnd/graph/db/models"
15
        "github.com/lightningnetwork/lnd/lnutils"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
// topologyManager holds all the fields required to manage the network topology
20
// subscriptions and notifications.
21
type topologyManager struct {
22
        // ntfnClientCounter is an atomic counter that's used to assign unique
23
        // notification client IDs to new clients.
24
        ntfnClientCounter atomic.Uint64
25

26
        // topologyUpdate is a channel that carries new topology updates
27
        // messages from outside the ChannelGraph to be processed by the
28
        // networkHandler.
29
        topologyUpdate chan any
30

31
        // topologyClients maps a client's unique notification ID to a
32
        // topologyClient client that contains its notification dispatch
33
        // channel.
34
        topologyClients *lnutils.SyncMap[uint64, *topologyClient]
35

36
        // ntfnClientUpdates is a channel that's used to send new updates to
37
        // topology notification clients to the ChannelGraph. Updates either
38
        // add a new notification client, or cancel notifications for an
39
        // existing client.
40
        ntfnClientUpdates chan *topologyClientUpdate
41
}
42

43
// newTopologyManager creates a new instance of the topologyManager.
44
func newTopologyManager() *topologyManager {
3✔
45
        return &topologyManager{
3✔
46
                topologyUpdate:    make(chan any),
3✔
47
                topologyClients:   &lnutils.SyncMap[uint64, *topologyClient]{},
3✔
48
                ntfnClientUpdates: make(chan *topologyClientUpdate),
3✔
49
        }
3✔
50
}
3✔
51

52
// TopologyClient represents an intent to receive notifications from the
53
// channel router regarding changes to the topology of the channel graph. The
54
// TopologyChanges channel will be sent upon with new updates to the channel
55
// graph in real-time as they're encountered.
56
type TopologyClient struct {
57
        // TopologyChanges is a receive only channel that new channel graph
58
        // updates will be sent over.
59
        //
60
        // TODO(roasbeef): chan for each update type instead?
61
        TopologyChanges <-chan *TopologyChange
62

63
        // Cancel is a function closure that should be executed when the client
64
        // wishes to cancel their notification intent. Doing so allows the
65
        // ChannelRouter to free up resources.
66
        Cancel func()
67
}
68

69
// topologyClientUpdate is a message sent to the channel router to either
70
// register a new topology client or re-register an existing client.
71
type topologyClientUpdate struct {
72
        // cancel indicates if the update to the client is cancelling an
73
        // existing client's notifications. If not then this update will be to
74
        // register a new set of notifications.
75
        cancel bool
76

77
        // clientID is the unique identifier for this client. Any further
78
        // updates (deleting or adding) to this notification client will be
79
        // dispatched according to the target clientID.
80
        clientID uint64
81

82
        // ntfnChan is a *send-only* channel in which notifications should be
83
        // sent over from router -> client.
84
        ntfnChan chan<- *TopologyChange
85

86
        // updateHandled is closed once the given update has been handled by the
87
        // topology client.
88
        updateHandled chan struct{}
89
}
90

91
// SubscribeTopology returns a new topology client which can be used by the
92
// caller to receive notifications whenever a change in the channel graph
93
// topology occurs. Changes that will be sent at notifications include: new
94
// nodes appearing, node updating their attributes, new channels, channels
95
// closing, and updates in the routing policies of a channel's directed edges.
96
func (c *ChannelGraph) SubscribeTopology() (*TopologyClient, error) {
3✔
97
        // If the router is not yet started, return an error to avoid a
3✔
98
        // deadlock waiting for it to handle the subscription request.
3✔
99
        if !c.started.Load() {
3✔
100
                return nil, fmt.Errorf("router not started")
×
101
        }
×
102

103
        // We'll first atomically obtain the next ID for this client from the
104
        // incrementing client ID counter.
105
        clientID := c.ntfnClientCounter.Add(1)
3✔
106

3✔
107
        log.Debugf("New graph topology client subscription, client %v",
3✔
108
                clientID)
3✔
109

3✔
110
        var (
3✔
111
                ntfnChan   = make(chan *TopologyChange, 10)
3✔
112
                subscribed = make(chan struct{})
3✔
113
        )
3✔
114

3✔
115
        select {
3✔
116
        case c.ntfnClientUpdates <- &topologyClientUpdate{
117
                cancel:        false,
118
                clientID:      clientID,
119
                ntfnChan:      ntfnChan,
120
                updateHandled: subscribed,
121
        }:
3✔
122
        case <-c.quit:
×
123
                return nil, errors.New("ChannelRouter shutting down")
×
124
        }
125

126
        // Wait for the synchronous signal that the subscription has been
127
        // handled.
128
        select {
3✔
129
        case <-subscribed:
3✔
NEW
130
        case <-c.quit:
×
NEW
131
                return nil, errors.New("ChannelRouter shutting down")
×
132
        }
133

134
        return &TopologyClient{
3✔
135
                TopologyChanges: ntfnChan,
3✔
136
                Cancel: func() {
6✔
137
                        cancelled := make(chan struct{})
3✔
138

3✔
139
                        select {
3✔
140
                        case c.ntfnClientUpdates <- &topologyClientUpdate{
141
                                cancel:        true,
142
                                clientID:      clientID,
143
                                updateHandled: cancelled,
144
                        }:
3✔
145
                        case <-c.quit:
1✔
146
                                return
1✔
147
                        }
148

149
                        // Wait for the synchronous signal that the subscription
150
                        // has been cancelled.
151
                        select {
3✔
152
                        case <-cancelled:
3✔
153
                        case <-c.quit:
1✔
154
                                return
1✔
155
                        }
156
                },
157
        }, nil
158
}
159

160
// topologyClient is a data-structure use by the channel router to couple the
161
// client's notification channel along with a special "exit" channel that can
162
// be used to cancel all lingering goroutines blocked on a send to the
163
// notification channel.
164
type topologyClient struct {
165
        // ntfnChan is a send-only channel that's used to propagate
166
        // notification s from the channel router to an instance of a
167
        // topologyClient client.
168
        ntfnChan chan<- *TopologyChange
169

170
        // exit is a channel that is used internally by the channel router to
171
        // cancel any active un-consumed goroutine notifications.
172
        exit chan struct{}
173

174
        wg sync.WaitGroup
175
}
176

177
// notifyTopologyChange notifies all registered clients of a new change in
178
// graph topology in a non-blocking.
179
func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) {
3✔
180
        // notifyClient is a helper closure that will send topology updates to
3✔
181
        // the given client.
3✔
182
        notifyClient := func(clientID uint64, client *topologyClient) bool {
6✔
183
                client.wg.Add(1)
3✔
184

3✔
185
                log.Tracef("Sending topology notification to client=%v, "+
3✔
186
                        "NodeUpdates=%v, ChannelEdgeUpdates=%v, "+
3✔
187
                        "ClosedChannels=%v", clientID,
3✔
188
                        len(topologyDiff.NodeUpdates),
3✔
189
                        len(topologyDiff.ChannelEdgeUpdates),
3✔
190
                        len(topologyDiff.ClosedChannels))
3✔
191

3✔
192
                go func(t *topologyClient) {
6✔
193
                        defer t.wg.Done()
3✔
194

3✔
195
                        select {
3✔
196

197
                        // In this case we'll try to send the notification
198
                        // directly to the upstream client consumer.
199
                        case t.ntfnChan <- topologyDiff:
3✔
200

201
                        // If the client cancels the notifications, then we'll
202
                        // exit early.
203
                        case <-t.exit:
×
204

205
                        // Similarly, if the ChannelRouter itself exists early,
206
                        // then we'll also exit ourselves.
207
                        case <-c.quit:
×
208
                        }
209
                }(client)
210

211
                // Always return true here so the following Range will iterate
212
                // all clients.
213
                return true
3✔
214
        }
215

216
        // Range over the set of active clients, and attempt to send the
217
        // topology updates.
218
        c.topologyClients.Range(notifyClient)
3✔
219
}
220

221
// handleTopologyUpdate is responsible for sending any topology changes
222
// notifications to registered clients.
223
//
224
// NOTE: must be run inside goroutine.
225
func (c *ChannelGraph) handleTopologyUpdate(update any) {
3✔
226
        defer c.wg.Done()
3✔
227

3✔
228
        topChange := &TopologyChange{}
3✔
229
        err := c.addToTopologyChange(topChange, update)
3✔
230
        if err != nil {
3✔
231
                log.Errorf("unable to update topology change notification: %v",
×
232
                        err)
×
233
                return
×
234
        }
×
235

236
        if topChange.isEmpty() {
6✔
237
                return
3✔
238
        }
3✔
239

240
        c.notifyTopologyChange(topChange)
3✔
241
}
242

243
// TopologyChange represents a new set of modifications to the channel graph.
244
// Topology changes will be dispatched in real-time as the ChannelGraph
245
// validates and process modifications to the authenticated channel graph.
246
type TopologyChange struct {
247
        // NodeUpdates is a slice of nodes which are either new to the channel
248
        // graph, or have had their attributes updated in an authenticated
249
        // manner.
250
        NodeUpdates []*NetworkNodeUpdate
251

252
        // ChanelEdgeUpdates is a slice of channel edges which are either newly
253
        // opened and authenticated, or have had their routing policies
254
        // updated.
255
        ChannelEdgeUpdates []*ChannelEdgeUpdate
256

257
        // ClosedChannels contains a slice of close channel summaries which
258
        // described which block a channel was closed at, and also carry
259
        // supplemental information such as the capacity of the former channel.
260
        ClosedChannels []*ClosedChanSummary
261
}
262

263
// isEmpty returns true if the TopologyChange is empty. A TopologyChange is
264
// considered empty, if it contains no *new* updates of any type.
265
func (t *TopologyChange) isEmpty() bool {
3✔
266
        return len(t.NodeUpdates) == 0 && len(t.ChannelEdgeUpdates) == 0 &&
3✔
267
                len(t.ClosedChannels) == 0
3✔
268
}
3✔
269

270
// ClosedChanSummary is a summary of a channel that was detected as being
271
// closed by monitoring the blockchain. Once a channel's funding point has been
272
// spent, the channel will automatically be marked as closed by the
273
// ChainNotifier.
274
//
275
// TODO(roasbeef): add nodes involved?
276
type ClosedChanSummary struct {
277
        // ChanID is the short-channel ID which uniquely identifies the
278
        // channel.
279
        ChanID uint64
280

281
        // Capacity was the total capacity of the channel before it was closed.
282
        Capacity btcutil.Amount
283

284
        // ClosedHeight is the height in the chain that the channel was closed
285
        // at.
286
        ClosedHeight uint32
287

288
        // ChanPoint is the funding point, or the multi-sig utxo which
289
        // previously represented the channel.
290
        ChanPoint wire.OutPoint
291
}
292

293
// createCloseSummaries takes in a slice of channels closed at the target block
294
// height and creates a slice of summaries which of each channel closure.
295
func createCloseSummaries(blockHeight uint32,
296
        closedChans ...*models.ChannelEdgeInfo) []*ClosedChanSummary {
3✔
297

3✔
298
        closeSummaries := make([]*ClosedChanSummary, len(closedChans))
3✔
299
        for i, closedChan := range closedChans {
6✔
300
                closeSummaries[i] = &ClosedChanSummary{
3✔
301
                        ChanID:       closedChan.ChannelID,
3✔
302
                        Capacity:     closedChan.Capacity,
3✔
303
                        ClosedHeight: blockHeight,
3✔
304
                        ChanPoint:    closedChan.ChannelPoint,
3✔
305
                }
3✔
306
        }
3✔
307

308
        return closeSummaries
3✔
309
}
310

311
// NetworkNodeUpdate is an update for a  node within the Lightning Network. A
312
// NetworkNodeUpdate is sent out either when a new node joins the network, or a
313
// node broadcasts a new update with a newer time stamp that supersedes its
314
// old update. All updates are properly authenticated.
315
type NetworkNodeUpdate struct {
316
        // Addresses is a slice of all the node's known addresses.
317
        Addresses []net.Addr
318

319
        // IdentityKey is the identity public key of the target node. This is
320
        // used to encrypt onion blobs as well as to authenticate any new
321
        // updates.
322
        IdentityKey *btcec.PublicKey
323

324
        // Alias is the alias or nick name of the node.
325
        Alias string
326

327
        // Color is the node's color in hex code format.
328
        Color string
329

330
        // Features holds the set of features the node supports.
331
        Features *lnwire.FeatureVector
332
}
333

334
// ChannelEdgeUpdate is an update for a new channel within the ChannelGraph.
335
// This update is sent out once a new authenticated channel edge is discovered
336
// within the network. These updates are directional, so if a channel is fully
337
// public, then there will be two updates sent out: one for each direction
338
// within the channel. Each update will carry that particular routing edge
339
// policy for the channel direction.
340
//
341
// An edge is a channel in the direction of AdvertisingNode -> ConnectingNode.
342
type ChannelEdgeUpdate struct {
343
        // ChanID is the unique short channel ID for the channel. This encodes
344
        // where in the blockchain the channel's funding transaction was
345
        // originally confirmed.
346
        ChanID uint64
347

348
        // ChanPoint is the outpoint which represents the multi-sig funding
349
        // output for the channel.
350
        ChanPoint wire.OutPoint
351

352
        // Capacity is the capacity of the newly created channel.
353
        Capacity btcutil.Amount
354

355
        // MinHTLC is the minimum HTLC amount that this channel will forward.
356
        MinHTLC lnwire.MilliSatoshi
357

358
        // MaxHTLC is the maximum HTLC amount that this channel will forward.
359
        MaxHTLC lnwire.MilliSatoshi
360

361
        // BaseFee is the base fee that will charged for all HTLC's forwarded
362
        // across the this channel direction.
363
        BaseFee lnwire.MilliSatoshi
364

365
        // FeeRate is the fee rate that will be shared for all HTLC's forwarded
366
        // across this channel direction.
367
        FeeRate lnwire.MilliSatoshi
368

369
        // TimeLockDelta is the time-lock expressed in blocks that will be
370
        // added to outgoing HTLC's from incoming HTLC's. This value is the
371
        // difference of the incoming and outgoing HTLC's time-locks routed
372
        // through this hop.
373
        TimeLockDelta uint16
374

375
        // AdvertisingNode is the node that's advertising this edge.
376
        AdvertisingNode *btcec.PublicKey
377

378
        // ConnectingNode is the node that the advertising node connects to.
379
        ConnectingNode *btcec.PublicKey
380

381
        // Disabled, if true, signals that the channel is unavailable to relay
382
        // payments.
383
        Disabled bool
384

385
        // ExtraOpaqueData is the set of data that was appended to this message
386
        // to fill out the full maximum transport message size. These fields can
387
        // be used to specify optional data such as custom TLV fields.
388
        ExtraOpaqueData lnwire.ExtraOpaqueData
389
}
390

391
// appendTopologyChange appends the passed update message to the passed
392
// TopologyChange, properly identifying which type of update the message
393
// constitutes. This function will also fetch any required auxiliary
394
// information required to create the topology change update from the graph
395
// database.
396
func (c *ChannelGraph) addToTopologyChange(update *TopologyChange,
397
        msg any) error {
3✔
398

3✔
399
        switch m := msg.(type) {
3✔
400

401
        // Any node announcement maps directly to a NetworkNodeUpdate struct.
402
        // No further data munging or db queries are required.
403
        case *models.LightningNode:
3✔
404
                pubKey, err := m.PubKey()
3✔
405
                if err != nil {
3✔
406
                        return err
×
407
                }
×
408

409
                nodeUpdate := &NetworkNodeUpdate{
3✔
410
                        Addresses:   m.Addresses,
3✔
411
                        IdentityKey: pubKey,
3✔
412
                        Alias:       m.Alias,
3✔
413
                        Color:       EncodeHexColor(m.Color),
3✔
414
                        Features:    m.Features.Clone(),
3✔
415
                }
3✔
416

3✔
417
                update.NodeUpdates = append(update.NodeUpdates, nodeUpdate)
3✔
418
                return nil
3✔
419

420
        // We ignore initial channel announcements as we'll only send out
421
        // updates once the individual edges themselves have been updated.
422
        case *models.ChannelEdgeInfo:
3✔
423
                return nil
3✔
424

425
        // Any new ChannelUpdateAnnouncements will generate a corresponding
426
        // ChannelEdgeUpdate notification.
427
        case *models.ChannelEdgePolicy:
3✔
428
                // We'll need to fetch the edge's information from the database
3✔
429
                // in order to get the information concerning which nodes are
3✔
430
                // being connected.
3✔
431
                edgeInfo, _, _, err := c.FetchChannelEdgesByID(m.ChannelID)
3✔
432
                if err != nil {
3✔
433
                        return errors.Errorf("unable fetch channel edge: %v",
×
434
                                err)
×
435
                }
×
436

437
                // If the flag is one, then the advertising node is actually
438
                // the second node.
439
                sourceNode := edgeInfo.NodeKey1
3✔
440
                connectingNode := edgeInfo.NodeKey2
3✔
441
                if m.ChannelFlags&lnwire.ChanUpdateDirection == 1 {
6✔
442
                        sourceNode = edgeInfo.NodeKey2
3✔
443
                        connectingNode = edgeInfo.NodeKey1
3✔
444
                }
3✔
445

446
                aNode, err := sourceNode()
3✔
447
                if err != nil {
3✔
448
                        return err
×
449
                }
×
450
                cNode, err := connectingNode()
3✔
451
                if err != nil {
3✔
452
                        return err
×
453
                }
×
454

455
                edgeUpdate := &ChannelEdgeUpdate{
3✔
456
                        ChanID:          m.ChannelID,
3✔
457
                        ChanPoint:       edgeInfo.ChannelPoint,
3✔
458
                        TimeLockDelta:   m.TimeLockDelta,
3✔
459
                        Capacity:        edgeInfo.Capacity,
3✔
460
                        MinHTLC:         m.MinHTLC,
3✔
461
                        MaxHTLC:         m.MaxHTLC,
3✔
462
                        BaseFee:         m.FeeBaseMSat,
3✔
463
                        FeeRate:         m.FeeProportionalMillionths,
3✔
464
                        AdvertisingNode: aNode,
3✔
465
                        ConnectingNode:  cNode,
3✔
466
                        Disabled:        m.ChannelFlags&lnwire.ChanUpdateDisabled != 0,
3✔
467
                        ExtraOpaqueData: m.ExtraOpaqueData,
3✔
468
                }
3✔
469

3✔
470
                // TODO(roasbeef): add bit to toggle
3✔
471
                update.ChannelEdgeUpdates = append(update.ChannelEdgeUpdates,
3✔
472
                        edgeUpdate)
3✔
473
                return nil
3✔
474

475
        default:
×
476
                return fmt.Errorf("unable to add to topology change, "+
×
477
                        "unknown message type %T", msg)
×
478
        }
479
}
480

481
// EncodeHexColor takes a color and returns it in hex code format.
482
func EncodeHexColor(color color.RGBA) string {
3✔
483
        return fmt.Sprintf("#%02x%02x%02x", color.R, color.G, color.B)
3✔
484
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc