• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15293384865

28 May 2025 06:45AM UTC coverage: 58.362% (-10.4%) from 68.776%
15293384865

push

github

web-flow
Merge pull request #9866 from ellemouton/graphSQL7-nodes-tables

sqldb+graph/db: add node related tables and implement some node CRUD

0 of 644 new or added lines in 2 files covered. (0.0%)

28264 existing lines in 453 files now uncovered.

97492 of 167046 relevant lines covered (58.36%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_store.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "encoding/hex"
8
        "errors"
9
        "fmt"
10
        "math"
11
        "net"
12
        "strconv"
13
        "sync"
14
        "time"
15

16
        "github.com/btcsuite/btcd/btcec/v2"
17
        "github.com/lightningnetwork/lnd/batch"
18
        "github.com/lightningnetwork/lnd/graph/db/models"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/routing/route"
21
        "github.com/lightningnetwork/lnd/sqldb"
22
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
23
        "github.com/lightningnetwork/lnd/tlv"
24
        "github.com/lightningnetwork/lnd/tor"
25
)
26

27
// ProtocolVersion is an enum that defines the gossip protocol version of a
28
// message.
29
type ProtocolVersion uint8
30

31
const (
32
        // ProtocolV1 is the gossip protocol version defined in BOLT #7.
33
        ProtocolV1 ProtocolVersion = 1
34
)
35

36
// String returns a string representation of the protocol version.
NEW
37
func (v ProtocolVersion) String() string {
×
NEW
38
        return fmt.Sprintf("V%d", v)
×
NEW
39
}
×
40

41
// SQLQueries is a subset of the sqlc.Querier interface that can be used to
42
// execute queries against the SQL graph tables.
43
//
44
//nolint:ll,interfacebloat
45
type SQLQueries interface {
46
        /*
47
                Node queries.
48
        */
49
        UpsertNode(ctx context.Context, arg sqlc.UpsertNodeParams) (int64, error)
50
        GetNodeByPubKey(ctx context.Context, arg sqlc.GetNodeByPubKeyParams) (sqlc.Node, error)
51
        GetNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetNodesByLastUpdateRangeParams) ([]sqlc.Node, error)
52
        DeleteNodeByPubKey(ctx context.Context, arg sqlc.DeleteNodeByPubKeyParams) (sql.Result, error)
53

54
        GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]sqlc.NodeExtraType, error)
55
        UpsertNodeExtraType(ctx context.Context, arg sqlc.UpsertNodeExtraTypeParams) error
56
        DeleteExtraNodeType(ctx context.Context, arg sqlc.DeleteExtraNodeTypeParams) error
57

58
        InsertNodeAddress(ctx context.Context, arg sqlc.InsertNodeAddressParams) error
59
        GetNodeAddressesByPubKey(ctx context.Context, arg sqlc.GetNodeAddressesByPubKeyParams) ([]sqlc.GetNodeAddressesByPubKeyRow, error)
60
        DeleteNodeAddresses(ctx context.Context, nodeID int64) error
61

62
        InsertNodeFeature(ctx context.Context, arg sqlc.InsertNodeFeatureParams) error
63
        GetNodeFeatures(ctx context.Context, nodeID int64) ([]sqlc.NodeFeature, error)
64
        GetNodeFeaturesByPubKey(ctx context.Context, arg sqlc.GetNodeFeaturesByPubKeyParams) ([]int32, error)
65
        DeleteNodeFeature(ctx context.Context, arg sqlc.DeleteNodeFeatureParams) error
66

67
        /*
68
                Source node queries.
69
        */
70
        AddSourceNode(ctx context.Context, nodeID int64) error
71
        GetSourceNodesByVersion(ctx context.Context, version int16) ([]sqlc.GetSourceNodesByVersionRow, error)
72
}
73

74
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
75
// database operations.
76
type BatchedSQLQueries interface {
77
        SQLQueries
78
        sqldb.BatchedTx[SQLQueries]
79
}
80

81
// SQLStore is an implementation of the V1Store interface that uses a SQL
82
// database as the backend.
83
//
84
// NOTE: currently, this temporarily embeds the KVStore struct so that we can
85
// implement the V1Store interface incrementally. For any method not
86
// implemented,  things will fall back to the KVStore. This is ONLY the case
87
// for the time being while this struct is purely used in unit tests only.
88
type SQLStore struct {
89
        db BatchedSQLQueries
90

91
        // cacheMu guards all caches (rejectCache and chanCache). If
92
        // this mutex will be acquired at the same time as the DB mutex then
93
        // the cacheMu MUST be acquired first to prevent deadlock.
94
        cacheMu     sync.RWMutex
95
        rejectCache *rejectCache
96
        chanCache   *channelCache
97

98
        chanScheduler batch.Scheduler[SQLQueries]
99
        nodeScheduler batch.Scheduler[SQLQueries]
100

101
        // Temporary fall-back to the KVStore so that we can implement the
102
        // interface incrementally.
103
        *KVStore
104
}
105

106
// A compile-time assertion to ensure that SQLStore implements the V1Store
107
// interface.
108
var _ V1Store = (*SQLStore)(nil)
109

110
// NewSQLStore creates a new SQLStore instance given an open BatchedSQLQueries
111
// storage backend.
112
func NewSQLStore(db BatchedSQLQueries, kvStore *KVStore,
113
        options ...StoreOptionModifier) (*SQLStore, error) {
×
114

×
115
        opts := DefaultOptions()
×
116
        for _, o := range options {
×
117
                o(opts)
×
118
        }
×
119

120
        if opts.NoMigration {
×
121
                return nil, fmt.Errorf("the NoMigration option is not yet " +
×
122
                        "supported for SQL stores")
×
123
        }
×
124

125
        s := &SQLStore{
×
126
                db:          db,
×
127
                KVStore:     kvStore,
×
128
                rejectCache: newRejectCache(opts.RejectCacheSize),
×
129
                chanCache:   newChannelCache(opts.ChannelCacheSize),
×
130
        }
×
131

×
132
        s.chanScheduler = batch.NewTimeScheduler(
×
133
                db, &s.cacheMu, opts.BatchCommitInterval,
×
134
        )
×
135
        s.nodeScheduler = batch.NewTimeScheduler(
×
136
                db, nil, opts.BatchCommitInterval,
×
137
        )
×
138

×
139
        return s, nil
×
140
}
141

142
// TxOptions defines the set of db txn options the SQLQueries
143
// understands.
144
type TxOptions struct {
145
        // readOnly governs if a read only transaction is needed or not.
146
        readOnly bool
147
}
148

149
// ReadOnly returns true if the transaction should be read only.
150
//
151
// NOTE: This implements the TxOptions.
NEW
152
func (a *TxOptions) ReadOnly() bool {
×
NEW
153
        return a.readOnly
×
NEW
154
}
×
155

156
// NewReadTx creates a new read transaction option set.
NEW
157
func NewReadTx() *TxOptions {
×
NEW
158
        return &TxOptions{
×
NEW
159
                readOnly: true,
×
NEW
160
        }
×
NEW
161
}
×
162

163
// AddLightningNode adds a vertex/node to the graph database. If the node is not
164
// in the database from before, this will add a new, unconnected one to the
165
// graph. If it is present from before, this will update that node's
166
// information.
167
//
168
// NOTE: part of the V1Store interface.
169
func (s *SQLStore) AddLightningNode(node *models.LightningNode,
NEW
170
        opts ...batch.SchedulerOption) error {
×
NEW
171

×
NEW
172
        ctx := context.TODO()
×
NEW
173

×
NEW
174
        r := &batch.Request[SQLQueries]{
×
NEW
175
                Opts: batch.NewSchedulerOptions(opts...),
×
NEW
176
                Do: func(queries SQLQueries) error {
×
NEW
177
                        _, err := upsertNode(ctx, queries, node)
×
NEW
178
                        return err
×
NEW
179
                },
×
180
        }
181

NEW
182
        return s.nodeScheduler.Execute(ctx, r)
×
183
}
184

185
// FetchLightningNode attempts to look up a target node by its identity public
186
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
187
// returned.
188
//
189
// NOTE: part of the V1Store interface.
190
func (s *SQLStore) FetchLightningNode(pubKey route.Vertex) (
NEW
191
        *models.LightningNode, error) {
×
NEW
192

×
NEW
193
        ctx := context.TODO()
×
NEW
194

×
NEW
195
        var (
×
NEW
196
                readTx = NewReadTx()
×
NEW
197
                node   *models.LightningNode
×
NEW
198
        )
×
NEW
199
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
NEW
200
                var err error
×
NEW
201
                _, node, err = getNodeByPubKey(ctx, db, pubKey)
×
NEW
202

×
NEW
203
                return err
×
NEW
204
        }, func() {})
×
NEW
205
        if err != nil {
×
NEW
206
                return nil, fmt.Errorf("unable to fetch node: %w", err)
×
NEW
207
        }
×
208

NEW
209
        return node, nil
×
210
}
211

212
// HasLightningNode determines if the graph has a vertex identified by the
213
// target node identity public key. If the node exists in the database, a
214
// timestamp of when the data for the node was lasted updated is returned along
215
// with a true boolean. Otherwise, an empty time.Time is returned with a false
216
// boolean.
217
//
218
// NOTE: part of the V1Store interface.
219
func (s *SQLStore) HasLightningNode(pubKey [33]byte) (time.Time, bool,
NEW
220
        error) {
×
NEW
221

×
NEW
222
        ctx := context.TODO()
×
NEW
223

×
NEW
224
        var (
×
NEW
225
                readTx     = NewReadTx()
×
NEW
226
                exists     bool
×
NEW
227
                lastUpdate time.Time
×
NEW
228
        )
×
NEW
229
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
NEW
230
                dbNode, err := db.GetNodeByPubKey(
×
NEW
231
                        ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
232
                                Version: int16(ProtocolV1),
×
NEW
233
                                PubKey:  pubKey[:],
×
NEW
234
                        },
×
NEW
235
                )
×
NEW
236
                if errors.Is(err, sql.ErrNoRows) {
×
NEW
237
                        return nil
×
NEW
238
                } else if err != nil {
×
NEW
239
                        return fmt.Errorf("unable to fetch node: %w", err)
×
NEW
240
                }
×
241

NEW
242
                exists = true
×
NEW
243

×
NEW
244
                if dbNode.LastUpdate.Valid {
×
NEW
245
                        lastUpdate = time.Unix(dbNode.LastUpdate.Int64, 0)
×
NEW
246
                }
×
247

NEW
248
                return nil
×
NEW
249
        }, func() {})
×
NEW
250
        if err != nil {
×
NEW
251
                return time.Time{}, false,
×
NEW
252
                        fmt.Errorf("unable to fetch node: %w", err)
×
NEW
253
        }
×
254

NEW
255
        return lastUpdate, exists, nil
×
256
}
257

258
// AddrsForNode returns all known addresses for the target node public key
259
// that the graph DB is aware of. The returned boolean indicates if the
260
// given node is unknown to the graph DB or not.
261
//
262
// NOTE: part of the V1Store interface.
263
func (s *SQLStore) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
NEW
264
        error) {
×
NEW
265

×
NEW
266
        ctx := context.TODO()
×
NEW
267

×
NEW
268
        var (
×
NEW
269
                readTx    = NewReadTx()
×
NEW
270
                addresses []net.Addr
×
NEW
271
                known     bool
×
NEW
272
        )
×
NEW
273
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
NEW
274
                var err error
×
NEW
275
                known, addresses, err = getNodeAddresses(
×
NEW
276
                        ctx, db, nodePub.SerializeCompressed(),
×
NEW
277
                )
×
NEW
278
                if err != nil {
×
NEW
279
                        return fmt.Errorf("unable to fetch node addresses: %w",
×
NEW
280
                                err)
×
NEW
281
                }
×
282

NEW
283
                return nil
×
NEW
284
        }, func() {})
×
NEW
285
        if err != nil {
×
NEW
286
                return false, nil, fmt.Errorf("unable to get addresses for "+
×
NEW
287
                        "node(%x): %w", nodePub.SerializeCompressed(), err)
×
NEW
288
        }
×
289

NEW
290
        return known, addresses, nil
×
291
}
292

293
// DeleteLightningNode starts a new database transaction to remove a vertex/node
294
// from the database according to the node's public key.
295
//
296
// NOTE: part of the V1Store interface.
NEW
297
func (s *SQLStore) DeleteLightningNode(pubKey route.Vertex) error {
×
NEW
298
        ctx := context.TODO()
×
NEW
299

×
NEW
300
        var writeTxOpts TxOptions
×
NEW
301
        err := s.db.ExecTx(ctx, &writeTxOpts, func(db SQLQueries) error {
×
NEW
302
                res, err := db.DeleteNodeByPubKey(
×
NEW
303
                        ctx, sqlc.DeleteNodeByPubKeyParams{
×
NEW
304
                                Version: int16(ProtocolV1),
×
NEW
305
                                PubKey:  pubKey[:],
×
NEW
306
                        },
×
NEW
307
                )
×
NEW
308
                if err != nil {
×
NEW
309
                        return err
×
NEW
310
                }
×
311

NEW
312
                rows, err := res.RowsAffected()
×
NEW
313
                if err != nil {
×
NEW
314
                        return err
×
NEW
315
                }
×
316

NEW
317
                if rows == 0 {
×
NEW
318
                        return ErrGraphNodeNotFound
×
NEW
319
                } else if rows > 1 {
×
NEW
320
                        return fmt.Errorf("deleted %d rows, expected 1", rows)
×
NEW
321
                }
×
322

NEW
323
                return err
×
NEW
324
        }, func() {})
×
NEW
325
        if err != nil {
×
NEW
326
                return fmt.Errorf("unable to delete node: %w", err)
×
NEW
327
        }
×
328

NEW
329
        return nil
×
330
}
331

332
// FetchNodeFeatures returns the features of the given node. If no features are
333
// known for the node, an empty feature vector is returned.
334
//
335
// NOTE: this is part of the graphdb.NodeTraverser interface.
336
func (s *SQLStore) FetchNodeFeatures(nodePub route.Vertex) (
NEW
337
        *lnwire.FeatureVector, error) {
×
NEW
338

×
NEW
339
        ctx := context.TODO()
×
NEW
340

×
NEW
341
        return fetchNodeFeatures(ctx, s.db, nodePub)
×
NEW
342
}
×
343

344
// LookupAlias attempts to return the alias as advertised by the target node.
345
//
346
// NOTE: part of the V1Store interface.
NEW
347
func (s *SQLStore) LookupAlias(pub *btcec.PublicKey) (string, error) {
×
NEW
348
        var (
×
NEW
349
                ctx    = context.TODO()
×
NEW
350
                readTx = NewReadTx()
×
NEW
351
                alias  string
×
NEW
352
        )
×
NEW
353
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
NEW
354
                dbNode, err := db.GetNodeByPubKey(
×
NEW
355
                        ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
356
                                Version: int16(ProtocolV1),
×
NEW
357
                                PubKey:  pub.SerializeCompressed(),
×
NEW
358
                        },
×
NEW
359
                )
×
NEW
360
                if errors.Is(err, sql.ErrNoRows) {
×
NEW
361
                        return ErrNodeAliasNotFound
×
NEW
362
                } else if err != nil {
×
NEW
363
                        return fmt.Errorf("unable to fetch node: %w", err)
×
NEW
364
                }
×
365

NEW
366
                if !dbNode.Alias.Valid {
×
NEW
367
                        return ErrNodeAliasNotFound
×
NEW
368
                }
×
369

NEW
370
                alias = dbNode.Alias.String
×
NEW
371

×
NEW
372
                return nil
×
NEW
373
        }, func() {})
×
NEW
374
        if err != nil {
×
NEW
375
                return "", fmt.Errorf("unable to look up alias: %w", err)
×
NEW
376
        }
×
377

NEW
378
        return alias, nil
×
379
}
380

381
// SourceNode returns the source node of the graph. The source node is treated
382
// as the center node within a star-graph. This method may be used to kick off
383
// a path finding algorithm in order to explore the reachability of another
384
// node based off the source node.
385
//
386
// NOTE: part of the V1Store interface.
NEW
387
func (s *SQLStore) SourceNode() (*models.LightningNode, error) {
×
NEW
388
        ctx := context.TODO()
×
NEW
389

×
NEW
390
        var (
×
NEW
391
                readTx = NewReadTx()
×
NEW
392
                node   *models.LightningNode
×
NEW
393
        )
×
NEW
394
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
NEW
395
                _, nodePub, err := getSourceNode(ctx, db, ProtocolV1)
×
NEW
396
                if err != nil {
×
NEW
397
                        return fmt.Errorf("unable to fetch V1 source node: %w",
×
NEW
398
                                err)
×
NEW
399
                }
×
400

NEW
401
                _, node, err = getNodeByPubKey(ctx, db, nodePub)
×
NEW
402

×
NEW
403
                return err
×
NEW
404
        }, func() {})
×
NEW
405
        if err != nil {
×
NEW
406
                return nil, fmt.Errorf("unable to fetch source node: %w", err)
×
NEW
407
        }
×
408

NEW
409
        return node, nil
×
410
}
411

412
// SetSourceNode sets the source node within the graph database. The source
413
// node is to be used as the center of a star-graph within path finding
414
// algorithms.
415
//
416
// NOTE: part of the V1Store interface.
NEW
417
func (s *SQLStore) SetSourceNode(node *models.LightningNode) error {
×
NEW
418
        ctx := context.TODO()
×
NEW
419
        var writeTxOpts TxOptions
×
NEW
420

×
NEW
421
        return s.db.ExecTx(ctx, &writeTxOpts, func(db SQLQueries) error {
×
NEW
422
                id, err := upsertNode(ctx, db, node)
×
NEW
423
                if err != nil {
×
NEW
424
                        return fmt.Errorf("unable to upsert source node: %w",
×
NEW
425
                                err)
×
NEW
426
                }
×
427

428
                // Make sure that if a source node for this version is already
429
                // set, then the ID is the same as the one we are about to set.
NEW
430
                dbSourceNodeID, _, err := getSourceNode(ctx, db, ProtocolV1)
×
NEW
431
                if err != nil && !errors.Is(err, ErrSourceNodeNotSet) {
×
NEW
432
                        return fmt.Errorf("unable to fetch source node: %w",
×
NEW
433
                                err)
×
NEW
434
                } else if err == nil {
×
NEW
435
                        if dbSourceNodeID != id {
×
NEW
436
                                return fmt.Errorf("v1 source node already "+
×
NEW
437
                                        "set to a different node: %d vs %d",
×
NEW
438
                                        dbSourceNodeID, id)
×
NEW
439
                        }
×
440

NEW
441
                        return nil
×
442
                }
443

NEW
444
                return db.AddSourceNode(ctx, id)
×
NEW
445
        }, func() {})
×
446
}
447

448
// NodeUpdatesInHorizon returns all the known lightning node which have an
449
// update timestamp within the passed range. This method can be used by two
450
// nodes to quickly determine if they have the same set of up to date node
451
// announcements.
452
//
453
// NOTE: This is part of the V1Store interface.
454
func (s *SQLStore) NodeUpdatesInHorizon(startTime,
NEW
455
        endTime time.Time) ([]models.LightningNode, error) {
×
NEW
456

×
NEW
457
        ctx := context.TODO()
×
NEW
458

×
NEW
459
        var (
×
NEW
460
                readTx = NewReadTx()
×
NEW
461
                nodes  []models.LightningNode
×
NEW
462
        )
×
NEW
463
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
NEW
464
                dbNodes, err := db.GetNodesByLastUpdateRange(
×
NEW
465
                        ctx, sqlc.GetNodesByLastUpdateRangeParams{
×
NEW
466
                                StartTime: sqldb.SQLInt64(startTime.Unix()),
×
NEW
467
                                EndTime:   sqldb.SQLInt64(endTime.Unix()),
×
NEW
468
                        },
×
NEW
469
                )
×
NEW
470
                if err != nil {
×
NEW
471
                        return fmt.Errorf("unable to fetch nodes: %w", err)
×
NEW
472
                }
×
473

NEW
474
                for _, dbNode := range dbNodes {
×
NEW
475
                        node, err := buildNode(ctx, db, &dbNode)
×
NEW
476
                        if err != nil {
×
NEW
477
                                return fmt.Errorf("unable to build node: %w",
×
NEW
478
                                        err)
×
NEW
479
                        }
×
480

NEW
481
                        nodes = append(nodes, *node)
×
482
                }
483

NEW
484
                return nil
×
NEW
485
        }, func() {})
×
NEW
486
        if err != nil {
×
NEW
487
                return nil, fmt.Errorf("unable to fetch nodes: %w", err)
×
NEW
488
        }
×
489

NEW
490
        return nodes, nil
×
491
}
492

493
// getNodeByPubKey attempts to look up a target node by its public key.
494
func getNodeByPubKey(ctx context.Context, db SQLQueries,
NEW
495
        pubKey route.Vertex) (int64, *models.LightningNode, error) {
×
NEW
496

×
NEW
497
        dbNode, err := db.GetNodeByPubKey(
×
NEW
498
                ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
499
                        Version: int16(ProtocolV1),
×
NEW
500
                        PubKey:  pubKey[:],
×
NEW
501
                },
×
NEW
502
        )
×
NEW
503
        if errors.Is(err, sql.ErrNoRows) {
×
NEW
504
                return 0, nil, ErrGraphNodeNotFound
×
NEW
505
        } else if err != nil {
×
NEW
506
                return 0, nil, fmt.Errorf("unable to fetch node: %w", err)
×
NEW
507
        }
×
508

NEW
509
        node, err := buildNode(ctx, db, &dbNode)
×
NEW
510
        if err != nil {
×
NEW
511
                return 0, nil, fmt.Errorf("unable to build node: %w", err)
×
NEW
512
        }
×
513

NEW
514
        return dbNode.ID, node, nil
×
515
}
516

517
// buildNode constructs a LightningNode instance from the given database node
518
// record. The node's features, addresses and extra signed fields are also
519
// fetched from the database and set on the node.
520
func buildNode(ctx context.Context, db SQLQueries, dbNode *sqlc.Node) (
NEW
521
        *models.LightningNode, error) {
×
NEW
522

×
NEW
523
        if dbNode.Version != int16(ProtocolV1) {
×
NEW
524
                return nil, fmt.Errorf("unsupported node version: %d",
×
NEW
525
                        dbNode.Version)
×
NEW
526
        }
×
527

NEW
528
        var pub [33]byte
×
NEW
529
        copy(pub[:], dbNode.PubKey)
×
NEW
530

×
NEW
531
        node := &models.LightningNode{
×
NEW
532
                PubKeyBytes:     pub,
×
NEW
533
                Features:        lnwire.EmptyFeatureVector(),
×
NEW
534
                LastUpdate:      time.Unix(0, 0),
×
NEW
535
                ExtraOpaqueData: make([]byte, 0),
×
NEW
536
        }
×
NEW
537

×
NEW
538
        if len(dbNode.Signature) == 0 {
×
NEW
539
                return node, nil
×
NEW
540
        }
×
541

NEW
542
        node.HaveNodeAnnouncement = true
×
NEW
543
        node.AuthSigBytes = dbNode.Signature
×
NEW
544
        node.Alias = dbNode.Alias.String
×
NEW
545
        node.LastUpdate = time.Unix(dbNode.LastUpdate.Int64, 0)
×
NEW
546

×
NEW
547
        var err error
×
NEW
548
        node.Color, err = DecodeHexColor(dbNode.Color.String)
×
NEW
549
        if err != nil {
×
NEW
550
                return nil, fmt.Errorf("unable to decode color: %w", err)
×
NEW
551
        }
×
552

553
        // Fetch the node's features.
NEW
554
        node.Features, err = getNodeFeatures(ctx, db, dbNode.ID)
×
NEW
555
        if err != nil {
×
NEW
556
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
NEW
557
                        "features: %w", dbNode.ID, err)
×
NEW
558
        }
×
559

560
        // Fetch the node's addresses.
NEW
561
        _, node.Addresses, err = getNodeAddresses(ctx, db, pub[:])
×
NEW
562
        if err != nil {
×
NEW
563
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
NEW
564
                        "addresses: %w", dbNode.ID, err)
×
NEW
565
        }
×
566

567
        // Fetch the node's extra signed fields.
NEW
568
        extraTLVMap, err := getNodeExtraSignedFields(ctx, db, dbNode.ID)
×
NEW
569
        if err != nil {
×
NEW
570
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
NEW
571
                        "extra signed fields: %w", dbNode.ID, err)
×
NEW
572
        }
×
573

NEW
574
        recs, err := lnwire.CustomRecords(extraTLVMap).Serialize()
×
NEW
575
        if err != nil {
×
NEW
576
                return nil, fmt.Errorf("unable to serialize extra signed "+
×
NEW
577
                        "fields: %w", err)
×
NEW
578
        }
×
579

NEW
580
        if len(recs) != 0 {
×
NEW
581
                node.ExtraOpaqueData = recs
×
NEW
582
        }
×
583

NEW
584
        return node, nil
×
585
}
586

587
// getNodeFeatures fetches the feature bits and constructs the feature vector
588
// for a node with the given DB ID.
589
func getNodeFeatures(ctx context.Context, db SQLQueries,
NEW
590
        nodeID int64) (*lnwire.FeatureVector, error) {
×
NEW
591

×
NEW
592
        rows, err := db.GetNodeFeatures(ctx, nodeID)
×
NEW
593
        if err != nil {
×
NEW
594
                return nil, fmt.Errorf("unable to get node(%d) features: %w",
×
NEW
595
                        nodeID, err)
×
NEW
596
        }
×
597

NEW
598
        features := lnwire.EmptyFeatureVector()
×
NEW
599
        for _, feature := range rows {
×
NEW
600
                features.Set(lnwire.FeatureBit(feature.FeatureBit))
×
NEW
601
        }
×
602

NEW
603
        return features, nil
×
604
}
605

606
// getNodeExtraSignedFields fetches the extra signed fields for a node with the
607
// given DB ID.
608
func getNodeExtraSignedFields(ctx context.Context, db SQLQueries,
NEW
609
        nodeID int64) (map[uint64][]byte, error) {
×
NEW
610

×
NEW
611
        fields, err := db.GetExtraNodeTypes(ctx, nodeID)
×
NEW
612
        if err != nil {
×
NEW
613
                return nil, fmt.Errorf("unable to get node(%d) extra "+
×
NEW
614
                        "signed fields: %w", nodeID, err)
×
NEW
615
        }
×
616

NEW
617
        extraFields := make(map[uint64][]byte)
×
NEW
618
        for _, field := range fields {
×
NEW
619
                extraFields[uint64(field.Type)] = field.Value
×
NEW
620
        }
×
621

NEW
622
        return extraFields, nil
×
623
}
624

625
// upsertNode upserts the node record into the database. If the node already
626
// exists, then the node's information is updated. If the node doesn't exist,
627
// then a new node is created. The node's features, addresses and extra TLV
628
// types are also updated. The node's DB ID is returned.
629
func upsertNode(ctx context.Context, db SQLQueries,
NEW
630
        node *models.LightningNode) (int64, error) {
×
NEW
631

×
NEW
632
        params := sqlc.UpsertNodeParams{
×
NEW
633
                Version: int16(ProtocolV1),
×
NEW
634
                PubKey:  node.PubKeyBytes[:],
×
NEW
635
        }
×
NEW
636

×
NEW
637
        if node.HaveNodeAnnouncement {
×
NEW
638
                params.LastUpdate = sqldb.SQLInt64(node.LastUpdate.Unix())
×
NEW
639
                params.Color = sqldb.SQLStr(EncodeHexColor(node.Color))
×
NEW
640
                params.Alias = sqldb.SQLStr(node.Alias)
×
NEW
641
                params.Signature = node.AuthSigBytes
×
NEW
642
        }
×
643

NEW
644
        nodeID, err := db.UpsertNode(ctx, params)
×
NEW
645
        if err != nil {
×
NEW
646
                return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes,
×
NEW
647
                        err)
×
NEW
648
        }
×
649

650
        // We can exit here if we don't have the announcement yet.
NEW
651
        if !node.HaveNodeAnnouncement {
×
NEW
652
                return nodeID, nil
×
NEW
653
        }
×
654

655
        // Update the node's features.
NEW
656
        err = upsertNodeFeatures(ctx, db, nodeID, node.Features)
×
NEW
657
        if err != nil {
×
NEW
658
                return 0, fmt.Errorf("inserting node features: %w", err)
×
NEW
659
        }
×
660

661
        // Update the node's addresses.
NEW
662
        err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses)
×
NEW
663
        if err != nil {
×
NEW
664
                return 0, fmt.Errorf("inserting node addresses: %w", err)
×
NEW
665
        }
×
666

667
        // Convert the flat extra opaque data into a map of TLV types to
668
        // values.
NEW
669
        extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
NEW
670
        if err != nil {
×
NEW
671
                return 0, fmt.Errorf("unable to marshal extra opaque data: %w",
×
NEW
672
                        err)
×
NEW
673
        }
×
674

675
        // Update the node's extra signed fields.
NEW
676
        err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra)
×
NEW
677
        if err != nil {
×
NEW
678
                return 0, fmt.Errorf("inserting node extra TLVs: %w", err)
×
NEW
679
        }
×
680

NEW
681
        return nodeID, nil
×
682
}
683

684
// upsertNodeFeatures updates the node's features node_features table. This
685
// includes deleting any feature bits no longer present and inserting any new
686
// feature bits. If the feature bit does not yet exist in the features table,
687
// then an entry is created in that table first.
688
func upsertNodeFeatures(ctx context.Context, db SQLQueries, nodeID int64,
NEW
689
        features *lnwire.FeatureVector) error {
×
NEW
690

×
NEW
691
        // Get any existing features for the node.
×
NEW
692
        existingFeatures, err := db.GetNodeFeatures(ctx, nodeID)
×
NEW
693
        if err != nil && !errors.Is(err, sql.ErrNoRows) {
×
NEW
694
                return err
×
NEW
695
        }
×
696

697
        // Copy the nodes latest set of feature bits.
NEW
698
        newFeatures := make(map[int32]struct{})
×
NEW
699
        if features != nil {
×
NEW
700
                for feature := range features.Features() {
×
NEW
701
                        newFeatures[int32(feature)] = struct{}{}
×
NEW
702
                }
×
703
        }
704

705
        // For any current feature that already exists in the DB, remove it from
706
        // the in-memory map. For any existing feature that does not exist in
707
        // the in-memory map, delete it from the database.
NEW
708
        for _, feature := range existingFeatures {
×
NEW
709
                // The feature is still present, so there are no updates to be
×
NEW
710
                // made.
×
NEW
711
                if _, ok := newFeatures[feature.FeatureBit]; ok {
×
NEW
712
                        delete(newFeatures, feature.FeatureBit)
×
NEW
713
                        continue
×
714
                }
715

716
                // The feature is no longer present, so we remove it from the
717
                // database.
NEW
718
                err := db.DeleteNodeFeature(ctx, sqlc.DeleteNodeFeatureParams{
×
NEW
719
                        NodeID:     nodeID,
×
NEW
720
                        FeatureBit: feature.FeatureBit,
×
NEW
721
                })
×
NEW
722
                if err != nil {
×
NEW
723
                        return fmt.Errorf("unable to delete node(%d) "+
×
NEW
724
                                "feature(%v): %w", nodeID, feature.FeatureBit,
×
NEW
725
                                err)
×
NEW
726
                }
×
727
        }
728

729
        // Any remaining entries in newFeatures are new features that need to be
730
        // added to the database for the first time.
NEW
731
        for feature := range newFeatures {
×
NEW
732
                err = db.InsertNodeFeature(ctx, sqlc.InsertNodeFeatureParams{
×
NEW
733
                        NodeID:     nodeID,
×
NEW
734
                        FeatureBit: feature,
×
NEW
735
                })
×
NEW
736
                if err != nil {
×
NEW
737
                        return fmt.Errorf("unable to insert node(%d) "+
×
NEW
738
                                "feature(%v): %w", nodeID, feature, err)
×
NEW
739
                }
×
740
        }
741

NEW
742
        return nil
×
743
}
744

745
// fetchNodeFeatures fetches the features for a node with the given public key.
746
func fetchNodeFeatures(ctx context.Context, queries SQLQueries,
NEW
747
        nodePub route.Vertex) (*lnwire.FeatureVector, error) {
×
NEW
748

×
NEW
749
        rows, err := queries.GetNodeFeaturesByPubKey(
×
NEW
750
                ctx, sqlc.GetNodeFeaturesByPubKeyParams{
×
NEW
751
                        PubKey:  nodePub[:],
×
NEW
752
                        Version: int16(ProtocolV1),
×
NEW
753
                },
×
NEW
754
        )
×
NEW
755
        if err != nil {
×
NEW
756
                return nil, fmt.Errorf("unable to get node(%s) features: %w",
×
NEW
757
                        nodePub, err)
×
NEW
758
        }
×
759

NEW
760
        features := lnwire.EmptyFeatureVector()
×
NEW
761
        for _, bit := range rows {
×
NEW
762
                features.Set(lnwire.FeatureBit(bit))
×
NEW
763
        }
×
764

NEW
765
        return features, nil
×
766
}
767

768
// dbAddressType is an enum type that represents the different address types
769
// that we store in the node_addresses table. The address type determines how
770
// the address is to be serialised/deserialize.
771
type dbAddressType uint8
772

773
const (
774
        addressTypeIPv4   dbAddressType = 1
775
        addressTypeIPv6   dbAddressType = 2
776
        addressTypeTorV2  dbAddressType = 3
777
        addressTypeTorV3  dbAddressType = 4
778
        addressTypeOpaque dbAddressType = math.MaxInt8
779
)
780

781
// upsertNodeAddresses updates the node's addresses in the database. This
782
// includes deleting any existing addresses and inserting the new set of
783
// addresses. The deletion is necessary since the ordering of the addresses may
784
// change, and we need to ensure that the database reflects the latest set of
785
// addresses so that at the time of reconstructing the node announcement, the
786
// order is preserved and the signature over the message remains valid.
787
func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64,
NEW
788
        addresses []net.Addr) error {
×
NEW
789

×
NEW
790
        // Delete any existing addresses for the node. This is required since
×
NEW
791
        // even if the new set of addresses is the same, the ordering may have
×
NEW
792
        // changed for a given address type.
×
NEW
793
        err := db.DeleteNodeAddresses(ctx, nodeID)
×
NEW
794
        if err != nil {
×
NEW
795
                return fmt.Errorf("unable to delete node(%d) addresses: %w",
×
NEW
796
                        nodeID, err)
×
NEW
797
        }
×
798

799
        // Copy the nodes latest set of addresses.
NEW
800
        newAddresses := map[dbAddressType][]string{
×
NEW
801
                addressTypeIPv4:   {},
×
NEW
802
                addressTypeIPv6:   {},
×
NEW
803
                addressTypeTorV2:  {},
×
NEW
804
                addressTypeTorV3:  {},
×
NEW
805
                addressTypeOpaque: {},
×
NEW
806
        }
×
NEW
807
        addAddr := func(t dbAddressType, addr net.Addr) {
×
NEW
808
                newAddresses[t] = append(newAddresses[t], addr.String())
×
NEW
809
        }
×
810

NEW
811
        for _, address := range addresses {
×
NEW
812
                switch addr := address.(type) {
×
NEW
813
                case *net.TCPAddr:
×
NEW
814
                        if ip4 := addr.IP.To4(); ip4 != nil {
×
NEW
815
                                addAddr(addressTypeIPv4, addr)
×
NEW
816
                        } else if ip6 := addr.IP.To16(); ip6 != nil {
×
NEW
817
                                addAddr(addressTypeIPv6, addr)
×
NEW
818
                        } else {
×
NEW
819
                                return fmt.Errorf("unhandled IP address: %v",
×
NEW
820
                                        addr)
×
NEW
821
                        }
×
822

NEW
823
                case *tor.OnionAddr:
×
NEW
824
                        switch len(addr.OnionService) {
×
NEW
825
                        case tor.V2Len:
×
NEW
826
                                addAddr(addressTypeTorV2, addr)
×
NEW
827
                        case tor.V3Len:
×
NEW
828
                                addAddr(addressTypeTorV3, addr)
×
NEW
829
                        default:
×
NEW
830
                                return fmt.Errorf("invalid length for a tor " +
×
NEW
831
                                        "address")
×
832
                        }
833

NEW
834
                case *lnwire.OpaqueAddrs:
×
NEW
835
                        addAddr(addressTypeOpaque, addr)
×
836

NEW
837
                default:
×
NEW
838
                        return fmt.Errorf("unhandled address type: %T", addr)
×
839
                }
840
        }
841

842
        // Any remaining entries in newAddresses are new addresses that need to
843
        // be added to the database for the first time.
NEW
844
        for addrType, addrList := range newAddresses {
×
NEW
845
                for position, addr := range addrList {
×
NEW
846
                        err := db.InsertNodeAddress(
×
NEW
847
                                ctx, sqlc.InsertNodeAddressParams{
×
NEW
848
                                        NodeID:   nodeID,
×
NEW
849
                                        Type:     int16(addrType),
×
NEW
850
                                        Address:  addr,
×
NEW
851
                                        Position: int32(position),
×
NEW
852
                                },
×
NEW
853
                        )
×
NEW
854
                        if err != nil {
×
NEW
855
                                return fmt.Errorf("unable to insert "+
×
NEW
856
                                        "node(%d) address(%v): %w", nodeID,
×
NEW
857
                                        addr, err)
×
NEW
858
                        }
×
859
                }
860
        }
861

NEW
862
        return nil
×
863
}
864

865
// getNodeAddresses fetches the addresses for a node with the given public key.
866
func getNodeAddresses(ctx context.Context, db SQLQueries, nodePub []byte) (bool,
NEW
867
        []net.Addr, error) {
×
NEW
868

×
NEW
869
        // GetNodeAddressesByPubKey ensures that the addresses for a given type
×
NEW
870
        // are returned in the same order as they were inserted.
×
NEW
871
        rows, err := db.GetNodeAddressesByPubKey(
×
NEW
872
                ctx, sqlc.GetNodeAddressesByPubKeyParams{
×
NEW
873
                        Version: int16(ProtocolV1),
×
NEW
874
                        PubKey:  nodePub,
×
NEW
875
                },
×
NEW
876
        )
×
NEW
877
        if err != nil {
×
NEW
878
                return false, nil, err
×
NEW
879
        }
×
880

881
        // GetNodeAddressesByPubKey uses a left join so there should always be
882
        // at least one row returned if the node exists even if it has no
883
        // addresses.
NEW
884
        if len(rows) == 0 {
×
NEW
885
                return false, nil, nil
×
NEW
886
        }
×
887

NEW
888
        addresses := make([]net.Addr, 0, len(rows))
×
NEW
889
        for _, addr := range rows {
×
NEW
890
                if !(addr.Type.Valid && addr.Address.Valid) {
×
NEW
891
                        continue
×
892
                }
893

NEW
894
                address := addr.Address.String
×
NEW
895

×
NEW
896
                switch dbAddressType(addr.Type.Int16) {
×
NEW
897
                case addressTypeIPv4:
×
NEW
898
                        tcp, err := net.ResolveTCPAddr("tcp4", address)
×
NEW
899
                        if err != nil {
×
NEW
900
                                return false, nil, nil
×
NEW
901
                        }
×
NEW
902
                        tcp.IP = tcp.IP.To4()
×
NEW
903

×
NEW
904
                        addresses = append(addresses, tcp)
×
905

NEW
906
                case addressTypeIPv6:
×
NEW
907
                        tcp, err := net.ResolveTCPAddr("tcp6", address)
×
NEW
908
                        if err != nil {
×
NEW
909
                                return false, nil, nil
×
NEW
910
                        }
×
NEW
911
                        addresses = append(addresses, tcp)
×
912

NEW
913
                case addressTypeTorV3, addressTypeTorV2:
×
NEW
914
                        service, portStr, err := net.SplitHostPort(address)
×
NEW
915
                        if err != nil {
×
NEW
916
                                return false, nil, fmt.Errorf("unable to "+
×
NEW
917
                                        "split tor v3 address: %v",
×
NEW
918
                                        addr.Address)
×
NEW
919
                        }
×
920

NEW
921
                        port, err := strconv.Atoi(portStr)
×
NEW
922
                        if err != nil {
×
NEW
923
                                return false, nil, err
×
NEW
924
                        }
×
925

NEW
926
                        addresses = append(addresses, &tor.OnionAddr{
×
NEW
927
                                OnionService: service,
×
NEW
928
                                Port:         port,
×
NEW
929
                        })
×
930

NEW
931
                case addressTypeOpaque:
×
NEW
932
                        opaque, err := hex.DecodeString(address)
×
NEW
933
                        if err != nil {
×
NEW
934
                                return false, nil, fmt.Errorf("unable to "+
×
NEW
935
                                        "decode opaque address: %v", addr)
×
NEW
936
                        }
×
937

NEW
938
                        addresses = append(addresses, &lnwire.OpaqueAddrs{
×
NEW
939
                                Payload: opaque,
×
NEW
940
                        })
×
941

NEW
942
                default:
×
NEW
943
                        return false, nil, fmt.Errorf("unknown address "+
×
NEW
944
                                "type: %v", addr.Type)
×
945
                }
946
        }
947

NEW
948
        return true, addresses, nil
×
949
}
950

951
// upsertNodeExtraSignedFields updates the node's extra signed fields in the
952
// database. This includes updating any existing types, inserting any new types,
953
// and deleting any types that are no longer present.
954
func upsertNodeExtraSignedFields(ctx context.Context, db SQLQueries,
NEW
955
        nodeID int64, extraFields map[uint64][]byte) error {
×
NEW
956

×
NEW
957
        // Get any existing extra signed fields for the node.
×
NEW
958
        existingFields, err := db.GetExtraNodeTypes(ctx, nodeID)
×
NEW
959
        if err != nil {
×
NEW
960
                return err
×
NEW
961
        }
×
962

963
        // Make a lookup map of the existing field types so that we can use it
964
        // to keep track of any fields we should delete.
NEW
965
        m := make(map[uint64]bool)
×
NEW
966
        for _, field := range existingFields {
×
NEW
967
                m[uint64(field.Type)] = true
×
NEW
968
        }
×
969

970
        // For all the new fields, we'll upsert them and remove them from the
971
        // map of existing fields.
NEW
972
        for tlvType, value := range extraFields {
×
NEW
973
                err = db.UpsertNodeExtraType(
×
NEW
974
                        ctx, sqlc.UpsertNodeExtraTypeParams{
×
NEW
975
                                NodeID: nodeID,
×
NEW
976
                                Type:   int64(tlvType),
×
NEW
977
                                Value:  value,
×
NEW
978
                        },
×
NEW
979
                )
×
NEW
980
                if err != nil {
×
NEW
981
                        return fmt.Errorf("unable to upsert node(%d) extra "+
×
NEW
982
                                "signed field(%v): %w", nodeID, tlvType, err)
×
NEW
983
                }
×
984

985
                // Remove the field from the map of existing fields if it was
986
                // present.
NEW
987
                delete(m, tlvType)
×
988
        }
989

990
        // For all the fields that are left in the map of existing fields, we'll
991
        // delete them as they are no longer present in the new set of fields.
NEW
992
        for tlvType := range m {
×
NEW
993
                err = db.DeleteExtraNodeType(
×
NEW
994
                        ctx, sqlc.DeleteExtraNodeTypeParams{
×
NEW
995
                                NodeID: nodeID,
×
NEW
996
                                Type:   int64(tlvType),
×
NEW
997
                        },
×
NEW
998
                )
×
NEW
999
                if err != nil {
×
NEW
1000
                        return fmt.Errorf("unable to delete node(%d) extra "+
×
NEW
1001
                                "signed field(%v): %w", nodeID, tlvType, err)
×
NEW
1002
                }
×
1003
        }
1004

NEW
1005
        return nil
×
1006
}
1007

1008
// getSourceNode returns the DB node ID and pub key of the source node for the
1009
// specified protocol version.
1010
func getSourceNode(ctx context.Context, db SQLQueries,
NEW
1011
        version ProtocolVersion) (int64, route.Vertex, error) {
×
NEW
1012

×
NEW
1013
        var pubKey route.Vertex
×
NEW
1014

×
NEW
1015
        nodes, err := db.GetSourceNodesByVersion(ctx, int16(version))
×
NEW
1016
        if err != nil {
×
NEW
1017
                return 0, pubKey, fmt.Errorf("unable to fetch source node: %w",
×
NEW
1018
                        err)
×
NEW
1019
        }
×
1020

NEW
1021
        if len(nodes) == 0 {
×
NEW
1022
                return 0, pubKey, ErrSourceNodeNotSet
×
NEW
1023
        } else if len(nodes) > 1 {
×
NEW
1024
                return 0, pubKey, fmt.Errorf("multiple source nodes for "+
×
NEW
1025
                        "protocol %s found", version)
×
NEW
1026
        }
×
1027

NEW
1028
        copy(pubKey[:], nodes[0].PubKey)
×
NEW
1029

×
NEW
1030
        return nodes[0].NodeID, pubKey, nil
×
1031
}
1032

1033
// marshalExtraOpaqueData takes a flat byte slice parses it as a TLV stream.
1034
// This then produces a map from TLV type to value. If the input is not a
1035
// valid TLV stream, then an error is returned.
NEW
1036
func marshalExtraOpaqueData(data []byte) (map[uint64][]byte, error) {
×
NEW
1037
        r := bytes.NewReader(data)
×
NEW
1038

×
NEW
1039
        tlvStream, err := tlv.NewStream()
×
NEW
1040
        if err != nil {
×
NEW
1041
                return nil, err
×
NEW
1042
        }
×
1043

1044
        // Since ExtraOpaqueData is provided by a potentially malicious peer,
1045
        // pass it into the P2P decoding variant.
NEW
1046
        parsedTypes, err := tlvStream.DecodeWithParsedTypesP2P(r)
×
NEW
1047
        if err != nil {
×
NEW
1048
                return nil, err
×
NEW
1049
        }
×
NEW
1050
        if len(parsedTypes) == 0 {
×
NEW
1051
                return nil, nil
×
NEW
1052
        }
×
1053

NEW
1054
        records := make(map[uint64][]byte)
×
NEW
1055
        for k, v := range parsedTypes {
×
NEW
1056
                records[uint64(k)] = v
×
NEW
1057
        }
×
1058

NEW
1059
        return records, nil
×
1060
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc