• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15959600311

29 Jun 2025 09:33PM UTC coverage: 67.577% (-0.03%) from 67.606%
15959600311

Pull #8825

github

web-flow
Merge b3542eca4 into 6290edf14
Pull Request #8825: lnd: use persisted node announcement settings across restarts

44 of 49 new or added lines in 1 file covered. (89.8%)

92 existing lines in 17 files now uncovered.

135081 of 199891 relevant lines covered (67.58%)

21854.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.67
/graph/db/notifications.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "image/color"
6
        "net"
7
        "strconv"
8
        "sync"
9
        "sync/atomic"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcutil"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/go-errors/errors"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/graph/db/models"
17
        "github.com/lightningnetwork/lnd/lnutils"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
)
20

21
// topologyManager holds all the fields required to manage the network topology
22
// subscriptions and notifications.
23
type topologyManager struct {
24
        // ntfnClientCounter is an atomic counter that's used to assign unique
25
        // notification client IDs to new clients.
26
        ntfnClientCounter atomic.Uint64
27

28
        // topologyUpdate is a channel that carries new topology updates
29
        // messages from outside the ChannelGraph to be processed by the
30
        // networkHandler.
31
        topologyUpdate chan any
32

33
        // topologyClients maps a client's unique notification ID to a
34
        // topologyClient client that contains its notification dispatch
35
        // channel.
36
        topologyClients *lnutils.SyncMap[uint64, *topologyClient]
37

38
        // ntfnClientUpdates is a channel that's used to send new updates to
39
        // topology notification clients to the ChannelGraph. Updates either
40
        // add a new notification client, or cancel notifications for an
41
        // existing client.
42
        ntfnClientUpdates chan *topologyClientUpdate
43
}
44

45
// newTopologyManager creates a new instance of the topologyManager.
46
func newTopologyManager() *topologyManager {
173✔
47
        return &topologyManager{
173✔
48
                topologyUpdate:    make(chan any),
173✔
49
                topologyClients:   &lnutils.SyncMap[uint64, *topologyClient]{},
173✔
50
                ntfnClientUpdates: make(chan *topologyClientUpdate),
173✔
51
        }
173✔
52
}
173✔
53

54
// TopologyClient represents an intent to receive notifications from the
55
// channel router regarding changes to the topology of the channel graph. The
56
// TopologyChanges channel will be sent upon with new updates to the channel
57
// graph in real-time as they're encountered.
58
type TopologyClient struct {
59
        // TopologyChanges is a receive only channel that new channel graph
60
        // updates will be sent over.
61
        //
62
        // TODO(roasbeef): chan for each update type instead?
63
        TopologyChanges <-chan *TopologyChange
64

65
        // Cancel is a function closure that should be executed when the client
66
        // wishes to cancel their notification intent. Doing so allows the
67
        // ChannelRouter to free up resources.
68
        Cancel func()
69
}
70

71
// topologyClientUpdate is a message sent to the channel router to either
72
// register a new topology client or re-register an existing client.
73
type topologyClientUpdate struct {
74
        // cancel indicates if the update to the client is cancelling an
75
        // existing client's notifications. If not then this update will be to
76
        // register a new set of notifications.
77
        cancel bool
78

79
        // clientID is the unique identifier for this client. Any further
80
        // updates (deleting or adding) to this notification client will be
81
        // dispatched according to the target clientID.
82
        clientID uint64
83

84
        // ntfnChan is a *send-only* channel in which notifications should be
85
        // sent over from router -> client.
86
        ntfnChan chan<- *TopologyChange
87
}
88

89
// SubscribeTopology returns a new topology client which can be used by the
90
// caller to receive notifications whenever a change in the channel graph
91
// topology occurs. Changes that will be sent at notifications include: new
92
// nodes appearing, node updating their attributes, new channels, channels
93
// closing, and updates in the routing policies of a channel's directed edges.
94
func (c *ChannelGraph) SubscribeTopology() (*TopologyClient, error) {
7✔
95
        // If the router is not yet started, return an error to avoid a
7✔
96
        // deadlock waiting for it to handle the subscription request.
7✔
97
        if !c.started.Load() {
7✔
98
                return nil, fmt.Errorf("router not started")
×
99
        }
×
100

101
        // We'll first atomically obtain the next ID for this client from the
102
        // incrementing client ID counter.
103
        clientID := c.ntfnClientCounter.Add(1)
7✔
104

7✔
105
        log.Debugf("New graph topology client subscription, client %v",
7✔
106
                clientID)
7✔
107

7✔
108
        ntfnChan := make(chan *TopologyChange, 10)
7✔
109

7✔
110
        select {
7✔
111
        case c.ntfnClientUpdates <- &topologyClientUpdate{
112
                cancel:   false,
113
                clientID: clientID,
114
                ntfnChan: ntfnChan,
115
        }:
7✔
116
        case <-c.quit:
×
117
                return nil, errors.New("ChannelRouter shutting down")
×
118
        }
119

120
        return &TopologyClient{
7✔
121
                TopologyChanges: ntfnChan,
7✔
122
                Cancel: func() {
11✔
123
                        select {
4✔
124
                        case c.ntfnClientUpdates <- &topologyClientUpdate{
125
                                cancel:   true,
126
                                clientID: clientID,
127
                        }:
4✔
128
                        case <-c.quit:
×
129
                                return
×
130
                        }
131
                },
132
        }, nil
133
}
134

135
// topologyClient is a data-structure use by the channel router to couple the
136
// client's notification channel along with a special "exit" channel that can
137
// be used to cancel all lingering goroutines blocked on a send to the
138
// notification channel.
139
type topologyClient struct {
140
        // ntfnChan is a send-only channel that's used to propagate
141
        // notification s from the channel router to an instance of a
142
        // topologyClient client.
143
        ntfnChan chan<- *TopologyChange
144

145
        // exit is a channel that is used internally by the channel router to
146
        // cancel any active un-consumed goroutine notifications.
147
        exit chan struct{}
148

149
        wg sync.WaitGroup
150
}
151

152
// notifyTopologyChange notifies all registered clients of a new change in
153
// graph topology in a non-blocking.
154
//
155
// NOTE: this should only ever be called from a call-stack originating from the
156
// handleTopologySubscriptions handler.
157
func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) {
3,399✔
158
        // notifyClient is a helper closure that will send topology updates to
3,399✔
159
        // the given client.
3,399✔
160
        notifyClient := func(clientID uint64, client *topologyClient) bool {
3,408✔
161
                client.wg.Add(1)
9✔
162

9✔
163
                log.Tracef("Sending topology notification to client=%v, "+
9✔
164
                        "NodeUpdates=%v, ChannelEdgeUpdates=%v, "+
9✔
165
                        "ClosedChannels=%v", clientID,
9✔
166
                        len(topologyDiff.NodeUpdates),
9✔
167
                        len(topologyDiff.ChannelEdgeUpdates),
9✔
168
                        len(topologyDiff.ClosedChannels))
9✔
169

9✔
170
                go func(t *topologyClient) {
18✔
171
                        defer t.wg.Done()
9✔
172

9✔
173
                        select {
9✔
174

175
                        // In this case we'll try to send the notification
176
                        // directly to the upstream client consumer.
177
                        case t.ntfnChan <- topologyDiff:
9✔
178

179
                        // If the client cancels the notifications, then we'll
180
                        // exit early.
181
                        case <-t.exit:
×
182

183
                        // Similarly, if the ChannelRouter itself exists early,
184
                        // then we'll also exit ourselves.
185
                        case <-c.quit:
×
186
                        }
187
                }(client)
188

189
                // Always return true here so the following Range will iterate
190
                // all clients.
191
                return true
9✔
192
        }
193

194
        // Range over the set of active clients, and attempt to send the
195
        // topology updates.
196
        c.topologyClients.Range(notifyClient)
3,399✔
197
}
198

199
// handleTopologyUpdate is responsible for sending any topology changes
200
// notifications to registered clients.
201
//
202
// NOTE: must be run inside goroutine and must only ever be called from within
203
// handleTopologySubscriptions.
204
func (c *ChannelGraph) handleTopologyUpdate(update any) {
4,893✔
205
        defer c.wg.Done()
4,893✔
206

4,893✔
207
        topChange := &TopologyChange{}
4,893✔
208
        err := c.addToTopologyChange(topChange, update)
4,893✔
209
        if err != nil {
4,893✔
UNCOV
210
                log.Errorf("unable to update topology change notification: %v",
×
UNCOV
211
                        err)
×
UNCOV
212
                return
×
UNCOV
213
        }
×
214

215
        if topChange.isEmpty() {
6,390✔
216
                return
1,497✔
217
        }
1,497✔
218

219
        c.notifyTopologyChange(topChange)
3,399✔
220
}
221

222
// TopologyChange represents a new set of modifications to the channel graph.
223
// Topology changes will be dispatched in real-time as the ChannelGraph
224
// validates and process modifications to the authenticated channel graph.
225
type TopologyChange struct {
226
        // NodeUpdates is a slice of nodes which are either new to the channel
227
        // graph, or have had their attributes updated in an authenticated
228
        // manner.
229
        NodeUpdates []*NetworkNodeUpdate
230

231
        // ChanelEdgeUpdates is a slice of channel edges which are either newly
232
        // opened and authenticated, or have had their routing policies
233
        // updated.
234
        ChannelEdgeUpdates []*ChannelEdgeUpdate
235

236
        // ClosedChannels contains a slice of close channel summaries which
237
        // described which block a channel was closed at, and also carry
238
        // supplemental information such as the capacity of the former channel.
239
        ClosedChannels []*ClosedChanSummary
240
}
241

242
// isEmpty returns true if the TopologyChange is empty. A TopologyChange is
243
// considered empty, if it contains no *new* updates of any type.
244
func (t *TopologyChange) isEmpty() bool {
4,893✔
245
        return len(t.NodeUpdates) == 0 && len(t.ChannelEdgeUpdates) == 0 &&
4,893✔
246
                len(t.ClosedChannels) == 0
4,893✔
247
}
4,893✔
248

249
// ClosedChanSummary is a summary of a channel that was detected as being
250
// closed by monitoring the blockchain. Once a channel's funding point has been
251
// spent, the channel will automatically be marked as closed by the
252
// ChainNotifier.
253
//
254
// TODO(roasbeef): add nodes involved?
255
type ClosedChanSummary struct {
256
        // ChanID is the short-channel ID which uniquely identifies the
257
        // channel.
258
        ChanID uint64
259

260
        // Capacity was the total capacity of the channel before it was closed.
261
        Capacity btcutil.Amount
262

263
        // ClosedHeight is the height in the chain that the channel was closed
264
        // at.
265
        ClosedHeight uint32
266

267
        // ChanPoint is the funding point, or the multi-sig utxo which
268
        // previously represented the channel.
269
        ChanPoint wire.OutPoint
270
}
271

272
// createCloseSummaries takes in a slice of channels closed at the target block
273
// height and creates a slice of summaries which of each channel closure.
274
func createCloseSummaries(blockHeight uint32,
275
        closedChans ...*models.ChannelEdgeInfo) []*ClosedChanSummary {
25✔
276

25✔
277
        closeSummaries := make([]*ClosedChanSummary, len(closedChans))
25✔
278
        for i, closedChan := range closedChans {
53✔
279
                closeSummaries[i] = &ClosedChanSummary{
28✔
280
                        ChanID:       closedChan.ChannelID,
28✔
281
                        Capacity:     closedChan.Capacity,
28✔
282
                        ClosedHeight: blockHeight,
28✔
283
                        ChanPoint:    closedChan.ChannelPoint,
28✔
284
                }
28✔
285
        }
28✔
286

287
        return closeSummaries
25✔
288
}
289

290
// NetworkNodeUpdate is an update for a  node within the Lightning Network. A
291
// NetworkNodeUpdate is sent out either when a new node joins the network, or a
292
// node broadcasts a new update with a newer time stamp that supersedes its
293
// old update. All updates are properly authenticated.
294
type NetworkNodeUpdate struct {
295
        // Addresses is a slice of all the node's known addresses.
296
        Addresses []net.Addr
297

298
        // IdentityKey is the identity public key of the target node. This is
299
        // used to encrypt onion blobs as well as to authenticate any new
300
        // updates.
301
        IdentityKey *btcec.PublicKey
302

303
        // Alias is the alias or nick name of the node.
304
        Alias string
305

306
        // Color is the node's color in hex code format.
307
        Color string
308

309
        // Features holds the set of features the node supports.
310
        Features *lnwire.FeatureVector
311
}
312

313
// ChannelEdgeUpdate is an update for a new channel within the ChannelGraph.
314
// This update is sent out once a new authenticated channel edge is discovered
315
// within the network. These updates are directional, so if a channel is fully
316
// public, then there will be two updates sent out: one for each direction
317
// within the channel. Each update will carry that particular routing edge
318
// policy for the channel direction.
319
//
320
// An edge is a channel in the direction of AdvertisingNode -> ConnectingNode.
321
type ChannelEdgeUpdate struct {
322
        // ChanID is the unique short channel ID for the channel. This encodes
323
        // where in the blockchain the channel's funding transaction was
324
        // originally confirmed.
325
        ChanID uint64
326

327
        // ChanPoint is the outpoint which represents the multi-sig funding
328
        // output for the channel.
329
        ChanPoint wire.OutPoint
330

331
        // Capacity is the capacity of the newly created channel.
332
        Capacity btcutil.Amount
333

334
        // MinHTLC is the minimum HTLC amount that this channel will forward.
335
        MinHTLC lnwire.MilliSatoshi
336

337
        // MaxHTLC is the maximum HTLC amount that this channel will forward.
338
        MaxHTLC lnwire.MilliSatoshi
339

340
        // BaseFee is the base fee that will charged for all HTLC's forwarded
341
        // across the this channel direction.
342
        BaseFee lnwire.MilliSatoshi
343

344
        // FeeRate is the fee rate that will be shared for all HTLC's forwarded
345
        // across this channel direction.
346
        FeeRate lnwire.MilliSatoshi
347

348
        // TimeLockDelta is the time-lock expressed in blocks that will be
349
        // added to outgoing HTLC's from incoming HTLC's. This value is the
350
        // difference of the incoming and outgoing HTLC's time-locks routed
351
        // through this hop.
352
        TimeLockDelta uint16
353

354
        // AdvertisingNode is the node that's advertising this edge.
355
        AdvertisingNode *btcec.PublicKey
356

357
        // ConnectingNode is the node that the advertising node connects to.
358
        ConnectingNode *btcec.PublicKey
359

360
        // Disabled, if true, signals that the channel is unavailable to relay
361
        // payments.
362
        Disabled bool
363

364
        // InboundFee is the fee that must be paid for incoming HTLCs.
365
        InboundFee fn.Option[lnwire.Fee]
366

367
        // ExtraOpaqueData is the set of data that was appended to this message
368
        // to fill out the full maximum transport message size. These fields can
369
        // be used to specify optional data such as custom TLV fields.
370
        ExtraOpaqueData lnwire.ExtraOpaqueData
371
}
372

373
// appendTopologyChange appends the passed update message to the passed
374
// TopologyChange, properly identifying which type of update the message
375
// constitutes. This function will also fetch any required auxiliary
376
// information required to create the topology change update from the graph
377
// database.
378
func (c *ChannelGraph) addToTopologyChange(update *TopologyChange,
379
        msg any) error {
4,893✔
380

4,893✔
381
        switch m := msg.(type) {
4,893✔
382

383
        // Any node announcement maps directly to a NetworkNodeUpdate struct.
384
        // No further data munging or db queries are required.
385
        case *models.LightningNode:
715✔
386
                pubKey, err := m.PubKey()
715✔
387
                if err != nil {
715✔
388
                        return err
×
389
                }
×
390

391
                nodeUpdate := &NetworkNodeUpdate{
715✔
392
                        Addresses:   m.Addresses,
715✔
393
                        IdentityKey: pubKey,
715✔
394
                        Alias:       m.Alias,
715✔
395
                        Color:       EncodeHexColor(m.Color),
715✔
396
                        Features:    m.Features.Clone(),
715✔
397
                }
715✔
398

715✔
399
                update.NodeUpdates = append(update.NodeUpdates, nodeUpdate)
715✔
400
                return nil
715✔
401

402
        // We ignore initial channel announcements as we'll only send out
403
        // updates once the individual edges themselves have been updated.
404
        case *models.ChannelEdgeInfo:
1,497✔
405
                return nil
1,497✔
406

407
        // Any new ChannelUpdateAnnouncements will generate a corresponding
408
        // ChannelEdgeUpdate notification.
409
        case *models.ChannelEdgePolicy:
2,665✔
410
                // We'll need to fetch the edge's information from the database
2,665✔
411
                // in order to get the information concerning which nodes are
2,665✔
412
                // being connected.
2,665✔
413
                edgeInfo, _, _, err := c.FetchChannelEdgesByID(m.ChannelID)
2,665✔
414
                if err != nil {
2,665✔
UNCOV
415
                        return errors.Errorf("unable fetch channel edge: %v",
×
UNCOV
416
                                err)
×
UNCOV
417
                }
×
418

419
                // If the flag is one, then the advertising node is actually
420
                // the second node.
421
                sourceNode := edgeInfo.NodeKey1
2,665✔
422
                connectingNode := edgeInfo.NodeKey2
2,665✔
423
                if m.ChannelFlags&lnwire.ChanUpdateDirection == 1 {
3,991✔
424
                        sourceNode = edgeInfo.NodeKey2
1,326✔
425
                        connectingNode = edgeInfo.NodeKey1
1,326✔
426
                }
1,326✔
427

428
                aNode, err := sourceNode()
2,665✔
429
                if err != nil {
2,665✔
430
                        return err
×
431
                }
×
432
                cNode, err := connectingNode()
2,665✔
433
                if err != nil {
2,665✔
434
                        return err
×
435
                }
×
436

437
                edgeUpdate := &ChannelEdgeUpdate{
2,665✔
438
                        ChanID:          m.ChannelID,
2,665✔
439
                        ChanPoint:       edgeInfo.ChannelPoint,
2,665✔
440
                        TimeLockDelta:   m.TimeLockDelta,
2,665✔
441
                        Capacity:        edgeInfo.Capacity,
2,665✔
442
                        MinHTLC:         m.MinHTLC,
2,665✔
443
                        MaxHTLC:         m.MaxHTLC,
2,665✔
444
                        BaseFee:         m.FeeBaseMSat,
2,665✔
445
                        FeeRate:         m.FeeProportionalMillionths,
2,665✔
446
                        AdvertisingNode: aNode,
2,665✔
447
                        ConnectingNode:  cNode,
2,665✔
448
                        Disabled:        m.ChannelFlags&lnwire.ChanUpdateDisabled != 0,
2,665✔
449
                        InboundFee:      m.InboundFee,
2,665✔
450
                        ExtraOpaqueData: m.ExtraOpaqueData,
2,665✔
451
                }
2,665✔
452

2,665✔
453
                // TODO(roasbeef): add bit to toggle
2,665✔
454
                update.ChannelEdgeUpdates = append(update.ChannelEdgeUpdates,
2,665✔
455
                        edgeUpdate)
2,665✔
456
                return nil
2,665✔
457

458
        case []*ClosedChanSummary:
25✔
459
                update.ClosedChannels = append(update.ClosedChannels, m...)
25✔
460
                return nil
25✔
461

462
        default:
×
463
                return fmt.Errorf("unable to add to topology change, "+
×
464
                        "unknown message type %T", msg)
×
465
        }
466
}
467

468
// EncodeHexColor takes a color and returns it in hex code format.
469
func EncodeHexColor(color color.RGBA) string {
724✔
470
        return fmt.Sprintf("#%02x%02x%02x", color.R, color.G, color.B)
724✔
471
}
724✔
472

473
// DecodeHexColor takes a hex color string like "#rrggbb" and returns a
474
// color.RGBA.
475
func DecodeHexColor(hex string) (color.RGBA, error) {
6✔
476
        if len(hex) != 7 || hex[0] != '#' {
9✔
477
                return color.RGBA{}, fmt.Errorf("invalid hex color string: %s",
3✔
478
                        hex)
3✔
479
        }
3✔
480

481
        r, err := strconv.ParseUint(hex[1:3], 16, 8)
3✔
482
        if err != nil {
3✔
483
                return color.RGBA{}, fmt.Errorf("invalid red component: %w",
×
484
                        err)
×
485
        }
×
486

487
        g, err := strconv.ParseUint(hex[3:5], 16, 8)
3✔
488
        if err != nil {
3✔
489
                return color.RGBA{}, fmt.Errorf("invalid green component: %w",
×
490
                        err)
×
491
        }
×
492

493
        b, err := strconv.ParseUint(hex[5:7], 16, 8)
3✔
494
        if err != nil {
3✔
495
                return color.RGBA{}, fmt.Errorf("invalid blue component: %w",
×
496
                        err)
×
497
        }
×
498

499
        return color.RGBA{
3✔
500
                R: uint8(r),
3✔
501
                G: uint8(g),
3✔
502
                B: uint8(b),
3✔
503
        }, nil
3✔
504
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc