• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12041760086

27 Nov 2024 01:02AM UTC coverage: 59.001% (+0.002%) from 58.999%
12041760086

Pull #9242

github

aakselrod
github workflow: save postgres log to zip file
Pull Request #9242: Reapply #8644

8 of 39 new or added lines in 3 files covered. (20.51%)

82 existing lines in 18 files now uncovered.

133176 of 225719 relevant lines covered (59.0%)

19559.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/graph/notifications.go
1
package graph
2

3
import (
4
        "fmt"
5
        "image/color"
6
        "net"
7
        "sync"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/go-errors/errors"
13
        "github.com/lightningnetwork/lnd/channeldb"
14
        "github.com/lightningnetwork/lnd/channeldb/models"
15
        "github.com/lightningnetwork/lnd/lnwire"
16
)
17

18
// TopologyClient represents an intent to receive notifications from the
19
// channel router regarding changes to the topology of the channel graph. The
20
// TopologyChanges channel will be sent upon with new updates to the channel
21
// graph in real-time as they're encountered.
22
type TopologyClient struct {
23
        // TopologyChanges is a receive only channel that new channel graph
24
        // updates will be sent over.
25
        //
26
        // TODO(roasbeef): chan for each update type instead?
27
        TopologyChanges <-chan *TopologyChange
28

29
        // Cancel is a function closure that should be executed when the client
30
        // wishes to cancel their notification intent. Doing so allows the
31
        // ChannelRouter to free up resources.
32
        Cancel func()
33
}
34

35
// topologyClientUpdate is a message sent to the channel router to either
36
// register a new topology client or re-register an existing client.
37
type topologyClientUpdate struct {
38
        // cancel indicates if the update to the client is cancelling an
39
        // existing client's notifications. If not then this update will be to
40
        // register a new set of notifications.
41
        cancel bool
42

43
        // clientID is the unique identifier for this client. Any further
44
        // updates (deleting or adding) to this notification client will be
45
        // dispatched according to the target clientID.
46
        clientID uint64
47

48
        // ntfnChan is a *send-only* channel in which notifications should be
49
        // sent over from router -> client.
50
        ntfnChan chan<- *TopologyChange
51
}
52

53
// SubscribeTopology returns a new topology client which can be used by the
54
// caller to receive notifications whenever a change in the channel graph
55
// topology occurs. Changes that will be sent at notifications include: new
56
// nodes appearing, node updating their attributes, new channels, channels
57
// closing, and updates in the routing policies of a channel's directed edges.
58
func (b *Builder) SubscribeTopology() (*TopologyClient, error) {
8✔
59
        // If the router is not yet started, return an error to avoid a
8✔
60
        // deadlock waiting for it to handle the subscription request.
8✔
61
        if !b.started.Load() {
8✔
62
                return nil, fmt.Errorf("router not started")
×
63
        }
×
64

65
        // We'll first atomically obtain the next ID for this client from the
66
        // incrementing client ID counter.
67
        clientID := b.ntfnClientCounter.Add(1)
8✔
68

8✔
69
        log.Debugf("New graph topology client subscription, client %v",
8✔
70
                clientID)
8✔
71

8✔
72
        ntfnChan := make(chan *TopologyChange, 10)
8✔
73

8✔
74
        select {
8✔
75
        case b.ntfnClientUpdates <- &topologyClientUpdate{
76
                cancel:   false,
77
                clientID: clientID,
78
                ntfnChan: ntfnChan,
79
        }:
8✔
80
        case <-b.quit:
×
81
                return nil, errors.New("ChannelRouter shutting down")
×
82
        }
83

84
        return &TopologyClient{
8✔
85
                TopologyChanges: ntfnChan,
8✔
86
                Cancel: func() {
13✔
87
                        select {
5✔
88
                        case b.ntfnClientUpdates <- &topologyClientUpdate{
89
                                cancel:   true,
90
                                clientID: clientID,
91
                        }:
5✔
UNCOV
92
                        case <-b.quit:
×
UNCOV
93
                                return
×
94
                        }
95
                },
96
        }, nil
97
}
98

99
// topologyClient is a data-structure use by the channel router to couple the
100
// client's notification channel along with a special "exit" channel that can
101
// be used to cancel all lingering goroutines blocked on a send to the
102
// notification channel.
103
type topologyClient struct {
104
        // ntfnChan is a send-only channel that's used to propagate
105
        // notification s from the channel router to an instance of a
106
        // topologyClient client.
107
        ntfnChan chan<- *TopologyChange
108

109
        // exit is a channel that is used internally by the channel router to
110
        // cancel any active un-consumed goroutine notifications.
111
        exit chan struct{}
112

113
        wg sync.WaitGroup
114
}
115

116
// notifyTopologyChange notifies all registered clients of a new change in
117
// graph topology in a non-blocking.
118
func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) {
16✔
119
        // notifyClient is a helper closure that will send topology updates to
16✔
120
        // the given client.
16✔
121
        notifyClient := func(clientID uint64, client *topologyClient) bool {
26✔
122
                client.wg.Add(1)
10✔
123

10✔
124
                log.Tracef("Sending topology notification to client=%v, "+
10✔
125
                        "NodeUpdates=%v, ChannelEdgeUpdates=%v, "+
10✔
126
                        "ClosedChannels=%v", clientID,
10✔
127
                        len(topologyDiff.NodeUpdates),
10✔
128
                        len(topologyDiff.ChannelEdgeUpdates),
10✔
129
                        len(topologyDiff.ClosedChannels))
10✔
130

10✔
131
                go func(c *topologyClient) {
20✔
132
                        defer c.wg.Done()
10✔
133

10✔
134
                        select {
10✔
135

136
                        // In this case we'll try to send the notification
137
                        // directly to the upstream client consumer.
138
                        case c.ntfnChan <- topologyDiff:
10✔
139

140
                        // If the client cancels the notifications, then we'll
141
                        // exit early.
142
                        case <-c.exit:
×
143

144
                        // Similarly, if the ChannelRouter itself exists early,
145
                        // then we'll also exit ourselves.
146
                        case <-b.quit:
×
147

148
                        }
149
                }(client)
150

151
                // Always return true here so the following Range will iterate
152
                // all clients.
153
                return true
10✔
154
        }
155

156
        // Range over the set of active clients, and attempt to send the
157
        // topology updates.
158
        b.topologyClients.Range(notifyClient)
16✔
159
}
160

161
// TopologyChange represents a new set of modifications to the channel graph.
162
// Topology changes will be dispatched in real-time as the ChannelGraph
163
// validates and process modifications to the authenticated channel graph.
164
type TopologyChange struct {
165
        // NodeUpdates is a slice of nodes which are either new to the channel
166
        // graph, or have had their attributes updated in an authenticated
167
        // manner.
168
        NodeUpdates []*NetworkNodeUpdate
169

170
        // ChanelEdgeUpdates is a slice of channel edges which are either newly
171
        // opened and authenticated, or have had their routing policies
172
        // updated.
173
        ChannelEdgeUpdates []*ChannelEdgeUpdate
174

175
        // ClosedChannels contains a slice of close channel summaries which
176
        // described which block a channel was closed at, and also carry
177
        // supplemental information such as the capacity of the former channel.
178
        ClosedChannels []*ClosedChanSummary
179
}
180

181
// isEmpty returns true if the TopologyChange is empty. A TopologyChange is
182
// considered empty, if it contains no *new* updates of any type.
183
func (t *TopologyChange) isEmpty() bool {
29✔
184
        return len(t.NodeUpdates) == 0 && len(t.ChannelEdgeUpdates) == 0 &&
29✔
185
                len(t.ClosedChannels) == 0
29✔
186
}
29✔
187

188
// ClosedChanSummary is a summary of a channel that was detected as being
189
// closed by monitoring the blockchain. Once a channel's funding point has been
190
// spent, the channel will automatically be marked as closed by the
191
// ChainNotifier.
192
//
193
// TODO(roasbeef): add nodes involved?
194
type ClosedChanSummary struct {
195
        // ChanID is the short-channel ID which uniquely identifies the
196
        // channel.
197
        ChanID uint64
198

199
        // Capacity was the total capacity of the channel before it was closed.
200
        Capacity btcutil.Amount
201

202
        // ClosedHeight is the height in the chain that the channel was closed
203
        // at.
204
        ClosedHeight uint32
205

206
        // ChanPoint is the funding point, or the multi-sig utxo which
207
        // previously represented the channel.
208
        ChanPoint wire.OutPoint
209
}
210

211
// createCloseSummaries takes in a slice of channels closed at the target block
212
// height and creates a slice of summaries which of each channel closure.
213
func createCloseSummaries(blockHeight uint32,
214
        closedChans ...*models.ChannelEdgeInfo) []*ClosedChanSummary {
5✔
215

5✔
216
        closeSummaries := make([]*ClosedChanSummary, len(closedChans))
5✔
217
        for i, closedChan := range closedChans {
10✔
218
                closeSummaries[i] = &ClosedChanSummary{
5✔
219
                        ChanID:       closedChan.ChannelID,
5✔
220
                        Capacity:     closedChan.Capacity,
5✔
221
                        ClosedHeight: blockHeight,
5✔
222
                        ChanPoint:    closedChan.ChannelPoint,
5✔
223
                }
5✔
224
        }
5✔
225

226
        return closeSummaries
5✔
227
}
228

229
// NetworkNodeUpdate is an update for a  node within the Lightning Network. A
230
// NetworkNodeUpdate is sent out either when a new node joins the network, or a
231
// node broadcasts a new update with a newer time stamp that supersedes its
232
// old update. All updates are properly authenticated.
233
type NetworkNodeUpdate struct {
234
        // Addresses is a slice of all the node's known addresses.
235
        Addresses []net.Addr
236

237
        // IdentityKey is the identity public key of the target node. This is
238
        // used to encrypt onion blobs as well as to authenticate any new
239
        // updates.
240
        IdentityKey *btcec.PublicKey
241

242
        // Alias is the alias or nick name of the node.
243
        Alias string
244

245
        // Color is the node's color in hex code format.
246
        Color string
247

248
        // Features holds the set of features the node supports.
249
        Features *lnwire.FeatureVector
250
}
251

252
// ChannelEdgeUpdate is an update for a new channel within the ChannelGraph.
253
// This update is sent out once a new authenticated channel edge is discovered
254
// within the network. These updates are directional, so if a channel is fully
255
// public, then there will be two updates sent out: one for each direction
256
// within the channel. Each update will carry that particular routing edge
257
// policy for the channel direction.
258
//
259
// An edge is a channel in the direction of AdvertisingNode -> ConnectingNode.
260
type ChannelEdgeUpdate struct {
261
        // ChanID is the unique short channel ID for the channel. This encodes
262
        // where in the blockchain the channel's funding transaction was
263
        // originally confirmed.
264
        ChanID uint64
265

266
        // ChanPoint is the outpoint which represents the multi-sig funding
267
        // output for the channel.
268
        ChanPoint wire.OutPoint
269

270
        // Capacity is the capacity of the newly created channel.
271
        Capacity btcutil.Amount
272

273
        // MinHTLC is the minimum HTLC amount that this channel will forward.
274
        MinHTLC lnwire.MilliSatoshi
275

276
        // MaxHTLC is the maximum HTLC amount that this channel will forward.
277
        MaxHTLC lnwire.MilliSatoshi
278

279
        // BaseFee is the base fee that will charged for all HTLC's forwarded
280
        // across the this channel direction.
281
        BaseFee lnwire.MilliSatoshi
282

283
        // FeeRate is the fee rate that will be shared for all HTLC's forwarded
284
        // across this channel direction.
285
        FeeRate lnwire.MilliSatoshi
286

287
        // TimeLockDelta is the time-lock expressed in blocks that will be
288
        // added to outgoing HTLC's from incoming HTLC's. This value is the
289
        // difference of the incoming and outgoing HTLC's time-locks routed
290
        // through this hop.
291
        TimeLockDelta uint16
292

293
        // AdvertisingNode is the node that's advertising this edge.
294
        AdvertisingNode *btcec.PublicKey
295

296
        // ConnectingNode is the node that the advertising node connects to.
297
        ConnectingNode *btcec.PublicKey
298

299
        // Disabled, if true, signals that the channel is unavailable to relay
300
        // payments.
301
        Disabled bool
302

303
        // ExtraOpaqueData is the set of data that was appended to this message
304
        // to fill out the full maximum transport message size. These fields can
305
        // be used to specify optional data such as custom TLV fields.
306
        ExtraOpaqueData lnwire.ExtraOpaqueData
307
}
308

309
// appendTopologyChange appends the passed update message to the passed
310
// TopologyChange, properly identifying which type of update the message
311
// constitutes. This function will also fetch any required auxiliary
312
// information required to create the topology change update from the graph
313
// database.
314
func addToTopologyChange(graph DB, update *TopologyChange,
315
        msg interface{}) error {
29✔
316

29✔
317
        switch m := msg.(type) {
29✔
318

319
        // Any node announcement maps directly to a NetworkNodeUpdate struct.
320
        // No further data munging or db queries are required.
321
        case *channeldb.LightningNode:
10✔
322
                pubKey, err := m.PubKey()
10✔
323
                if err != nil {
10✔
324
                        return err
×
325
                }
×
326

327
                nodeUpdate := &NetworkNodeUpdate{
10✔
328
                        Addresses:   m.Addresses,
10✔
329
                        IdentityKey: pubKey,
10✔
330
                        Alias:       m.Alias,
10✔
331
                        Color:       EncodeHexColor(m.Color),
10✔
332
                        Features:    m.Features.Clone(),
10✔
333
                }
10✔
334

10✔
335
                update.NodeUpdates = append(update.NodeUpdates, nodeUpdate)
10✔
336
                return nil
10✔
337

338
        // We ignore initial channel announcements as we'll only send out
339
        // updates once the individual edges themselves have been updated.
340
        case *models.ChannelEdgeInfo:
18✔
341
                return nil
18✔
342

343
        // Any new ChannelUpdateAnnouncements will generate a corresponding
344
        // ChannelEdgeUpdate notification.
345
        case *models.ChannelEdgePolicy:
9✔
346
                // We'll need to fetch the edge's information from the database
9✔
347
                // in order to get the information concerning which nodes are
9✔
348
                // being connected.
9✔
349
                edgeInfo, _, _, err := graph.FetchChannelEdgesByID(m.ChannelID)
9✔
350
                if err != nil {
9✔
351
                        return errors.Errorf("unable fetch channel edge: %v",
×
352
                                err)
×
353
                }
×
354

355
                // If the flag is one, then the advertising node is actually
356
                // the second node.
357
                sourceNode := edgeInfo.NodeKey1
9✔
358
                connectingNode := edgeInfo.NodeKey2
9✔
359
                if m.ChannelFlags&lnwire.ChanUpdateDirection == 1 {
15✔
360
                        sourceNode = edgeInfo.NodeKey2
6✔
361
                        connectingNode = edgeInfo.NodeKey1
6✔
362
                }
6✔
363

364
                aNode, err := sourceNode()
9✔
365
                if err != nil {
9✔
366
                        return err
×
367
                }
×
368
                cNode, err := connectingNode()
9✔
369
                if err != nil {
9✔
370
                        return err
×
371
                }
×
372

373
                edgeUpdate := &ChannelEdgeUpdate{
9✔
374
                        ChanID:          m.ChannelID,
9✔
375
                        ChanPoint:       edgeInfo.ChannelPoint,
9✔
376
                        TimeLockDelta:   m.TimeLockDelta,
9✔
377
                        Capacity:        edgeInfo.Capacity,
9✔
378
                        MinHTLC:         m.MinHTLC,
9✔
379
                        MaxHTLC:         m.MaxHTLC,
9✔
380
                        BaseFee:         m.FeeBaseMSat,
9✔
381
                        FeeRate:         m.FeeProportionalMillionths,
9✔
382
                        AdvertisingNode: aNode,
9✔
383
                        ConnectingNode:  cNode,
9✔
384
                        Disabled:        m.ChannelFlags&lnwire.ChanUpdateDisabled != 0,
9✔
385
                        ExtraOpaqueData: m.ExtraOpaqueData,
9✔
386
                }
9✔
387

9✔
388
                // TODO(roasbeef): add bit to toggle
9✔
389
                update.ChannelEdgeUpdates = append(update.ChannelEdgeUpdates,
9✔
390
                        edgeUpdate)
9✔
391
                return nil
9✔
392

393
        default:
×
394
                return fmt.Errorf("unable to add to topology change, "+
×
395
                        "unknown message type %T", msg)
×
396
        }
397
}
398

399
// EncodeHexColor takes a color and returns it in hex code format.
400
func EncodeHexColor(color color.RGBA) string {
19✔
401
        return fmt.Sprintf("#%02x%02x%02x", color.R, color.G, color.B)
19✔
402
}
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc