• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14994910928

13 May 2025 11:00AM UTC coverage: 68.991% (+10.4%) from 58.559%
14994910928

Pull #9752

github

web-flow
Merge ca77283bc into 1db6c31e2
Pull Request #9752: routerrpc: reject payment to invoice that don't have payment secret or blinded paths

6 of 7 new or added lines in 1 file covered. (85.71%)

959 existing lines in 12 files now uncovered.

133922 of 194116 relevant lines covered (68.99%)

22091.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.96
/graph/db/notifications.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "image/color"
6
        "net"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcec/v2"
11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/go-errors/errors"
14
        "github.com/lightningnetwork/lnd/graph/db/models"
15
        "github.com/lightningnetwork/lnd/lnutils"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
// topologyManager holds all the fields required to manage the network topology
20
// subscriptions and notifications.
21
type topologyManager struct {
22
        // ntfnClientCounter is an atomic counter that's used to assign unique
23
        // notification client IDs to new clients.
24
        ntfnClientCounter atomic.Uint64
25

26
        // topologyUpdate is a channel that carries new topology updates
27
        // messages from outside the ChannelGraph to be processed by the
28
        // networkHandler.
29
        topologyUpdate chan any
30

31
        // topologyClients maps a client's unique notification ID to a
32
        // topologyClient client that contains its notification dispatch
33
        // channel.
34
        topologyClients *lnutils.SyncMap[uint64, *topologyClient]
35

36
        // ntfnClientUpdates is a channel that's used to send new updates to
37
        // topology notification clients to the ChannelGraph. Updates either
38
        // add a new notification client, or cancel notifications for an
39
        // existing client.
40
        ntfnClientUpdates chan *topologyClientUpdate
41
}
42

43
// newTopologyManager creates a new instance of the topologyManager.
44
func newTopologyManager() *topologyManager {
177✔
45
        return &topologyManager{
177✔
46
                topologyUpdate:    make(chan any),
177✔
47
                topologyClients:   &lnutils.SyncMap[uint64, *topologyClient]{},
177✔
48
                ntfnClientUpdates: make(chan *topologyClientUpdate),
177✔
49
        }
177✔
50
}
177✔
51

52
// TopologyClient represents an intent to receive notifications from the
53
// channel router regarding changes to the topology of the channel graph. The
54
// TopologyChanges channel will be sent upon with new updates to the channel
55
// graph in real-time as they're encountered.
56
type TopologyClient struct {
57
        // TopologyChanges is a receive only channel that new channel graph
58
        // updates will be sent over.
59
        //
60
        // TODO(roasbeef): chan for each update type instead?
61
        TopologyChanges <-chan *TopologyChange
62

63
        // Cancel is a function closure that should be executed when the client
64
        // wishes to cancel their notification intent. Doing so allows the
65
        // ChannelRouter to free up resources.
66
        Cancel func()
67
}
68

69
// topologyClientUpdate is a message sent to the channel router to either
70
// register a new topology client or re-register an existing client.
71
type topologyClientUpdate struct {
72
        // cancel indicates if the update to the client is cancelling an
73
        // existing client's notifications. If not then this update will be to
74
        // register a new set of notifications.
75
        cancel bool
76

77
        // clientID is the unique identifier for this client. Any further
78
        // updates (deleting or adding) to this notification client will be
79
        // dispatched according to the target clientID.
80
        clientID uint64
81

82
        // ntfnChan is a *send-only* channel in which notifications should be
83
        // sent over from router -> client.
84
        ntfnChan chan<- *TopologyChange
85
}
86

87
// SubscribeTopology returns a new topology client which can be used by the
88
// caller to receive notifications whenever a change in the channel graph
89
// topology occurs. Changes that will be sent at notifications include: new
90
// nodes appearing, node updating their attributes, new channels, channels
91
// closing, and updates in the routing policies of a channel's directed edges.
92
func (c *ChannelGraph) SubscribeTopology() (*TopologyClient, error) {
7✔
93
        // If the router is not yet started, return an error to avoid a
7✔
94
        // deadlock waiting for it to handle the subscription request.
7✔
95
        if !c.started.Load() {
7✔
96
                return nil, fmt.Errorf("router not started")
×
97
        }
×
98

99
        // We'll first atomically obtain the next ID for this client from the
100
        // incrementing client ID counter.
101
        clientID := c.ntfnClientCounter.Add(1)
7✔
102

7✔
103
        log.Debugf("New graph topology client subscription, client %v",
7✔
104
                clientID)
7✔
105

7✔
106
        ntfnChan := make(chan *TopologyChange, 10)
7✔
107

7✔
108
        select {
7✔
109
        case c.ntfnClientUpdates <- &topologyClientUpdate{
110
                cancel:   false,
111
                clientID: clientID,
112
                ntfnChan: ntfnChan,
113
        }:
7✔
114
        case <-c.quit:
×
115
                return nil, errors.New("ChannelRouter shutting down")
×
116
        }
117

118
        return &TopologyClient{
7✔
119
                TopologyChanges: ntfnChan,
7✔
120
                Cancel: func() {
11✔
121
                        select {
4✔
122
                        case c.ntfnClientUpdates <- &topologyClientUpdate{
123
                                cancel:   true,
124
                                clientID: clientID,
125
                        }:
4✔
UNCOV
126
                        case <-c.quit:
×
UNCOV
127
                                return
×
128
                        }
129
                },
130
        }, nil
131
}
132

133
// topologyClient is a data-structure use by the channel router to couple the
134
// client's notification channel along with a special "exit" channel that can
135
// be used to cancel all lingering goroutines blocked on a send to the
136
// notification channel.
137
type topologyClient struct {
138
        // ntfnChan is a send-only channel that's used to propagate
139
        // notification s from the channel router to an instance of a
140
        // topologyClient client.
141
        ntfnChan chan<- *TopologyChange
142

143
        // exit is a channel that is used internally by the channel router to
144
        // cancel any active un-consumed goroutine notifications.
145
        exit chan struct{}
146

147
        wg sync.WaitGroup
148
}
149

150
// notifyTopologyChange notifies all registered clients of a new change in
151
// graph topology in a non-blocking.
152
//
153
// NOTE: this should only ever be called from a call-stack originating from the
154
// handleTopologySubscriptions handler.
155
func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) {
3,472✔
156
        // notifyClient is a helper closure that will send topology updates to
3,472✔
157
        // the given client.
3,472✔
158
        notifyClient := func(clientID uint64, client *topologyClient) bool {
3,481✔
159
                client.wg.Add(1)
9✔
160

9✔
161
                log.Tracef("Sending topology notification to client=%v, "+
9✔
162
                        "NodeUpdates=%v, ChannelEdgeUpdates=%v, "+
9✔
163
                        "ClosedChannels=%v", clientID,
9✔
164
                        len(topologyDiff.NodeUpdates),
9✔
165
                        len(topologyDiff.ChannelEdgeUpdates),
9✔
166
                        len(topologyDiff.ClosedChannels))
9✔
167

9✔
168
                go func(t *topologyClient) {
18✔
169
                        defer t.wg.Done()
9✔
170

9✔
171
                        select {
9✔
172

173
                        // In this case we'll try to send the notification
174
                        // directly to the upstream client consumer.
175
                        case t.ntfnChan <- topologyDiff:
9✔
176

177
                        // If the client cancels the notifications, then we'll
178
                        // exit early.
179
                        case <-t.exit:
×
180

181
                        // Similarly, if the ChannelRouter itself exists early,
182
                        // then we'll also exit ourselves.
183
                        case <-c.quit:
×
184
                        }
185
                }(client)
186

187
                // Always return true here so the following Range will iterate
188
                // all clients.
189
                return true
9✔
190
        }
191

192
        // Range over the set of active clients, and attempt to send the
193
        // topology updates.
194
        c.topologyClients.Range(notifyClient)
3,472✔
195
}
196

197
// handleTopologyUpdate is responsible for sending any topology changes
198
// notifications to registered clients.
199
//
200
// NOTE: must be run inside goroutine and must only ever be called from within
201
// handleTopologySubscriptions.
202
func (c *ChannelGraph) handleTopologyUpdate(update any) {
4,958✔
203
        defer c.wg.Done()
4,958✔
204

4,958✔
205
        topChange := &TopologyChange{}
4,958✔
206
        err := c.addToTopologyChange(topChange, update)
4,958✔
207
        if err != nil {
4,965✔
208
                log.Errorf("unable to update topology change notification: %v",
7✔
209
                        err)
7✔
210
                return
7✔
211
        }
7✔
212

213
        if topChange.isEmpty() {
6,433✔
214
                return
1,482✔
215
        }
1,482✔
216

217
        c.notifyTopologyChange(topChange)
3,472✔
218
}
219

220
// TopologyChange represents a new set of modifications to the channel graph.
221
// Topology changes will be dispatched in real-time as the ChannelGraph
222
// validates and process modifications to the authenticated channel graph.
223
type TopologyChange struct {
224
        // NodeUpdates is a slice of nodes which are either new to the channel
225
        // graph, or have had their attributes updated in an authenticated
226
        // manner.
227
        NodeUpdates []*NetworkNodeUpdate
228

229
        // ChanelEdgeUpdates is a slice of channel edges which are either newly
230
        // opened and authenticated, or have had their routing policies
231
        // updated.
232
        ChannelEdgeUpdates []*ChannelEdgeUpdate
233

234
        // ClosedChannels contains a slice of close channel summaries which
235
        // described which block a channel was closed at, and also carry
236
        // supplemental information such as the capacity of the former channel.
237
        ClosedChannels []*ClosedChanSummary
238
}
239

240
// isEmpty returns true if the TopologyChange is empty. A TopologyChange is
241
// considered empty, if it contains no *new* updates of any type.
242
func (t *TopologyChange) isEmpty() bool {
4,951✔
243
        return len(t.NodeUpdates) == 0 && len(t.ChannelEdgeUpdates) == 0 &&
4,951✔
244
                len(t.ClosedChannels) == 0
4,951✔
245
}
4,951✔
246

247
// ClosedChanSummary is a summary of a channel that was detected as being
248
// closed by monitoring the blockchain. Once a channel's funding point has been
249
// spent, the channel will automatically be marked as closed by the
250
// ChainNotifier.
251
//
252
// TODO(roasbeef): add nodes involved?
253
type ClosedChanSummary struct {
254
        // ChanID is the short-channel ID which uniquely identifies the
255
        // channel.
256
        ChanID uint64
257

258
        // Capacity was the total capacity of the channel before it was closed.
259
        Capacity btcutil.Amount
260

261
        // ClosedHeight is the height in the chain that the channel was closed
262
        // at.
263
        ClosedHeight uint32
264

265
        // ChanPoint is the funding point, or the multi-sig utxo which
266
        // previously represented the channel.
267
        ChanPoint wire.OutPoint
268
}
269

270
// createCloseSummaries takes in a slice of channels closed at the target block
271
// height and creates a slice of summaries which of each channel closure.
272
func createCloseSummaries(blockHeight uint32,
273
        closedChans ...*models.ChannelEdgeInfo) []*ClosedChanSummary {
20✔
274

20✔
275
        closeSummaries := make([]*ClosedChanSummary, len(closedChans))
20✔
276
        for i, closedChan := range closedChans {
45✔
277
                closeSummaries[i] = &ClosedChanSummary{
25✔
278
                        ChanID:       closedChan.ChannelID,
25✔
279
                        Capacity:     closedChan.Capacity,
25✔
280
                        ClosedHeight: blockHeight,
25✔
281
                        ChanPoint:    closedChan.ChannelPoint,
25✔
282
                }
25✔
283
        }
25✔
284

285
        return closeSummaries
20✔
286
}
287

288
// NetworkNodeUpdate is an update for a  node within the Lightning Network. A
289
// NetworkNodeUpdate is sent out either when a new node joins the network, or a
290
// node broadcasts a new update with a newer time stamp that supersedes its
291
// old update. All updates are properly authenticated.
292
type NetworkNodeUpdate struct {
293
        // Addresses is a slice of all the node's known addresses.
294
        Addresses []net.Addr
295

296
        // IdentityKey is the identity public key of the target node. This is
297
        // used to encrypt onion blobs as well as to authenticate any new
298
        // updates.
299
        IdentityKey *btcec.PublicKey
300

301
        // Alias is the alias or nick name of the node.
302
        Alias string
303

304
        // Color is the node's color in hex code format.
305
        Color string
306

307
        // Features holds the set of features the node supports.
308
        Features *lnwire.FeatureVector
309
}
310

311
// ChannelEdgeUpdate is an update for a new channel within the ChannelGraph.
312
// This update is sent out once a new authenticated channel edge is discovered
313
// within the network. These updates are directional, so if a channel is fully
314
// public, then there will be two updates sent out: one for each direction
315
// within the channel. Each update will carry that particular routing edge
316
// policy for the channel direction.
317
//
318
// An edge is a channel in the direction of AdvertisingNode -> ConnectingNode.
319
type ChannelEdgeUpdate struct {
320
        // ChanID is the unique short channel ID for the channel. This encodes
321
        // where in the blockchain the channel's funding transaction was
322
        // originally confirmed.
323
        ChanID uint64
324

325
        // ChanPoint is the outpoint which represents the multi-sig funding
326
        // output for the channel.
327
        ChanPoint wire.OutPoint
328

329
        // Capacity is the capacity of the newly created channel.
330
        Capacity btcutil.Amount
331

332
        // MinHTLC is the minimum HTLC amount that this channel will forward.
333
        MinHTLC lnwire.MilliSatoshi
334

335
        // MaxHTLC is the maximum HTLC amount that this channel will forward.
336
        MaxHTLC lnwire.MilliSatoshi
337

338
        // BaseFee is the base fee that will charged for all HTLC's forwarded
339
        // across the this channel direction.
340
        BaseFee lnwire.MilliSatoshi
341

342
        // FeeRate is the fee rate that will be shared for all HTLC's forwarded
343
        // across this channel direction.
344
        FeeRate lnwire.MilliSatoshi
345

346
        // TimeLockDelta is the time-lock expressed in blocks that will be
347
        // added to outgoing HTLC's from incoming HTLC's. This value is the
348
        // difference of the incoming and outgoing HTLC's time-locks routed
349
        // through this hop.
350
        TimeLockDelta uint16
351

352
        // AdvertisingNode is the node that's advertising this edge.
353
        AdvertisingNode *btcec.PublicKey
354

355
        // ConnectingNode is the node that the advertising node connects to.
356
        ConnectingNode *btcec.PublicKey
357

358
        // Disabled, if true, signals that the channel is unavailable to relay
359
        // payments.
360
        Disabled bool
361

362
        // ExtraOpaqueData is the set of data that was appended to this message
363
        // to fill out the full maximum transport message size. These fields can
364
        // be used to specify optional data such as custom TLV fields.
365
        ExtraOpaqueData lnwire.ExtraOpaqueData
366
}
367

368
// appendTopologyChange appends the passed update message to the passed
369
// TopologyChange, properly identifying which type of update the message
370
// constitutes. This function will also fetch any required auxiliary
371
// information required to create the topology change update from the graph
372
// database.
373
func (c *ChannelGraph) addToTopologyChange(update *TopologyChange,
374
        msg any) error {
4,958✔
375

4,958✔
376
        switch m := msg.(type) {
4,958✔
377

378
        // Any node announcement maps directly to a NetworkNodeUpdate struct.
379
        // No further data munging or db queries are required.
380
        case *models.LightningNode:
802✔
381
                pubKey, err := m.PubKey()
802✔
382
                if err != nil {
802✔
383
                        return err
×
384
                }
×
385

386
                nodeUpdate := &NetworkNodeUpdate{
802✔
387
                        Addresses:   m.Addresses,
802✔
388
                        IdentityKey: pubKey,
802✔
389
                        Alias:       m.Alias,
802✔
390
                        Color:       EncodeHexColor(m.Color),
802✔
391
                        Features:    m.Features.Clone(),
802✔
392
                }
802✔
393

802✔
394
                update.NodeUpdates = append(update.NodeUpdates, nodeUpdate)
802✔
395
                return nil
802✔
396

397
        // We ignore initial channel announcements as we'll only send out
398
        // updates once the individual edges themselves have been updated.
399
        case *models.ChannelEdgeInfo:
1,482✔
400
                return nil
1,482✔
401

402
        // Any new ChannelUpdateAnnouncements will generate a corresponding
403
        // ChannelEdgeUpdate notification.
404
        case *models.ChannelEdgePolicy:
2,663✔
405
                // We'll need to fetch the edge's information from the database
2,663✔
406
                // in order to get the information concerning which nodes are
2,663✔
407
                // being connected.
2,663✔
408
                edgeInfo, _, _, err := c.FetchChannelEdgesByID(m.ChannelID)
2,663✔
409
                if err != nil {
2,670✔
410
                        return errors.Errorf("unable fetch channel edge: %v",
7✔
411
                                err)
7✔
412
                }
7✔
413

414
                // If the flag is one, then the advertising node is actually
415
                // the second node.
416
                sourceNode := edgeInfo.NodeKey1
2,656✔
417
                connectingNode := edgeInfo.NodeKey2
2,656✔
418
                if m.ChannelFlags&lnwire.ChanUpdateDirection == 1 {
3,981✔
419
                        sourceNode = edgeInfo.NodeKey2
1,325✔
420
                        connectingNode = edgeInfo.NodeKey1
1,325✔
421
                }
1,325✔
422

423
                aNode, err := sourceNode()
2,656✔
424
                if err != nil {
2,656✔
425
                        return err
×
426
                }
×
427
                cNode, err := connectingNode()
2,656✔
428
                if err != nil {
2,656✔
429
                        return err
×
430
                }
×
431

432
                edgeUpdate := &ChannelEdgeUpdate{
2,656✔
433
                        ChanID:          m.ChannelID,
2,656✔
434
                        ChanPoint:       edgeInfo.ChannelPoint,
2,656✔
435
                        TimeLockDelta:   m.TimeLockDelta,
2,656✔
436
                        Capacity:        edgeInfo.Capacity,
2,656✔
437
                        MinHTLC:         m.MinHTLC,
2,656✔
438
                        MaxHTLC:         m.MaxHTLC,
2,656✔
439
                        BaseFee:         m.FeeBaseMSat,
2,656✔
440
                        FeeRate:         m.FeeProportionalMillionths,
2,656✔
441
                        AdvertisingNode: aNode,
2,656✔
442
                        ConnectingNode:  cNode,
2,656✔
443
                        Disabled:        m.ChannelFlags&lnwire.ChanUpdateDisabled != 0,
2,656✔
444
                        ExtraOpaqueData: m.ExtraOpaqueData,
2,656✔
445
                }
2,656✔
446

2,656✔
447
                // TODO(roasbeef): add bit to toggle
2,656✔
448
                update.ChannelEdgeUpdates = append(update.ChannelEdgeUpdates,
2,656✔
449
                        edgeUpdate)
2,656✔
450
                return nil
2,656✔
451

452
        case []*ClosedChanSummary:
20✔
453
                update.ClosedChannels = append(update.ClosedChannels, m...)
20✔
454
                return nil
20✔
455

456
        default:
×
457
                return fmt.Errorf("unable to add to topology change, "+
×
458
                        "unknown message type %T", msg)
×
459
        }
460
}
461

462
// EncodeHexColor takes a color and returns it in hex code format.
463
func EncodeHexColor(color color.RGBA) string {
811✔
464
        return fmt.Sprintf("#%02x%02x%02x", color.R, color.G, color.B)
811✔
465
}
811✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc