• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17917482292

22 Sep 2025 01:50PM UTC coverage: 56.562% (-10.1%) from 66.668%
17917482292

Pull #10182

github

web-flow
Merge 9efe3bd8c into 055fb436e
Pull Request #10182: Aux feature bits

32 of 68 new or added lines in 5 files covered. (47.06%)

29734 existing lines in 467 files now uncovered.

98449 of 174056 relevant lines covered (56.56%)

1.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.47
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/chaincfg"
17
        "github.com/btcsuite/btcd/chaincfg/chainhash"
18
        "github.com/btcsuite/btcd/txscript"
19
        "github.com/btcsuite/btcd/wire"
20
        "github.com/lightninglabs/neutrino/cache"
21
        "github.com/lightninglabs/neutrino/cache/lru"
22
        "github.com/lightningnetwork/lnd/batch"
23
        "github.com/lightningnetwork/lnd/chainntnfs"
24
        "github.com/lightningnetwork/lnd/channeldb"
25
        "github.com/lightningnetwork/lnd/fn/v2"
26
        "github.com/lightningnetwork/lnd/graph"
27
        graphdb "github.com/lightningnetwork/lnd/graph/db"
28
        "github.com/lightningnetwork/lnd/graph/db/models"
29
        "github.com/lightningnetwork/lnd/input"
30
        "github.com/lightningnetwork/lnd/keychain"
31
        "github.com/lightningnetwork/lnd/lnpeer"
32
        "github.com/lightningnetwork/lnd/lnutils"
33
        "github.com/lightningnetwork/lnd/lnwallet"
34
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
35
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
36
        "github.com/lightningnetwork/lnd/lnwire"
37
        "github.com/lightningnetwork/lnd/multimutex"
38
        "github.com/lightningnetwork/lnd/netann"
39
        "github.com/lightningnetwork/lnd/routing/route"
40
        "github.com/lightningnetwork/lnd/ticker"
41
        "golang.org/x/time/rate"
42
)
43

44
const (
45
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
46
        // for a specific channel and direction that we'll accept over an
47
        // interval.
48
        DefaultMaxChannelUpdateBurst = 10
49

50
        // DefaultChannelUpdateInterval is the default interval we'll use to
51
        // determine how often we should allow a new update for a specific
52
        // channel and direction.
53
        DefaultChannelUpdateInterval = time.Minute
54

55
        // maxPrematureUpdates tracks the max amount of premature channel
56
        // updates that we'll hold onto.
57
        maxPrematureUpdates = 100
58

59
        // maxFutureMessages tracks the max amount of future messages that
60
        // we'll hold onto.
61
        maxFutureMessages = 1000
62

63
        // DefaultSubBatchDelay is the default delay we'll use when
64
        // broadcasting the next announcement batch.
65
        DefaultSubBatchDelay = 5 * time.Second
66

67
        // maxRejectedUpdates tracks the max amount of rejected channel updates
68
        // we'll maintain. This is the global size across all peers. We'll
69
        // allocate ~3 MB max to the cache.
70
        maxRejectedUpdates = 10_000
71

72
        // DefaultProofMatureDelta specifies the default value used for
73
        // ProofMatureDelta, which is the number of confirmations needed before
74
        // processing the announcement signatures.
75
        DefaultProofMatureDelta = 6
76
)
77

78
var (
79
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
80
        // is in the process of being shut down.
81
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
82

83
        // ErrGossipSyncerNotFound signals that we were unable to find an active
84
        // gossip syncer corresponding to a gossip query message received from
85
        // the remote peer.
86
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
87

88
        // ErrNoFundingTransaction is returned when we are unable to find the
89
        // funding transaction described by the short channel ID on chain.
90
        ErrNoFundingTransaction = errors.New(
91
                "unable to find the funding transaction",
92
        )
93

94
        // ErrInvalidFundingOutput is returned if the channel funding output
95
        // fails validation.
96
        ErrInvalidFundingOutput = errors.New(
97
                "channel funding output validation failed",
98
        )
99

100
        // ErrChannelSpent is returned when we go to validate a channel, but
101
        // the purported funding output has actually already been spent on
102
        // chain.
103
        ErrChannelSpent = errors.New("channel output has been spent")
104

105
        // emptyPubkey is used to compare compressed pubkeys against an empty
106
        // byte array.
107
        emptyPubkey [33]byte
108
)
109

110
// optionalMsgFields is a set of optional message fields that external callers
111
// can provide that serve useful when processing a specific network
112
// announcement.
113
type optionalMsgFields struct {
114
        capacity      *btcutil.Amount
115
        channelPoint  *wire.OutPoint
116
        remoteAlias   *lnwire.ShortChannelID
117
        tapscriptRoot fn.Option[chainhash.Hash]
118
}
119

120
// apply applies the optional fields within the functional options.
121
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
2✔
122
        for _, optionalMsgField := range optionalMsgFields {
4✔
123
                optionalMsgField(f)
2✔
124
        }
2✔
125
}
126

127
// OptionalMsgField is a functional option parameter that can be used to provide
128
// external information that is not included within a network message but serves
129
// useful when processing it.
130
type OptionalMsgField func(*optionalMsgFields)
131

132
// ChannelCapacity is an optional field that lets the gossiper know of the
133
// capacity of a channel.
134
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
2✔
135
        return func(f *optionalMsgFields) {
4✔
136
                f.capacity = &capacity
2✔
137
        }
2✔
138
}
139

140
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
141
// of a channel.
142
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
2✔
143
        return func(f *optionalMsgFields) {
4✔
144
                f.channelPoint = &op
2✔
145
        }
2✔
146
}
147

148
// TapscriptRoot is an optional field that lets the gossiper know of the root of
149
// the tapscript tree for a custom channel.
150
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
2✔
151
        return func(f *optionalMsgFields) {
4✔
152
                f.tapscriptRoot = root
2✔
153
        }
2✔
154
}
155

156
// RemoteAlias is an optional field that lets the gossiper know that a locally
157
// sent channel update is actually an update for the peer that should replace
158
// the ShortChannelID field with the remote's alias. This is only used for
159
// channels with peers where the option-scid-alias feature bit was negotiated.
160
// The channel update will be added to the graph under the original SCID, but
161
// will be modified and re-signed with this alias.
162
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
2✔
163
        return func(f *optionalMsgFields) {
4✔
164
                f.remoteAlias = alias
2✔
165
        }
2✔
166
}
167

168
// networkMsg couples a routing related wire message with the peer that
169
// originally sent it.
170
type networkMsg struct {
171
        peer              lnpeer.Peer
172
        source            *btcec.PublicKey
173
        msg               lnwire.Message
174
        optionalMsgFields *optionalMsgFields
175

176
        isRemote bool
177

178
        err chan error
179
}
180

181
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
182
// wishes to update a particular set of channels. New ChannelUpdate messages
183
// will be crafted to be sent out during the next broadcast epoch and the fee
184
// updates committed to the lower layer.
185
type chanPolicyUpdateRequest struct {
186
        edgesToUpdate []EdgeWithInfo
187
        errChan       chan error
188
}
189

190
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
191
// syncer at all times.
192
type PinnedSyncers map[route.Vertex]struct{}
193

194
// Config defines the configuration for the service. ALL elements within the
195
// configuration MUST be non-nil for the service to carry out its duties.
196
type Config struct {
197
        // ChainParams holds the chain parameters for the active network this
198
        // node is participating on.
199
        ChainParams *chaincfg.Params
200

201
        // Graph is the subsystem which is responsible for managing the
202
        // topology of lightning network. After incoming channel, node, channel
203
        // updates announcements are validated they are sent to the router in
204
        // order to be included in the LN graph.
205
        Graph graph.ChannelGraphSource
206

207
        // ChainIO represents an abstraction over a source that can query the
208
        // blockchain.
209
        ChainIO lnwallet.BlockChainIO
210

211
        // ChanSeries is an interfaces that provides access to a time series
212
        // view of the current known channel graph. Each GossipSyncer enabled
213
        // peer will utilize this in order to create and respond to channel
214
        // graph time series queries.
215
        ChanSeries ChannelGraphTimeSeries
216

217
        // Notifier is used for receiving notifications of incoming blocks.
218
        // With each new incoming block found we process previously premature
219
        // announcements.
220
        //
221
        // TODO(roasbeef): could possibly just replace this with an epoch
222
        // channel.
223
        Notifier chainntnfs.ChainNotifier
224

225
        // Broadcast broadcasts a particular set of announcements to all peers
226
        // that the daemon is connected to. If supplied, the exclude parameter
227
        // indicates that the target peer should be excluded from the
228
        // broadcast.
229
        Broadcast func(skips map[route.Vertex]struct{},
230
                msg ...lnwire.Message) error
231

232
        // NotifyWhenOnline is a function that allows the gossiper to be
233
        // notified when a certain peer comes online, allowing it to
234
        // retry sending a peer message.
235
        //
236
        // NOTE: The peerChan channel must be buffered.
237
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
238

239
        // NotifyWhenOffline is a function that allows the gossiper to be
240
        // notified when a certain peer disconnects, allowing it to request a
241
        // notification for when it reconnects.
242
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
243

244
        // FetchSelfAnnouncement retrieves our current node announcement, for
245
        // use when determining whether we should update our peers about our
246
        // presence in the network.
247
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
248

249
        // UpdateSelfAnnouncement produces a new announcement for our node with
250
        // an updated timestamp which can be broadcast to our peers.
251
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
252

253
        // ProofMatureDelta the number of confirmations which is needed before
254
        // exchange the channel announcement proofs.
255
        ProofMatureDelta uint32
256

257
        // TrickleDelay the period of trickle timer which flushes to the
258
        // network the pending batch of new announcements we've received since
259
        // the last trickle tick.
260
        TrickleDelay time.Duration
261

262
        // RetransmitTicker is a ticker that ticks with a period which
263
        // indicates that we should check if we need re-broadcast any of our
264
        // personal channels.
265
        RetransmitTicker ticker.Ticker
266

267
        // RebroadcastInterval is the maximum time we wait between sending out
268
        // channel updates for our active channels and our own node
269
        // announcement. We do this to ensure our active presence on the
270
        // network is known, and we are not being considered a zombie node or
271
        // having zombie channels.
272
        RebroadcastInterval time.Duration
273

274
        // WaitingProofStore is a persistent storage of partial channel proof
275
        // announcement messages. We use it to buffer half of the material
276
        // needed to reconstruct a full authenticated channel announcement.
277
        // Once we receive the other half the channel proof, we'll be able to
278
        // properly validate it and re-broadcast it out to the network.
279
        //
280
        // TODO(wilmer): make interface to prevent channeldb dependency.
281
        WaitingProofStore *channeldb.WaitingProofStore
282

283
        // MessageStore is a persistent storage of gossip messages which we will
284
        // use to determine which messages need to be resent for a given peer.
285
        MessageStore GossipMessageStore
286

287
        // AnnSigner is an instance of the MessageSigner interface which will
288
        // be used to manually sign any outgoing channel updates. The signer
289
        // implementation should be backed by the public key of the backing
290
        // Lightning node.
291
        //
292
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
293
        // here?
294
        AnnSigner lnwallet.MessageSigner
295

296
        // ScidCloser is an instance of ClosedChannelTracker that helps the
297
        // gossiper cut down on spam channel announcements for already closed
298
        // channels.
299
        ScidCloser ClosedChannelTracker
300

301
        // NumActiveSyncers is the number of peers for which we should have
302
        // active syncers with. After reaching NumActiveSyncers, any future
303
        // gossip syncers will be passive.
304
        NumActiveSyncers int
305

306
        // NoTimestampQueries will prevent the GossipSyncer from querying
307
        // timestamps of announcement messages from the peer and from replying
308
        // to timestamp queries.
309
        NoTimestampQueries bool
310

311
        // RotateTicker is a ticker responsible for notifying the SyncManager
312
        // when it should rotate its active syncers. A single active syncer with
313
        // a chansSynced state will be exchanged for a passive syncer in order
314
        // to ensure we don't keep syncing with the same peers.
315
        RotateTicker ticker.Ticker
316

317
        // HistoricalSyncTicker is a ticker responsible for notifying the
318
        // syncManager when it should attempt a historical sync with a gossip
319
        // sync peer.
320
        HistoricalSyncTicker ticker.Ticker
321

322
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
323
        // syncManager when it should attempt to start the next pending
324
        // activeSyncer due to the current one not completing its state machine
325
        // within the timeout.
326
        ActiveSyncerTimeoutTicker ticker.Ticker
327

328
        // MinimumBatchSize is minimum size of a sub batch of announcement
329
        // messages.
330
        MinimumBatchSize int
331

332
        // SubBatchDelay is the delay between sending sub batches of
333
        // gossip messages.
334
        SubBatchDelay time.Duration
335

336
        // IgnoreHistoricalFilters will prevent syncers from replying with
337
        // historical data when the remote peer sets a gossip_timestamp_range.
338
        // This prevents ranges with old start times from causing us to dump the
339
        // graph on connect.
340
        IgnoreHistoricalFilters bool
341

342
        // PinnedSyncers is a set of peers that will always transition to
343
        // ActiveSync upon connection. These peers will never transition to
344
        // PassiveSync.
345
        PinnedSyncers PinnedSyncers
346

347
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
348
        // specific channel and direction that we'll accept over an interval.
349
        MaxChannelUpdateBurst int
350

351
        // ChannelUpdateInterval specifies the interval we'll use to determine
352
        // how often we should allow a new update for a specific channel and
353
        // direction.
354
        ChannelUpdateInterval time.Duration
355

356
        // IsAlias returns true if a given ShortChannelID is an alias for
357
        // option_scid_alias channels.
358
        IsAlias func(scid lnwire.ShortChannelID) bool
359

360
        // SignAliasUpdate is used to re-sign a channel update using the
361
        // remote's alias if the option-scid-alias feature bit was negotiated.
362
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
363
                error)
364

365
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
366
        // This is used for channels that have negotiated the option-scid-alias
367
        // feature bit.
368
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
369
                lnwire.ShortChannelID, error)
370

371
        // GetAlias allows the gossiper to look up the peer's alias for a given
372
        // ChannelID. This is used to sign updates for them if the channel has
373
        // no AuthProof and the option-scid-alias feature bit was negotiated.
374
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
375

376
        // FindChannel allows the gossiper to find a channel that we're party
377
        // to without iterating over the entire set of open channels.
378
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
379
                *channeldb.OpenChannel, error)
380

381
        // IsStillZombieChannel takes the timestamps of the latest channel
382
        // updates for a channel and returns true if the channel should be
383
        // considered a zombie based on these timestamps.
384
        IsStillZombieChannel func(time.Time, time.Time) bool
385

386
        // AssumeChannelValid toggles whether the gossiper will check for
387
        // spent-ness of channel outpoints. For neutrino, this saves long
388
        // rescans from blocking initial usage of the daemon.
389
        AssumeChannelValid bool
390

391
        // MsgRateBytes is the rate limit for the number of bytes per second
392
        // that we'll allocate to outbound gossip messages.
393
        MsgRateBytes uint64
394

395
        // MsgBurstBytes is the allotted burst amount in bytes. This is the
396
        // number of starting tokens in our token bucket algorithm.
397
        MsgBurstBytes uint64
398

399
        // FilterConcurrency is the maximum number of concurrent gossip filter
400
        // applications that can be processed.
401
        FilterConcurrency int
402

403
        // BanThreshold is the score used to decide whether a given peer is
404
        // banned or not.
405
        BanThreshold uint64
406

407
        // PeerMsgRateBytes is the rate limit for the number of bytes per second
408
        // that we'll allocate to outbound gossip messages for a single peer.
409
        PeerMsgRateBytes uint64
410
}
411

412
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
413
// used to let the caller of the lru.Cache know if a message has already been
414
// processed or not.
415
type processedNetworkMsg struct {
416
        processed bool
417
        msg       *networkMsg
418
}
419

420
// cachedNetworkMsg is a wrapper around a network message that can be used with
421
// *lru.Cache.
422
//
423
// NOTE: This struct is not thread safe which means you need to assure no
424
// concurrent read write access to it and all its contents which are pointers
425
// as well.
426
type cachedNetworkMsg struct {
427
        msgs []*processedNetworkMsg
428
}
429

430
// Size returns the "size" of an entry. We return the number of items as we
431
// just want to limit the total amount of entries rather than do accurate size
432
// accounting.
433
func (c *cachedNetworkMsg) Size() (uint64, error) {
2✔
434
        return uint64(len(c.msgs)), nil
2✔
435
}
2✔
436

437
// rejectCacheKey is the cache key that we'll use to track announcements we've
438
// recently rejected.
439
type rejectCacheKey struct {
440
        pubkey [33]byte
441
        chanID uint64
442
}
443

444
// newRejectCacheKey returns a new cache key for the reject cache.
445
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
2✔
446
        k := rejectCacheKey{
2✔
447
                chanID: cid,
2✔
448
                pubkey: pub,
2✔
449
        }
2✔
450

2✔
451
        return k
2✔
452
}
2✔
453

454
// sourceToPub returns a serialized-compressed public key for use in the reject
455
// cache.
456
func sourceToPub(pk *btcec.PublicKey) [33]byte {
2✔
457
        var pub [33]byte
2✔
458
        copy(pub[:], pk.SerializeCompressed())
2✔
459
        return pub
2✔
460
}
2✔
461

462
// cachedReject is the empty value used to track the value for rejects.
463
type cachedReject struct {
464
}
465

466
// Size returns the "size" of an entry. We return 1 as we just want to limit
467
// the total size.
UNCOV
468
func (c *cachedReject) Size() (uint64, error) {
×
UNCOV
469
        return 1, nil
×
UNCOV
470
}
×
471

472
// AuthenticatedGossiper is a subsystem which is responsible for receiving
473
// announcements, validating them and applying the changes to router, syncing
474
// lightning network with newly connected nodes, broadcasting announcements
475
// after validation, negotiating the channel announcement proofs exchange and
476
// handling the premature announcements. All outgoing announcements are
477
// expected to be properly signed as dictated in BOLT#7, additionally, all
478
// incoming message are expected to be well formed and signed. Invalid messages
479
// will be rejected by this struct.
480
type AuthenticatedGossiper struct {
481
        // Parameters which are needed to properly handle the start and stop of
482
        // the service.
483
        started sync.Once
484
        stopped sync.Once
485

486
        // bestHeight is the height of the block at the tip of the main chain
487
        // as we know it. Accesses *MUST* be done with the gossiper's lock
488
        // held.
489
        bestHeight uint32
490

491
        // cfg is a copy of the configuration struct that the gossiper service
492
        // was initialized with.
493
        cfg *Config
494

495
        // blockEpochs encapsulates a stream of block epochs that are sent at
496
        // every new block height.
497
        blockEpochs *chainntnfs.BlockEpochEvent
498

499
        // prematureChannelUpdates is a map of ChannelUpdates we have received
500
        // that wasn't associated with any channel we know about.  We store
501
        // them temporarily, such that we can reprocess them when a
502
        // ChannelAnnouncement for the channel is received.
503
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
504

505
        // banman tracks our peer's ban status.
506
        banman *banman
507

508
        // networkMsgs is a channel that carries new network broadcasted
509
        // message from outside the gossiper service to be processed by the
510
        // networkHandler.
511
        networkMsgs chan *networkMsg
512

513
        // futureMsgs is a list of premature network messages that have a block
514
        // height specified in the future. We will save them and resend it to
515
        // the chan networkMsgs once the block height has reached. The cached
516
        // map format is,
517
        //   {msgID1: msg1, msgID2: msg2, ...}
518
        futureMsgs *futureMsgCache
519

520
        // chanPolicyUpdates is a channel that requests to update the
521
        // forwarding policy of a set of channels is sent over.
522
        chanPolicyUpdates chan *chanPolicyUpdateRequest
523

524
        // selfKey is the identity public key of the backing Lightning node.
525
        selfKey *btcec.PublicKey
526

527
        // selfKeyLoc is the locator for the identity public key of the backing
528
        // Lightning node.
529
        selfKeyLoc keychain.KeyLocator
530

531
        // channelMtx is used to restrict the database access to one
532
        // goroutine per channel ID. This is done to ensure that when
533
        // the gossiper is handling an announcement, the db state stays
534
        // consistent between when the DB is first read until it's written.
535
        channelMtx *multimutex.Mutex[uint64]
536

537
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
538

539
        // syncMgr is a subsystem responsible for managing the gossip syncers
540
        // for peers currently connected. When a new peer is connected, the
541
        // manager will create its accompanying gossip syncer and determine
542
        // whether it should have an activeSync or passiveSync sync type based
543
        // on how many other gossip syncers are currently active. Any activeSync
544
        // gossip syncers are started in a round-robin manner to ensure we're
545
        // not syncing with multiple peers at the same time.
546
        syncMgr *SyncManager
547

548
        // reliableSender is a subsystem responsible for handling reliable
549
        // message send requests to peers. This should only be used for channels
550
        // that are unadvertised at the time of handling the message since if it
551
        // is advertised, then peers should be able to get the message from the
552
        // network.
553
        reliableSender *reliableSender
554

555
        // chanUpdateRateLimiter contains rate limiters for each direction of
556
        // a channel update we've processed. We'll use these to determine
557
        // whether we should accept a new update for a specific channel and
558
        // direction.
559
        //
560
        // NOTE: This map must be synchronized with the main
561
        // AuthenticatedGossiper lock.
562
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
563

564
        // vb is used to enforce job dependency ordering of gossip messages.
565
        vb *ValidationBarrier
566

567
        sync.Mutex
568

569
        cancel fn.Option[context.CancelFunc]
570
        quit   chan struct{}
571
        wg     sync.WaitGroup
572
}
573

574
// New creates a new AuthenticatedGossiper instance, initialized with the
575
// passed configuration parameters.
576
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
2✔
577
        gossiper := &AuthenticatedGossiper{
2✔
578
                selfKey:           selfKeyDesc.PubKey,
2✔
579
                selfKeyLoc:        selfKeyDesc.KeyLocator,
2✔
580
                cfg:               &cfg,
2✔
581
                networkMsgs:       make(chan *networkMsg),
2✔
582
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
2✔
583
                quit:              make(chan struct{}),
2✔
584
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
2✔
585
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
2✔
586
                        maxPrematureUpdates,
2✔
587
                ),
2✔
588
                channelMtx: multimutex.NewMutex[uint64](),
2✔
589
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
2✔
590
                        maxRejectedUpdates,
2✔
591
                ),
2✔
592
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
2✔
593
                banman:                newBanman(cfg.BanThreshold),
2✔
594
        }
2✔
595

2✔
596
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
2✔
597

2✔
598
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
2✔
599
                ChainHash:                *cfg.ChainParams.GenesisHash,
2✔
600
                ChanSeries:               cfg.ChanSeries,
2✔
601
                RotateTicker:             cfg.RotateTicker,
2✔
602
                HistoricalSyncTicker:     cfg.HistoricalSyncTicker,
2✔
603
                NumActiveSyncers:         cfg.NumActiveSyncers,
2✔
604
                NoTimestampQueries:       cfg.NoTimestampQueries,
2✔
605
                IgnoreHistoricalFilters:  cfg.IgnoreHistoricalFilters,
2✔
606
                BestHeight:               gossiper.latestHeight,
2✔
607
                PinnedSyncers:            cfg.PinnedSyncers,
2✔
608
                IsStillZombieChannel:     cfg.IsStillZombieChannel,
2✔
609
                AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
2✔
610
                AllotedMsgBytesBurst:     cfg.MsgBurstBytes,
2✔
611
                FilterConcurrency:        cfg.FilterConcurrency,
2✔
612
                PeerMsgBytesPerSecond:    cfg.PeerMsgRateBytes,
2✔
613
        })
2✔
614

2✔
615
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
2✔
616
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
2✔
617
                NotifyWhenOffline: cfg.NotifyWhenOffline,
2✔
618
                MessageStore:      cfg.MessageStore,
2✔
619
                IsMsgStale:        gossiper.isMsgStale,
2✔
620
        })
2✔
621

2✔
622
        return gossiper
2✔
623
}
2✔
624

625
// EdgeWithInfo contains the information that is required to update an edge.
626
type EdgeWithInfo struct {
627
        // Info describes the channel.
628
        Info *models.ChannelEdgeInfo
629

630
        // Edge describes the policy in one direction of the channel.
631
        Edge *models.ChannelEdgePolicy
632
}
633

634
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
635
// specified edge updates. Updates are done in two stages: first, the
636
// AuthenticatedGossiper ensures the update has been committed by dependent
637
// sub-systems, then it signs and broadcasts new updates to the network. A
638
// mapping between outpoints and updated channel policies is returned, which is
639
// used to update the forwarding policies of the underlying links.
640
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
641
        edgesToUpdate []EdgeWithInfo) error {
2✔
642

2✔
643
        errChan := make(chan error, 1)
2✔
644
        policyUpdate := &chanPolicyUpdateRequest{
2✔
645
                edgesToUpdate: edgesToUpdate,
2✔
646
                errChan:       errChan,
2✔
647
        }
2✔
648

2✔
649
        select {
2✔
650
        case d.chanPolicyUpdates <- policyUpdate:
2✔
651
                err := <-errChan
2✔
652
                return err
2✔
653
        case <-d.quit:
×
654
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
655
        }
656
}
657

658
// Start spawns network messages handler goroutine and registers on new block
659
// notifications in order to properly handle the premature announcements.
660
func (d *AuthenticatedGossiper) Start() error {
2✔
661
        var err error
2✔
662
        d.started.Do(func() {
4✔
663
                ctx, cancel := context.WithCancel(context.Background())
2✔
664
                d.cancel = fn.Some(cancel)
2✔
665

2✔
666
                log.Info("Authenticated Gossiper starting")
2✔
667
                err = d.start(ctx)
2✔
668
        })
2✔
669
        return err
2✔
670
}
671

672
func (d *AuthenticatedGossiper) start(ctx context.Context) error {
2✔
673
        // First we register for new notifications of newly discovered blocks.
2✔
674
        // We do this immediately so we'll later be able to consume any/all
2✔
675
        // blocks which were discovered.
2✔
676
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
2✔
677
        if err != nil {
2✔
678
                return err
×
679
        }
×
680
        d.blockEpochs = blockEpochs
2✔
681

2✔
682
        height, err := d.cfg.Graph.CurrentBlockHeight()
2✔
683
        if err != nil {
2✔
684
                return err
×
685
        }
×
686
        d.bestHeight = height
2✔
687

2✔
688
        // Start the reliable sender. In case we had any pending messages ready
2✔
689
        // to be sent when the gossiper was last shut down, we must continue on
2✔
690
        // our quest to deliver them to their respective peers.
2✔
691
        if err := d.reliableSender.Start(); err != nil {
2✔
692
                return err
×
693
        }
×
694

695
        d.syncMgr.Start()
2✔
696

2✔
697
        d.banman.start()
2✔
698

2✔
699
        // Start receiving blocks in its dedicated goroutine.
2✔
700
        d.wg.Add(2)
2✔
701
        go d.syncBlockHeight()
2✔
702
        go d.networkHandler(ctx)
2✔
703

2✔
704
        return nil
2✔
705
}
706

707
// syncBlockHeight syncs the best block height for the gossiper by reading
708
// blockEpochs.
709
//
710
// NOTE: must be run as a goroutine.
711
func (d *AuthenticatedGossiper) syncBlockHeight() {
2✔
712
        defer d.wg.Done()
2✔
713

2✔
714
        for {
4✔
715
                select {
2✔
716
                // A new block has arrived, so we can re-process the previously
717
                // premature announcements.
718
                case newBlock, ok := <-d.blockEpochs.Epochs:
2✔
719
                        // If the channel has been closed, then this indicates
2✔
720
                        // the daemon is shutting down, so we exit ourselves.
2✔
721
                        if !ok {
4✔
722
                                return
2✔
723
                        }
2✔
724

725
                        // Once a new block arrives, we update our running
726
                        // track of the height of the chain tip.
727
                        d.Lock()
2✔
728
                        blockHeight := uint32(newBlock.Height)
2✔
729
                        d.bestHeight = blockHeight
2✔
730
                        d.Unlock()
2✔
731

2✔
732
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
2✔
733
                                newBlock.Hash)
2✔
734

2✔
735
                        // Resend future messages, if any.
2✔
736
                        d.resendFutureMessages(blockHeight)
2✔
737

UNCOV
738
                case <-d.quit:
×
UNCOV
739
                        return
×
740
                }
741
        }
742
}
743

744
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
745
// the unique ID when saving the message.
746
type futureMsgCache struct {
747
        *lru.Cache[uint64, *cachedFutureMsg]
748

749
        // msgID is a monotonically increased integer.
750
        msgID atomic.Uint64
751
}
752

753
// nextMsgID returns a unique message ID.
754
func (f *futureMsgCache) nextMsgID() uint64 {
2✔
755
        return f.msgID.Add(1)
2✔
756
}
2✔
757

758
// newFutureMsgCache creates a new future message cache with the underlying lru
759
// cache being initialized with the specified capacity.
760
func newFutureMsgCache(capacity uint64) *futureMsgCache {
2✔
761
        // Create a new cache.
2✔
762
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
2✔
763

2✔
764
        return &futureMsgCache{
2✔
765
                Cache: cache,
2✔
766
        }
2✔
767
}
2✔
768

769
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
770
type cachedFutureMsg struct {
771
        // msg is the network message.
772
        msg *networkMsg
773

774
        // height is the block height.
775
        height uint32
776
}
777

778
// Size returns the size of the message.
779
func (c *cachedFutureMsg) Size() (uint64, error) {
2✔
780
        // Return a constant 1.
2✔
781
        return 1, nil
2✔
782
}
2✔
783

784
// resendFutureMessages takes a block height, resends all the future messages
785
// found below and equal to that height and deletes those messages found in the
786
// gossiper's futureMsgs.
787
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
2✔
788
        var (
2✔
789
                // msgs are the target messages.
2✔
790
                msgs []*networkMsg
2✔
791

2✔
792
                // keys are the target messages' caching keys.
2✔
793
                keys []uint64
2✔
794
        )
2✔
795

2✔
796
        // filterMsgs is the visitor used when iterating the future cache.
2✔
797
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
4✔
798
                if cmsg.height <= height {
4✔
799
                        msgs = append(msgs, cmsg.msg)
2✔
800
                        keys = append(keys, k)
2✔
801
                }
2✔
802

803
                return true
2✔
804
        }
805

806
        // Filter out the target messages.
807
        d.futureMsgs.Range(filterMsgs)
2✔
808

2✔
809
        // Return early if no messages found.
2✔
810
        if len(msgs) == 0 {
4✔
811
                return
2✔
812
        }
2✔
813

814
        // Remove the filtered messages.
815
        for _, key := range keys {
4✔
816
                d.futureMsgs.Delete(key)
2✔
817
        }
2✔
818

819
        log.Debugf("Resending %d network messages at height %d",
2✔
820
                len(msgs), height)
2✔
821

2✔
822
        for _, msg := range msgs {
4✔
823
                select {
2✔
824
                case d.networkMsgs <- msg:
2✔
825
                case <-d.quit:
×
826
                        msg.err <- ErrGossiperShuttingDown
×
827
                }
828
        }
829
}
830

831
// Stop signals any active goroutines for a graceful closure.
832
func (d *AuthenticatedGossiper) Stop() error {
2✔
833
        d.stopped.Do(func() {
4✔
834
                log.Info("Authenticated gossiper shutting down...")
2✔
835
                defer log.Debug("Authenticated gossiper shutdown complete")
2✔
836

2✔
837
                d.stop()
2✔
838
        })
2✔
839
        return nil
2✔
840
}
841

842
func (d *AuthenticatedGossiper) stop() {
2✔
843
        log.Debug("Authenticated Gossiper is stopping")
2✔
844
        defer log.Debug("Authenticated Gossiper stopped")
2✔
845

2✔
846
        // `blockEpochs` is only initialized in the start routine so we make
2✔
847
        // sure we don't panic here in the case where the `Stop` method is
2✔
848
        // called when the `Start` method does not complete.
2✔
849
        if d.blockEpochs != nil {
4✔
850
                d.blockEpochs.Cancel()
2✔
851
        }
2✔
852

853
        d.syncMgr.Stop()
2✔
854

2✔
855
        d.banman.stop()
2✔
856

2✔
857
        d.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
4✔
858
        close(d.quit)
2✔
859
        d.wg.Wait()
2✔
860

2✔
861
        // We'll stop our reliable sender after all of the gossiper's goroutines
2✔
862
        // have exited to ensure nothing can cause it to continue executing.
2✔
863
        d.reliableSender.Stop()
2✔
864
}
865

866
// TODO(roasbeef): need method to get current gossip timestamp?
867
//  * using mtx, check time rotate forward is needed?
868

869
// ProcessRemoteAnnouncement sends a new remote announcement message along with
870
// the peer that sent the routing message. The announcement will be processed
871
// then added to a queue for batched trickled announcement to all connected
872
// peers.  Remote channel announcements should contain the announcement proof
873
// and be fully validated.
874
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
875
        msg lnwire.Message, peer lnpeer.Peer) chan error {
2✔
876

2✔
877
        errChan := make(chan error, 1)
2✔
878

2✔
879
        // For messages in the known set of channel series queries, we'll
2✔
880
        // dispatch the message directly to the GossipSyncer, and skip the main
2✔
881
        // processing loop.
2✔
882
        switch m := msg.(type) {
2✔
883
        case *lnwire.QueryShortChanIDs,
884
                *lnwire.QueryChannelRange,
885
                *lnwire.ReplyChannelRange,
886
                *lnwire.ReplyShortChanIDsEnd:
2✔
887

2✔
888
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
2✔
889
                if !ok {
2✔
890
                        log.Warnf("Gossip syncer for peer=%x not found",
×
891
                                peer.PubKey())
×
892

×
893
                        errChan <- ErrGossipSyncerNotFound
×
894
                        return errChan
×
895
                }
×
896

897
                // If we've found the message target, then we'll dispatch the
898
                // message directly to it.
899
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
2✔
900
                if err != nil {
2✔
901
                        log.Errorf("Process query msg from peer %x got %v",
×
902
                                peer.PubKey(), err)
×
903
                }
×
904

905
                errChan <- err
2✔
906
                return errChan
2✔
907

908
        // If a peer is updating its current update horizon, then we'll dispatch
909
        // that directly to the proper GossipSyncer.
910
        case *lnwire.GossipTimestampRange:
2✔
911
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
2✔
912
                if !ok {
2✔
913
                        log.Warnf("Gossip syncer for peer=%x not found",
×
914
                                peer.PubKey())
×
915

×
916
                        errChan <- ErrGossipSyncerNotFound
×
917
                        return errChan
×
918
                }
×
919

920
                // Queue the message for asynchronous processing to prevent
921
                // blocking the gossiper when rate limiting is active.
922
                if !syncer.QueueTimestampRange(m) {
2✔
923
                        log.Warnf("Unable to queue gossip filter for peer=%x: "+
×
924
                                "queue full", peer.PubKey())
×
925

×
926
                        // Return nil to indicate we've handled the message,
×
927
                        // even though it was dropped. This prevents the peer
×
928
                        // from being disconnected.
×
929
                        errChan <- nil
×
930
                        return errChan
×
931
                }
×
932

933
                errChan <- nil
2✔
934
                return errChan
2✔
935

936
        // To avoid inserting edges in the graph for our own channels that we
937
        // have already closed, we ignore such channel announcements coming
938
        // from the remote.
939
        case *lnwire.ChannelAnnouncement1:
2✔
940
                ownKey := d.selfKey.SerializeCompressed()
2✔
941
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
2✔
942
                        "for own channel")
2✔
943

2✔
944
                if bytes.Equal(m.NodeID1[:], ownKey) ||
2✔
945
                        bytes.Equal(m.NodeID2[:], ownKey) {
4✔
946

2✔
947
                        log.Warn(ownErr)
2✔
948
                        errChan <- ownErr
2✔
949
                        return errChan
2✔
950
                }
2✔
951
        }
952

953
        nMsg := &networkMsg{
2✔
954
                msg:      msg,
2✔
955
                isRemote: true,
2✔
956
                peer:     peer,
2✔
957
                source:   peer.IdentityKey(),
2✔
958
                err:      errChan,
2✔
959
        }
2✔
960

2✔
961
        select {
2✔
962
        case d.networkMsgs <- nMsg:
2✔
963

964
        // If the peer that sent us this error is quitting, then we don't need
965
        // to send back an error and can return immediately.
966
        // TODO(elle): the peer should now just rely on canceling the passed
967
        //  context.
968
        case <-peer.QuitSignal():
×
969
                return nil
×
970
        case <-ctx.Done():
×
971
                return nil
×
972
        case <-d.quit:
×
973
                nMsg.err <- ErrGossiperShuttingDown
×
974
        }
975

976
        return nMsg.err
2✔
977
}
978

979
// ProcessLocalAnnouncement sends a new remote announcement message along with
980
// the peer that sent the routing message. The announcement will be processed
981
// then added to a queue for batched trickled announcement to all connected
982
// peers.  Local channel announcements don't contain the announcement proof and
983
// will not be fully validated. Once the channel proofs are received, the
984
// entire channel announcement and update messages will be re-constructed and
985
// broadcast to the rest of the network.
986
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
987
        optionalFields ...OptionalMsgField) chan error {
2✔
988

2✔
989
        optionalMsgFields := &optionalMsgFields{}
2✔
990
        optionalMsgFields.apply(optionalFields...)
2✔
991

2✔
992
        nMsg := &networkMsg{
2✔
993
                msg:               msg,
2✔
994
                optionalMsgFields: optionalMsgFields,
2✔
995
                isRemote:          false,
2✔
996
                source:            d.selfKey,
2✔
997
                err:               make(chan error, 1),
2✔
998
        }
2✔
999

2✔
1000
        select {
2✔
1001
        case d.networkMsgs <- nMsg:
2✔
1002
        case <-d.quit:
×
1003
                nMsg.err <- ErrGossiperShuttingDown
×
1004
        }
1005

1006
        return nMsg.err
2✔
1007
}
1008

1009
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
1010
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
1011
// tuple.
1012
type channelUpdateID struct {
1013
        // channelID represents the set of data which is needed to
1014
        // retrieve all necessary data to validate the channel existence.
1015
        channelID lnwire.ShortChannelID
1016

1017
        // Flags least-significant bit must be set to 0 if the creating node
1018
        // corresponds to the first node in the previously sent channel
1019
        // announcement and 1 otherwise.
1020
        flags lnwire.ChanUpdateChanFlags
1021
}
1022

1023
// msgWithSenders is a wrapper struct around a message, and the set of peers
1024
// that originally sent us this message. Using this struct, we can ensure that
1025
// we don't re-send a message to the peer that sent it to us in the first
1026
// place.
1027
type msgWithSenders struct {
1028
        // msg is the wire message itself.
1029
        msg lnwire.Message
1030

1031
        // isLocal is true if this was a message that originated locally. We'll
1032
        // use this to bypass our normal checks to ensure we prioritize sending
1033
        // out our own updates.
1034
        isLocal bool
1035

1036
        // sender is the set of peers that sent us this message.
1037
        senders map[route.Vertex]struct{}
1038
}
1039

1040
// mergeSyncerMap is used to merge the set of senders of a particular message
1041
// with peers that we have an active GossipSyncer with. We do this to ensure
1042
// that we don't broadcast messages to any peers that we have active gossip
1043
// syncers for.
1044
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
2✔
1045
        for peerPub := range syncers {
4✔
1046
                m.senders[peerPub] = struct{}{}
2✔
1047
        }
2✔
1048
}
1049

1050
// deDupedAnnouncements de-duplicates announcements that have been added to the
1051
// batch. Internally, announcements are stored in three maps
1052
// (one each for channel announcements, channel updates, and node
1053
// announcements). These maps keep track of unique announcements and ensure no
1054
// announcements are duplicated. We keep the three message types separate, such
1055
// that we can send channel announcements first, then channel updates, and
1056
// finally node announcements when it's time to broadcast them.
1057
type deDupedAnnouncements struct {
1058
        // channelAnnouncements are identified by the short channel id field.
1059
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
1060

1061
        // channelUpdates are identified by the channel update id field.
1062
        channelUpdates map[channelUpdateID]msgWithSenders
1063

1064
        // nodeAnnouncements are identified by the Vertex field.
1065
        nodeAnnouncements map[route.Vertex]msgWithSenders
1066

1067
        sync.Mutex
1068
}
1069

1070
// Reset operates on deDupedAnnouncements to reset the storage of
1071
// announcements.
1072
func (d *deDupedAnnouncements) Reset() {
2✔
1073
        d.Lock()
2✔
1074
        defer d.Unlock()
2✔
1075

2✔
1076
        d.reset()
2✔
1077
}
2✔
1078

1079
// reset is the private version of the Reset method. We have this so we can
1080
// call this method within method that are already holding the lock.
1081
func (d *deDupedAnnouncements) reset() {
2✔
1082
        // Storage of each type of announcement (channel announcements, channel
2✔
1083
        // updates, node announcements) is set to an empty map where the
2✔
1084
        // appropriate key points to the corresponding lnwire.Message.
2✔
1085
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
2✔
1086
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
2✔
1087
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
2✔
1088
}
2✔
1089

1090
// addMsg adds a new message to the current batch. If the message is already
1091
// present in the current batch, then this new instance replaces the latter,
1092
// and the set of senders is updated to reflect which node sent us this
1093
// message.
1094
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
2✔
1095
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
2✔
1096

2✔
1097
        // Depending on the message type (channel announcement, channel update,
2✔
1098
        // or node announcement), the message is added to the corresponding map
2✔
1099
        // in deDupedAnnouncements. Because each identifying key can have at
2✔
1100
        // most one value, the announcements are de-duplicated, with newer ones
2✔
1101
        // replacing older ones.
2✔
1102
        switch msg := message.msg.(type) {
2✔
1103

1104
        // Channel announcements are identified by the short channel id field.
1105
        case *lnwire.ChannelAnnouncement1:
2✔
1106
                deDupKey := msg.ShortChannelID
2✔
1107
                sender := route.NewVertex(message.source)
2✔
1108

2✔
1109
                mws, ok := d.channelAnnouncements[deDupKey]
2✔
1110
                if !ok {
4✔
1111
                        mws = msgWithSenders{
2✔
1112
                                msg:     msg,
2✔
1113
                                isLocal: !message.isRemote,
2✔
1114
                                senders: make(map[route.Vertex]struct{}),
2✔
1115
                        }
2✔
1116
                        mws.senders[sender] = struct{}{}
2✔
1117

2✔
1118
                        d.channelAnnouncements[deDupKey] = mws
2✔
1119

2✔
1120
                        return
2✔
1121
                }
2✔
1122

UNCOV
1123
                mws.msg = msg
×
UNCOV
1124
                mws.senders[sender] = struct{}{}
×
UNCOV
1125
                d.channelAnnouncements[deDupKey] = mws
×
1126

1127
        // Channel updates are identified by the (short channel id,
1128
        // channelflags) tuple.
1129
        case *lnwire.ChannelUpdate1:
2✔
1130
                sender := route.NewVertex(message.source)
2✔
1131
                deDupKey := channelUpdateID{
2✔
1132
                        msg.ShortChannelID,
2✔
1133
                        msg.ChannelFlags,
2✔
1134
                }
2✔
1135

2✔
1136
                oldTimestamp := uint32(0)
2✔
1137
                mws, ok := d.channelUpdates[deDupKey]
2✔
1138
                if ok {
2✔
UNCOV
1139
                        // If we already have seen this message, record its
×
UNCOV
1140
                        // timestamp.
×
UNCOV
1141
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
×
UNCOV
1142
                        if !ok {
×
1143
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1144
                                        "got: %T", mws.msg)
×
1145

×
1146
                                return
×
1147
                        }
×
1148

UNCOV
1149
                        oldTimestamp = update.Timestamp
×
1150
                }
1151

1152
                // If we already had this message with a strictly newer
1153
                // timestamp, then we'll just discard the message we got.
1154
                if oldTimestamp > msg.Timestamp {
2✔
UNCOV
1155
                        log.Debugf("Ignored outdated network message: "+
×
UNCOV
1156
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
×
UNCOV
1157
                        return
×
UNCOV
1158
                }
×
1159

1160
                // If the message we just got is newer than what we previously
1161
                // have seen, or this is the first time we see it, then we'll
1162
                // add it to our map of announcements.
1163
                if oldTimestamp < msg.Timestamp {
4✔
1164
                        mws = msgWithSenders{
2✔
1165
                                msg:     msg,
2✔
1166
                                isLocal: !message.isRemote,
2✔
1167
                                senders: make(map[route.Vertex]struct{}),
2✔
1168
                        }
2✔
1169

2✔
1170
                        // We'll mark the sender of the message in the
2✔
1171
                        // senders map.
2✔
1172
                        mws.senders[sender] = struct{}{}
2✔
1173

2✔
1174
                        d.channelUpdates[deDupKey] = mws
2✔
1175

2✔
1176
                        return
2✔
1177
                }
2✔
1178

1179
                // Lastly, if we had seen this exact message from before, with
1180
                // the same timestamp, we'll add the sender to the map of
1181
                // senders, such that we can skip sending this message back in
1182
                // the next batch.
UNCOV
1183
                mws.msg = msg
×
UNCOV
1184
                mws.senders[sender] = struct{}{}
×
UNCOV
1185
                d.channelUpdates[deDupKey] = mws
×
1186

1187
        // Node announcements are identified by the Vertex field.  Use the
1188
        // NodeID to create the corresponding Vertex.
1189
        case *lnwire.NodeAnnouncement:
2✔
1190
                sender := route.NewVertex(message.source)
2✔
1191
                deDupKey := route.Vertex(msg.NodeID)
2✔
1192

2✔
1193
                // We do the same for node announcements as we did for channel
2✔
1194
                // updates, as they also carry a timestamp.
2✔
1195
                oldTimestamp := uint32(0)
2✔
1196
                mws, ok := d.nodeAnnouncements[deDupKey]
2✔
1197
                if ok {
4✔
1198
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
2✔
1199
                }
2✔
1200

1201
                // Discard the message if it's old.
1202
                if oldTimestamp > msg.Timestamp {
4✔
1203
                        return
2✔
1204
                }
2✔
1205

1206
                // Replace if it's newer.
1207
                if oldTimestamp < msg.Timestamp {
4✔
1208
                        mws = msgWithSenders{
2✔
1209
                                msg:     msg,
2✔
1210
                                isLocal: !message.isRemote,
2✔
1211
                                senders: make(map[route.Vertex]struct{}),
2✔
1212
                        }
2✔
1213

2✔
1214
                        mws.senders[sender] = struct{}{}
2✔
1215

2✔
1216
                        d.nodeAnnouncements[deDupKey] = mws
2✔
1217

2✔
1218
                        return
2✔
1219
                }
2✔
1220

1221
                // Add to senders map if it's the same as we had.
1222
                mws.msg = msg
2✔
1223
                mws.senders[sender] = struct{}{}
2✔
1224
                d.nodeAnnouncements[deDupKey] = mws
2✔
1225
        }
1226
}
1227

1228
// AddMsgs is a helper method to add multiple messages to the announcement
1229
// batch.
1230
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
2✔
1231
        d.Lock()
2✔
1232
        defer d.Unlock()
2✔
1233

2✔
1234
        for _, msg := range msgs {
4✔
1235
                d.addMsg(msg)
2✔
1236
        }
2✔
1237
}
1238

1239
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1240
// to broadcast next into messages that are locally sourced and those that are
1241
// sourced remotely.
1242
type msgsToBroadcast struct {
1243
        // localMsgs is the set of messages we created locally.
1244
        localMsgs []msgWithSenders
1245

1246
        // remoteMsgs is the set of messages that we received from a remote
1247
        // party.
1248
        remoteMsgs []msgWithSenders
1249
}
1250

1251
// addMsg adds a new message to the appropriate sub-slice.
1252
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
2✔
1253
        if msg.isLocal {
4✔
1254
                m.localMsgs = append(m.localMsgs, msg)
2✔
1255
        } else {
4✔
1256
                m.remoteMsgs = append(m.remoteMsgs, msg)
2✔
1257
        }
2✔
1258
}
1259

1260
// isEmpty returns true if the batch is empty.
1261
func (m *msgsToBroadcast) isEmpty() bool {
2✔
1262
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
2✔
1263
}
2✔
1264

1265
// length returns the length of the combined message set.
UNCOV
1266
func (m *msgsToBroadcast) length() int {
×
UNCOV
1267
        return len(m.localMsgs) + len(m.remoteMsgs)
×
UNCOV
1268
}
×
1269

1270
// Emit returns the set of de-duplicated announcements to be sent out during
1271
// the next announcement epoch, in the order of channel announcements, channel
1272
// updates, and node announcements. Each message emitted, contains the set of
1273
// peers that sent us the message. This way, we can ensure that we don't waste
1274
// bandwidth by re-sending a message to the peer that sent it to us in the
1275
// first place. Additionally, the set of stored messages are reset.
1276
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
2✔
1277
        d.Lock()
2✔
1278
        defer d.Unlock()
2✔
1279

2✔
1280
        // Get the total number of announcements.
2✔
1281
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
2✔
1282
                len(d.nodeAnnouncements)
2✔
1283

2✔
1284
        // Create an empty array of lnwire.Messages with a length equal to
2✔
1285
        // the total number of announcements.
2✔
1286
        msgs := msgsToBroadcast{
2✔
1287
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
2✔
1288
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
2✔
1289
        }
2✔
1290

2✔
1291
        // Add the channel announcements to the array first.
2✔
1292
        for _, message := range d.channelAnnouncements {
4✔
1293
                msgs.addMsg(message)
2✔
1294
        }
2✔
1295

1296
        // Then add the channel updates.
1297
        for _, message := range d.channelUpdates {
4✔
1298
                msgs.addMsg(message)
2✔
1299
        }
2✔
1300

1301
        // Finally add the node announcements.
1302
        for _, message := range d.nodeAnnouncements {
4✔
1303
                msgs.addMsg(message)
2✔
1304
        }
2✔
1305

1306
        d.reset()
2✔
1307

2✔
1308
        // Return the array of lnwire.messages.
2✔
1309
        return msgs
2✔
1310
}
1311

1312
// calculateSubBatchSize is a helper function that calculates the size to break
1313
// down the batchSize into.
1314
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1315
        minimumBatchSize, batchSize int) int {
2✔
1316
        if subBatchDelay > totalDelay {
2✔
UNCOV
1317
                return batchSize
×
UNCOV
1318
        }
×
1319

1320
        subBatchSize := (batchSize*int(subBatchDelay) +
2✔
1321
                int(totalDelay) - 1) / int(totalDelay)
2✔
1322

2✔
1323
        if subBatchSize < minimumBatchSize {
4✔
1324
                return minimumBatchSize
2✔
1325
        }
2✔
1326

UNCOV
1327
        return subBatchSize
×
1328
}
1329

1330
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1331
// this variable so the function can be mocked in our test.
1332
var batchSizeCalculator = calculateSubBatchSize
1333

1334
// splitAnnouncementBatches takes an exiting list of announcements and
1335
// decomposes it into sub batches controlled by the `subBatchSize`.
1336
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1337
        announcementBatch []msgWithSenders) [][]msgWithSenders {
2✔
1338

2✔
1339
        subBatchSize := batchSizeCalculator(
2✔
1340
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
2✔
1341
                d.cfg.MinimumBatchSize, len(announcementBatch),
2✔
1342
        )
2✔
1343

2✔
1344
        var splitAnnouncementBatch [][]msgWithSenders
2✔
1345

2✔
1346
        for subBatchSize < len(announcementBatch) {
4✔
1347
                // For slicing with minimal allocation
2✔
1348
                // https://github.com/golang/go/wiki/SliceTricks
2✔
1349
                announcementBatch, splitAnnouncementBatch =
2✔
1350
                        announcementBatch[subBatchSize:],
2✔
1351
                        append(splitAnnouncementBatch,
2✔
1352
                                announcementBatch[0:subBatchSize:subBatchSize])
2✔
1353
        }
2✔
1354
        splitAnnouncementBatch = append(
2✔
1355
                splitAnnouncementBatch, announcementBatch,
2✔
1356
        )
2✔
1357

2✔
1358
        return splitAnnouncementBatch
2✔
1359
}
1360

1361
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1362
// split size, and then sends out all items to the set of target peers. Locally
1363
// generated announcements are always sent before remotely generated
1364
// announcements.
1365
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(ctx context.Context,
1366
        annBatch msgsToBroadcast) {
2✔
1367

2✔
1368
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
2✔
1369
        // duration to delay the sending of next announcement batch.
2✔
1370
        delayNextBatch := func() {
4✔
1371
                select {
2✔
1372
                case <-time.After(d.cfg.SubBatchDelay):
2✔
UNCOV
1373
                case <-d.quit:
×
UNCOV
1374
                        return
×
1375
                }
1376
        }
1377

1378
        // Fetch the local and remote announcements.
1379
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
2✔
1380
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
2✔
1381

2✔
1382
        d.wg.Add(1)
2✔
1383
        go func() {
4✔
1384
                defer d.wg.Done()
2✔
1385

2✔
1386
                log.Debugf("Broadcasting %v new local announcements in %d "+
2✔
1387
                        "sub batches", len(annBatch.localMsgs),
2✔
1388
                        len(localBatches))
2✔
1389

2✔
1390
                // Send out the local announcements first.
2✔
1391
                for _, annBatch := range localBatches {
4✔
1392
                        d.sendLocalBatch(annBatch)
2✔
1393
                        delayNextBatch()
2✔
1394
                }
2✔
1395

1396
                log.Debugf("Broadcasting %v new remote announcements in %d "+
2✔
1397
                        "sub batches", len(annBatch.remoteMsgs),
2✔
1398
                        len(remoteBatches))
2✔
1399

2✔
1400
                // Now send the remote announcements.
2✔
1401
                for _, annBatch := range remoteBatches {
4✔
1402
                        d.sendRemoteBatch(ctx, annBatch)
2✔
1403
                        delayNextBatch()
2✔
1404
                }
2✔
1405
        }()
1406
}
1407

1408
// sendLocalBatch broadcasts a list of locally generated announcements to our
1409
// peers. For local announcements, we skip the filter and dedup logic and just
1410
// send the announcements out to all our coonnected peers.
1411
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
2✔
1412
        msgsToSend := lnutils.Map(
2✔
1413
                annBatch, func(m msgWithSenders) lnwire.Message {
4✔
1414
                        return m.msg
2✔
1415
                },
2✔
1416
        )
1417

1418
        err := d.cfg.Broadcast(nil, msgsToSend...)
2✔
1419
        if err != nil {
2✔
1420
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1421
        }
×
1422
}
1423

1424
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1425
// peers.
1426
func (d *AuthenticatedGossiper) sendRemoteBatch(ctx context.Context,
1427
        annBatch []msgWithSenders) {
2✔
1428

2✔
1429
        syncerPeers := d.syncMgr.GossipSyncers()
2✔
1430

2✔
1431
        // We'll first attempt to filter out this new message for all peers
2✔
1432
        // that have active gossip syncers active.
2✔
1433
        for pub, syncer := range syncerPeers {
4✔
1434
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
2✔
1435
                syncer.FilterGossipMsgs(ctx, annBatch...)
2✔
1436
        }
2✔
1437

1438
        for _, msgChunk := range annBatch {
4✔
1439
                msgChunk := msgChunk
2✔
1440

2✔
1441
                // With the syncers taken care of, we'll merge the sender map
2✔
1442
                // with the set of syncers, so we don't send out duplicate
2✔
1443
                // messages.
2✔
1444
                msgChunk.mergeSyncerMap(syncerPeers)
2✔
1445

2✔
1446
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
2✔
1447
                if err != nil {
2✔
1448
                        log.Errorf("Unable to send batch "+
×
1449
                                "announcements: %v", err)
×
1450
                        continue
×
1451
                }
1452
        }
1453
}
1454

1455
// networkHandler is the primary goroutine that drives this service. The roles
1456
// of this goroutine includes answering queries related to the state of the
1457
// network, syncing up newly connected peers, and also periodically
1458
// broadcasting our latest topology state to all connected peers.
1459
//
1460
// NOTE: This MUST be run as a goroutine.
1461
func (d *AuthenticatedGossiper) networkHandler(ctx context.Context) {
2✔
1462
        defer d.wg.Done()
2✔
1463

2✔
1464
        // Initialize empty deDupedAnnouncements to store announcement batch.
2✔
1465
        announcements := deDupedAnnouncements{}
2✔
1466
        announcements.Reset()
2✔
1467

2✔
1468
        d.cfg.RetransmitTicker.Resume()
2✔
1469
        defer d.cfg.RetransmitTicker.Stop()
2✔
1470

2✔
1471
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
2✔
1472
        defer trickleTimer.Stop()
2✔
1473

2✔
1474
        // To start, we'll first check to see if there are any stale channel or
2✔
1475
        // node announcements that we need to re-transmit.
2✔
1476
        if err := d.retransmitStaleAnns(ctx, time.Now()); err != nil {
2✔
1477
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1478
        }
×
1479

1480
        for {
4✔
1481
                select {
2✔
1482
                // A new policy update has arrived. We'll commit it to the
1483
                // sub-systems below us, then craft, sign, and broadcast a new
1484
                // ChannelUpdate for the set of affected clients.
1485
                case policyUpdate := <-d.chanPolicyUpdates:
2✔
1486
                        log.Tracef("Received channel %d policy update requests",
2✔
1487
                                len(policyUpdate.edgesToUpdate))
2✔
1488

2✔
1489
                        // First, we'll now create new fully signed updates for
2✔
1490
                        // the affected channels and also update the underlying
2✔
1491
                        // graph with the new state.
2✔
1492
                        newChanUpdates, err := d.processChanPolicyUpdate(
2✔
1493
                                ctx, policyUpdate.edgesToUpdate,
2✔
1494
                        )
2✔
1495
                        policyUpdate.errChan <- err
2✔
1496
                        if err != nil {
2✔
1497
                                log.Errorf("Unable to craft policy updates: %v",
×
1498
                                        err)
×
1499
                                continue
×
1500
                        }
1501

1502
                        // Finally, with the updates committed, we'll now add
1503
                        // them to the announcement batch to be flushed at the
1504
                        // start of the next epoch.
1505
                        announcements.AddMsgs(newChanUpdates...)
2✔
1506

1507
                case announcement := <-d.networkMsgs:
2✔
1508
                        log.Tracef("Received network message: "+
2✔
1509
                                "peer=%v, msg=%s, is_remote=%v",
2✔
1510
                                announcement.peer, announcement.msg.MsgType(),
2✔
1511
                                announcement.isRemote)
2✔
1512

2✔
1513
                        switch announcement.msg.(type) {
2✔
1514
                        // Channel announcement signatures are amongst the only
1515
                        // messages that we'll process serially.
1516
                        case *lnwire.AnnounceSignatures1:
2✔
1517
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
2✔
1518
                                        ctx, announcement,
2✔
1519
                                )
2✔
1520
                                log.Debugf("Processed network message %s, "+
2✔
1521
                                        "returned len(announcements)=%v",
2✔
1522
                                        announcement.msg.MsgType(),
2✔
1523
                                        len(emittedAnnouncements))
2✔
1524

2✔
1525
                                if emittedAnnouncements != nil {
4✔
1526
                                        announcements.AddMsgs(
2✔
1527
                                                emittedAnnouncements...,
2✔
1528
                                        )
2✔
1529
                                }
2✔
1530
                                continue
2✔
1531
                        }
1532

1533
                        // If this message was recently rejected, then we won't
1534
                        // attempt to re-process it.
1535
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
2✔
1536
                                announcement.msg,
2✔
1537
                                sourceToPub(announcement.source),
2✔
1538
                        ) {
2✔
UNCOV
1539

×
UNCOV
1540
                                announcement.err <- fmt.Errorf("recently " +
×
UNCOV
1541
                                        "rejected")
×
UNCOV
1542
                                continue
×
1543
                        }
1544

1545
                        // We'll set up any dependent, and wait until a free
1546
                        // slot for this job opens up, this allow us to not
1547
                        // have thousands of goroutines active.
1548
                        annJobID, err := d.vb.InitJobDependencies(
2✔
1549
                                announcement.msg,
2✔
1550
                        )
2✔
1551
                        if err != nil {
2✔
1552
                                announcement.err <- err
×
1553
                                continue
×
1554
                        }
1555

1556
                        d.wg.Add(1)
2✔
1557
                        go d.handleNetworkMessages(
2✔
1558
                                ctx, announcement, &announcements, annJobID,
2✔
1559
                        )
2✔
1560

1561
                // The trickle timer has ticked, which indicates we should
1562
                // flush to the network the pending batch of new announcements
1563
                // we've received since the last trickle tick.
1564
                case <-trickleTimer.C:
2✔
1565
                        // Emit the current batch of announcements from
2✔
1566
                        // deDupedAnnouncements.
2✔
1567
                        announcementBatch := announcements.Emit()
2✔
1568

2✔
1569
                        // If the current announcements batch is nil, then we
2✔
1570
                        // have no further work here.
2✔
1571
                        if announcementBatch.isEmpty() {
4✔
1572
                                continue
2✔
1573
                        }
1574

1575
                        // At this point, we have the set of local and remote
1576
                        // announcements we want to send out. We'll do the
1577
                        // batching as normal for both, but for local
1578
                        // announcements, we'll blast them out w/o regard for
1579
                        // our peer's policies so we ensure they propagate
1580
                        // properly.
1581
                        d.splitAndSendAnnBatch(ctx, announcementBatch)
2✔
1582

1583
                // The retransmission timer has ticked which indicates that we
1584
                // should check if we need to prune or re-broadcast any of our
1585
                // personal channels or node announcement. This addresses the
1586
                // case of "zombie" channels and channel advertisements that
1587
                // have been dropped, or not properly propagated through the
1588
                // network.
UNCOV
1589
                case tick := <-d.cfg.RetransmitTicker.Ticks():
×
UNCOV
1590
                        if err := d.retransmitStaleAnns(ctx, tick); err != nil {
×
1591
                                log.Errorf("unable to rebroadcast stale "+
×
1592
                                        "announcements: %v", err)
×
1593
                        }
×
1594

1595
                // The gossiper has been signalled to exit, to we exit our
1596
                // main loop so the wait group can be decremented.
1597
                case <-d.quit:
2✔
1598
                        return
2✔
1599
                }
1600
        }
1601
}
1602

1603
// handleNetworkMessages is responsible for waiting for dependencies for a
1604
// given network message and processing the message. Once processed, it will
1605
// signal its dependants and add the new announcements to the announce batch.
1606
//
1607
// NOTE: must be run as a goroutine.
1608
func (d *AuthenticatedGossiper) handleNetworkMessages(ctx context.Context,
1609
        nMsg *networkMsg, deDuped *deDupedAnnouncements, jobID JobID) {
2✔
1610

2✔
1611
        defer d.wg.Done()
2✔
1612
        defer d.vb.CompleteJob()
2✔
1613

2✔
1614
        // We should only broadcast this message forward if it originated from
2✔
1615
        // us or it wasn't received as part of our initial historical sync.
2✔
1616
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
2✔
1617

2✔
1618
        // If this message has an existing dependency, then we'll wait until
2✔
1619
        // that has been fully validated before we proceed.
2✔
1620
        err := d.vb.WaitForParents(jobID, nMsg.msg)
2✔
1621
        if err != nil {
2✔
1622
                log.Debugf("Validating network message %s got err: %v",
×
1623
                        nMsg.msg.MsgType(), err)
×
1624

×
1625
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
1626
                        log.Warnf("unexpected error during validation "+
×
1627
                                "barrier shutdown: %v", err)
×
1628
                }
×
1629
                nMsg.err <- err
×
1630

×
1631
                return
×
1632
        }
1633

1634
        // Process the network announcement to determine if this is either a
1635
        // new announcement from our PoV or an edges to a prior vertex/edge we
1636
        // previously proceeded.
1637
        newAnns, allow := d.processNetworkAnnouncement(ctx, nMsg)
2✔
1638

2✔
1639
        log.Tracef("Processed network message %s, returned "+
2✔
1640
                "len(announcements)=%v, allowDependents=%v",
2✔
1641
                nMsg.msg.MsgType(), len(newAnns), allow)
2✔
1642

2✔
1643
        // If this message had any dependencies, then we can now signal them to
2✔
1644
        // continue.
2✔
1645
        err = d.vb.SignalDependents(nMsg.msg, jobID)
2✔
1646
        if err != nil {
2✔
1647
                // Something is wrong if SignalDependents returns an error.
×
1648
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
1649
                        "JobID=%v", lnutils.SpewLogClosure(nMsg.msg), jobID)
×
1650

×
1651
                nMsg.err <- err
×
1652

×
1653
                return
×
1654
        }
×
1655

1656
        // If the announcement was accepted, then add the emitted announcements
1657
        // to our announce batch to be broadcast once the trickle timer ticks
1658
        // gain.
1659
        if newAnns != nil && shouldBroadcast {
4✔
1660
                // TODO(roasbeef): exclude peer that sent.
2✔
1661
                deDuped.AddMsgs(newAnns...)
2✔
1662
        } else if newAnns != nil {
6✔
1663
                log.Trace("Skipping broadcast of announcements received " +
2✔
1664
                        "during initial graph sync")
2✔
1665
        }
2✔
1666
}
1667

1668
// TODO(roasbeef): d/c peers that send updates not on our chain
1669

1670
// InitSyncState is called by outside sub-systems when a connection is
1671
// established to a new peer that understands how to perform channel range
1672
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1673
// needed to handle new queries.
1674
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
2✔
1675
        d.syncMgr.InitSyncState(syncPeer)
2✔
1676
}
2✔
1677

1678
// PruneSyncState is called by outside sub-systems once a peer that we were
1679
// previously connected to has been disconnected. In this case we can stop the
1680
// existing GossipSyncer assigned to the peer and free up resources.
1681
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
2✔
1682
        d.syncMgr.PruneSyncState(peer)
2✔
1683
}
2✔
1684

1685
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1686
// false otherwise, This avoids expensive reprocessing of the message.
1687
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1688
        peerPub [33]byte) bool {
2✔
1689

2✔
1690
        var scid uint64
2✔
1691
        switch m := msg.(type) {
2✔
1692
        case *lnwire.ChannelUpdate1:
2✔
1693
                scid = m.ShortChannelID.ToUint64()
2✔
1694

1695
        case *lnwire.ChannelAnnouncement1:
2✔
1696
                scid = m.ShortChannelID.ToUint64()
2✔
1697

1698
        default:
2✔
1699
                return false
2✔
1700
        }
1701

1702
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
2✔
1703
        return err != cache.ErrElementNotFound
2✔
1704
}
1705

1706
// retransmitStaleAnns examines all outgoing channels that the source node is
1707
// known to maintain to check to see if any of them are "stale". A channel is
1708
// stale iff, the last timestamp of its rebroadcast is older than the
1709
// RebroadcastInterval. We also check if a refreshed node announcement should
1710
// be resent.
1711
func (d *AuthenticatedGossiper) retransmitStaleAnns(ctx context.Context,
1712
        now time.Time) error {
2✔
1713

2✔
1714
        // Iterate over all of our channels and check if any of them fall
2✔
1715
        // within the prune interval or re-broadcast interval.
2✔
1716
        type updateTuple struct {
2✔
1717
                info *models.ChannelEdgeInfo
2✔
1718
                edge *models.ChannelEdgePolicy
2✔
1719
        }
2✔
1720

2✔
1721
        var (
2✔
1722
                havePublicChannels bool
2✔
1723
                edgesToUpdate      []updateTuple
2✔
1724
        )
2✔
1725
        err := d.cfg.Graph.ForAllOutgoingChannels(ctx, func(
2✔
1726
                info *models.ChannelEdgeInfo,
2✔
1727
                edge *models.ChannelEdgePolicy) error {
4✔
1728

2✔
1729
                // If there's no auth proof attached to this edge, it means
2✔
1730
                // that it is a private channel not meant to be announced to
2✔
1731
                // the greater network, so avoid sending channel updates for
2✔
1732
                // this channel to not leak its
2✔
1733
                // existence.
2✔
1734
                if info.AuthProof == nil {
4✔
1735
                        log.Debugf("Skipping retransmission of channel "+
2✔
1736
                                "without AuthProof: %v", info.ChannelID)
2✔
1737
                        return nil
2✔
1738
                }
2✔
1739

1740
                // We make a note that we have at least one public channel. We
1741
                // use this to determine whether we should send a node
1742
                // announcement below.
1743
                havePublicChannels = true
2✔
1744

2✔
1745
                // If this edge has a ChannelUpdate that was created before the
2✔
1746
                // introduction of the MaxHTLC field, then we'll update this
2✔
1747
                // edge to propagate this information in the network.
2✔
1748
                if !edge.MessageFlags.HasMaxHtlc() {
2✔
1749
                        // We'll make sure we support the new max_htlc field if
×
1750
                        // not already present.
×
1751
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1752
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1753

×
1754
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1755
                                info: info,
×
1756
                                edge: edge,
×
1757
                        })
×
1758
                        return nil
×
1759
                }
×
1760

1761
                timeElapsed := now.Sub(edge.LastUpdate)
2✔
1762

2✔
1763
                // If it's been longer than RebroadcastInterval since we've
2✔
1764
                // re-broadcasted the channel, add the channel to the set of
2✔
1765
                // edges we need to update.
2✔
1766
                if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
UNCOV
1767
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
UNCOV
1768
                                info: info,
×
UNCOV
1769
                                edge: edge,
×
UNCOV
1770
                        })
×
UNCOV
1771
                }
×
1772

1773
                return nil
2✔
1774
        }, func() {
2✔
1775
                havePublicChannels = false
2✔
1776
                edgesToUpdate = nil
2✔
1777
        })
2✔
1778
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
2✔
1779
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1780
                        err)
×
1781
        }
×
1782

1783
        var signedUpdates []lnwire.Message
2✔
1784
        for _, chanToUpdate := range edgesToUpdate {
2✔
UNCOV
1785
                // Re-sign and update the channel on disk and retrieve our
×
UNCOV
1786
                // ChannelUpdate to broadcast.
×
UNCOV
1787
                chanAnn, chanUpdate, err := d.updateChannel(
×
UNCOV
1788
                        ctx, chanToUpdate.info, chanToUpdate.edge,
×
UNCOV
1789
                )
×
UNCOV
1790
                if err != nil {
×
1791
                        return fmt.Errorf("unable to update channel: %w", err)
×
1792
                }
×
1793

1794
                // If we have a valid announcement to transmit, then we'll send
1795
                // that along with the update.
UNCOV
1796
                if chanAnn != nil {
×
UNCOV
1797
                        signedUpdates = append(signedUpdates, chanAnn)
×
UNCOV
1798
                }
×
1799

UNCOV
1800
                signedUpdates = append(signedUpdates, chanUpdate)
×
1801
        }
1802

1803
        // If we don't have any public channels, we return as we don't want to
1804
        // broadcast anything that would reveal our existence.
1805
        if !havePublicChannels {
4✔
1806
                return nil
2✔
1807
        }
2✔
1808

1809
        // We'll also check that our NodeAnnouncement is not too old.
1810
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
2✔
1811
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
2✔
1812
        timeElapsed := now.Sub(timestamp)
2✔
1813

2✔
1814
        // If it's been a full day since we've re-broadcasted the
2✔
1815
        // node announcement, refresh it and resend it.
2✔
1816
        nodeAnnStr := ""
2✔
1817
        if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
UNCOV
1818
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
×
UNCOV
1819
                if err != nil {
×
1820
                        return fmt.Errorf("unable to get refreshed node "+
×
1821
                                "announcement: %v", err)
×
1822
                }
×
1823

UNCOV
1824
                signedUpdates = append(signedUpdates, &newNodeAnn)
×
UNCOV
1825
                nodeAnnStr = " and our refreshed node announcement"
×
UNCOV
1826

×
UNCOV
1827
                // Before broadcasting the refreshed node announcement, add it
×
UNCOV
1828
                // to our own graph.
×
UNCOV
1829
                if err := d.addNode(ctx, &newNodeAnn); err != nil {
×
UNCOV
1830
                        log.Errorf("Unable to add refreshed node announcement "+
×
UNCOV
1831
                                "to graph: %v", err)
×
UNCOV
1832
                }
×
1833
        }
1834

1835
        // If we don't have any updates to re-broadcast, then we'll exit
1836
        // early.
1837
        if len(signedUpdates) == 0 {
4✔
1838
                return nil
2✔
1839
        }
2✔
1840

UNCOV
1841
        log.Infof("Retransmitting %v outgoing channels%v",
×
UNCOV
1842
                len(edgesToUpdate), nodeAnnStr)
×
UNCOV
1843

×
UNCOV
1844
        // With all the wire announcements properly crafted, we'll broadcast
×
UNCOV
1845
        // our known outgoing channels to all our immediate peers.
×
UNCOV
1846
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
×
1847
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1848
        }
×
1849

UNCOV
1850
        return nil
×
1851
}
1852

1853
// processChanPolicyUpdate generates a new set of channel updates for the
1854
// provided list of edges and updates the backing ChannelGraphSource.
1855
func (d *AuthenticatedGossiper) processChanPolicyUpdate(ctx context.Context,
1856
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
2✔
1857

2✔
1858
        var chanUpdates []networkMsg
2✔
1859
        for _, edgeInfo := range edgesToUpdate {
4✔
1860
                // Now that we've collected all the channels we need to update,
2✔
1861
                // we'll re-sign and update the backing ChannelGraphSource, and
2✔
1862
                // retrieve our ChannelUpdate to broadcast.
2✔
1863
                _, chanUpdate, err := d.updateChannel(
2✔
1864
                        ctx, edgeInfo.Info, edgeInfo.Edge,
2✔
1865
                )
2✔
1866
                if err != nil {
2✔
1867
                        return nil, err
×
1868
                }
×
1869

1870
                // We'll avoid broadcasting any updates for private channels to
1871
                // avoid directly giving away their existence. Instead, we'll
1872
                // send the update directly to the remote party.
1873
                if edgeInfo.Info.AuthProof == nil {
4✔
1874
                        // If AuthProof is nil and an alias was found for this
2✔
1875
                        // ChannelID (meaning the option-scid-alias feature was
2✔
1876
                        // negotiated), we'll replace the ShortChannelID in the
2✔
1877
                        // update with the peer's alias. We do this after
2✔
1878
                        // updateChannel so that the alias isn't persisted to
2✔
1879
                        // the database.
2✔
1880
                        chanID := lnwire.NewChanIDFromOutPoint(
2✔
1881
                                edgeInfo.Info.ChannelPoint,
2✔
1882
                        )
2✔
1883

2✔
1884
                        var defaultAlias lnwire.ShortChannelID
2✔
1885
                        foundAlias, _ := d.cfg.GetAlias(chanID)
2✔
1886
                        if foundAlias != defaultAlias {
4✔
1887
                                chanUpdate.ShortChannelID = foundAlias
2✔
1888

2✔
1889
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
2✔
1890
                                if err != nil {
2✔
1891
                                        log.Errorf("Unable to sign alias "+
×
1892
                                                "update: %v", err)
×
1893
                                        continue
×
1894
                                }
1895

1896
                                lnSig, err := lnwire.NewSigFromSignature(sig)
2✔
1897
                                if err != nil {
2✔
1898
                                        log.Errorf("Unable to create sig: %v",
×
1899
                                                err)
×
1900
                                        continue
×
1901
                                }
1902

1903
                                chanUpdate.Signature = lnSig
2✔
1904
                        }
1905

1906
                        remotePubKey := remotePubFromChanInfo(
2✔
1907
                                edgeInfo.Info, chanUpdate.ChannelFlags,
2✔
1908
                        )
2✔
1909
                        err := d.reliableSender.sendMessage(
2✔
1910
                                ctx, chanUpdate, remotePubKey,
2✔
1911
                        )
2✔
1912
                        if err != nil {
2✔
1913
                                log.Errorf("Unable to reliably send %v for "+
×
1914
                                        "channel=%v to peer=%x: %v",
×
1915
                                        chanUpdate.MsgType(),
×
1916
                                        chanUpdate.ShortChannelID,
×
1917
                                        remotePubKey, err)
×
1918
                        }
×
1919
                        continue
2✔
1920
                }
1921

1922
                // We set ourselves as the source of this message to indicate
1923
                // that we shouldn't skip any peers when sending this message.
1924
                chanUpdates = append(chanUpdates, networkMsg{
2✔
1925
                        source:   d.selfKey,
2✔
1926
                        isRemote: false,
2✔
1927
                        msg:      chanUpdate,
2✔
1928
                })
2✔
1929
        }
1930

1931
        return chanUpdates, nil
2✔
1932
}
1933

1934
// remotePubFromChanInfo returns the public key of the remote peer given a
1935
// ChannelEdgeInfo that describe a channel we have with them.
1936
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1937
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
2✔
1938

2✔
1939
        var remotePubKey [33]byte
2✔
1940
        switch {
2✔
1941
        case chanFlags&lnwire.ChanUpdateDirection == 0:
2✔
1942
                remotePubKey = chanInfo.NodeKey2Bytes
2✔
1943
        case chanFlags&lnwire.ChanUpdateDirection == 1:
2✔
1944
                remotePubKey = chanInfo.NodeKey1Bytes
2✔
1945
        }
1946

1947
        return remotePubKey
2✔
1948
}
1949

1950
// processRejectedEdge examines a rejected edge to see if we can extract any
1951
// new announcements from it.  An edge will get rejected if we already added
1952
// the same edge without AuthProof to the graph. If the received announcement
1953
// contains a proof, we can add this proof to our edge.  We can end up in this
1954
// situation in the case where we create a channel, but for some reason fail
1955
// to receive the remote peer's proof, while the remote peer is able to fully
1956
// assemble the proof and craft the ChannelAnnouncement.
1957
func (d *AuthenticatedGossiper) processRejectedEdge(_ context.Context,
1958
        chanAnnMsg *lnwire.ChannelAnnouncement1,
1959
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
2✔
1960

2✔
1961
        // First, we'll fetch the state of the channel as we know if from the
2✔
1962
        // database.
2✔
1963
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
2✔
1964
                chanAnnMsg.ShortChannelID,
2✔
1965
        )
2✔
1966
        if err != nil {
2✔
1967
                return nil, err
×
1968
        }
×
1969

1970
        // The edge is in the graph, and has a proof attached, then we'll just
1971
        // reject it as normal.
1972
        if chanInfo.AuthProof != nil {
4✔
1973
                return nil, nil
2✔
1974
        }
2✔
1975

1976
        // Otherwise, this means that the edge is within the graph, but it
1977
        // doesn't yet have a proper proof attached. If we did not receive
1978
        // the proof such that we now can add it, there's nothing more we
1979
        // can do.
1980
        if proof == nil {
×
1981
                return nil, nil
×
1982
        }
×
1983

1984
        // We'll then create then validate the new fully assembled
1985
        // announcement.
1986
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1987
                proof, chanInfo, e1, e2,
×
1988
        )
×
1989
        if err != nil {
×
1990
                return nil, err
×
1991
        }
×
1992
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1993
        if err != nil {
×
1994
                err := fmt.Errorf("assembled channel announcement proof "+
×
1995
                        "for shortChanID=%v isn't valid: %v",
×
1996
                        chanAnnMsg.ShortChannelID, err)
×
1997
                log.Error(err)
×
1998
                return nil, err
×
1999
        }
×
2000

2001
        // If everything checks out, then we'll add the fully assembled proof
2002
        // to the database.
2003
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
2004
        if err != nil {
×
2005
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
2006
                        chanAnnMsg.ShortChannelID, err)
×
2007
                log.Error(err)
×
2008
                return nil, err
×
2009
        }
×
2010

2011
        // As we now have a complete channel announcement for this channel,
2012
        // we'll construct the announcement so they can be broadcast out to all
2013
        // our peers.
2014
        announcements := make([]networkMsg, 0, 3)
×
2015
        announcements = append(announcements, networkMsg{
×
2016
                source: d.selfKey,
×
2017
                msg:    chanAnn,
×
2018
        })
×
2019
        if e1Ann != nil {
×
2020
                announcements = append(announcements, networkMsg{
×
2021
                        source: d.selfKey,
×
2022
                        msg:    e1Ann,
×
2023
                })
×
2024
        }
×
2025
        if e2Ann != nil {
×
2026
                announcements = append(announcements, networkMsg{
×
2027
                        source: d.selfKey,
×
2028
                        msg:    e2Ann,
×
2029
                })
×
2030

×
2031
        }
×
2032

2033
        return announcements, nil
×
2034
}
2035

2036
// fetchPKScript fetches the output script for the given SCID.
2037
func (d *AuthenticatedGossiper) fetchPKScript(chanID lnwire.ShortChannelID) (
2038
        txscript.ScriptClass, btcutil.Address, error) {
×
2039

×
2040
        pkScript, err := lnwallet.FetchPKScriptWithQuit(
×
2041
                d.cfg.ChainIO, chanID, d.quit,
×
2042
        )
×
2043
        if err != nil {
×
2044
                return txscript.WitnessUnknownTy, nil, err
×
2045
        }
×
2046

2047
        scriptClass, addrs, _, err := txscript.ExtractPkScriptAddrs(
×
2048
                pkScript, d.cfg.ChainParams,
×
2049
        )
×
2050
        if err != nil {
×
2051
                return txscript.WitnessUnknownTy, nil, err
×
2052
        }
×
2053

2054
        if len(addrs) != 1 {
×
2055
                return txscript.WitnessUnknownTy, nil, fmt.Errorf("expected "+
×
2056
                        "1 address, got: %d", len(addrs))
×
2057
        }
×
2058

2059
        return scriptClass, addrs[0], nil
×
2060
}
2061

2062
// addNode processes the given node announcement, and adds it to our channel
2063
// graph.
2064
func (d *AuthenticatedGossiper) addNode(ctx context.Context,
2065
        msg *lnwire.NodeAnnouncement, op ...batch.SchedulerOption) error {
2✔
2066

2✔
2067
        if err := netann.ValidateNodeAnn(msg); err != nil {
2✔
UNCOV
2068
                return fmt.Errorf("unable to validate node announcement: %w",
×
UNCOV
2069
                        err)
×
UNCOV
2070
        }
×
2071

2072
        return d.cfg.Graph.AddNode(
2✔
2073
                ctx, models.NodeFromWireAnnouncement(msg), op...,
2✔
2074
        )
2✔
2075
}
2076

2077
// isPremature decides whether a given network message has a block height+delta
2078
// value specified in the future. If so, the message will be added to the
2079
// future message map and be processed when the block height as reached.
2080
//
2081
// NOTE: must be used inside a lock.
2082
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2083
        delta uint32, msg *networkMsg) bool {
2✔
2084

2✔
2085
        // The channel is already confirmed at chanID.BlockHeight so we minus
2✔
2086
        // one block. For instance, if the required confirmation for this
2✔
2087
        // channel announcement is 6, we then only need to wait for 5 more
2✔
2088
        // blocks once the funding tx is confirmed.
2✔
2089
        if delta > 0 {
4✔
2090
                delta--
2✔
2091
        }
2✔
2092

2093
        msgHeight := chanID.BlockHeight + delta
2✔
2094

2✔
2095
        // The message height is smaller or equal to our best known height,
2✔
2096
        // thus the message is mature.
2✔
2097
        if msgHeight <= d.bestHeight {
4✔
2098
                return false
2✔
2099
        }
2✔
2100

2101
        // Add the premature message to our future messages which will be
2102
        // resent once the block height has reached.
2103
        //
2104
        // Copy the networkMsgs since the old message's err chan will be
2105
        // consumed.
2106
        copied := &networkMsg{
2✔
2107
                peer:              msg.peer,
2✔
2108
                source:            msg.source,
2✔
2109
                msg:               msg.msg,
2✔
2110
                optionalMsgFields: msg.optionalMsgFields,
2✔
2111
                isRemote:          msg.isRemote,
2✔
2112
                err:               make(chan error, 1),
2✔
2113
        }
2✔
2114

2✔
2115
        // Create the cached message.
2✔
2116
        cachedMsg := &cachedFutureMsg{
2✔
2117
                msg:    copied,
2✔
2118
                height: msgHeight,
2✔
2119
        }
2✔
2120

2✔
2121
        // Increment the msg ID and add it to the cache.
2✔
2122
        nextMsgID := d.futureMsgs.nextMsgID()
2✔
2123
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
2✔
2124
        if err != nil {
2✔
2125
                log.Errorf("Adding future message got error: %v", err)
×
2126
        }
×
2127

2128
        log.Debugf("Network message: %v added to future messages for "+
2✔
2129
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
2✔
2130
                msgHeight, d.bestHeight)
2✔
2131

2✔
2132
        return true
2✔
2133
}
2134

2135
// processNetworkAnnouncement processes a new network relate authenticated
2136
// channel or node announcement or announcements proofs. If the announcement
2137
// didn't affect the internal state due to either being out of date, invalid,
2138
// or redundant, then nil is returned. Otherwise, the set of announcements will
2139
// be returned which should be broadcasted to the rest of the network. The
2140
// boolean returned indicates whether any dependents of the announcement should
2141
// attempt to be processed as well.
2142
func (d *AuthenticatedGossiper) processNetworkAnnouncement(ctx context.Context,
2143
        nMsg *networkMsg) ([]networkMsg, bool) {
2✔
2144

2✔
2145
        // If this is a remote update, we set the scheduler option to lazily
2✔
2146
        // add it to the graph.
2✔
2147
        var schedulerOp []batch.SchedulerOption
2✔
2148
        if nMsg.isRemote {
4✔
2149
                schedulerOp = append(schedulerOp, batch.LazyAdd())
2✔
2150
        }
2✔
2151

2152
        switch msg := nMsg.msg.(type) {
2✔
2153
        // A new node announcement has arrived which either presents new
2154
        // information about a node in one of the channels we know about, or a
2155
        // updating previously advertised information.
2156
        case *lnwire.NodeAnnouncement:
2✔
2157
                return d.handleNodeAnnouncement(ctx, nMsg, msg, schedulerOp)
2✔
2158

2159
        // A new channel announcement has arrived, this indicates the
2160
        // *creation* of a new channel within the network. This only advertises
2161
        // the existence of a channel and not yet the routing policies in
2162
        // either direction of the channel.
2163
        case *lnwire.ChannelAnnouncement1:
2✔
2164
                return d.handleChanAnnouncement(ctx, nMsg, msg, schedulerOp...)
2✔
2165

2166
        // A new authenticated channel edge update has arrived. This indicates
2167
        // that the directional information for an already known channel has
2168
        // been updated.
2169
        case *lnwire.ChannelUpdate1:
2✔
2170
                return d.handleChanUpdate(ctx, nMsg, msg, schedulerOp)
2✔
2171

2172
        // A new signature announcement has been received. This indicates
2173
        // willingness of nodes involved in the funding of a channel to
2174
        // announce this new channel to the rest of the world.
2175
        case *lnwire.AnnounceSignatures1:
2✔
2176
                return d.handleAnnSig(ctx, nMsg, msg)
2✔
2177

2178
        default:
×
2179
                err := errors.New("wrong type of the announcement")
×
2180
                nMsg.err <- err
×
2181
                return nil, false
×
2182
        }
2183
}
2184

2185
// processZombieUpdate determines whether the provided channel update should
2186
// resurrect a given zombie edge.
2187
//
2188
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2189
// should be inspected.
2190
func (d *AuthenticatedGossiper) processZombieUpdate(_ context.Context,
2191
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
UNCOV
2192
        msg *lnwire.ChannelUpdate1) error {
×
UNCOV
2193

×
UNCOV
2194
        // The least-significant bit in the flag on the channel update tells us
×
UNCOV
2195
        // which edge is being updated.
×
UNCOV
2196
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
×
UNCOV
2197

×
UNCOV
2198
        // Since we've deemed the update as not stale above, before marking it
×
UNCOV
2199
        // live, we'll make sure it has been signed by the correct party. If we
×
UNCOV
2200
        // have both pubkeys, either party can resurrect the channel. If we've
×
UNCOV
2201
        // already marked this with the stricter, single-sided resurrection we
×
UNCOV
2202
        // will only have the pubkey of the node with the oldest timestamp.
×
UNCOV
2203
        var pubKey *btcec.PublicKey
×
UNCOV
2204
        switch {
×
2205
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2206
                pubKey, _ = chanInfo.NodeKey1()
×
UNCOV
2207
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
×
UNCOV
2208
                pubKey, _ = chanInfo.NodeKey2()
×
2209
        }
UNCOV
2210
        if pubKey == nil {
×
UNCOV
2211
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
×
UNCOV
2212
                        "with chan_id=%v", msg.ShortChannelID)
×
UNCOV
2213
        }
×
2214

UNCOV
2215
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
×
UNCOV
2216
        if err != nil {
×
UNCOV
2217
                return fmt.Errorf("unable to verify channel "+
×
UNCOV
2218
                        "update signature: %v", err)
×
UNCOV
2219
        }
×
2220

2221
        // With the signature valid, we'll proceed to mark the
2222
        // edge as live and wait for the channel announcement to
2223
        // come through again.
UNCOV
2224
        err = d.cfg.Graph.MarkEdgeLive(scid)
×
UNCOV
2225
        switch {
×
2226
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2227
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2228
                        "zombie index: %v", err)
×
2229

×
2230
                return nil
×
2231

2232
        case err != nil:
×
2233
                return fmt.Errorf("unable to remove edge with "+
×
2234
                        "chan_id=%v from zombie index: %v",
×
2235
                        msg.ShortChannelID, err)
×
2236

UNCOV
2237
        default:
×
2238
        }
2239

UNCOV
2240
        log.Debugf("Removed edge with chan_id=%v from zombie "+
×
UNCOV
2241
                "index", msg.ShortChannelID)
×
UNCOV
2242

×
UNCOV
2243
        return nil
×
2244
}
2245

2246
// fetchNodeAnn fetches the latest signed node announcement from our point of
2247
// view for the node with the given public key. It also validates the node
2248
// announcement fields and returns an error if they are invalid to prevent
2249
// forwarding invalid node announcements to our peers.
2250
func (d *AuthenticatedGossiper) fetchNodeAnn(ctx context.Context,
2251
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
2✔
2252

2✔
2253
        node, err := d.cfg.Graph.FetchNode(ctx, pubKey)
2✔
2254
        if err != nil {
2✔
UNCOV
2255
                return nil, err
×
UNCOV
2256
        }
×
2257

2258
        nodeAnn, err := node.NodeAnnouncement(true)
2✔
2259
        if err != nil {
4✔
2260
                return nil, err
2✔
2261
        }
2✔
2262

2263
        return nodeAnn, netann.ValidateNodeAnnFields(nodeAnn)
2✔
2264
}
2265

2266
// isMsgStale determines whether a message retrieved from the backing
2267
// MessageStore is seen as stale by the current graph.
2268
func (d *AuthenticatedGossiper) isMsgStale(_ context.Context,
2269
        msg lnwire.Message) bool {
2✔
2270

2✔
2271
        switch msg := msg.(type) {
2✔
2272
        case *lnwire.AnnounceSignatures1:
2✔
2273
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
2✔
2274
                        msg.ShortChannelID,
2✔
2275
                )
2✔
2276

2✔
2277
                // If the channel cannot be found, it is most likely a leftover
2✔
2278
                // message for a channel that was closed, so we can consider it
2✔
2279
                // stale.
2✔
2280
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
4✔
2281
                        return true
2✔
2282
                }
2✔
2283
                if err != nil {
2✔
2284
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2285
                                "%v", msg.ShortChannelID, err)
×
2286
                        return false
×
2287
                }
×
2288

2289
                // If the proof exists in the graph, then we have successfully
2290
                // received the remote proof and assembled the full proof, so we
2291
                // can safely delete the local proof from the database.
2292
                return chanInfo.AuthProof != nil
2✔
2293

2294
        case *lnwire.ChannelUpdate1:
2✔
2295
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
2✔
2296

2✔
2297
                // If the channel cannot be found, it is most likely a leftover
2✔
2298
                // message for a channel that was closed, so we can consider it
2✔
2299
                // stale.
2✔
2300
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
4✔
2301
                        return true
2✔
2302
                }
2✔
2303
                if err != nil {
2✔
2304
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2305
                                "%v", msg.ShortChannelID, err)
×
2306
                        return false
×
2307
                }
×
2308

2309
                // Otherwise, we'll retrieve the correct policy that we
2310
                // currently have stored within our graph to check if this
2311
                // message is stale by comparing its timestamp.
2312
                var p *models.ChannelEdgePolicy
2✔
2313
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
4✔
2314
                        p = p1
2✔
2315
                } else {
4✔
2316
                        p = p2
2✔
2317
                }
2✔
2318

2319
                // If the policy is still unknown, then we can consider this
2320
                // policy fresh.
2321
                if p == nil {
2✔
2322
                        return false
×
2323
                }
×
2324

2325
                timestamp := time.Unix(int64(msg.Timestamp), 0)
2✔
2326
                return p.LastUpdate.After(timestamp)
2✔
2327

2328
        default:
×
2329
                // We'll make sure to not mark any unsupported messages as stale
×
2330
                // to ensure they are not removed.
×
2331
                return false
×
2332
        }
2333
}
2334

2335
// updateChannel creates a new fully signed update for the channel, and updates
2336
// the underlying graph with the new state.
2337
func (d *AuthenticatedGossiper) updateChannel(ctx context.Context,
2338
        info *models.ChannelEdgeInfo,
2339
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2340
        *lnwire.ChannelUpdate1, error) {
2✔
2341

2✔
2342
        // Parse the unsigned edge into a channel update.
2✔
2343
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
2✔
2344

2✔
2345
        // We'll generate a new signature over a digest of the channel
2✔
2346
        // announcement itself and update the timestamp to ensure it propagate.
2✔
2347
        err := netann.SignChannelUpdate(
2✔
2348
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
2✔
2349
                netann.ChanUpdSetTimestamp,
2✔
2350
        )
2✔
2351
        if err != nil {
2✔
2352
                return nil, nil, err
×
2353
        }
×
2354

2355
        // Next, we'll set the new signature in place, and update the reference
2356
        // in the backing slice.
2357
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
2✔
2358
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
2✔
2359

2✔
2360
        // To ensure that our signature is valid, we'll verify it ourself
2✔
2361
        // before committing it to the slice returned.
2✔
2362
        err = netann.ValidateChannelUpdateAnn(
2✔
2363
                d.selfKey, info.Capacity, chanUpdate,
2✔
2364
        )
2✔
2365
        if err != nil {
2✔
2366
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2367
                        "update sig: %v", err)
×
2368
        }
×
2369

2370
        // Finally, we'll write the new edge policy to disk.
2371
        if err := d.cfg.Graph.UpdateEdge(ctx, edge); err != nil {
2✔
2372
                return nil, nil, err
×
2373
        }
×
2374

2375
        // We'll also create the original channel announcement so the two can
2376
        // be broadcast along side each other (if necessary), but only if we
2377
        // have a full channel announcement for this channel.
2378
        var chanAnn *lnwire.ChannelAnnouncement1
2✔
2379
        if info.AuthProof != nil {
4✔
2380
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
2✔
2381
                chanAnn = &lnwire.ChannelAnnouncement1{
2✔
2382
                        ShortChannelID:  chanID,
2✔
2383
                        NodeID1:         info.NodeKey1Bytes,
2✔
2384
                        NodeID2:         info.NodeKey2Bytes,
2✔
2385
                        ChainHash:       info.ChainHash,
2✔
2386
                        BitcoinKey1:     info.BitcoinKey1Bytes,
2✔
2387
                        Features:        lnwire.NewRawFeatureVector(),
2✔
2388
                        BitcoinKey2:     info.BitcoinKey2Bytes,
2✔
2389
                        ExtraOpaqueData: info.ExtraOpaqueData,
2✔
2390
                }
2✔
2391
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
2✔
2392
                        info.AuthProof.NodeSig1Bytes,
2✔
2393
                )
2✔
2394
                if err != nil {
2✔
2395
                        return nil, nil, err
×
2396
                }
×
2397
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
2✔
2398
                        info.AuthProof.NodeSig2Bytes,
2✔
2399
                )
2✔
2400
                if err != nil {
2✔
2401
                        return nil, nil, err
×
2402
                }
×
2403
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
2✔
2404
                        info.AuthProof.BitcoinSig1Bytes,
2✔
2405
                )
2✔
2406
                if err != nil {
2✔
2407
                        return nil, nil, err
×
2408
                }
×
2409
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
2✔
2410
                        info.AuthProof.BitcoinSig2Bytes,
2✔
2411
                )
2✔
2412
                if err != nil {
2✔
2413
                        return nil, nil, err
×
2414
                }
×
2415
        }
2416

2417
        return chanAnn, chanUpdate, err
2✔
2418
}
2419

2420
// SyncManager returns the gossiper's SyncManager instance.
2421
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
2✔
2422
        return d.syncMgr
2✔
2423
}
2✔
2424

2425
// IsKeepAliveUpdate determines whether this channel update is considered a
2426
// keep-alive update based on the previous channel update processed for the same
2427
// direction.
2428
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2429
        prev *models.ChannelEdgePolicy) bool {
2✔
2430

2✔
2431
        // Both updates should be from the same direction.
2✔
2432
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
2✔
2433
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
2✔
2434

×
2435
                return false
×
2436
        }
×
2437

2438
        // The timestamp should always increase for a keep-alive update.
2439
        timestamp := time.Unix(int64(update.Timestamp), 0)
2✔
2440
        if !timestamp.After(prev.LastUpdate) {
2✔
2441
                return false
×
2442
        }
×
2443

2444
        // None of the remaining fields should change for a keep-alive update.
2445
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
4✔
2446
                return false
2✔
2447
        }
2✔
2448
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
4✔
2449
                return false
2✔
2450
        }
2✔
2451
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
4✔
2452
                return false
2✔
2453
        }
2✔
2454
        if update.TimeLockDelta != prev.TimeLockDelta {
2✔
2455
                return false
×
2456
        }
×
2457
        if update.HtlcMinimumMsat != prev.MinHTLC {
2✔
2458
                return false
×
2459
        }
×
2460
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
2✔
2461
                return false
×
2462
        }
×
2463
        if update.HtlcMaximumMsat != prev.MaxHTLC {
2✔
2464
                return false
×
2465
        }
×
2466
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
4✔
2467
                return false
2✔
2468
        }
2✔
2469
        return true
2✔
2470
}
2471

2472
// latestHeight returns the gossiper's latest height known of the chain.
2473
func (d *AuthenticatedGossiper) latestHeight() uint32 {
2✔
2474
        d.Lock()
2✔
2475
        defer d.Unlock()
2✔
2476
        return d.bestHeight
2✔
2477
}
2✔
2478

2479
// handleNodeAnnouncement processes a new node announcement.
2480
func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context,
2481
        nMsg *networkMsg, nodeAnn *lnwire.NodeAnnouncement,
2482
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
2✔
2483

2✔
2484
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
2✔
2485

2✔
2486
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
2✔
2487
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
2✔
2488
                nMsg.source.SerializeCompressed())
2✔
2489

2✔
2490
        // We'll quickly ask the router if it already has a newer update for
2✔
2491
        // this node so we can skip validating signatures if not required.
2✔
2492
        if d.cfg.Graph.IsStaleNode(ctx, nodeAnn.NodeID, timestamp) {
4✔
2493
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
2✔
2494
                nMsg.err <- nil
2✔
2495
                return nil, true
2✔
2496
        }
2✔
2497

2498
        if err := d.addNode(ctx, nodeAnn, ops...); err != nil {
4✔
2499
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
2✔
2500
                        err)
2✔
2501

2✔
2502
                if !graph.IsError(
2✔
2503
                        err,
2✔
2504
                        graph.ErrOutdated,
2✔
2505
                        graph.ErrIgnored,
2✔
2506
                ) {
2✔
2507

×
2508
                        log.Error(err)
×
2509
                }
×
2510

2511
                nMsg.err <- err
2✔
2512
                return nil, false
2✔
2513
        }
2514

2515
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2516
        // quick check to ensure this node intends to publicly advertise itself
2517
        // to the network.
2518
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
2✔
2519
        if err != nil {
2✔
2520
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2521
                        nodeAnn.NodeID, err)
×
2522
                nMsg.err <- err
×
2523
                return nil, false
×
2524
        }
×
2525

2526
        var announcements []networkMsg
2✔
2527

2✔
2528
        // If it does, we'll add their announcement to our batch so that it can
2✔
2529
        // be broadcast to the rest of our peers.
2✔
2530
        if isPublic {
4✔
2531
                announcements = append(announcements, networkMsg{
2✔
2532
                        peer:     nMsg.peer,
2✔
2533
                        isRemote: nMsg.isRemote,
2✔
2534
                        source:   nMsg.source,
2✔
2535
                        msg:      nodeAnn,
2✔
2536
                })
2✔
2537
        } else {
4✔
2538
                log.Tracef("Skipping broadcasting node announcement for %x "+
2✔
2539
                        "due to being unadvertised", nodeAnn.NodeID)
2✔
2540
        }
2✔
2541

2542
        nMsg.err <- nil
2✔
2543
        // TODO(roasbeef): get rid of the above
2✔
2544

2✔
2545
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
2✔
2546
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
2✔
2547
                nMsg.source.SerializeCompressed())
2✔
2548

2✔
2549
        return announcements, true
2✔
2550
}
2551

2552
// handleChanAnnouncement processes a new channel announcement.
2553
//
2554
//nolint:funlen
2555
func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
2556
        nMsg *networkMsg, ann *lnwire.ChannelAnnouncement1,
2557
        ops ...batch.SchedulerOption) ([]networkMsg, bool) {
2✔
2558

2✔
2559
        scid := ann.ShortChannelID
2✔
2560
        chainHash := d.cfg.ChainParams.GenesisHash
2✔
2561

2✔
2562
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
2✔
2563
                nMsg.peer, scid.ToUint64())
2✔
2564

2✔
2565
        // We'll ignore any channel announcements that target any chain other
2✔
2566
        // than the set of chains we know of.
2✔
2567
        if !bytes.Equal(ann.ChainHash[:], chainHash[:]) {
2✔
2568
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2569
                        ", gossiper on chain=%v", ann.ChainHash, chainHash)
×
2570
                log.Errorf(err.Error())
×
2571

×
2572
                key := newRejectCacheKey(
×
2573
                        scid.ToUint64(),
×
2574
                        sourceToPub(nMsg.source),
×
2575
                )
×
2576
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2577

×
2578
                nMsg.err <- err
×
2579
                return nil, false
×
2580
        }
×
2581

2582
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2583
        // reject the announcement. Since the router accepts alias SCIDs,
2584
        // not erroring out would be a DoS vector.
2585
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
2✔
2586
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2587
                log.Errorf(err.Error())
×
2588

×
2589
                key := newRejectCacheKey(
×
2590
                        scid.ToUint64(),
×
2591
                        sourceToPub(nMsg.source),
×
2592
                )
×
2593
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2594

×
2595
                nMsg.err <- err
×
2596
                return nil, false
×
2597
        }
×
2598

2599
        // If the advertised inclusionary block is beyond our knowledge of the
2600
        // chain tip, then we'll ignore it for now.
2601
        d.Lock()
2✔
2602
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
2✔
UNCOV
2603
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
×
UNCOV
2604
                        "advertises height %v, only height %v is known",
×
UNCOV
2605
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
×
UNCOV
2606
                d.Unlock()
×
UNCOV
2607
                nMsg.err <- nil
×
UNCOV
2608
                return nil, false
×
UNCOV
2609
        }
×
2610
        d.Unlock()
2✔
2611

2✔
2612
        // At this point, we'll now ask the router if this is a zombie/known
2✔
2613
        // edge. If so we can skip all the processing below.
2✔
2614
        if d.cfg.Graph.IsKnownEdge(scid) {
4✔
2615
                nMsg.err <- nil
2✔
2616
                return nil, true
2✔
2617
        }
2✔
2618

2619
        // Check if the channel is already closed in which case we can ignore
2620
        // it.
2621
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
2✔
2622
        if err != nil {
2✔
2623
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2624
                        err)
×
2625
                nMsg.err <- err
×
2626

×
2627
                return nil, false
×
2628
        }
×
2629

2630
        if closed {
2✔
UNCOV
2631
                err = fmt.Errorf("ignoring closed channel %v", scid)
×
UNCOV
2632

×
UNCOV
2633
                // If this is an announcement from us, we'll just ignore it.
×
UNCOV
2634
                if !nMsg.isRemote {
×
2635
                        nMsg.err <- err
×
2636
                        return nil, false
×
2637
                }
×
2638

UNCOV
2639
                log.Warnf("Increasing ban score for peer=%v due to outdated "+
×
UNCOV
2640
                        "channel announcement for channel %v", nMsg.peer, scid)
×
UNCOV
2641

×
UNCOV
2642
                // Increment the peer's ban score if they are sending closed
×
UNCOV
2643
                // channel announcements.
×
UNCOV
2644
                dcErr := d.handleBadPeer(nMsg.peer)
×
UNCOV
2645
                if dcErr != nil {
×
2646
                        err = dcErr
×
2647
                }
×
2648

UNCOV
2649
                nMsg.err <- err
×
UNCOV
2650

×
UNCOV
2651
                return nil, false
×
2652
        }
2653

2654
        // If this is a remote channel announcement, then we'll validate all
2655
        // the signatures within the proof as it should be well formed.
2656
        var proof *models.ChannelAuthProof
2✔
2657
        if nMsg.isRemote {
4✔
2658
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
2✔
2659
                if err != nil {
2✔
2660
                        err := fmt.Errorf("unable to validate announcement: "+
×
2661
                                "%v", err)
×
2662

×
2663
                        key := newRejectCacheKey(
×
2664
                                scid.ToUint64(),
×
2665
                                sourceToPub(nMsg.source),
×
2666
                        )
×
2667
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2668

×
2669
                        log.Error(err)
×
2670
                        nMsg.err <- err
×
2671
                        return nil, false
×
2672
                }
×
2673

2674
                // If the proof checks out, then we'll save the proof itself to
2675
                // the database so we can fetch it later when gossiping with
2676
                // other nodes.
2677
                proof = &models.ChannelAuthProof{
2✔
2678
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
2✔
2679
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
2✔
2680
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
2✔
2681
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
2✔
2682
                }
2✔
2683
        }
2684

2685
        // With the proof validated (if necessary), we can now store it within
2686
        // the database for our path finding and syncing needs.
2687
        edge := &models.ChannelEdgeInfo{
2✔
2688
                ChannelID:        scid.ToUint64(),
2✔
2689
                ChainHash:        ann.ChainHash,
2✔
2690
                NodeKey1Bytes:    ann.NodeID1,
2✔
2691
                NodeKey2Bytes:    ann.NodeID2,
2✔
2692
                BitcoinKey1Bytes: ann.BitcoinKey1,
2✔
2693
                BitcoinKey2Bytes: ann.BitcoinKey2,
2✔
2694
                AuthProof:        proof,
2✔
2695
                Features: lnwire.NewFeatureVector(
2✔
2696
                        ann.Features, lnwire.Features,
2✔
2697
                ),
2✔
2698
                ExtraOpaqueData: ann.ExtraOpaqueData,
2✔
2699
        }
2✔
2700

2✔
2701
        // If there were any optional message fields provided, we'll include
2✔
2702
        // them in its serialized disk representation now.
2✔
2703
        var tapscriptRoot fn.Option[chainhash.Hash]
2✔
2704
        if nMsg.optionalMsgFields != nil {
4✔
2705
                if nMsg.optionalMsgFields.capacity != nil {
4✔
2706
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
2✔
2707
                }
2✔
2708
                if nMsg.optionalMsgFields.channelPoint != nil {
4✔
2709
                        cp := *nMsg.optionalMsgFields.channelPoint
2✔
2710
                        edge.ChannelPoint = cp
2✔
2711
                }
2✔
2712

2713
                // Optional tapscript root for custom channels.
2714
                tapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
2✔
2715
        }
2716

2717
        // Before we start validation or add the edge to the database, we obtain
2718
        // the mutex for this channel ID. We do this to ensure no other
2719
        // goroutine has read the database and is now making decisions based on
2720
        // this DB state, before it writes to the DB. It also ensures that we
2721
        // don't perform the expensive validation check on the same channel
2722
        // announcement at the same time.
2723
        d.channelMtx.Lock(scid.ToUint64())
2✔
2724

2✔
2725
        // If AssumeChannelValid is present, then we are unable to perform any
2✔
2726
        // of the expensive checks below, so we'll short-circuit our path
2✔
2727
        // straight to adding the edge to our graph. If the passed
2✔
2728
        // ShortChannelID is an alias, then we'll skip validation as it will
2✔
2729
        // not map to a legitimate tx. This is not a DoS vector as only we can
2✔
2730
        // add an alias ChannelAnnouncement from the gossiper.
2✔
2731
        if !(d.cfg.AssumeChannelValid || d.cfg.IsAlias(scid)) {
4✔
2732
                op, capacity, script, err := d.validateFundingTransaction(
2✔
2733
                        ctx, ann, tapscriptRoot,
2✔
2734
                )
2✔
2735
                if err != nil {
2✔
UNCOV
2736
                        defer d.channelMtx.Unlock(scid.ToUint64())
×
UNCOV
2737

×
UNCOV
2738
                        switch {
×
2739
                        case errors.Is(err, ErrNoFundingTransaction),
UNCOV
2740
                                errors.Is(err, ErrInvalidFundingOutput):
×
UNCOV
2741

×
UNCOV
2742
                                key := newRejectCacheKey(
×
UNCOV
2743
                                        scid.ToUint64(),
×
UNCOV
2744
                                        sourceToPub(nMsg.source),
×
UNCOV
2745
                                )
×
UNCOV
2746
                                _, _ = d.recentRejects.Put(
×
UNCOV
2747
                                        key, &cachedReject{},
×
UNCOV
2748
                                )
×
2749

UNCOV
2750
                        case errors.Is(err, ErrChannelSpent):
×
UNCOV
2751
                                key := newRejectCacheKey(
×
UNCOV
2752
                                        scid.ToUint64(),
×
UNCOV
2753
                                        sourceToPub(nMsg.source),
×
UNCOV
2754
                                )
×
UNCOV
2755
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2756

×
UNCOV
2757
                                // Since this channel has already been closed,
×
UNCOV
2758
                                // we'll add it to the graph's closed channel
×
UNCOV
2759
                                // index such that we won't attempt to do
×
UNCOV
2760
                                // expensive validation checks on it again.
×
UNCOV
2761
                                // TODO: Populate the ScidCloser by using closed
×
UNCOV
2762
                                // channel notifications.
×
UNCOV
2763
                                dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
×
UNCOV
2764
                                if dbErr != nil {
×
2765
                                        log.Errorf("failed to mark scid(%v) "+
×
2766
                                                "as closed: %v", scid, dbErr)
×
2767

×
2768
                                        nMsg.err <- dbErr
×
2769

×
2770
                                        return nil, false
×
2771
                                }
×
2772

2773
                        default:
×
2774
                                // Otherwise, this is just a regular rejected
×
2775
                                // edge. We won't increase the ban score for the
×
2776
                                // remote peer.
×
2777
                                key := newRejectCacheKey(
×
2778
                                        scid.ToUint64(),
×
2779
                                        sourceToPub(nMsg.source),
×
2780
                                )
×
2781
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2782

×
2783
                                nMsg.err <- err
×
2784

×
2785
                                return nil, false
×
2786
                        }
2787

UNCOV
2788
                        if !nMsg.isRemote {
×
2789
                                log.Errorf("failed to add edge for local "+
×
2790
                                        "channel: %v", err)
×
2791
                                nMsg.err <- err
×
2792

×
2793
                                return nil, false
×
2794
                        }
×
2795

UNCOV
2796
                        log.Warnf("Increasing ban score for peer=%v due to "+
×
UNCOV
2797
                                "invalid channel announcement for channel %v",
×
UNCOV
2798
                                nMsg.peer, scid)
×
UNCOV
2799

×
UNCOV
2800
                        // Increment the peer's ban score if they are sending
×
UNCOV
2801
                        // us invalid channel announcements.
×
UNCOV
2802
                        dcErr := d.handleBadPeer(nMsg.peer)
×
UNCOV
2803
                        if dcErr != nil {
×
2804
                                err = dcErr
×
2805
                        }
×
2806

UNCOV
2807
                        nMsg.err <- err
×
UNCOV
2808

×
UNCOV
2809
                        return nil, false
×
2810
                }
2811

2812
                edge.FundingScript = fn.Some(script)
2✔
2813

2✔
2814
                // TODO(roasbeef): this is a hack, needs to be removed after
2✔
2815
                //  commitment fees are dynamic.
2✔
2816
                edge.Capacity = capacity
2✔
2817
                edge.ChannelPoint = op
2✔
2818
        }
2819

2820
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
2✔
2821

2✔
2822
        // We will add the edge to the channel router. If the nodes present in
2✔
2823
        // this channel are not present in the database, a partial node will be
2✔
2824
        // added to represent each node while we wait for a node announcement.
2✔
2825
        err = d.cfg.Graph.AddEdge(ctx, edge, ops...)
2✔
2826
        if err != nil {
4✔
2827
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
2✔
2828
                        scid.ToUint64(), err)
2✔
2829

2✔
2830
                defer d.channelMtx.Unlock(scid.ToUint64())
2✔
2831

2✔
2832
                // If the edge was rejected due to already being known, then it
2✔
2833
                // may be the case that this new message has a fresh channel
2✔
2834
                // proof, so we'll check.
2✔
2835
                if graph.IsError(err, graph.ErrIgnored) {
4✔
2836
                        // Attempt to process the rejected message to see if we
2✔
2837
                        // get any new announcements.
2✔
2838
                        anns, rErr := d.processRejectedEdge(ctx, ann, proof)
2✔
2839
                        if rErr != nil {
2✔
2840
                                key := newRejectCacheKey(
×
2841
                                        scid.ToUint64(),
×
2842
                                        sourceToPub(nMsg.source),
×
2843
                                )
×
2844
                                cr := &cachedReject{}
×
2845
                                _, _ = d.recentRejects.Put(key, cr)
×
2846

×
2847
                                nMsg.err <- rErr
×
2848

×
2849
                                return nil, false
×
2850
                        }
×
2851

2852
                        log.Debugf("Extracted %v announcements from rejected "+
2✔
2853
                                "msgs", len(anns))
2✔
2854

2✔
2855
                        // If while processing this rejected edge, we realized
2✔
2856
                        // there's a set of announcements we could extract,
2✔
2857
                        // then we'll return those directly.
2✔
2858
                        //
2✔
2859
                        // NOTE: since this is an ErrIgnored, we can return
2✔
2860
                        // true here to signal "allow" to its dependants.
2✔
2861
                        nMsg.err <- nil
2✔
2862

2✔
2863
                        return anns, true
2✔
2864
                }
2865

2866
                // Otherwise, this is just a regular rejected edge.
UNCOV
2867
                key := newRejectCacheKey(
×
UNCOV
2868
                        scid.ToUint64(),
×
UNCOV
2869
                        sourceToPub(nMsg.source),
×
UNCOV
2870
                )
×
UNCOV
2871
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2872

×
UNCOV
2873
                if !nMsg.isRemote {
×
2874
                        log.Errorf("failed to add edge for local channel: %v",
×
2875
                                err)
×
2876
                        nMsg.err <- err
×
2877

×
2878
                        return nil, false
×
2879
                }
×
2880

UNCOV
2881
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
×
UNCOV
2882
                if dcErr != nil {
×
2883
                        log.Errorf("failed to check if we should disconnect "+
×
2884
                                "peer: %v", dcErr)
×
2885
                        nMsg.err <- dcErr
×
2886

×
2887
                        return nil, false
×
2888
                }
×
2889

UNCOV
2890
                if shouldDc {
×
2891
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2892
                }
×
2893

UNCOV
2894
                nMsg.err <- err
×
UNCOV
2895

×
UNCOV
2896
                return nil, false
×
2897
        }
2898

2899
        // If err is nil, release the lock immediately.
2900
        d.channelMtx.Unlock(scid.ToUint64())
2✔
2901

2✔
2902
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
2✔
2903

2✔
2904
        // If we earlier received any ChannelUpdates for this channel, we can
2✔
2905
        // now process them, as the channel is added to the graph.
2✔
2906
        var channelUpdates []*processedNetworkMsg
2✔
2907

2✔
2908
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
2✔
2909
        if err == nil {
4✔
2910
                // There was actually an entry in the map, so we'll accumulate
2✔
2911
                // it. We don't worry about deletion, since it'll eventually
2✔
2912
                // fall out anyway.
2✔
2913
                chanMsgs := earlyChanUpdates
2✔
2914
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
2✔
2915
        }
2✔
2916

2917
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2918
        // ensure we don't block here, as we can handle only one announcement
2919
        // at a time.
2920
        for _, cu := range channelUpdates {
4✔
2921
                // Skip if already processed.
2✔
2922
                if cu.processed {
3✔
2923
                        continue
1✔
2924
                }
2925

2926
                // Mark the ChannelUpdate as processed. This ensures that a
2927
                // subsequent announcement in the option-scid-alias case does
2928
                // not re-use an old ChannelUpdate.
2929
                cu.processed = true
2✔
2930

2✔
2931
                d.wg.Add(1)
2✔
2932
                go func(updMsg *networkMsg) {
4✔
2933
                        defer d.wg.Done()
2✔
2934

2✔
2935
                        switch msg := updMsg.msg.(type) {
2✔
2936
                        // Reprocess the message, making sure we return an
2937
                        // error to the original caller in case the gossiper
2938
                        // shuts down.
2939
                        case *lnwire.ChannelUpdate1:
2✔
2940
                                log.Debugf("Reprocessing ChannelUpdate for "+
2✔
2941
                                        "shortChanID=%v", scid.ToUint64())
2✔
2942

2✔
2943
                                select {
2✔
2944
                                case d.networkMsgs <- updMsg:
2✔
2945
                                case <-d.quit:
×
2946
                                        updMsg.err <- ErrGossiperShuttingDown
×
2947
                                }
2948

2949
                        // We don't expect any other message type than
2950
                        // ChannelUpdate to be in this cache.
2951
                        default:
×
2952
                                log.Errorf("Unsupported message type found "+
×
2953
                                        "among ChannelUpdates: %T", msg)
×
2954
                        }
2955
                }(cu.msg)
2956
        }
2957

2958
        // Channel announcement was successfully processed and now it might be
2959
        // broadcast to other connected nodes if it was an announcement with
2960
        // proof (remote).
2961
        var announcements []networkMsg
2✔
2962

2✔
2963
        if proof != nil {
4✔
2964
                announcements = append(announcements, networkMsg{
2✔
2965
                        peer:     nMsg.peer,
2✔
2966
                        isRemote: nMsg.isRemote,
2✔
2967
                        source:   nMsg.source,
2✔
2968
                        msg:      ann,
2✔
2969
                })
2✔
2970
        }
2✔
2971

2972
        nMsg.err <- nil
2✔
2973

2✔
2974
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
2✔
2975
                nMsg.peer, scid.ToUint64())
2✔
2976

2✔
2977
        return announcements, true
2✔
2978
}
2979

2980
// handleChanUpdate processes a new channel update.
2981
//
2982
//nolint:funlen
2983
func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
2984
        nMsg *networkMsg, upd *lnwire.ChannelUpdate1,
2985
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
2✔
2986

2✔
2987
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
2✔
2988
                nMsg.peer, upd.ShortChannelID.ToUint64())
2✔
2989

2✔
2990
        chainHash := d.cfg.ChainParams.GenesisHash
2✔
2991

2✔
2992
        // We'll ignore any channel updates that target any chain other than
2✔
2993
        // the set of chains we know of.
2✔
2994
        if !bytes.Equal(upd.ChainHash[:], chainHash[:]) {
2✔
2995
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2996
                        "gossiper on chain=%v", upd.ChainHash, chainHash)
×
2997
                log.Errorf(err.Error())
×
2998

×
2999
                key := newRejectCacheKey(
×
3000
                        upd.ShortChannelID.ToUint64(),
×
3001
                        sourceToPub(nMsg.source),
×
3002
                )
×
3003
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3004

×
3005
                nMsg.err <- err
×
3006
                return nil, false
×
3007
        }
×
3008

3009
        blockHeight := upd.ShortChannelID.BlockHeight
2✔
3010
        shortChanID := upd.ShortChannelID.ToUint64()
2✔
3011

2✔
3012
        // If the advertised inclusionary block is beyond our knowledge of the
2✔
3013
        // chain tip, then we'll put the announcement in limbo to be fully
2✔
3014
        // verified once we advance forward in the chain. If the update has an
2✔
3015
        // alias SCID, we'll skip the isPremature check. This is necessary
2✔
3016
        // since aliases start at block height 16_000_000.
2✔
3017
        d.Lock()
2✔
3018
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
2✔
3019
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
3✔
3020

1✔
3021
                log.Warnf("Update announcement for short_chan_id(%v), is "+
1✔
3022
                        "premature: advertises height %v, only height %v is "+
1✔
3023
                        "known", shortChanID, blockHeight, d.bestHeight)
1✔
3024
                d.Unlock()
1✔
3025
                nMsg.err <- nil
1✔
3026
                return nil, false
1✔
3027
        }
1✔
3028
        d.Unlock()
2✔
3029

2✔
3030
        // Before we perform any of the expensive checks below, we'll check
2✔
3031
        // whether this update is stale or is for a zombie channel in order to
2✔
3032
        // quickly reject it.
2✔
3033
        timestamp := time.Unix(int64(upd.Timestamp), 0)
2✔
3034

2✔
3035
        // Fetch the SCID we should be using to lock the channelMtx and make
2✔
3036
        // graph queries with.
2✔
3037
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
2✔
3038
        if err != nil {
4✔
3039
                // Fallback and set the graphScid to the peer-provided SCID.
2✔
3040
                // This will occur for non-option-scid-alias channels and for
2✔
3041
                // public option-scid-alias channels after 6 confirmations.
2✔
3042
                // Once public option-scid-alias channels have 6 confs, we'll
2✔
3043
                // ignore ChannelUpdates with one of their aliases.
2✔
3044
                graphScid = upd.ShortChannelID
2✔
3045
        }
2✔
3046

3047
        // We make sure to obtain the mutex for this channel ID before we access
3048
        // the database. This ensures the state we read from the database has
3049
        // not changed between this point and when we call UpdateEdge() later.
3050
        d.channelMtx.Lock(graphScid.ToUint64())
2✔
3051
        defer d.channelMtx.Unlock(graphScid.ToUint64())
2✔
3052

2✔
3053
        if d.cfg.Graph.IsStaleEdgePolicy(
2✔
3054
                graphScid, timestamp, upd.ChannelFlags,
2✔
3055
        ) {
4✔
3056

2✔
3057
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
2✔
3058
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
2✔
3059
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
2✔
3060
                )
2✔
3061

2✔
3062
                nMsg.err <- nil
2✔
3063
                return nil, true
2✔
3064
        }
2✔
3065

3066
        // Check that the ChanUpdate is not too far into the future, this could
3067
        // reveal some faulty implementation therefore we log an error.
3068
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
2✔
3069
                err := fmt.Errorf("skewed timestamp of edge policy, "+
×
3070
                        "timestamp too far in the future: %v", timestamp.Unix())
×
3071

×
3072
                // If this is a channel_update from us, we'll just ignore it.
×
3073
                if !nMsg.isRemote {
×
3074
                        nMsg.err <- err
×
3075
                        return nil, false
×
3076
                }
×
3077

3078
                log.Errorf("Increasing ban score for peer=%v due to bad "+
×
3079
                        "channel_update with short_chan_id(%v): timestamp(%v) "+
×
3080
                        "too far in the future", nMsg.peer, shortChanID,
×
3081
                        timestamp.Unix())
×
3082

×
3083
                // Increment the peer's ban score if they are skewed channel
×
3084
                // updates.
×
3085
                dcErr := d.handleBadPeer(nMsg.peer)
×
3086
                if dcErr != nil {
×
3087
                        err = dcErr
×
3088
                }
×
3089

3090
                nMsg.err <- err
×
3091

×
3092
                return nil, false
×
3093
        }
3094

3095
        // Get the node pub key as far since we don't have it in the channel
3096
        // update announcement message. We'll need this to properly verify the
3097
        // message's signature.
3098
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
2✔
3099
        switch {
2✔
3100
        // No error, break.
3101
        case err == nil:
2✔
3102
                break
2✔
3103

UNCOV
3104
        case errors.Is(err, graphdb.ErrZombieEdge):
×
UNCOV
3105
                err = d.processZombieUpdate(ctx, chanInfo, graphScid, upd)
×
UNCOV
3106
                if err != nil {
×
UNCOV
3107
                        log.Debug(err)
×
UNCOV
3108
                        nMsg.err <- err
×
UNCOV
3109
                        return nil, false
×
UNCOV
3110
                }
×
3111

3112
                // We'll fallthrough to ensure we stash the update until we
3113
                // receive its corresponding ChannelAnnouncement. This is
3114
                // needed to ensure the edge exists in the graph before
3115
                // applying the update.
UNCOV
3116
                fallthrough
×
UNCOV
3117
        case errors.Is(err, graphdb.ErrGraphNotFound):
×
UNCOV
3118
                fallthrough
×
UNCOV
3119
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
×
UNCOV
3120
                fallthrough
×
3121
        case errors.Is(err, graphdb.ErrEdgeNotFound):
2✔
3122
                // If the edge corresponding to this ChannelUpdate was not
2✔
3123
                // found in the graph, this might be a channel in the process
2✔
3124
                // of being opened, and we haven't processed our own
2✔
3125
                // ChannelAnnouncement yet, hence it is not not found in the
2✔
3126
                // graph. This usually gets resolved after the channel proofs
2✔
3127
                // are exchanged and the channel is broadcasted to the rest of
2✔
3128
                // the network, but in case this is a private channel this
2✔
3129
                // won't ever happen. This can also happen in the case of a
2✔
3130
                // zombie channel with a fresh update for which we don't have a
2✔
3131
                // ChannelAnnouncement for since we reject them. Because of
2✔
3132
                // this, we temporarily add it to a map, and reprocess it after
2✔
3133
                // our own ChannelAnnouncement has been processed.
2✔
3134
                //
2✔
3135
                // The shortChanID may be an alias, but it is fine to use here
2✔
3136
                // since we don't have an edge in the graph and if the peer is
2✔
3137
                // not buggy, we should be able to use it once the gossiper
2✔
3138
                // receives the local announcement.
2✔
3139
                pMsg := &processedNetworkMsg{msg: nMsg}
2✔
3140

2✔
3141
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
2✔
3142
                switch {
2✔
3143
                // Nothing in the cache yet, we can just directly insert this
3144
                // element.
3145
                case err == cache.ErrElementNotFound:
2✔
3146
                        _, _ = d.prematureChannelUpdates.Put(
2✔
3147
                                shortChanID, &cachedNetworkMsg{
2✔
3148
                                        msgs: []*processedNetworkMsg{pMsg},
2✔
3149
                                })
2✔
3150

3151
                // There's already something in the cache, so we'll combine the
3152
                // set of messages into a single value.
3153
                default:
2✔
3154
                        msgs := earlyMsgs.msgs
2✔
3155
                        msgs = append(msgs, pMsg)
2✔
3156
                        _, _ = d.prematureChannelUpdates.Put(
2✔
3157
                                shortChanID, &cachedNetworkMsg{
2✔
3158
                                        msgs: msgs,
2✔
3159
                                })
2✔
3160
                }
3161

3162
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
2✔
3163
                        "(shortChanID=%v), saving for reprocessing later",
2✔
3164
                        shortChanID)
2✔
3165

2✔
3166
                // NOTE: We don't return anything on the error channel for this
2✔
3167
                // message, as we expect that will be done when this
2✔
3168
                // ChannelUpdate is later reprocessed. This might never happen
2✔
3169
                // if the corresponding ChannelAnnouncement is never received
2✔
3170
                // or the LRU cache is filled up and the entry is evicted.
2✔
3171
                return nil, false
2✔
3172

3173
        default:
×
3174
                err := fmt.Errorf("unable to validate channel update "+
×
3175
                        "short_chan_id=%v: %v", shortChanID, err)
×
3176
                log.Error(err)
×
3177
                nMsg.err <- err
×
3178

×
3179
                key := newRejectCacheKey(
×
3180
                        upd.ShortChannelID.ToUint64(),
×
3181
                        sourceToPub(nMsg.source),
×
3182
                )
×
3183
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3184

×
3185
                return nil, false
×
3186
        }
3187

3188
        // The least-significant bit in the flag on the channel update
3189
        // announcement tells us "which" side of the channels directed edge is
3190
        // being updated.
3191
        var (
2✔
3192
                pubKey       *btcec.PublicKey
2✔
3193
                edgeToUpdate *models.ChannelEdgePolicy
2✔
3194
        )
2✔
3195
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
2✔
3196
        switch direction {
2✔
3197
        case 0:
2✔
3198
                pubKey, _ = chanInfo.NodeKey1()
2✔
3199
                edgeToUpdate = e1
2✔
3200
        case 1:
2✔
3201
                pubKey, _ = chanInfo.NodeKey2()
2✔
3202
                edgeToUpdate = e2
2✔
3203
        }
3204

3205
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
2✔
3206
                "edge policy=%v", chanInfo.ChannelID,
2✔
3207
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
2✔
3208

2✔
3209
        // Validate the channel announcement with the expected public key and
2✔
3210
        // channel capacity. In the case of an invalid channel update, we'll
2✔
3211
        // return an error to the caller and exit early.
2✔
3212
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
2✔
3213
        if err != nil {
2✔
UNCOV
3214
                rErr := fmt.Errorf("unable to validate channel update "+
×
UNCOV
3215
                        "announcement for short_chan_id=%v: %v",
×
UNCOV
3216
                        lnutils.SpewLogClosure(upd.ShortChannelID), err)
×
UNCOV
3217

×
UNCOV
3218
                log.Error(rErr)
×
UNCOV
3219
                nMsg.err <- rErr
×
UNCOV
3220
                return nil, false
×
UNCOV
3221
        }
×
3222

3223
        // If we have a previous version of the edge being updated, we'll want
3224
        // to rate limit its updates to prevent spam throughout the network.
3225
        if nMsg.isRemote && edgeToUpdate != nil {
4✔
3226
                // If it's a keep-alive update, we'll only propagate one if
2✔
3227
                // it's been a day since the previous. This follows our own
2✔
3228
                // heuristic of sending keep-alive updates after the same
2✔
3229
                // duration (see retransmitStaleAnns).
2✔
3230
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
2✔
3231
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
4✔
3232
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
4✔
3233
                                log.Debugf("Ignoring keep alive update not "+
2✔
3234
                                        "within %v period for channel %v",
2✔
3235
                                        d.cfg.RebroadcastInterval, shortChanID)
2✔
3236
                                nMsg.err <- nil
2✔
3237
                                return nil, false
2✔
3238
                        }
2✔
3239
                } else {
2✔
3240
                        // If it's not, we'll allow an update per minute with a
2✔
3241
                        // maximum burst of 10. If we haven't seen an update
2✔
3242
                        // for this channel before, we'll need to initialize a
2✔
3243
                        // rate limiter for each direction.
2✔
3244
                        //
2✔
3245
                        // Since the edge exists in the graph, we'll create a
2✔
3246
                        // rate limiter for chanInfo.ChannelID rather then the
2✔
3247
                        // SCID the peer sent. This is because there may be
2✔
3248
                        // multiple aliases for a channel and we may otherwise
2✔
3249
                        // rate-limit only a single alias of the channel,
2✔
3250
                        // instead of the whole channel.
2✔
3251
                        baseScid := chanInfo.ChannelID
2✔
3252
                        d.Lock()
2✔
3253
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
2✔
3254
                        if !ok {
4✔
3255
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
2✔
3256
                                b := d.cfg.MaxChannelUpdateBurst
2✔
3257
                                rls = [2]*rate.Limiter{
2✔
3258
                                        rate.NewLimiter(r, b),
2✔
3259
                                        rate.NewLimiter(r, b),
2✔
3260
                                }
2✔
3261
                                d.chanUpdateRateLimiter[baseScid] = rls
2✔
3262
                        }
2✔
3263
                        d.Unlock()
2✔
3264

2✔
3265
                        if !rls[direction].Allow() {
4✔
3266
                                log.Debugf("Rate limiting update for channel "+
2✔
3267
                                        "%v from direction %x", shortChanID,
2✔
3268
                                        pubKey.SerializeCompressed())
2✔
3269
                                nMsg.err <- nil
2✔
3270
                                return nil, false
2✔
3271
                        }
2✔
3272
                }
3273
        }
3274

3275
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3276
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3277
        // lookup the stored SCID. If we're sending the update, we'll always
3278
        // use the SCID stored in the database rather than a potentially
3279
        // different alias. This might mean that SigBytes is incorrect as it
3280
        // signs a different SCID than the database SCID, but since there will
3281
        // only be a difference if AuthProof == nil, this is fine.
3282
        update := &models.ChannelEdgePolicy{
2✔
3283
                SigBytes:                  upd.Signature.ToSignatureBytes(),
2✔
3284
                ChannelID:                 chanInfo.ChannelID,
2✔
3285
                LastUpdate:                timestamp,
2✔
3286
                MessageFlags:              upd.MessageFlags,
2✔
3287
                ChannelFlags:              upd.ChannelFlags,
2✔
3288
                TimeLockDelta:             upd.TimeLockDelta,
2✔
3289
                MinHTLC:                   upd.HtlcMinimumMsat,
2✔
3290
                MaxHTLC:                   upd.HtlcMaximumMsat,
2✔
3291
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
2✔
3292
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
2✔
3293
                InboundFee:                upd.InboundFee.ValOpt(),
2✔
3294
                ExtraOpaqueData:           upd.ExtraOpaqueData,
2✔
3295
        }
2✔
3296

2✔
3297
        if err := d.cfg.Graph.UpdateEdge(ctx, update, ops...); err != nil {
2✔
3298
                if graph.IsError(
×
3299
                        err, graph.ErrOutdated,
×
3300
                        graph.ErrIgnored,
×
3301
                ) {
×
3302

×
3303
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
×
3304
                                shortChanID, err)
×
3305
                } else {
×
3306
                        // Since we know the stored SCID in the graph, we'll
×
3307
                        // cache that SCID.
×
3308
                        key := newRejectCacheKey(
×
3309
                                chanInfo.ChannelID,
×
3310
                                sourceToPub(nMsg.source),
×
3311
                        )
×
3312
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3313

×
3314
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3315
                                shortChanID, err)
×
3316
                }
×
3317

3318
                nMsg.err <- err
×
3319
                return nil, false
×
3320
        }
3321

3322
        // If this is a local ChannelUpdate without an AuthProof, it means it
3323
        // is an update to a channel that is not (yet) supposed to be announced
3324
        // to the greater network. However, our channel counter party will need
3325
        // to be given the update, so we'll try sending the update directly to
3326
        // the remote peer.
3327
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
4✔
3328
                if nMsg.optionalMsgFields != nil {
4✔
3329
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
2✔
3330
                        if remoteAlias != nil {
4✔
3331
                                // The remoteAlias field was specified, meaning
2✔
3332
                                // that we should replace the SCID in the
2✔
3333
                                // update with the remote's alias. We'll also
2✔
3334
                                // need to re-sign the channel update. This is
2✔
3335
                                // required for option-scid-alias feature-bit
2✔
3336
                                // negotiated channels.
2✔
3337
                                upd.ShortChannelID = *remoteAlias
2✔
3338

2✔
3339
                                sig, err := d.cfg.SignAliasUpdate(upd)
2✔
3340
                                if err != nil {
2✔
3341
                                        log.Error(err)
×
3342
                                        nMsg.err <- err
×
3343
                                        return nil, false
×
3344
                                }
×
3345

3346
                                lnSig, err := lnwire.NewSigFromSignature(sig)
2✔
3347
                                if err != nil {
2✔
3348
                                        log.Error(err)
×
3349
                                        nMsg.err <- err
×
3350
                                        return nil, false
×
3351
                                }
×
3352

3353
                                upd.Signature = lnSig
2✔
3354
                        }
3355
                }
3356

3357
                // Get our peer's public key.
3358
                remotePubKey := remotePubFromChanInfo(
2✔
3359
                        chanInfo, upd.ChannelFlags,
2✔
3360
                )
2✔
3361

2✔
3362
                log.Debugf("The message %v has no AuthProof, sending the "+
2✔
3363
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
2✔
3364

2✔
3365
                // Now we'll attempt to send the channel update message
2✔
3366
                // reliably to the remote peer in the background, so that we
2✔
3367
                // don't block if the peer happens to be offline at the moment.
2✔
3368
                err := d.reliableSender.sendMessage(ctx, upd, remotePubKey)
2✔
3369
                if err != nil {
2✔
3370
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3371
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3372
                                upd.ShortChannelID, remotePubKey, err)
×
3373
                        nMsg.err <- err
×
3374
                        return nil, false
×
3375
                }
×
3376
        }
3377

3378
        // Channel update announcement was successfully processed and now it
3379
        // can be broadcast to the rest of the network. However, we'll only
3380
        // broadcast the channel update announcement if it has an attached
3381
        // authentication proof. We also won't broadcast the update if it
3382
        // contains an alias because the network would reject this.
3383
        var announcements []networkMsg
2✔
3384
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
4✔
3385
                announcements = append(announcements, networkMsg{
2✔
3386
                        peer:     nMsg.peer,
2✔
3387
                        source:   nMsg.source,
2✔
3388
                        isRemote: nMsg.isRemote,
2✔
3389
                        msg:      upd,
2✔
3390
                })
2✔
3391
        }
2✔
3392

3393
        nMsg.err <- nil
2✔
3394

2✔
3395
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
2✔
3396
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
2✔
3397
                timestamp)
2✔
3398
        return announcements, true
2✔
3399
}
3400

3401
// handleAnnSig processes a new announcement signatures message.
3402
//
3403
//nolint:funlen
3404
func (d *AuthenticatedGossiper) handleAnnSig(ctx context.Context,
3405
        nMsg *networkMsg, ann *lnwire.AnnounceSignatures1) ([]networkMsg,
3406
        bool) {
2✔
3407

2✔
3408
        needBlockHeight := ann.ShortChannelID.BlockHeight +
2✔
3409
                d.cfg.ProofMatureDelta
2✔
3410
        shortChanID := ann.ShortChannelID.ToUint64()
2✔
3411

2✔
3412
        prefix := "local"
2✔
3413
        if nMsg.isRemote {
4✔
3414
                prefix = "remote"
2✔
3415
        }
2✔
3416

3417
        log.Infof("Received new %v announcement signature for %v", prefix,
2✔
3418
                ann.ShortChannelID)
2✔
3419

2✔
3420
        // By the specification, channel announcement proofs should be sent
2✔
3421
        // after some number of confirmations after channel was registered in
2✔
3422
        // bitcoin blockchain. Therefore, we check if the proof is mature.
2✔
3423
        d.Lock()
2✔
3424
        premature := d.isPremature(
2✔
3425
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
2✔
3426
        )
2✔
3427
        if premature {
4✔
3428
                log.Warnf("Premature proof announcement, current block height"+
2✔
3429
                        "lower than needed: %v < %v", d.bestHeight,
2✔
3430
                        needBlockHeight)
2✔
3431
                d.Unlock()
2✔
3432
                nMsg.err <- nil
2✔
3433
                return nil, false
2✔
3434
        }
2✔
3435
        d.Unlock()
2✔
3436

2✔
3437
        // Ensure that we know of a channel with the target channel ID before
2✔
3438
        // proceeding further.
2✔
3439
        //
2✔
3440
        // We must acquire the mutex for this channel ID before getting the
2✔
3441
        // channel from the database, to ensure what we read does not change
2✔
3442
        // before we call AddProof() later.
2✔
3443
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
2✔
3444
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
2✔
3445

2✔
3446
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
2✔
3447
                ann.ShortChannelID,
2✔
3448
        )
2✔
3449
        if err != nil {
4✔
3450
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
2✔
3451
                if err != nil {
4✔
3452
                        err := fmt.Errorf("unable to store the proof for "+
2✔
3453
                                "short_chan_id=%v: %v", shortChanID, err)
2✔
3454
                        log.Error(err)
2✔
3455
                        nMsg.err <- err
2✔
3456

2✔
3457
                        return nil, false
2✔
3458
                }
2✔
3459

3460
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
2✔
3461
                err := d.cfg.WaitingProofStore.Add(proof)
2✔
3462
                if err != nil {
2✔
3463
                        err := fmt.Errorf("unable to store the proof for "+
×
3464
                                "short_chan_id=%v: %v", shortChanID, err)
×
3465
                        log.Error(err)
×
3466
                        nMsg.err <- err
×
3467
                        return nil, false
×
3468
                }
×
3469

3470
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
2✔
3471
                        ", adding to waiting batch", prefix, shortChanID)
2✔
3472
                nMsg.err <- nil
2✔
3473
                return nil, false
2✔
3474
        }
3475

3476
        nodeID := nMsg.source.SerializeCompressed()
2✔
3477
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
2✔
3478
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
2✔
3479

2✔
3480
        // Ensure that channel that was retrieved belongs to the peer which
2✔
3481
        // sent the proof announcement.
2✔
3482
        if !(isFirstNode || isSecondNode) {
2✔
3483
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3484
                        "to the peer which sent the proof, short_chan_id=%v",
×
3485
                        shortChanID)
×
3486
                log.Error(err)
×
3487
                nMsg.err <- err
×
3488
                return nil, false
×
3489
        }
×
3490

3491
        // If proof was sent by a local sub-system, then we'll send the
3492
        // announcement signature to the remote node so they can also
3493
        // reconstruct the full channel announcement.
3494
        if !nMsg.isRemote {
4✔
3495
                var remotePubKey [33]byte
2✔
3496
                if isFirstNode {
4✔
3497
                        remotePubKey = chanInfo.NodeKey2Bytes
2✔
3498
                } else {
4✔
3499
                        remotePubKey = chanInfo.NodeKey1Bytes
2✔
3500
                }
2✔
3501

3502
                // Since the remote peer might not be online we'll call a
3503
                // method that will attempt to deliver the proof when it comes
3504
                // online.
3505
                err := d.reliableSender.sendMessage(ctx, ann, remotePubKey)
2✔
3506
                if err != nil {
2✔
3507
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3508
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3509
                                ann.ShortChannelID, remotePubKey, err)
×
3510
                        nMsg.err <- err
×
3511
                        return nil, false
×
3512
                }
×
3513
        }
3514

3515
        // Check if we already have the full proof for this channel.
3516
        if chanInfo.AuthProof != nil {
4✔
3517
                // If we already have the fully assembled proof, then the peer
2✔
3518
                // sending us their proof has probably not received our local
2✔
3519
                // proof yet. So be kind and send them the full proof.
2✔
3520
                if nMsg.isRemote {
4✔
3521
                        peerID := nMsg.source.SerializeCompressed()
2✔
3522
                        log.Debugf("Got AnnounceSignatures for channel with " +
2✔
3523
                                "full proof.")
2✔
3524

2✔
3525
                        d.wg.Add(1)
2✔
3526
                        go func() {
4✔
3527
                                defer d.wg.Done()
2✔
3528

2✔
3529
                                log.Debugf("Received half proof for channel "+
2✔
3530
                                        "%v with existing full proof. Sending"+
2✔
3531
                                        " full proof to peer=%x",
2✔
3532
                                        ann.ChannelID, peerID)
2✔
3533

2✔
3534
                                ca, _, _, err := netann.CreateChanAnnouncement(
2✔
3535
                                        chanInfo.AuthProof, chanInfo, e1, e2,
2✔
3536
                                )
2✔
3537
                                if err != nil {
2✔
3538
                                        log.Errorf("unable to gen ann: %v",
×
3539
                                                err)
×
3540
                                        return
×
3541
                                }
×
3542

3543
                                err = nMsg.peer.SendMessage(false, ca)
2✔
3544
                                if err != nil {
2✔
3545
                                        log.Errorf("Failed sending full proof"+
×
3546
                                                " to peer=%x: %v", peerID, err)
×
3547
                                        return
×
3548
                                }
×
3549

3550
                                log.Debugf("Full proof sent to peer=%x for "+
2✔
3551
                                        "chanID=%v", peerID, ann.ChannelID)
2✔
3552
                        }()
3553
                }
3554

3555
                log.Debugf("Already have proof for channel with chanID=%v",
2✔
3556
                        ann.ChannelID)
2✔
3557
                nMsg.err <- nil
2✔
3558
                return nil, true
2✔
3559
        }
3560

3561
        // Check that we received the opposite proof. If so, then we're now
3562
        // able to construct the full proof, and create the channel
3563
        // announcement. If we didn't receive the opposite half of the proof
3564
        // then we should store this one, and wait for the opposite to be
3565
        // received.
3566
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
2✔
3567
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
2✔
3568
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
2✔
3569
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3570
                        "short_chan_id=%v: %v", shortChanID, err)
×
3571
                log.Error(err)
×
3572
                nMsg.err <- err
×
3573
                return nil, false
×
3574
        }
×
3575

3576
        if err == channeldb.ErrWaitingProofNotFound {
4✔
3577
                err := d.cfg.WaitingProofStore.Add(proof)
2✔
3578
                if err != nil {
2✔
3579
                        err := fmt.Errorf("unable to store the proof for "+
×
3580
                                "short_chan_id=%v: %v", shortChanID, err)
×
3581
                        log.Error(err)
×
3582
                        nMsg.err <- err
×
3583
                        return nil, false
×
3584
                }
×
3585

3586
                log.Infof("1/2 of channel ann proof received for "+
2✔
3587
                        "short_chan_id=%v, waiting for other half",
2✔
3588
                        shortChanID)
2✔
3589

2✔
3590
                nMsg.err <- nil
2✔
3591
                return nil, false
2✔
3592
        }
3593

3594
        // We now have both halves of the channel announcement proof, then
3595
        // we'll reconstruct the initial announcement so we can validate it
3596
        // shortly below.
3597
        var dbProof models.ChannelAuthProof
2✔
3598
        if isFirstNode {
4✔
3599
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
2✔
3600
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
2✔
3601
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
2✔
3602
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
2✔
3603
        } else {
4✔
3604
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
2✔
3605
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
2✔
3606
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
2✔
3607
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
2✔
3608
        }
2✔
3609

3610
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
2✔
3611
                &dbProof, chanInfo, e1, e2,
2✔
3612
        )
2✔
3613
        if err != nil {
2✔
3614
                log.Error(err)
×
3615
                nMsg.err <- err
×
3616
                return nil, false
×
3617
        }
×
3618

3619
        // With all the necessary components assembled validate the full
3620
        // channel announcement proof.
3621
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
2✔
3622
        if err != nil {
2✔
3623
                err := fmt.Errorf("channel announcement proof for "+
×
3624
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3625

×
3626
                log.Error(err)
×
3627
                nMsg.err <- err
×
3628
                return nil, false
×
3629
        }
×
3630

3631
        // If the channel was returned by the router it means that existence of
3632
        // funding point and inclusion of nodes bitcoin keys in it already
3633
        // checked by the router. In this stage we should check that node keys
3634
        // attest to the bitcoin keys by validating the signatures of
3635
        // announcement. If proof is valid then we'll populate the channel edge
3636
        // with it, so we can announce it on peer connect.
3637
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
2✔
3638
        if err != nil {
2✔
3639
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3640
                        " %v", ann.ChannelID, err)
×
3641
                log.Error(err)
×
3642
                nMsg.err <- err
×
3643
                return nil, false
×
3644
        }
×
3645

3646
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
2✔
3647
        if err != nil {
2✔
3648
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3649
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3650
                log.Error(err)
×
3651
                nMsg.err <- err
×
3652
                return nil, false
×
3653
        }
×
3654

3655
        // Proof was successfully created and now can announce the channel to
3656
        // the remain network.
3657
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
2✔
3658
                ", adding to next ann batch", shortChanID)
2✔
3659

2✔
3660
        // Assemble the necessary announcements to add to the next broadcasting
2✔
3661
        // batch.
2✔
3662
        var announcements []networkMsg
2✔
3663
        announcements = append(announcements, networkMsg{
2✔
3664
                peer:   nMsg.peer,
2✔
3665
                source: nMsg.source,
2✔
3666
                msg:    chanAnn,
2✔
3667
        })
2✔
3668
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
4✔
3669
                announcements = append(announcements, networkMsg{
2✔
3670
                        peer:   nMsg.peer,
2✔
3671
                        source: src,
2✔
3672
                        msg:    e1Ann,
2✔
3673
                })
2✔
3674
        }
2✔
3675
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
4✔
3676
                announcements = append(announcements, networkMsg{
2✔
3677
                        peer:   nMsg.peer,
2✔
3678
                        source: src,
2✔
3679
                        msg:    e2Ann,
2✔
3680
                })
2✔
3681
        }
2✔
3682

3683
        // We'll also send along the node announcements for each channel
3684
        // participant if we know of them. To ensure our node announcement
3685
        // propagates to our channel counterparty, we'll set the source for
3686
        // each announcement to the node it belongs to, otherwise we won't send
3687
        // it since the source gets skipped. This isn't necessary for channel
3688
        // updates and announcement signatures since we send those directly to
3689
        // our channel counterparty through the gossiper's reliable sender.
3690
        node1Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey1Bytes)
2✔
3691
        if err != nil {
4✔
3692
                log.Debugf("Unable to fetch node announcement for %x: %v",
2✔
3693
                        chanInfo.NodeKey1Bytes, err)
2✔
3694
        } else {
4✔
3695
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
4✔
3696
                        announcements = append(announcements, networkMsg{
2✔
3697
                                peer:   nMsg.peer,
2✔
3698
                                source: nodeKey1,
2✔
3699
                                msg:    node1Ann,
2✔
3700
                        })
2✔
3701
                }
2✔
3702
        }
3703

3704
        node2Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey2Bytes)
2✔
3705
        if err != nil {
4✔
3706
                log.Debugf("Unable to fetch node announcement for %x: %v",
2✔
3707
                        chanInfo.NodeKey2Bytes, err)
2✔
3708
        } else {
4✔
3709
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
4✔
3710
                        announcements = append(announcements, networkMsg{
2✔
3711
                                peer:   nMsg.peer,
2✔
3712
                                source: nodeKey2,
2✔
3713
                                msg:    node2Ann,
2✔
3714
                        })
2✔
3715
                }
2✔
3716
        }
3717

3718
        nMsg.err <- nil
2✔
3719
        return announcements, true
2✔
3720
}
3721

3722
// isBanned returns true if the peer identified by pubkey is banned for sending
3723
// invalid channel announcements.
3724
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
2✔
3725
        return d.banman.isBanned(pubkey)
2✔
3726
}
2✔
3727

3728
// ShouldDisconnect returns true if we should disconnect the peer identified by
3729
// pubkey.
3730
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3731
        bool, error) {
2✔
3732

2✔
3733
        pubkeySer := pubkey.SerializeCompressed()
2✔
3734

2✔
3735
        var pubkeyBytes [33]byte
2✔
3736
        copy(pubkeyBytes[:], pubkeySer)
2✔
3737

2✔
3738
        // If the public key is banned, check whether or not this is a channel
2✔
3739
        // peer.
2✔
3740
        if d.isBanned(pubkeyBytes) {
2✔
UNCOV
3741
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
×
UNCOV
3742
                if err != nil {
×
3743
                        return false, err
×
3744
                }
×
3745

3746
                // We should only disconnect non-channel peers.
UNCOV
3747
                if !isChanPeer {
×
UNCOV
3748
                        return true, nil
×
UNCOV
3749
                }
×
3750
        }
3751

3752
        return false, nil
2✔
3753
}
3754

3755
// validateFundingTransaction fetches the channel announcements claimed funding
3756
// transaction from chain to ensure that it exists, is not spent and matches
3757
// the channel announcement proof. The transaction's outpoint and value are
3758
// returned if we can glean them from the work done in this method.
3759
func (d *AuthenticatedGossiper) validateFundingTransaction(_ context.Context,
3760
        ann *lnwire.ChannelAnnouncement1,
3761
        tapscriptRoot fn.Option[chainhash.Hash]) (wire.OutPoint, btcutil.Amount,
3762
        []byte, error) {
2✔
3763

2✔
3764
        scid := ann.ShortChannelID
2✔
3765

2✔
3766
        // Before we can add the channel to the channel graph, we need to obtain
2✔
3767
        // the full funding outpoint that's encoded within the channel ID.
2✔
3768
        fundingTx, err := lnwallet.FetchFundingTxWrapper(
2✔
3769
                d.cfg.ChainIO, scid, d.quit,
2✔
3770
        )
2✔
3771
        if err != nil {
2✔
UNCOV
3772
                //nolint:ll
×
UNCOV
3773
                //
×
UNCOV
3774
                // In order to ensure we don't erroneously mark a channel as a
×
UNCOV
3775
                // zombie due to an RPC failure, we'll attempt to string match
×
UNCOV
3776
                // for the relevant errors.
×
UNCOV
3777
                //
×
UNCOV
3778
                // * btcd:
×
UNCOV
3779
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
×
UNCOV
3780
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
×
UNCOV
3781
                // * bitcoind:
×
UNCOV
3782
                //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
×
UNCOV
3783
                //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
×
UNCOV
3784
                switch {
×
UNCOV
3785
                case strings.Contains(err.Error(), "not found"):
×
UNCOV
3786
                        fallthrough
×
3787

UNCOV
3788
                case strings.Contains(err.Error(), "out of range"):
×
UNCOV
3789
                        // If the funding transaction isn't found at all, then
×
UNCOV
3790
                        // we'll mark the edge itself as a zombie so we don't
×
UNCOV
3791
                        // continue to request it. We use the "zero key" for
×
UNCOV
3792
                        // both node pubkeys so this edge can't be resurrected.
×
UNCOV
3793
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
×
UNCOV
3794
                        if zErr != nil {
×
3795
                                return wire.OutPoint{}, 0, nil, zErr
×
3796
                        }
×
3797

3798
                default:
×
3799
                }
3800

UNCOV
3801
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
×
UNCOV
3802
                        ErrNoFundingTransaction, err)
×
3803
        }
3804

3805
        // Recreate witness output to be sure that declared in channel edge
3806
        // bitcoin keys and channel value corresponds to the reality.
3807
        fundingPkScript, err := makeFundingScript(
2✔
3808
                ann.BitcoinKey1[:], ann.BitcoinKey2[:], ann.Features,
2✔
3809
                tapscriptRoot,
2✔
3810
        )
2✔
3811
        if err != nil {
2✔
3812
                return wire.OutPoint{}, 0, nil, err
×
3813
        }
×
3814

3815
        // Next we'll validate that this channel is actually well formed. If
3816
        // this check fails, then this channel either doesn't exist, or isn't
3817
        // the one that was meant to be created according to the passed channel
3818
        // proofs.
3819
        fundingPoint, err := chanvalidate.Validate(
2✔
3820
                &chanvalidate.Context{
2✔
3821
                        Locator: &chanvalidate.ShortChanIDChanLocator{
2✔
3822
                                ID: scid,
2✔
3823
                        },
2✔
3824
                        MultiSigPkScript: fundingPkScript,
2✔
3825
                        FundingTx:        fundingTx,
2✔
3826
                },
2✔
3827
        )
2✔
3828
        if err != nil {
2✔
UNCOV
3829
                // Mark the edge as a zombie so we won't try to re-validate it
×
UNCOV
3830
                // on start up.
×
UNCOV
3831
                zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
×
UNCOV
3832
                if zErr != nil {
×
3833
                        return wire.OutPoint{}, 0, nil, zErr
×
3834
                }
×
3835

UNCOV
3836
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
×
UNCOV
3837
                        ErrInvalidFundingOutput, err)
×
3838
        }
3839

3840
        // Now that we have the funding outpoint of the channel, ensure
3841
        // that it hasn't yet been spent. If so, then this channel has
3842
        // been closed so we'll ignore it.
3843
        chanUtxo, err := d.cfg.ChainIO.GetUtxo(
2✔
3844
                fundingPoint, fundingPkScript, scid.BlockHeight, d.quit,
2✔
3845
        )
2✔
3846
        if err != nil {
2✔
UNCOV
3847
                if errors.Is(err, btcwallet.ErrOutputSpent) {
×
UNCOV
3848
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
×
UNCOV
3849
                        if zErr != nil {
×
3850
                                return wire.OutPoint{}, 0, nil, zErr
×
3851
                        }
×
3852
                }
3853

UNCOV
3854
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: unable to "+
×
UNCOV
3855
                        "fetch utxo for chan_id=%v, chan_point=%v: %w",
×
UNCOV
3856
                        ErrChannelSpent, scid.ToUint64(), fundingPoint, err)
×
3857
        }
3858

3859
        return *fundingPoint, btcutil.Amount(chanUtxo.Value), fundingPkScript,
2✔
3860
                nil
2✔
3861
}
3862

3863
// handleBadPeer takes a misbehaving peer and increases its ban score. Once
3864
// increased, it will disconnect the peer if its ban score has reached
3865
// `banThreshold` and it doesn't have a channel with us.
UNCOV
3866
func (d *AuthenticatedGossiper) handleBadPeer(peer lnpeer.Peer) error {
×
UNCOV
3867
        // Increment the peer's ban score for misbehavior.
×
UNCOV
3868
        d.banman.incrementBanScore(peer.PubKey())
×
UNCOV
3869

×
UNCOV
3870
        // If the peer is banned and not a channel peer, we'll disconnect them.
×
UNCOV
3871
        shouldDc, dcErr := d.ShouldDisconnect(peer.IdentityKey())
×
UNCOV
3872
        if dcErr != nil {
×
3873
                log.Errorf("failed to check if we should disconnect peer: %v",
×
3874
                        dcErr)
×
3875

×
3876
                return dcErr
×
3877
        }
×
3878

UNCOV
3879
        if shouldDc {
×
UNCOV
3880
                peer.Disconnect(ErrPeerBanned)
×
UNCOV
3881
        }
×
3882

UNCOV
3883
        return nil
×
3884
}
3885

3886
// makeFundingScript is used to make the funding script for both segwit v0 and
3887
// segwit v1 (taproot) channels.
3888
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
3889
        features *lnwire.RawFeatureVector,
3890
        tapscriptRoot fn.Option[chainhash.Hash]) ([]byte, error) {
2✔
3891

2✔
3892
        legacyFundingScript := func() ([]byte, error) {
4✔
3893
                witnessScript, err := input.GenMultiSigScript(
2✔
3894
                        bitcoinKey1, bitcoinKey2,
2✔
3895
                )
2✔
3896
                if err != nil {
2✔
3897
                        return nil, err
×
3898
                }
×
3899
                pkScript, err := input.WitnessScriptHash(witnessScript)
2✔
3900
                if err != nil {
2✔
3901
                        return nil, err
×
3902
                }
×
3903

3904
                return pkScript, nil
2✔
3905
        }
3906

3907
        if features.IsEmpty() {
4✔
3908
                return legacyFundingScript()
2✔
3909
        }
2✔
3910

3911
        chanFeatureBits := lnwire.NewFeatureVector(features, lnwire.Features)
2✔
3912
        if chanFeatureBits.HasFeature(
2✔
3913
                lnwire.SimpleTaprootChannelsOptionalStaging,
2✔
3914
        ) {
4✔
3915

2✔
3916
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
2✔
3917
                if err != nil {
2✔
3918
                        return nil, err
×
3919
                }
×
3920
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
2✔
3921
                if err != nil {
2✔
3922
                        return nil, err
×
3923
                }
×
3924

3925
                fundingScript, _, err := input.GenTaprootFundingScript(
2✔
3926
                        pubKey1, pubKey2, 0, tapscriptRoot,
2✔
3927
                )
2✔
3928
                if err != nil {
2✔
3929
                        return nil, err
×
3930
                }
×
3931

3932
                // TODO(roasbeef): add tapscript root to gossip v1.5
3933

3934
                return fundingScript, nil
2✔
3935
        }
3936

3937
        return legacyFundingScript()
×
3938
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc