• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15294244242

28 May 2025 07:32AM UTC coverage: 58.279% (-0.08%) from 58.362%
15294244242

Pull #9869

github

web-flow
Merge 08fc1d655 into 8e96bd030
Pull Request #9869: sqldb+graph/db: add channel tables and implement some channel CRUD

2 of 176 new or added lines in 2 files covered. (1.14%)

72 existing lines in 13 files now uncovered.

97453 of 167217 relevant lines covered (58.28%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/graph/db/sql_store.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "context"
6
        "database/sql"
7
        "encoding/hex"
8
        "errors"
9
        "fmt"
10
        "math"
11
        "net"
12
        "strconv"
13
        "sync"
14
        "time"
15

16
        "github.com/btcsuite/btcd/btcec/v2"
17
        "github.com/lightningnetwork/lnd/batch"
18
        "github.com/lightningnetwork/lnd/graph/db/models"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/routing/route"
21
        "github.com/lightningnetwork/lnd/sqldb"
22
        "github.com/lightningnetwork/lnd/sqldb/sqlc"
23
        "github.com/lightningnetwork/lnd/tlv"
24
        "github.com/lightningnetwork/lnd/tor"
25
)
26

27
// ProtocolVersion is an enum that defines the gossip protocol version of a
28
// message.
29
type ProtocolVersion uint8
30

31
const (
32
        // ProtocolV1 is the gossip protocol version defined in BOLT #7.
33
        ProtocolV1 ProtocolVersion = 1
34
)
35

36
// String returns a string representation of the protocol version.
37
func (v ProtocolVersion) String() string {
×
38
        return fmt.Sprintf("V%d", v)
×
39
}
×
40

41
// SQLQueries is a subset of the sqlc.Querier interface that can be used to
42
// execute queries against the SQL graph tables.
43
//
44
//nolint:ll,interfacebloat
45
type SQLQueries interface {
46
        /*
47
                Node queries.
48
        */
49
        UpsertNode(ctx context.Context, arg sqlc.UpsertNodeParams) (int64, error)
50
        GetNodeByPubKey(ctx context.Context, arg sqlc.GetNodeByPubKeyParams) (sqlc.Node, error)
51
        GetNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetNodesByLastUpdateRangeParams) ([]sqlc.Node, error)
52
        DeleteNodeByPubKey(ctx context.Context, arg sqlc.DeleteNodeByPubKeyParams) (sql.Result, error)
53

54
        GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]sqlc.NodeExtraType, error)
55
        UpsertNodeExtraType(ctx context.Context, arg sqlc.UpsertNodeExtraTypeParams) error
56
        DeleteExtraNodeType(ctx context.Context, arg sqlc.DeleteExtraNodeTypeParams) error
57

58
        InsertNodeAddress(ctx context.Context, arg sqlc.InsertNodeAddressParams) error
59
        GetNodeAddressesByPubKey(ctx context.Context, arg sqlc.GetNodeAddressesByPubKeyParams) ([]sqlc.GetNodeAddressesByPubKeyRow, error)
60
        DeleteNodeAddresses(ctx context.Context, nodeID int64) error
61

62
        InsertNodeFeature(ctx context.Context, arg sqlc.InsertNodeFeatureParams) error
63
        GetNodeFeatures(ctx context.Context, nodeID int64) ([]sqlc.NodeFeature, error)
64
        GetNodeFeaturesByPubKey(ctx context.Context, arg sqlc.GetNodeFeaturesByPubKeyParams) ([]int32, error)
65
        DeleteNodeFeature(ctx context.Context, arg sqlc.DeleteNodeFeatureParams) error
66

67
        /*
68
                Source node queries.
69
        */
70
        AddSourceNode(ctx context.Context, nodeID int64) error
71
        GetSourceNodesByVersion(ctx context.Context, version int16) ([]sqlc.GetSourceNodesByVersionRow, error)
72

73
        /*
74
                Channel queries.
75
        */
76
        CreateChannel(ctx context.Context, arg sqlc.CreateChannelParams) (int64, error)
77
        GetChannelBySCID(ctx context.Context, arg sqlc.GetChannelBySCIDParams) (sqlc.Channel, error)
78
        HighestSCID(ctx context.Context, version int16) ([]byte, error)
79

80
        CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
81
        InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
82
}
83

84
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
85
// database operations.
86
type BatchedSQLQueries interface {
87
        SQLQueries
88
        sqldb.BatchedTx[SQLQueries]
89
}
90

91
// SQLStore is an implementation of the V1Store interface that uses a SQL
92
// database as the backend.
93
//
94
// NOTE: currently, this temporarily embeds the KVStore struct so that we can
95
// implement the V1Store interface incrementally. For any method not
96
// implemented,  things will fall back to the KVStore. This is ONLY the case
97
// for the time being while this struct is purely used in unit tests only.
98
type SQLStore struct {
99
        db BatchedSQLQueries
100

101
        // cacheMu guards all caches (rejectCache and chanCache). If
102
        // this mutex will be acquired at the same time as the DB mutex then
103
        // the cacheMu MUST be acquired first to prevent deadlock.
104
        cacheMu     sync.RWMutex
105
        rejectCache *rejectCache
106
        chanCache   *channelCache
107

108
        chanScheduler batch.Scheduler[SQLQueries]
109
        nodeScheduler batch.Scheduler[SQLQueries]
110

111
        // Temporary fall-back to the KVStore so that we can implement the
112
        // interface incrementally.
113
        *KVStore
114
}
115

116
// A compile-time assertion to ensure that SQLStore implements the V1Store
117
// interface.
118
var _ V1Store = (*SQLStore)(nil)
119

120
// NewSQLStore creates a new SQLStore instance given an open BatchedSQLQueries
121
// storage backend.
122
func NewSQLStore(db BatchedSQLQueries, kvStore *KVStore,
123
        options ...StoreOptionModifier) (*SQLStore, error) {
×
124

×
125
        opts := DefaultOptions()
×
126
        for _, o := range options {
×
127
                o(opts)
×
128
        }
×
129

130
        if opts.NoMigration {
×
131
                return nil, fmt.Errorf("the NoMigration option is not yet " +
×
132
                        "supported for SQL stores")
×
133
        }
×
134

135
        s := &SQLStore{
×
136
                db:          db,
×
137
                KVStore:     kvStore,
×
138
                rejectCache: newRejectCache(opts.RejectCacheSize),
×
139
                chanCache:   newChannelCache(opts.ChannelCacheSize),
×
140
        }
×
141

×
142
        s.chanScheduler = batch.NewTimeScheduler(
×
143
                db, &s.cacheMu, opts.BatchCommitInterval,
×
144
        )
×
145
        s.nodeScheduler = batch.NewTimeScheduler(
×
146
                db, nil, opts.BatchCommitInterval,
×
147
        )
×
148

×
149
        return s, nil
×
150
}
151

152
// TxOptions defines the set of db txn options the SQLQueries
153
// understands.
154
type TxOptions struct {
155
        // readOnly governs if a read only transaction is needed or not.
156
        readOnly bool
157
}
158

159
// ReadOnly returns true if the transaction should be read only.
160
//
161
// NOTE: This implements the TxOptions.
162
func (a *TxOptions) ReadOnly() bool {
×
163
        return a.readOnly
×
164
}
×
165

166
// NewReadTx creates a new read transaction option set.
167
func NewReadTx() *TxOptions {
×
168
        return &TxOptions{
×
169
                readOnly: true,
×
170
        }
×
171
}
×
172

173
// AddLightningNode adds a vertex/node to the graph database. If the node is not
174
// in the database from before, this will add a new, unconnected one to the
175
// graph. If it is present from before, this will update that node's
176
// information.
177
//
178
// NOTE: part of the V1Store interface.
179
func (s *SQLStore) AddLightningNode(node *models.LightningNode,
180
        opts ...batch.SchedulerOption) error {
×
181

×
182
        ctx := context.TODO()
×
183

×
184
        r := &batch.Request[SQLQueries]{
×
185
                Opts: batch.NewSchedulerOptions(opts...),
×
186
                Do: func(queries SQLQueries) error {
×
187
                        _, err := upsertNode(ctx, queries, node)
×
188
                        return err
×
189
                },
×
190
        }
191

192
        return s.nodeScheduler.Execute(ctx, r)
×
193
}
194

195
// FetchLightningNode attempts to look up a target node by its identity public
196
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
197
// returned.
198
//
199
// NOTE: part of the V1Store interface.
200
func (s *SQLStore) FetchLightningNode(pubKey route.Vertex) (
201
        *models.LightningNode, error) {
×
202

×
203
        ctx := context.TODO()
×
204

×
205
        var (
×
206
                readTx = NewReadTx()
×
207
                node   *models.LightningNode
×
208
        )
×
209
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
210
                var err error
×
211
                _, node, err = getNodeByPubKey(ctx, db, pubKey)
×
212

×
213
                return err
×
214
        }, func() {})
×
215
        if err != nil {
×
216
                return nil, fmt.Errorf("unable to fetch node: %w", err)
×
217
        }
×
218

219
        return node, nil
×
220
}
221

222
// HasLightningNode determines if the graph has a vertex identified by the
223
// target node identity public key. If the node exists in the database, a
224
// timestamp of when the data for the node was lasted updated is returned along
225
// with a true boolean. Otherwise, an empty time.Time is returned with a false
226
// boolean.
227
//
228
// NOTE: part of the V1Store interface.
229
func (s *SQLStore) HasLightningNode(pubKey [33]byte) (time.Time, bool,
230
        error) {
×
231

×
232
        ctx := context.TODO()
×
233

×
234
        var (
×
235
                readTx     = NewReadTx()
×
236
                exists     bool
×
237
                lastUpdate time.Time
×
238
        )
×
239
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
240
                dbNode, err := db.GetNodeByPubKey(
×
241
                        ctx, sqlc.GetNodeByPubKeyParams{
×
242
                                Version: int16(ProtocolV1),
×
243
                                PubKey:  pubKey[:],
×
244
                        },
×
245
                )
×
246
                if errors.Is(err, sql.ErrNoRows) {
×
247
                        return nil
×
248
                } else if err != nil {
×
249
                        return fmt.Errorf("unable to fetch node: %w", err)
×
250
                }
×
251

252
                exists = true
×
253

×
254
                if dbNode.LastUpdate.Valid {
×
255
                        lastUpdate = time.Unix(dbNode.LastUpdate.Int64, 0)
×
256
                }
×
257

258
                return nil
×
259
        }, func() {})
×
260
        if err != nil {
×
261
                return time.Time{}, false,
×
262
                        fmt.Errorf("unable to fetch node: %w", err)
×
263
        }
×
264

265
        return lastUpdate, exists, nil
×
266
}
267

268
// AddrsForNode returns all known addresses for the target node public key
269
// that the graph DB is aware of. The returned boolean indicates if the
270
// given node is unknown to the graph DB or not.
271
//
272
// NOTE: part of the V1Store interface.
273
func (s *SQLStore) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
274
        error) {
×
275

×
276
        ctx := context.TODO()
×
277

×
278
        var (
×
279
                readTx    = NewReadTx()
×
280
                addresses []net.Addr
×
281
                known     bool
×
282
        )
×
283
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
284
                var err error
×
285
                known, addresses, err = getNodeAddresses(
×
286
                        ctx, db, nodePub.SerializeCompressed(),
×
287
                )
×
288
                if err != nil {
×
289
                        return fmt.Errorf("unable to fetch node addresses: %w",
×
290
                                err)
×
291
                }
×
292

293
                return nil
×
294
        }, func() {})
×
295
        if err != nil {
×
296
                return false, nil, fmt.Errorf("unable to get addresses for "+
×
297
                        "node(%x): %w", nodePub.SerializeCompressed(), err)
×
298
        }
×
299

300
        return known, addresses, nil
×
301
}
302

303
// DeleteLightningNode starts a new database transaction to remove a vertex/node
304
// from the database according to the node's public key.
305
//
306
// NOTE: part of the V1Store interface.
307
func (s *SQLStore) DeleteLightningNode(pubKey route.Vertex) error {
×
308
        ctx := context.TODO()
×
309

×
310
        var writeTxOpts TxOptions
×
311
        err := s.db.ExecTx(ctx, &writeTxOpts, func(db SQLQueries) error {
×
312
                res, err := db.DeleteNodeByPubKey(
×
313
                        ctx, sqlc.DeleteNodeByPubKeyParams{
×
314
                                Version: int16(ProtocolV1),
×
315
                                PubKey:  pubKey[:],
×
316
                        },
×
317
                )
×
318
                if err != nil {
×
319
                        return err
×
320
                }
×
321

322
                rows, err := res.RowsAffected()
×
323
                if err != nil {
×
324
                        return err
×
325
                }
×
326

327
                if rows == 0 {
×
328
                        return ErrGraphNodeNotFound
×
329
                } else if rows > 1 {
×
330
                        return fmt.Errorf("deleted %d rows, expected 1", rows)
×
331
                }
×
332

333
                return err
×
334
        }, func() {})
×
335
        if err != nil {
×
336
                return fmt.Errorf("unable to delete node: %w", err)
×
337
        }
×
338

339
        return nil
×
340
}
341

342
// FetchNodeFeatures returns the features of the given node. If no features are
343
// known for the node, an empty feature vector is returned.
344
//
345
// NOTE: this is part of the graphdb.NodeTraverser interface.
346
func (s *SQLStore) FetchNodeFeatures(nodePub route.Vertex) (
347
        *lnwire.FeatureVector, error) {
×
348

×
349
        ctx := context.TODO()
×
350

×
351
        return fetchNodeFeatures(ctx, s.db, nodePub)
×
352
}
×
353

354
// LookupAlias attempts to return the alias as advertised by the target node.
355
//
356
// NOTE: part of the V1Store interface.
357
func (s *SQLStore) LookupAlias(pub *btcec.PublicKey) (string, error) {
×
358
        var (
×
359
                ctx    = context.TODO()
×
360
                readTx = NewReadTx()
×
361
                alias  string
×
362
        )
×
363
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
364
                dbNode, err := db.GetNodeByPubKey(
×
365
                        ctx, sqlc.GetNodeByPubKeyParams{
×
366
                                Version: int16(ProtocolV1),
×
367
                                PubKey:  pub.SerializeCompressed(),
×
368
                        },
×
369
                )
×
370
                if errors.Is(err, sql.ErrNoRows) {
×
371
                        return ErrNodeAliasNotFound
×
372
                } else if err != nil {
×
373
                        return fmt.Errorf("unable to fetch node: %w", err)
×
374
                }
×
375

376
                if !dbNode.Alias.Valid {
×
377
                        return ErrNodeAliasNotFound
×
378
                }
×
379

380
                alias = dbNode.Alias.String
×
381

×
382
                return nil
×
383
        }, func() {})
×
384
        if err != nil {
×
385
                return "", fmt.Errorf("unable to look up alias: %w", err)
×
386
        }
×
387

388
        return alias, nil
×
389
}
390

391
// SourceNode returns the source node of the graph. The source node is treated
392
// as the center node within a star-graph. This method may be used to kick off
393
// a path finding algorithm in order to explore the reachability of another
394
// node based off the source node.
395
//
396
// NOTE: part of the V1Store interface.
397
func (s *SQLStore) SourceNode() (*models.LightningNode, error) {
×
398
        ctx := context.TODO()
×
399

×
400
        var (
×
401
                readTx = NewReadTx()
×
402
                node   *models.LightningNode
×
403
        )
×
404
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
405
                _, nodePub, err := getSourceNode(ctx, db, ProtocolV1)
×
406
                if err != nil {
×
407
                        return fmt.Errorf("unable to fetch V1 source node: %w",
×
408
                                err)
×
409
                }
×
410

411
                _, node, err = getNodeByPubKey(ctx, db, nodePub)
×
412

×
413
                return err
×
414
        }, func() {})
×
415
        if err != nil {
×
416
                return nil, fmt.Errorf("unable to fetch source node: %w", err)
×
417
        }
×
418

419
        return node, nil
×
420
}
421

422
// SetSourceNode sets the source node within the graph database. The source
423
// node is to be used as the center of a star-graph within path finding
424
// algorithms.
425
//
426
// NOTE: part of the V1Store interface.
427
func (s *SQLStore) SetSourceNode(node *models.LightningNode) error {
×
428
        ctx := context.TODO()
×
429
        var writeTxOpts TxOptions
×
430

×
431
        return s.db.ExecTx(ctx, &writeTxOpts, func(db SQLQueries) error {
×
432
                id, err := upsertNode(ctx, db, node)
×
433
                if err != nil {
×
434
                        return fmt.Errorf("unable to upsert source node: %w",
×
435
                                err)
×
436
                }
×
437

438
                // Make sure that if a source node for this version is already
439
                // set, then the ID is the same as the one we are about to set.
440
                dbSourceNodeID, _, err := getSourceNode(ctx, db, ProtocolV1)
×
441
                if err != nil && !errors.Is(err, ErrSourceNodeNotSet) {
×
442
                        return fmt.Errorf("unable to fetch source node: %w",
×
443
                                err)
×
444
                } else if err == nil {
×
445
                        if dbSourceNodeID != id {
×
446
                                return fmt.Errorf("v1 source node already "+
×
447
                                        "set to a different node: %d vs %d",
×
448
                                        dbSourceNodeID, id)
×
449
                        }
×
450

451
                        return nil
×
452
                }
453

454
                return db.AddSourceNode(ctx, id)
×
455
        }, func() {})
×
456
}
457

458
// NodeUpdatesInHorizon returns all the known lightning node which have an
459
// update timestamp within the passed range. This method can be used by two
460
// nodes to quickly determine if they have the same set of up to date node
461
// announcements.
462
//
463
// NOTE: This is part of the V1Store interface.
464
func (s *SQLStore) NodeUpdatesInHorizon(startTime,
465
        endTime time.Time) ([]models.LightningNode, error) {
×
466

×
467
        ctx := context.TODO()
×
468

×
469
        var (
×
470
                readTx = NewReadTx()
×
471
                nodes  []models.LightningNode
×
472
        )
×
473
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
474
                dbNodes, err := db.GetNodesByLastUpdateRange(
×
475
                        ctx, sqlc.GetNodesByLastUpdateRangeParams{
×
476
                                StartTime: sqldb.SQLInt64(startTime.Unix()),
×
477
                                EndTime:   sqldb.SQLInt64(endTime.Unix()),
×
478
                        },
×
479
                )
×
480
                if err != nil {
×
481
                        return fmt.Errorf("unable to fetch nodes: %w", err)
×
482
                }
×
483

484
                for _, dbNode := range dbNodes {
×
485
                        node, err := buildNode(ctx, db, &dbNode)
×
486
                        if err != nil {
×
487
                                return fmt.Errorf("unable to build node: %w",
×
488
                                        err)
×
489
                        }
×
490

491
                        nodes = append(nodes, *node)
×
492
                }
493

494
                return nil
×
495
        }, func() {})
×
496
        if err != nil {
×
497
                return nil, fmt.Errorf("unable to fetch nodes: %w", err)
×
498
        }
×
499

500
        return nodes, nil
×
501
}
502

503
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
504
// undirected edge from the two target nodes are created. The information stored
505
// denotes the static attributes of the channel, such as the channelID, the keys
506
// involved in creation of the channel, and the set of features that the channel
507
// supports. The chanPoint and chanID are used to uniquely identify the edge
508
// globally within the database.
509
//
510
// NOTE: part of the V1Store interface.
511
func (s *SQLStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
NEW
512
        opts ...batch.SchedulerOption) error {
×
NEW
513

×
NEW
514
        ctx := context.TODO()
×
NEW
515

×
NEW
516
        var alreadyExists bool
×
NEW
517
        r := &batch.Request[SQLQueries]{
×
NEW
518
                Opts: batch.NewSchedulerOptions(opts...),
×
NEW
519
                Reset: func() {
×
NEW
520
                        alreadyExists = false
×
NEW
521
                },
×
NEW
522
                Do: func(tx SQLQueries) error {
×
NEW
523
                        err := insertChannel(ctx, tx, edge)
×
NEW
524

×
NEW
525
                        // Silence ErrEdgeAlreadyExist so that the batch can
×
NEW
526
                        // succeed, but propagate the error via local state.
×
NEW
527
                        if errors.Is(err, ErrEdgeAlreadyExist) {
×
NEW
528
                                alreadyExists = true
×
NEW
529
                                return nil
×
NEW
530
                        }
×
531

NEW
532
                        return err
×
533
                },
NEW
534
                OnCommit: func(err error) error {
×
NEW
535
                        switch {
×
NEW
536
                        case err != nil:
×
NEW
537
                                return err
×
NEW
538
                        case alreadyExists:
×
NEW
539
                                return ErrEdgeAlreadyExist
×
NEW
540
                        default:
×
NEW
541
                                s.rejectCache.remove(edge.ChannelID)
×
NEW
542
                                s.chanCache.remove(edge.ChannelID)
×
NEW
543
                                return nil
×
544
                        }
545
                },
546
        }
547

NEW
548
        return s.chanScheduler.Execute(ctx, r)
×
549
}
550

551
// HighestChanID returns the "highest" known channel ID in the channel graph.
552
// This represents the "newest" channel from the PoV of the chain. This method
553
// can be used by peers to quickly determine if they're graphs are in sync.
554
//
555
// NOTE: This is part of the V1Store interface.
NEW
556
func (s *SQLStore) HighestChanID() (uint64, error) {
×
NEW
557
        ctx := context.TODO()
×
NEW
558

×
NEW
559
        var (
×
NEW
560
                readTx        = NewReadTx()
×
NEW
561
                highestChanID uint64
×
NEW
562
        )
×
NEW
563
        err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
×
NEW
564
                chanID, err := db.HighestSCID(ctx, int16(ProtocolV1))
×
NEW
565
                if errors.Is(err, sql.ErrNoRows) {
×
NEW
566
                        return nil
×
NEW
567
                } else if err != nil {
×
NEW
568
                        return fmt.Errorf("unable to fetch highest chan ID: %w",
×
NEW
569
                                err)
×
NEW
570
                }
×
571

NEW
572
                highestChanID = byteOrder.Uint64(chanID)
×
NEW
573

×
NEW
574
                return nil
×
NEW
575
        }, func() {})
×
NEW
576
        if err != nil {
×
NEW
577
                return 0, fmt.Errorf("unable to fetch highest chan ID: %w", err)
×
NEW
578
        }
×
579

NEW
580
        return highestChanID, nil
×
581
}
582

583
// getNodeByPubKey attempts to look up a target node by its public key.
584
func getNodeByPubKey(ctx context.Context, db SQLQueries,
585
        pubKey route.Vertex) (int64, *models.LightningNode, error) {
×
586

×
587
        dbNode, err := db.GetNodeByPubKey(
×
588
                ctx, sqlc.GetNodeByPubKeyParams{
×
589
                        Version: int16(ProtocolV1),
×
590
                        PubKey:  pubKey[:],
×
591
                },
×
592
        )
×
593
        if errors.Is(err, sql.ErrNoRows) {
×
594
                return 0, nil, ErrGraphNodeNotFound
×
595
        } else if err != nil {
×
596
                return 0, nil, fmt.Errorf("unable to fetch node: %w", err)
×
597
        }
×
598

599
        node, err := buildNode(ctx, db, &dbNode)
×
600
        if err != nil {
×
601
                return 0, nil, fmt.Errorf("unable to build node: %w", err)
×
602
        }
×
603

604
        return dbNode.ID, node, nil
×
605
}
606

607
// buildNode constructs a LightningNode instance from the given database node
608
// record. The node's features, addresses and extra signed fields are also
609
// fetched from the database and set on the node.
610
func buildNode(ctx context.Context, db SQLQueries, dbNode *sqlc.Node) (
611
        *models.LightningNode, error) {
×
612

×
613
        if dbNode.Version != int16(ProtocolV1) {
×
614
                return nil, fmt.Errorf("unsupported node version: %d",
×
615
                        dbNode.Version)
×
616
        }
×
617

618
        var pub [33]byte
×
619
        copy(pub[:], dbNode.PubKey)
×
620

×
621
        node := &models.LightningNode{
×
NEW
622
                PubKeyBytes: pub,
×
NEW
623
                Features:    lnwire.EmptyFeatureVector(),
×
NEW
624
                LastUpdate:  time.Unix(0, 0),
×
625
        }
×
626

×
627
        if len(dbNode.Signature) == 0 {
×
628
                return node, nil
×
629
        }
×
630

631
        node.HaveNodeAnnouncement = true
×
632
        node.AuthSigBytes = dbNode.Signature
×
633
        node.Alias = dbNode.Alias.String
×
634
        node.LastUpdate = time.Unix(dbNode.LastUpdate.Int64, 0)
×
635

×
636
        var err error
×
637
        node.Color, err = DecodeHexColor(dbNode.Color.String)
×
638
        if err != nil {
×
639
                return nil, fmt.Errorf("unable to decode color: %w", err)
×
640
        }
×
641

642
        // Fetch the node's features.
643
        node.Features, err = getNodeFeatures(ctx, db, dbNode.ID)
×
644
        if err != nil {
×
645
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
646
                        "features: %w", dbNode.ID, err)
×
647
        }
×
648

649
        // Fetch the node's addresses.
650
        _, node.Addresses, err = getNodeAddresses(ctx, db, pub[:])
×
651
        if err != nil {
×
652
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
653
                        "addresses: %w", dbNode.ID, err)
×
654
        }
×
655

656
        // Fetch the node's extra signed fields.
657
        extraTLVMap, err := getNodeExtraSignedFields(ctx, db, dbNode.ID)
×
658
        if err != nil {
×
659
                return nil, fmt.Errorf("unable to fetch node(%d) "+
×
660
                        "extra signed fields: %w", dbNode.ID, err)
×
661
        }
×
662

663
        recs, err := lnwire.CustomRecords(extraTLVMap).Serialize()
×
664
        if err != nil {
×
665
                return nil, fmt.Errorf("unable to serialize extra signed "+
×
666
                        "fields: %w", err)
×
667
        }
×
668

669
        if len(recs) != 0 {
×
670
                node.ExtraOpaqueData = recs
×
671
        }
×
672

673
        return node, nil
×
674
}
675

676
// getNodeFeatures fetches the feature bits and constructs the feature vector
677
// for a node with the given DB ID.
678
func getNodeFeatures(ctx context.Context, db SQLQueries,
679
        nodeID int64) (*lnwire.FeatureVector, error) {
×
680

×
681
        rows, err := db.GetNodeFeatures(ctx, nodeID)
×
682
        if err != nil {
×
683
                return nil, fmt.Errorf("unable to get node(%d) features: %w",
×
684
                        nodeID, err)
×
685
        }
×
686

687
        features := lnwire.EmptyFeatureVector()
×
688
        for _, feature := range rows {
×
689
                features.Set(lnwire.FeatureBit(feature.FeatureBit))
×
690
        }
×
691

692
        return features, nil
×
693
}
694

695
// getNodeExtraSignedFields fetches the extra signed fields for a node with the
696
// given DB ID.
697
func getNodeExtraSignedFields(ctx context.Context, db SQLQueries,
698
        nodeID int64) (map[uint64][]byte, error) {
×
699

×
700
        fields, err := db.GetExtraNodeTypes(ctx, nodeID)
×
701
        if err != nil {
×
702
                return nil, fmt.Errorf("unable to get node(%d) extra "+
×
703
                        "signed fields: %w", nodeID, err)
×
704
        }
×
705

706
        extraFields := make(map[uint64][]byte)
×
707
        for _, field := range fields {
×
708
                extraFields[uint64(field.Type)] = field.Value
×
709
        }
×
710

711
        return extraFields, nil
×
712
}
713

714
// upsertNode upserts the node record into the database. If the node already
715
// exists, then the node's information is updated. If the node doesn't exist,
716
// then a new node is created. The node's features, addresses and extra TLV
717
// types are also updated. The node's DB ID is returned.
718
func upsertNode(ctx context.Context, db SQLQueries,
719
        node *models.LightningNode) (int64, error) {
×
720

×
721
        params := sqlc.UpsertNodeParams{
×
722
                Version: int16(ProtocolV1),
×
723
                PubKey:  node.PubKeyBytes[:],
×
724
        }
×
725

×
726
        if node.HaveNodeAnnouncement {
×
727
                params.LastUpdate = sqldb.SQLInt64(node.LastUpdate.Unix())
×
728
                params.Color = sqldb.SQLStr(EncodeHexColor(node.Color))
×
729
                params.Alias = sqldb.SQLStr(node.Alias)
×
730
                params.Signature = node.AuthSigBytes
×
731
        }
×
732

733
        nodeID, err := db.UpsertNode(ctx, params)
×
734
        if err != nil {
×
735
                return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes,
×
736
                        err)
×
737
        }
×
738

739
        // We can exit here if we don't have the announcement yet.
740
        if !node.HaveNodeAnnouncement {
×
741
                return nodeID, nil
×
742
        }
×
743

744
        // Update the node's features.
745
        err = upsertNodeFeatures(ctx, db, nodeID, node.Features)
×
746
        if err != nil {
×
747
                return 0, fmt.Errorf("inserting node features: %w", err)
×
748
        }
×
749

750
        // Update the node's addresses.
751
        err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses)
×
752
        if err != nil {
×
753
                return 0, fmt.Errorf("inserting node addresses: %w", err)
×
754
        }
×
755

756
        // Convert the flat extra opaque data into a map of TLV types to
757
        // values.
758
        extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
×
759
        if err != nil {
×
760
                return 0, fmt.Errorf("unable to marshal extra opaque data: %w",
×
761
                        err)
×
762
        }
×
763

764
        // Update the node's extra signed fields.
765
        err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra)
×
766
        if err != nil {
×
767
                return 0, fmt.Errorf("inserting node extra TLVs: %w", err)
×
768
        }
×
769

770
        return nodeID, nil
×
771
}
772

773
// upsertNodeFeatures updates the node's features node_features table. This
774
// includes deleting any feature bits no longer present and inserting any new
775
// feature bits. If the feature bit does not yet exist in the features table,
776
// then an entry is created in that table first.
777
func upsertNodeFeatures(ctx context.Context, db SQLQueries, nodeID int64,
778
        features *lnwire.FeatureVector) error {
×
779

×
780
        // Get any existing features for the node.
×
781
        existingFeatures, err := db.GetNodeFeatures(ctx, nodeID)
×
782
        if err != nil && !errors.Is(err, sql.ErrNoRows) {
×
783
                return err
×
784
        }
×
785

786
        // Copy the nodes latest set of feature bits.
787
        newFeatures := make(map[int32]struct{})
×
788
        if features != nil {
×
789
                for feature := range features.Features() {
×
790
                        newFeatures[int32(feature)] = struct{}{}
×
791
                }
×
792
        }
793

794
        // For any current feature that already exists in the DB, remove it from
795
        // the in-memory map. For any existing feature that does not exist in
796
        // the in-memory map, delete it from the database.
797
        for _, feature := range existingFeatures {
×
798
                // The feature is still present, so there are no updates to be
×
799
                // made.
×
800
                if _, ok := newFeatures[feature.FeatureBit]; ok {
×
801
                        delete(newFeatures, feature.FeatureBit)
×
802
                        continue
×
803
                }
804

805
                // The feature is no longer present, so we remove it from the
806
                // database.
807
                err := db.DeleteNodeFeature(ctx, sqlc.DeleteNodeFeatureParams{
×
808
                        NodeID:     nodeID,
×
809
                        FeatureBit: feature.FeatureBit,
×
810
                })
×
811
                if err != nil {
×
812
                        return fmt.Errorf("unable to delete node(%d) "+
×
813
                                "feature(%v): %w", nodeID, feature.FeatureBit,
×
814
                                err)
×
815
                }
×
816
        }
817

818
        // Any remaining entries in newFeatures are new features that need to be
819
        // added to the database for the first time.
820
        for feature := range newFeatures {
×
821
                err = db.InsertNodeFeature(ctx, sqlc.InsertNodeFeatureParams{
×
822
                        NodeID:     nodeID,
×
823
                        FeatureBit: feature,
×
824
                })
×
825
                if err != nil {
×
826
                        return fmt.Errorf("unable to insert node(%d) "+
×
827
                                "feature(%v): %w", nodeID, feature, err)
×
828
                }
×
829
        }
830

831
        return nil
×
832
}
833

834
// fetchNodeFeatures fetches the features for a node with the given public key.
835
func fetchNodeFeatures(ctx context.Context, queries SQLQueries,
836
        nodePub route.Vertex) (*lnwire.FeatureVector, error) {
×
837

×
838
        rows, err := queries.GetNodeFeaturesByPubKey(
×
839
                ctx, sqlc.GetNodeFeaturesByPubKeyParams{
×
840
                        PubKey:  nodePub[:],
×
841
                        Version: int16(ProtocolV1),
×
842
                },
×
843
        )
×
844
        if err != nil {
×
845
                return nil, fmt.Errorf("unable to get node(%s) features: %w",
×
846
                        nodePub, err)
×
847
        }
×
848

849
        features := lnwire.EmptyFeatureVector()
×
850
        for _, bit := range rows {
×
851
                features.Set(lnwire.FeatureBit(bit))
×
852
        }
×
853

854
        return features, nil
×
855
}
856

857
// dbAddressType is an enum type that represents the different address types
858
// that we store in the node_addresses table. The address type determines how
859
// the address is to be serialised/deserialize.
860
type dbAddressType uint8
861

862
const (
863
        addressTypeIPv4   dbAddressType = 1
864
        addressTypeIPv6   dbAddressType = 2
865
        addressTypeTorV2  dbAddressType = 3
866
        addressTypeTorV3  dbAddressType = 4
867
        addressTypeOpaque dbAddressType = math.MaxInt8
868
)
869

870
// upsertNodeAddresses updates the node's addresses in the database. This
871
// includes deleting any existing addresses and inserting the new set of
872
// addresses. The deletion is necessary since the ordering of the addresses may
873
// change, and we need to ensure that the database reflects the latest set of
874
// addresses so that at the time of reconstructing the node announcement, the
875
// order is preserved and the signature over the message remains valid.
876
func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64,
877
        addresses []net.Addr) error {
×
878

×
879
        // Delete any existing addresses for the node. This is required since
×
880
        // even if the new set of addresses is the same, the ordering may have
×
881
        // changed for a given address type.
×
882
        err := db.DeleteNodeAddresses(ctx, nodeID)
×
883
        if err != nil {
×
884
                return fmt.Errorf("unable to delete node(%d) addresses: %w",
×
885
                        nodeID, err)
×
886
        }
×
887

888
        // Copy the nodes latest set of addresses.
889
        newAddresses := map[dbAddressType][]string{
×
890
                addressTypeIPv4:   {},
×
891
                addressTypeIPv6:   {},
×
892
                addressTypeTorV2:  {},
×
893
                addressTypeTorV3:  {},
×
894
                addressTypeOpaque: {},
×
895
        }
×
896
        addAddr := func(t dbAddressType, addr net.Addr) {
×
897
                newAddresses[t] = append(newAddresses[t], addr.String())
×
898
        }
×
899

900
        for _, address := range addresses {
×
901
                switch addr := address.(type) {
×
902
                case *net.TCPAddr:
×
903
                        if ip4 := addr.IP.To4(); ip4 != nil {
×
904
                                addAddr(addressTypeIPv4, addr)
×
905
                        } else if ip6 := addr.IP.To16(); ip6 != nil {
×
906
                                addAddr(addressTypeIPv6, addr)
×
907
                        } else {
×
908
                                return fmt.Errorf("unhandled IP address: %v",
×
909
                                        addr)
×
910
                        }
×
911

912
                case *tor.OnionAddr:
×
913
                        switch len(addr.OnionService) {
×
914
                        case tor.V2Len:
×
915
                                addAddr(addressTypeTorV2, addr)
×
916
                        case tor.V3Len:
×
917
                                addAddr(addressTypeTorV3, addr)
×
918
                        default:
×
919
                                return fmt.Errorf("invalid length for a tor " +
×
920
                                        "address")
×
921
                        }
922

923
                case *lnwire.OpaqueAddrs:
×
924
                        addAddr(addressTypeOpaque, addr)
×
925

926
                default:
×
927
                        return fmt.Errorf("unhandled address type: %T", addr)
×
928
                }
929
        }
930

931
        // Any remaining entries in newAddresses are new addresses that need to
932
        // be added to the database for the first time.
933
        for addrType, addrList := range newAddresses {
×
934
                for position, addr := range addrList {
×
935
                        err := db.InsertNodeAddress(
×
936
                                ctx, sqlc.InsertNodeAddressParams{
×
937
                                        NodeID:   nodeID,
×
938
                                        Type:     int16(addrType),
×
939
                                        Address:  addr,
×
940
                                        Position: int32(position),
×
941
                                },
×
942
                        )
×
943
                        if err != nil {
×
944
                                return fmt.Errorf("unable to insert "+
×
945
                                        "node(%d) address(%v): %w", nodeID,
×
946
                                        addr, err)
×
947
                        }
×
948
                }
949
        }
950

951
        return nil
×
952
}
953

954
// getNodeAddresses fetches the addresses for a node with the given public key.
955
func getNodeAddresses(ctx context.Context, db SQLQueries, nodePub []byte) (bool,
956
        []net.Addr, error) {
×
957

×
958
        // GetNodeAddressesByPubKey ensures that the addresses for a given type
×
959
        // are returned in the same order as they were inserted.
×
960
        rows, err := db.GetNodeAddressesByPubKey(
×
961
                ctx, sqlc.GetNodeAddressesByPubKeyParams{
×
962
                        Version: int16(ProtocolV1),
×
963
                        PubKey:  nodePub,
×
964
                },
×
965
        )
×
966
        if err != nil {
×
967
                return false, nil, err
×
968
        }
×
969

970
        // GetNodeAddressesByPubKey uses a left join so there should always be
971
        // at least one row returned if the node exists even if it has no
972
        // addresses.
973
        if len(rows) == 0 {
×
974
                return false, nil, nil
×
975
        }
×
976

977
        addresses := make([]net.Addr, 0, len(rows))
×
978
        for _, addr := range rows {
×
979
                if !(addr.Type.Valid && addr.Address.Valid) {
×
980
                        continue
×
981
                }
982

983
                address := addr.Address.String
×
984

×
985
                switch dbAddressType(addr.Type.Int16) {
×
986
                case addressTypeIPv4:
×
987
                        tcp, err := net.ResolveTCPAddr("tcp4", address)
×
988
                        if err != nil {
×
989
                                return false, nil, nil
×
990
                        }
×
991
                        tcp.IP = tcp.IP.To4()
×
992

×
993
                        addresses = append(addresses, tcp)
×
994

995
                case addressTypeIPv6:
×
996
                        tcp, err := net.ResolveTCPAddr("tcp6", address)
×
997
                        if err != nil {
×
998
                                return false, nil, nil
×
999
                        }
×
1000
                        addresses = append(addresses, tcp)
×
1001

1002
                case addressTypeTorV3, addressTypeTorV2:
×
1003
                        service, portStr, err := net.SplitHostPort(address)
×
1004
                        if err != nil {
×
1005
                                return false, nil, fmt.Errorf("unable to "+
×
1006
                                        "split tor v3 address: %v",
×
1007
                                        addr.Address)
×
1008
                        }
×
1009

1010
                        port, err := strconv.Atoi(portStr)
×
1011
                        if err != nil {
×
1012
                                return false, nil, err
×
1013
                        }
×
1014

1015
                        addresses = append(addresses, &tor.OnionAddr{
×
1016
                                OnionService: service,
×
1017
                                Port:         port,
×
1018
                        })
×
1019

1020
                case addressTypeOpaque:
×
1021
                        opaque, err := hex.DecodeString(address)
×
1022
                        if err != nil {
×
1023
                                return false, nil, fmt.Errorf("unable to "+
×
1024
                                        "decode opaque address: %v", addr)
×
1025
                        }
×
1026

1027
                        addresses = append(addresses, &lnwire.OpaqueAddrs{
×
1028
                                Payload: opaque,
×
1029
                        })
×
1030

1031
                default:
×
1032
                        return false, nil, fmt.Errorf("unknown address "+
×
1033
                                "type: %v", addr.Type)
×
1034
                }
1035
        }
1036

1037
        return true, addresses, nil
×
1038
}
1039

1040
// upsertNodeExtraSignedFields updates the node's extra signed fields in the
1041
// database. This includes updating any existing types, inserting any new types,
1042
// and deleting any types that are no longer present.
1043
func upsertNodeExtraSignedFields(ctx context.Context, db SQLQueries,
1044
        nodeID int64, extraFields map[uint64][]byte) error {
×
1045

×
1046
        // Get any existing extra signed fields for the node.
×
1047
        existingFields, err := db.GetExtraNodeTypes(ctx, nodeID)
×
1048
        if err != nil {
×
1049
                return err
×
1050
        }
×
1051

1052
        // Make a lookup map of the existing field types so that we can use it
1053
        // to keep track of any fields we should delete.
1054
        m := make(map[uint64]bool)
×
1055
        for _, field := range existingFields {
×
1056
                m[uint64(field.Type)] = true
×
1057
        }
×
1058

1059
        // For all the new fields, we'll upsert them and remove them from the
1060
        // map of existing fields.
1061
        for tlvType, value := range extraFields {
×
1062
                err = db.UpsertNodeExtraType(
×
1063
                        ctx, sqlc.UpsertNodeExtraTypeParams{
×
1064
                                NodeID: nodeID,
×
1065
                                Type:   int64(tlvType),
×
1066
                                Value:  value,
×
1067
                        },
×
1068
                )
×
1069
                if err != nil {
×
1070
                        return fmt.Errorf("unable to upsert node(%d) extra "+
×
1071
                                "signed field(%v): %w", nodeID, tlvType, err)
×
1072
                }
×
1073

1074
                // Remove the field from the map of existing fields if it was
1075
                // present.
1076
                delete(m, tlvType)
×
1077
        }
1078

1079
        // For all the fields that are left in the map of existing fields, we'll
1080
        // delete them as they are no longer present in the new set of fields.
1081
        for tlvType := range m {
×
1082
                err = db.DeleteExtraNodeType(
×
1083
                        ctx, sqlc.DeleteExtraNodeTypeParams{
×
1084
                                NodeID: nodeID,
×
1085
                                Type:   int64(tlvType),
×
1086
                        },
×
1087
                )
×
1088
                if err != nil {
×
1089
                        return fmt.Errorf("unable to delete node(%d) extra "+
×
1090
                                "signed field(%v): %w", nodeID, tlvType, err)
×
1091
                }
×
1092
        }
1093

1094
        return nil
×
1095
}
1096

1097
// getSourceNode returns the DB node ID and pub key of the source node for the
1098
// specified protocol version.
1099
func getSourceNode(ctx context.Context, db SQLQueries,
1100
        version ProtocolVersion) (int64, route.Vertex, error) {
×
1101

×
1102
        var pubKey route.Vertex
×
1103

×
1104
        nodes, err := db.GetSourceNodesByVersion(ctx, int16(version))
×
1105
        if err != nil {
×
1106
                return 0, pubKey, fmt.Errorf("unable to fetch source node: %w",
×
1107
                        err)
×
1108
        }
×
1109

1110
        if len(nodes) == 0 {
×
1111
                return 0, pubKey, ErrSourceNodeNotSet
×
1112
        } else if len(nodes) > 1 {
×
1113
                return 0, pubKey, fmt.Errorf("multiple source nodes for "+
×
1114
                        "protocol %s found", version)
×
1115
        }
×
1116

1117
        copy(pubKey[:], nodes[0].PubKey)
×
1118

×
1119
        return nodes[0].NodeID, pubKey, nil
×
1120
}
1121

1122
// marshalExtraOpaqueData takes a flat byte slice parses it as a TLV stream.
1123
// This then produces a map from TLV type to value. If the input is not a
1124
// valid TLV stream, then an error is returned.
1125
func marshalExtraOpaqueData(data []byte) (map[uint64][]byte, error) {
×
1126
        r := bytes.NewReader(data)
×
1127

×
1128
        tlvStream, err := tlv.NewStream()
×
1129
        if err != nil {
×
1130
                return nil, err
×
1131
        }
×
1132

1133
        // Since ExtraOpaqueData is provided by a potentially malicious peer,
1134
        // pass it into the P2P decoding variant.
1135
        parsedTypes, err := tlvStream.DecodeWithParsedTypesP2P(r)
×
1136
        if err != nil {
×
1137
                return nil, err
×
1138
        }
×
1139
        if len(parsedTypes) == 0 {
×
1140
                return nil, nil
×
1141
        }
×
1142

1143
        records := make(map[uint64][]byte)
×
1144
        for k, v := range parsedTypes {
×
1145
                records[uint64(k)] = v
×
1146
        }
×
1147

1148
        return records, nil
×
1149
}
1150

1151
// insertChannel inserts a new channel record into the database.
1152
func insertChannel(ctx context.Context, db SQLQueries,
NEW
1153
        edge *models.ChannelEdgeInfo) error {
×
NEW
1154

×
NEW
1155
        var chanIDB [8]byte
×
NEW
1156
        byteOrder.PutUint64(chanIDB[:], edge.ChannelID)
×
NEW
1157

×
NEW
1158
        // Make sure that the channel doesn't already exist. We do this
×
NEW
1159
        // explicitly instead of relying on catching a unique constraint error
×
NEW
1160
        // because relying on SQL to throw that error would abort the entire
×
NEW
1161
        // batch of transactions.
×
NEW
1162
        _, err := db.GetChannelBySCID(
×
NEW
1163
                ctx, sqlc.GetChannelBySCIDParams{
×
NEW
1164
                        Scid:    chanIDB[:],
×
NEW
1165
                        Version: int16(ProtocolV1),
×
NEW
1166
                },
×
NEW
1167
        )
×
NEW
1168
        if err == nil {
×
NEW
1169
                return ErrEdgeAlreadyExist
×
NEW
1170
        } else if !errors.Is(err, sql.ErrNoRows) {
×
NEW
1171
                return fmt.Errorf("unable to fetch channel: %w", err)
×
NEW
1172
        }
×
1173

1174
        // Make sure that at least a "shell" entry for each node is present in
1175
        // the nodes table.
NEW
1176
        node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes)
×
NEW
1177
        if err != nil {
×
NEW
1178
                return fmt.Errorf("unable to create shell node: %w", err)
×
NEW
1179
        }
×
1180

NEW
1181
        node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes)
×
NEW
1182
        if err != nil {
×
NEW
1183
                return fmt.Errorf("unable to create shell node: %w", err)
×
NEW
1184
        }
×
1185

NEW
1186
        var capacity sql.NullInt64
×
NEW
1187
        if edge.Capacity != 0 {
×
NEW
1188
                capacity = sqldb.SQLInt64(int64(edge.Capacity))
×
NEW
1189
        }
×
1190

NEW
1191
        createParams := sqlc.CreateChannelParams{
×
NEW
1192
                Version:     int16(ProtocolV1),
×
NEW
1193
                Scid:        chanIDB[:],
×
NEW
1194
                NodeID1:     node1DBID,
×
NEW
1195
                NodeID2:     node2DBID,
×
NEW
1196
                Outpoint:    edge.ChannelPoint.String(),
×
NEW
1197
                Capacity:    capacity,
×
NEW
1198
                BitcoinKey1: edge.BitcoinKey1Bytes[:],
×
NEW
1199
                BitcoinKey2: edge.BitcoinKey2Bytes[:],
×
NEW
1200
        }
×
NEW
1201

×
NEW
1202
        if edge.AuthProof != nil {
×
NEW
1203
                proof := edge.AuthProof
×
NEW
1204

×
NEW
1205
                createParams.Node1Signature = proof.NodeSig1Bytes
×
NEW
1206
                createParams.Node2Signature = proof.NodeSig2Bytes
×
NEW
1207
                createParams.Bitcoin1Signature = proof.BitcoinSig1Bytes
×
NEW
1208
                createParams.Bitcoin2Signature = proof.BitcoinSig2Bytes
×
NEW
1209
        }
×
1210

1211
        // Insert the new channel record.
NEW
1212
        dbChanID, err := db.CreateChannel(ctx, createParams)
×
NEW
1213
        if err != nil {
×
NEW
1214
                return err
×
NEW
1215
        }
×
1216

1217
        // Insert any channel features.
NEW
1218
        if len(edge.Features) != 0 {
×
NEW
1219
                chanFeatures := lnwire.NewRawFeatureVector()
×
NEW
1220
                err := chanFeatures.Decode(bytes.NewReader(edge.Features))
×
NEW
1221
                if err != nil {
×
NEW
1222
                        return err
×
NEW
1223
                }
×
1224

NEW
1225
                fv := lnwire.NewFeatureVector(chanFeatures, lnwire.Features)
×
NEW
1226
                for feature := range fv.Features() {
×
NEW
1227
                        err = db.InsertChannelFeature(
×
NEW
1228
                                ctx, sqlc.InsertChannelFeatureParams{
×
NEW
1229
                                        ChannelID: dbChanID,
×
NEW
1230
                                        Bit:       int32(feature),
×
NEW
1231
                                },
×
NEW
1232
                        )
×
NEW
1233
                        if err != nil {
×
NEW
1234
                                return fmt.Errorf("unable to insert "+
×
NEW
1235
                                        "channel(%d) feature(%v): %w", dbChanID,
×
NEW
1236
                                        feature, err)
×
NEW
1237
                        }
×
1238
                }
1239
        }
1240

1241
        // Finally, insert any extra TLV fields in the channel announcement.
NEW
1242
        extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
×
NEW
1243
        if err != nil {
×
NEW
1244
                return fmt.Errorf("unable to marshal extra opaque data: %w",
×
NEW
1245
                        err)
×
NEW
1246
        }
×
1247

NEW
1248
        for tlvType, value := range extra {
×
NEW
1249
                err := db.CreateChannelExtraType(
×
NEW
1250
                        ctx, sqlc.CreateChannelExtraTypeParams{
×
NEW
1251
                                ChannelID: dbChanID,
×
NEW
1252
                                Type:      int64(tlvType),
×
NEW
1253
                                Value:     value,
×
NEW
1254
                        },
×
NEW
1255
                )
×
NEW
1256
                if err != nil {
×
NEW
1257
                        return fmt.Errorf("unable to upsert channel(%d) extra "+
×
NEW
1258
                                "signed field(%v): %w", edge.ChannelID,
×
NEW
1259
                                tlvType, err)
×
NEW
1260
                }
×
1261

1262
        }
1263

NEW
1264
        return nil
×
1265
}
1266

1267
// maybeCreateShellNode checks if a shell node entry exists for the
1268
// given public key. If it does not exist, then a new shell node entry is
1269
// created. The ID of the node is returned. A shell node only has a protocol
1270
// version and public key persisted.
1271
func maybeCreateShellNode(ctx context.Context, db SQLQueries,
NEW
1272
        pubKey route.Vertex) (int64, error) {
×
NEW
1273

×
NEW
1274
        dbNode, err := db.GetNodeByPubKey(
×
NEW
1275
                ctx, sqlc.GetNodeByPubKeyParams{
×
NEW
1276
                        PubKey:  pubKey[:],
×
NEW
1277
                        Version: int16(ProtocolV1),
×
NEW
1278
                },
×
NEW
1279
        )
×
NEW
1280
        // The node exists. Return the ID.
×
NEW
1281
        if err == nil {
×
NEW
1282
                return dbNode.ID, nil
×
NEW
1283
        } else if !errors.Is(err, sql.ErrNoRows) {
×
NEW
1284
                return 0, err
×
NEW
1285
        }
×
1286

1287
        // Otherwise, the node does not exist, so we create a shell entry for
1288
        // it.
NEW
1289
        id, err := db.UpsertNode(ctx, sqlc.UpsertNodeParams{
×
NEW
1290
                Version: int16(ProtocolV1),
×
NEW
1291
                PubKey:  pubKey[:],
×
NEW
1292
        })
×
NEW
1293
        if err != nil {
×
NEW
1294
                return 0, fmt.Errorf("unable to create shell node: %w", err)
×
NEW
1295
        }
×
1296

NEW
1297
        return id, nil
×
1298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc