• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.29
/graph/db/notifications.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "image/color"
6
        "net"
7
        "strconv"
8
        "sync"
9
        "sync/atomic"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcutil"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/go-errors/errors"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/lnutils"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
)
19

20
// topologyManager holds all the fields required to manage the network topology
21
// subscriptions and notifications.
22
type topologyManager struct {
23
        // ntfnClientCounter is an atomic counter that's used to assign unique
24
        // notification client IDs to new clients.
25
        ntfnClientCounter atomic.Uint64
26

27
        // topologyUpdate is a channel that carries new topology updates
28
        // messages from outside the ChannelGraph to be processed by the
29
        // networkHandler.
30
        topologyUpdate chan any
31

32
        // topologyClients maps a client's unique notification ID to a
33
        // topologyClient client that contains its notification dispatch
34
        // channel.
35
        topologyClients *lnutils.SyncMap[uint64, *topologyClient]
36

37
        // ntfnClientUpdates is a channel that's used to send new updates to
38
        // topology notification clients to the ChannelGraph. Updates either
39
        // add a new notification client, or cancel notifications for an
40
        // existing client.
41
        ntfnClientUpdates chan *topologyClientUpdate
42
}
43

44
// newTopologyManager creates a new instance of the topologyManager.
45
func newTopologyManager() *topologyManager {
3✔
46
        return &topologyManager{
3✔
47
                topologyUpdate:    make(chan any),
3✔
48
                topologyClients:   &lnutils.SyncMap[uint64, *topologyClient]{},
3✔
49
                ntfnClientUpdates: make(chan *topologyClientUpdate),
3✔
50
        }
3✔
51
}
3✔
52

53
// TopologyClient represents an intent to receive notifications from the
54
// channel router regarding changes to the topology of the channel graph. The
55
// TopologyChanges channel will be sent upon with new updates to the channel
56
// graph in real-time as they're encountered.
57
type TopologyClient struct {
58
        // TopologyChanges is a receive only channel that new channel graph
59
        // updates will be sent over.
60
        //
61
        // TODO(roasbeef): chan for each update type instead?
62
        TopologyChanges <-chan *TopologyChange
63

64
        // Cancel is a function closure that should be executed when the client
65
        // wishes to cancel their notification intent. Doing so allows the
66
        // ChannelRouter to free up resources.
67
        Cancel func()
68
}
69

70
// topologyClientUpdate is a message sent to the channel router to either
71
// register a new topology client or re-register an existing client.
72
type topologyClientUpdate struct {
73
        // cancel indicates if the update to the client is cancelling an
74
        // existing client's notifications. If not then this update will be to
75
        // register a new set of notifications.
76
        cancel bool
77

78
        // clientID is the unique identifier for this client. Any further
79
        // updates (deleting or adding) to this notification client will be
80
        // dispatched according to the target clientID.
81
        clientID uint64
82

83
        // ntfnChan is a *send-only* channel in which notifications should be
84
        // sent over from router -> client.
85
        ntfnChan chan<- *TopologyChange
86
}
87

88
// SubscribeTopology returns a new topology client which can be used by the
89
// caller to receive notifications whenever a change in the channel graph
90
// topology occurs. Changes that will be sent at notifications include: new
91
// nodes appearing, node updating their attributes, new channels, channels
92
// closing, and updates in the routing policies of a channel's directed edges.
93
func (c *ChannelGraph) SubscribeTopology() (*TopologyClient, error) {
3✔
94
        // If the router is not yet started, return an error to avoid a
3✔
95
        // deadlock waiting for it to handle the subscription request.
3✔
96
        if !c.started.Load() {
3✔
97
                return nil, fmt.Errorf("router not started")
×
98
        }
×
99

100
        // We'll first atomically obtain the next ID for this client from the
101
        // incrementing client ID counter.
102
        clientID := c.ntfnClientCounter.Add(1)
3✔
103

3✔
104
        log.Debugf("New graph topology client subscription, client %v",
3✔
105
                clientID)
3✔
106

3✔
107
        ntfnChan := make(chan *TopologyChange, 10)
3✔
108

3✔
109
        select {
3✔
110
        case c.ntfnClientUpdates <- &topologyClientUpdate{
111
                cancel:   false,
112
                clientID: clientID,
113
                ntfnChan: ntfnChan,
114
        }:
3✔
115
        case <-c.quit:
×
116
                return nil, errors.New("ChannelRouter shutting down")
×
117
        }
118

119
        return &TopologyClient{
3✔
120
                TopologyChanges: ntfnChan,
3✔
121
                Cancel: func() {
6✔
122
                        select {
3✔
123
                        case c.ntfnClientUpdates <- &topologyClientUpdate{
124
                                cancel:   true,
125
                                clientID: clientID,
126
                        }:
3✔
UNCOV
127
                        case <-c.quit:
×
UNCOV
128
                                return
×
129
                        }
130
                },
131
        }, nil
132
}
133

134
// topologyClient is a data-structure use by the channel router to couple the
135
// client's notification channel along with a special "exit" channel that can
136
// be used to cancel all lingering goroutines blocked on a send to the
137
// notification channel.
138
type topologyClient struct {
139
        // ntfnChan is a send-only channel that's used to propagate
140
        // notification s from the channel router to an instance of a
141
        // topologyClient client.
142
        ntfnChan chan<- *TopologyChange
143

144
        // exit is a channel that is used internally by the channel router to
145
        // cancel any active un-consumed goroutine notifications.
146
        exit chan struct{}
147

148
        wg sync.WaitGroup
149
}
150

151
// notifyTopologyChange notifies all registered clients of a new change in
152
// graph topology in a non-blocking.
153
//
154
// NOTE: this should only ever be called from a call-stack originating from the
155
// handleTopologySubscriptions handler.
156
func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) {
3✔
157
        // notifyClient is a helper closure that will send topology updates to
3✔
158
        // the given client.
3✔
159
        notifyClient := func(clientID uint64, client *topologyClient) bool {
6✔
160
                client.wg.Add(1)
3✔
161

3✔
162
                log.Tracef("Sending topology notification to client=%v, "+
3✔
163
                        "NodeUpdates=%v, ChannelEdgeUpdates=%v, "+
3✔
164
                        "ClosedChannels=%v", clientID,
3✔
165
                        len(topologyDiff.NodeUpdates),
3✔
166
                        len(topologyDiff.ChannelEdgeUpdates),
3✔
167
                        len(topologyDiff.ClosedChannels))
3✔
168

3✔
169
                go func(t *topologyClient) {
6✔
170
                        defer t.wg.Done()
3✔
171

3✔
172
                        select {
3✔
173

174
                        // In this case we'll try to send the notification
175
                        // directly to the upstream client consumer.
176
                        case t.ntfnChan <- topologyDiff:
3✔
177

178
                        // If the client cancels the notifications, then we'll
179
                        // exit early.
180
                        case <-t.exit:
×
181

182
                        // Similarly, if the ChannelRouter itself exists early,
183
                        // then we'll also exit ourselves.
184
                        case <-c.quit:
×
185
                        }
186
                }(client)
187

188
                // Always return true here so the following Range will iterate
189
                // all clients.
190
                return true
3✔
191
        }
192

193
        // Range over the set of active clients, and attempt to send the
194
        // topology updates.
195
        c.topologyClients.Range(notifyClient)
3✔
196
}
197

198
// handleTopologyUpdate is responsible for sending any topology changes
199
// notifications to registered clients.
200
//
201
// NOTE: must be run inside goroutine and must only ever be called from within
202
// handleTopologySubscriptions.
203
func (c *ChannelGraph) handleTopologyUpdate(update any) {
3✔
204
        defer c.wg.Done()
3✔
205

3✔
206
        topChange := &TopologyChange{}
3✔
207
        err := c.addToTopologyChange(topChange, update)
3✔
208
        if err != nil {
3✔
209
                log.Errorf("unable to update topology change notification: %v",
×
210
                        err)
×
211
                return
×
212
        }
×
213

214
        if topChange.isEmpty() {
6✔
215
                return
3✔
216
        }
3✔
217

218
        c.notifyTopologyChange(topChange)
3✔
219
}
220

221
// TopologyChange represents a new set of modifications to the channel graph.
222
// Topology changes will be dispatched in real-time as the ChannelGraph
223
// validates and process modifications to the authenticated channel graph.
224
type TopologyChange struct {
225
        // NodeUpdates is a slice of nodes which are either new to the channel
226
        // graph, or have had their attributes updated in an authenticated
227
        // manner.
228
        NodeUpdates []*NetworkNodeUpdate
229

230
        // ChanelEdgeUpdates is a slice of channel edges which are either newly
231
        // opened and authenticated, or have had their routing policies
232
        // updated.
233
        ChannelEdgeUpdates []*ChannelEdgeUpdate
234

235
        // ClosedChannels contains a slice of close channel summaries which
236
        // described which block a channel was closed at, and also carry
237
        // supplemental information such as the capacity of the former channel.
238
        ClosedChannels []*ClosedChanSummary
239
}
240

241
// isEmpty returns true if the TopologyChange is empty. A TopologyChange is
242
// considered empty, if it contains no *new* updates of any type.
243
func (t *TopologyChange) isEmpty() bool {
3✔
244
        return len(t.NodeUpdates) == 0 && len(t.ChannelEdgeUpdates) == 0 &&
3✔
245
                len(t.ClosedChannels) == 0
3✔
246
}
3✔
247

248
// ClosedChanSummary is a summary of a channel that was detected as being
249
// closed by monitoring the blockchain. Once a channel's funding point has been
250
// spent, the channel will automatically be marked as closed by the
251
// ChainNotifier.
252
//
253
// TODO(roasbeef): add nodes involved?
254
type ClosedChanSummary struct {
255
        // ChanID is the short-channel ID which uniquely identifies the
256
        // channel.
257
        ChanID uint64
258

259
        // Capacity was the total capacity of the channel before it was closed.
260
        Capacity btcutil.Amount
261

262
        // ClosedHeight is the height in the chain that the channel was closed
263
        // at.
264
        ClosedHeight uint32
265

266
        // ChanPoint is the funding point, or the multi-sig utxo which
267
        // previously represented the channel.
268
        ChanPoint wire.OutPoint
269
}
270

271
// createCloseSummaries takes in a slice of channels closed at the target block
272
// height and creates a slice of summaries which of each channel closure.
273
func createCloseSummaries(blockHeight uint32,
274
        closedChans ...*models.ChannelEdgeInfo) []*ClosedChanSummary {
3✔
275

3✔
276
        closeSummaries := make([]*ClosedChanSummary, len(closedChans))
3✔
277
        for i, closedChan := range closedChans {
6✔
278
                closeSummaries[i] = &ClosedChanSummary{
3✔
279
                        ChanID:       closedChan.ChannelID,
3✔
280
                        Capacity:     closedChan.Capacity,
3✔
281
                        ClosedHeight: blockHeight,
3✔
282
                        ChanPoint:    closedChan.ChannelPoint,
3✔
283
                }
3✔
284
        }
3✔
285

286
        return closeSummaries
3✔
287
}
288

289
// NetworkNodeUpdate is an update for a  node within the Lightning Network. A
290
// NetworkNodeUpdate is sent out either when a new node joins the network, or a
291
// node broadcasts a new update with a newer time stamp that supersedes its
292
// old update. All updates are properly authenticated.
293
type NetworkNodeUpdate struct {
294
        // Addresses is a slice of all the node's known addresses.
295
        Addresses []net.Addr
296

297
        // IdentityKey is the identity public key of the target node. This is
298
        // used to encrypt onion blobs as well as to authenticate any new
299
        // updates.
300
        IdentityKey *btcec.PublicKey
301

302
        // Alias is the alias or nick name of the node.
303
        Alias string
304

305
        // Color is the node's color in hex code format.
306
        Color string
307

308
        // Features holds the set of features the node supports.
309
        Features *lnwire.FeatureVector
310
}
311

312
// ChannelEdgeUpdate is an update for a new channel within the ChannelGraph.
313
// This update is sent out once a new authenticated channel edge is discovered
314
// within the network. These updates are directional, so if a channel is fully
315
// public, then there will be two updates sent out: one for each direction
316
// within the channel. Each update will carry that particular routing edge
317
// policy for the channel direction.
318
//
319
// An edge is a channel in the direction of AdvertisingNode -> ConnectingNode.
320
type ChannelEdgeUpdate struct {
321
        // ChanID is the unique short channel ID for the channel. This encodes
322
        // where in the blockchain the channel's funding transaction was
323
        // originally confirmed.
324
        ChanID uint64
325

326
        // ChanPoint is the outpoint which represents the multi-sig funding
327
        // output for the channel.
328
        ChanPoint wire.OutPoint
329

330
        // Capacity is the capacity of the newly created channel.
331
        Capacity btcutil.Amount
332

333
        // MinHTLC is the minimum HTLC amount that this channel will forward.
334
        MinHTLC lnwire.MilliSatoshi
335

336
        // MaxHTLC is the maximum HTLC amount that this channel will forward.
337
        MaxHTLC lnwire.MilliSatoshi
338

339
        // BaseFee is the base fee that will charged for all HTLC's forwarded
340
        // across the this channel direction.
341
        BaseFee lnwire.MilliSatoshi
342

343
        // FeeRate is the fee rate that will be shared for all HTLC's forwarded
344
        // across this channel direction.
345
        FeeRate lnwire.MilliSatoshi
346

347
        // TimeLockDelta is the time-lock expressed in blocks that will be
348
        // added to outgoing HTLC's from incoming HTLC's. This value is the
349
        // difference of the incoming and outgoing HTLC's time-locks routed
350
        // through this hop.
351
        TimeLockDelta uint16
352

353
        // AdvertisingNode is the node that's advertising this edge.
354
        AdvertisingNode *btcec.PublicKey
355

356
        // ConnectingNode is the node that the advertising node connects to.
357
        ConnectingNode *btcec.PublicKey
358

359
        // Disabled, if true, signals that the channel is unavailable to relay
360
        // payments.
361
        Disabled bool
362

363
        // ExtraOpaqueData is the set of data that was appended to this message
364
        // to fill out the full maximum transport message size. These fields can
365
        // be used to specify optional data such as custom TLV fields.
366
        ExtraOpaqueData lnwire.ExtraOpaqueData
367
}
368

369
// appendTopologyChange appends the passed update message to the passed
370
// TopologyChange, properly identifying which type of update the message
371
// constitutes. This function will also fetch any required auxiliary
372
// information required to create the topology change update from the graph
373
// database.
374
func (c *ChannelGraph) addToTopologyChange(update *TopologyChange,
375
        msg any) error {
3✔
376

3✔
377
        switch m := msg.(type) {
3✔
378

379
        // Any node announcement maps directly to a NetworkNodeUpdate struct.
380
        // No further data munging or db queries are required.
381
        case *models.LightningNode:
3✔
382
                pubKey, err := m.PubKey()
3✔
383
                if err != nil {
3✔
384
                        return err
×
385
                }
×
386

387
                nodeUpdate := &NetworkNodeUpdate{
3✔
388
                        Addresses:   m.Addresses,
3✔
389
                        IdentityKey: pubKey,
3✔
390
                        Alias:       m.Alias,
3✔
391
                        Color:       EncodeHexColor(m.Color),
3✔
392
                        Features:    m.Features.Clone(),
3✔
393
                }
3✔
394

3✔
395
                update.NodeUpdates = append(update.NodeUpdates, nodeUpdate)
3✔
396
                return nil
3✔
397

398
        // We ignore initial channel announcements as we'll only send out
399
        // updates once the individual edges themselves have been updated.
400
        case *models.ChannelEdgeInfo:
3✔
401
                return nil
3✔
402

403
        // Any new ChannelUpdateAnnouncements will generate a corresponding
404
        // ChannelEdgeUpdate notification.
405
        case *models.ChannelEdgePolicy:
3✔
406
                // We'll need to fetch the edge's information from the database
3✔
407
                // in order to get the information concerning which nodes are
3✔
408
                // being connected.
3✔
409
                edgeInfo, _, _, err := c.FetchChannelEdgesByID(m.ChannelID)
3✔
410
                if err != nil {
3✔
411
                        return errors.Errorf("unable fetch channel edge: %v",
×
412
                                err)
×
413
                }
×
414

415
                // If the flag is one, then the advertising node is actually
416
                // the second node.
417
                sourceNode := edgeInfo.NodeKey1
3✔
418
                connectingNode := edgeInfo.NodeKey2
3✔
419
                if m.ChannelFlags&lnwire.ChanUpdateDirection == 1 {
6✔
420
                        sourceNode = edgeInfo.NodeKey2
3✔
421
                        connectingNode = edgeInfo.NodeKey1
3✔
422
                }
3✔
423

424
                aNode, err := sourceNode()
3✔
425
                if err != nil {
3✔
426
                        return err
×
427
                }
×
428
                cNode, err := connectingNode()
3✔
429
                if err != nil {
3✔
430
                        return err
×
431
                }
×
432

433
                edgeUpdate := &ChannelEdgeUpdate{
3✔
434
                        ChanID:          m.ChannelID,
3✔
435
                        ChanPoint:       edgeInfo.ChannelPoint,
3✔
436
                        TimeLockDelta:   m.TimeLockDelta,
3✔
437
                        Capacity:        edgeInfo.Capacity,
3✔
438
                        MinHTLC:         m.MinHTLC,
3✔
439
                        MaxHTLC:         m.MaxHTLC,
3✔
440
                        BaseFee:         m.FeeBaseMSat,
3✔
441
                        FeeRate:         m.FeeProportionalMillionths,
3✔
442
                        AdvertisingNode: aNode,
3✔
443
                        ConnectingNode:  cNode,
3✔
444
                        Disabled:        m.ChannelFlags&lnwire.ChanUpdateDisabled != 0,
3✔
445
                        ExtraOpaqueData: m.ExtraOpaqueData,
3✔
446
                }
3✔
447

3✔
448
                // TODO(roasbeef): add bit to toggle
3✔
449
                update.ChannelEdgeUpdates = append(update.ChannelEdgeUpdates,
3✔
450
                        edgeUpdate)
3✔
451
                return nil
3✔
452

453
        case []*ClosedChanSummary:
3✔
454
                update.ClosedChannels = append(update.ClosedChannels, m...)
3✔
455
                return nil
3✔
456

457
        default:
×
458
                return fmt.Errorf("unable to add to topology change, "+
×
459
                        "unknown message type %T", msg)
×
460
        }
461
}
462

463
// EncodeHexColor takes a color and returns it in hex code format.
464
func EncodeHexColor(color color.RGBA) string {
3✔
465
        return fmt.Sprintf("#%02x%02x%02x", color.R, color.G, color.B)
3✔
466
}
3✔
467

468
// DecodeHexColor takes a hex color string like "#rrggbb" and returns a
469
// color.RGBA.
470
func DecodeHexColor(hex string) (color.RGBA, error) {
×
471
        r, err := strconv.ParseUint(hex[1:3], 16, 8)
×
472
        if err != nil {
×
473
                return color.RGBA{}, fmt.Errorf("invalid red component: %w",
×
474
                        err)
×
475
        }
×
476

477
        g, err := strconv.ParseUint(hex[3:5], 16, 8)
×
478
        if err != nil {
×
479
                return color.RGBA{}, fmt.Errorf("invalid green component: %w",
×
480
                        err)
×
481
        }
×
482

483
        b, err := strconv.ParseUint(hex[5:7], 16, 8)
×
484
        if err != nil {
×
485
                return color.RGBA{}, fmt.Errorf("invalid blue component: %w",
×
486
                        err)
×
487
        }
×
488

489
        return color.RGBA{
×
490
                R: uint8(r),
×
491
                G: uint8(g),
×
492
                B: uint8(b),
×
493
        }, nil
×
494
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc