• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17101605539

20 Aug 2025 02:35PM UTC coverage: 57.321% (-9.4%) from 66.68%
17101605539

push

github

web-flow
Merge pull request #10102 from yyforyongyu/fix-UpdatesInHorizon

Catch bad gossip peer and fix `UpdatesInHorizon`

28 of 89 new or added lines in 4 files covered. (31.46%)

29163 existing lines in 459 files now uncovered.

99187 of 173038 relevant lines covered (57.32%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.44
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/chaincfg/chainhash"
17
        "github.com/btcsuite/btcd/wire"
18
        "github.com/davecgh/go-spew/spew"
19
        "github.com/lightninglabs/neutrino/cache"
20
        "github.com/lightninglabs/neutrino/cache/lru"
21
        "github.com/lightningnetwork/lnd/batch"
22
        "github.com/lightningnetwork/lnd/chainntnfs"
23
        "github.com/lightningnetwork/lnd/channeldb"
24
        "github.com/lightningnetwork/lnd/fn/v2"
25
        "github.com/lightningnetwork/lnd/graph"
26
        graphdb "github.com/lightningnetwork/lnd/graph/db"
27
        "github.com/lightningnetwork/lnd/graph/db/models"
28
        "github.com/lightningnetwork/lnd/input"
29
        "github.com/lightningnetwork/lnd/keychain"
30
        "github.com/lightningnetwork/lnd/lnpeer"
31
        "github.com/lightningnetwork/lnd/lnutils"
32
        "github.com/lightningnetwork/lnd/lnwallet"
33
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
34
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
35
        "github.com/lightningnetwork/lnd/lnwire"
36
        "github.com/lightningnetwork/lnd/multimutex"
37
        "github.com/lightningnetwork/lnd/netann"
38
        "github.com/lightningnetwork/lnd/routing/route"
39
        "github.com/lightningnetwork/lnd/ticker"
40
        "golang.org/x/time/rate"
41
)
42

43
const (
44
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
45
        // for a specific channel and direction that we'll accept over an
46
        // interval.
47
        DefaultMaxChannelUpdateBurst = 10
48

49
        // DefaultChannelUpdateInterval is the default interval we'll use to
50
        // determine how often we should allow a new update for a specific
51
        // channel and direction.
52
        DefaultChannelUpdateInterval = time.Minute
53

54
        // maxPrematureUpdates tracks the max amount of premature channel
55
        // updates that we'll hold onto.
56
        maxPrematureUpdates = 100
57

58
        // maxFutureMessages tracks the max amount of future messages that
59
        // we'll hold onto.
60
        maxFutureMessages = 1000
61

62
        // DefaultSubBatchDelay is the default delay we'll use when
63
        // broadcasting the next announcement batch.
64
        DefaultSubBatchDelay = 5 * time.Second
65

66
        // maxRejectedUpdates tracks the max amount of rejected channel updates
67
        // we'll maintain. This is the global size across all peers. We'll
68
        // allocate ~3 MB max to the cache.
69
        maxRejectedUpdates = 10_000
70

71
        // DefaultProofMatureDelta specifies the default value used for
72
        // ProofMatureDelta, which is the number of confirmations needed before
73
        // processing the announcement signatures.
74
        DefaultProofMatureDelta = 6
75
)
76

77
var (
78
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
79
        // is in the process of being shut down.
80
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
81

82
        // ErrGossipSyncerNotFound signals that we were unable to find an active
83
        // gossip syncer corresponding to a gossip query message received from
84
        // the remote peer.
85
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
86

87
        // ErrNoFundingTransaction is returned when we are unable to find the
88
        // funding transaction described by the short channel ID on chain.
89
        ErrNoFundingTransaction = errors.New(
90
                "unable to find the funding transaction",
91
        )
92

93
        // ErrInvalidFundingOutput is returned if the channel funding output
94
        // fails validation.
95
        ErrInvalidFundingOutput = errors.New(
96
                "channel funding output validation failed",
97
        )
98

99
        // ErrChannelSpent is returned when we go to validate a channel, but
100
        // the purported funding output has actually already been spent on
101
        // chain.
102
        ErrChannelSpent = errors.New("channel output has been spent")
103

104
        // emptyPubkey is used to compare compressed pubkeys against an empty
105
        // byte array.
106
        emptyPubkey [33]byte
107
)
108

109
// optionalMsgFields is a set of optional message fields that external callers
110
// can provide that serve useful when processing a specific network
111
// announcement.
112
type optionalMsgFields struct {
113
        capacity      *btcutil.Amount
114
        channelPoint  *wire.OutPoint
115
        remoteAlias   *lnwire.ShortChannelID
116
        tapscriptRoot fn.Option[chainhash.Hash]
117
}
118

119
// apply applies the optional fields within the functional options.
120
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
3✔
121
        for _, optionalMsgField := range optionalMsgFields {
6✔
122
                optionalMsgField(f)
3✔
123
        }
3✔
124
}
125

126
// OptionalMsgField is a functional option parameter that can be used to provide
127
// external information that is not included within a network message but serves
128
// useful when processing it.
129
type OptionalMsgField func(*optionalMsgFields)
130

131
// ChannelCapacity is an optional field that lets the gossiper know of the
132
// capacity of a channel.
133
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
3✔
134
        return func(f *optionalMsgFields) {
6✔
135
                f.capacity = &capacity
3✔
136
        }
3✔
137
}
138

139
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
140
// of a channel.
141
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
3✔
142
        return func(f *optionalMsgFields) {
6✔
143
                f.channelPoint = &op
3✔
144
        }
3✔
145
}
146

147
// TapscriptRoot is an optional field that lets the gossiper know of the root of
148
// the tapscript tree for a custom channel.
149
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
3✔
150
        return func(f *optionalMsgFields) {
6✔
151
                f.tapscriptRoot = root
3✔
152
        }
3✔
153
}
154

155
// RemoteAlias is an optional field that lets the gossiper know that a locally
156
// sent channel update is actually an update for the peer that should replace
157
// the ShortChannelID field with the remote's alias. This is only used for
158
// channels with peers where the option-scid-alias feature bit was negotiated.
159
// The channel update will be added to the graph under the original SCID, but
160
// will be modified and re-signed with this alias.
161
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
3✔
162
        return func(f *optionalMsgFields) {
6✔
163
                f.remoteAlias = alias
3✔
164
        }
3✔
165
}
166

167
// networkMsg couples a routing related wire message with the peer that
168
// originally sent it.
169
type networkMsg struct {
170
        peer              lnpeer.Peer
171
        source            *btcec.PublicKey
172
        msg               lnwire.Message
173
        optionalMsgFields *optionalMsgFields
174

175
        isRemote bool
176

177
        err chan error
178
}
179

180
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
181
// wishes to update a particular set of channels. New ChannelUpdate messages
182
// will be crafted to be sent out during the next broadcast epoch and the fee
183
// updates committed to the lower layer.
184
type chanPolicyUpdateRequest struct {
185
        edgesToUpdate []EdgeWithInfo
186
        errChan       chan error
187
}
188

189
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
190
// syncer at all times.
191
type PinnedSyncers map[route.Vertex]struct{}
192

193
// Config defines the configuration for the service. ALL elements within the
194
// configuration MUST be non-nil for the service to carry out its duties.
195
type Config struct {
196
        // ChainHash is a hash that indicates which resident chain of the
197
        // AuthenticatedGossiper. Any announcements that don't match this
198
        // chain hash will be ignored.
199
        //
200
        // TODO(roasbeef): eventually make into map so can de-multiplex
201
        // incoming announcements
202
        //   * also need to do same for Notifier
203
        ChainHash chainhash.Hash
204

205
        // Graph is the subsystem which is responsible for managing the
206
        // topology of lightning network. After incoming channel, node, channel
207
        // updates announcements are validated they are sent to the router in
208
        // order to be included in the LN graph.
209
        Graph graph.ChannelGraphSource
210

211
        // ChainIO represents an abstraction over a source that can query the
212
        // blockchain.
213
        ChainIO lnwallet.BlockChainIO
214

215
        // ChanSeries is an interfaces that provides access to a time series
216
        // view of the current known channel graph. Each GossipSyncer enabled
217
        // peer will utilize this in order to create and respond to channel
218
        // graph time series queries.
219
        ChanSeries ChannelGraphTimeSeries
220

221
        // Notifier is used for receiving notifications of incoming blocks.
222
        // With each new incoming block found we process previously premature
223
        // announcements.
224
        //
225
        // TODO(roasbeef): could possibly just replace this with an epoch
226
        // channel.
227
        Notifier chainntnfs.ChainNotifier
228

229
        // Broadcast broadcasts a particular set of announcements to all peers
230
        // that the daemon is connected to. If supplied, the exclude parameter
231
        // indicates that the target peer should be excluded from the
232
        // broadcast.
233
        Broadcast func(skips map[route.Vertex]struct{},
234
                msg ...lnwire.Message) error
235

236
        // NotifyWhenOnline is a function that allows the gossiper to be
237
        // notified when a certain peer comes online, allowing it to
238
        // retry sending a peer message.
239
        //
240
        // NOTE: The peerChan channel must be buffered.
241
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
242

243
        // NotifyWhenOffline is a function that allows the gossiper to be
244
        // notified when a certain peer disconnects, allowing it to request a
245
        // notification for when it reconnects.
246
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
247

248
        // FetchSelfAnnouncement retrieves our current node announcement, for
249
        // use when determining whether we should update our peers about our
250
        // presence in the network.
251
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
252

253
        // UpdateSelfAnnouncement produces a new announcement for our node with
254
        // an updated timestamp which can be broadcast to our peers.
255
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
256

257
        // ProofMatureDelta the number of confirmations which is needed before
258
        // exchange the channel announcement proofs.
259
        ProofMatureDelta uint32
260

261
        // TrickleDelay the period of trickle timer which flushes to the
262
        // network the pending batch of new announcements we've received since
263
        // the last trickle tick.
264
        TrickleDelay time.Duration
265

266
        // RetransmitTicker is a ticker that ticks with a period which
267
        // indicates that we should check if we need re-broadcast any of our
268
        // personal channels.
269
        RetransmitTicker ticker.Ticker
270

271
        // RebroadcastInterval is the maximum time we wait between sending out
272
        // channel updates for our active channels and our own node
273
        // announcement. We do this to ensure our active presence on the
274
        // network is known, and we are not being considered a zombie node or
275
        // having zombie channels.
276
        RebroadcastInterval time.Duration
277

278
        // WaitingProofStore is a persistent storage of partial channel proof
279
        // announcement messages. We use it to buffer half of the material
280
        // needed to reconstruct a full authenticated channel announcement.
281
        // Once we receive the other half the channel proof, we'll be able to
282
        // properly validate it and re-broadcast it out to the network.
283
        //
284
        // TODO(wilmer): make interface to prevent channeldb dependency.
285
        WaitingProofStore *channeldb.WaitingProofStore
286

287
        // MessageStore is a persistent storage of gossip messages which we will
288
        // use to determine which messages need to be resent for a given peer.
289
        MessageStore GossipMessageStore
290

291
        // AnnSigner is an instance of the MessageSigner interface which will
292
        // be used to manually sign any outgoing channel updates. The signer
293
        // implementation should be backed by the public key of the backing
294
        // Lightning node.
295
        //
296
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
297
        // here?
298
        AnnSigner lnwallet.MessageSigner
299

300
        // ScidCloser is an instance of ClosedChannelTracker that helps the
301
        // gossiper cut down on spam channel announcements for already closed
302
        // channels.
303
        ScidCloser ClosedChannelTracker
304

305
        // NumActiveSyncers is the number of peers for which we should have
306
        // active syncers with. After reaching NumActiveSyncers, any future
307
        // gossip syncers will be passive.
308
        NumActiveSyncers int
309

310
        // NoTimestampQueries will prevent the GossipSyncer from querying
311
        // timestamps of announcement messages from the peer and from replying
312
        // to timestamp queries.
313
        NoTimestampQueries bool
314

315
        // RotateTicker is a ticker responsible for notifying the SyncManager
316
        // when it should rotate its active syncers. A single active syncer with
317
        // a chansSynced state will be exchanged for a passive syncer in order
318
        // to ensure we don't keep syncing with the same peers.
319
        RotateTicker ticker.Ticker
320

321
        // HistoricalSyncTicker is a ticker responsible for notifying the
322
        // syncManager when it should attempt a historical sync with a gossip
323
        // sync peer.
324
        HistoricalSyncTicker ticker.Ticker
325

326
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
327
        // syncManager when it should attempt to start the next pending
328
        // activeSyncer due to the current one not completing its state machine
329
        // within the timeout.
330
        ActiveSyncerTimeoutTicker ticker.Ticker
331

332
        // MinimumBatchSize is minimum size of a sub batch of announcement
333
        // messages.
334
        MinimumBatchSize int
335

336
        // SubBatchDelay is the delay between sending sub batches of
337
        // gossip messages.
338
        SubBatchDelay time.Duration
339

340
        // IgnoreHistoricalFilters will prevent syncers from replying with
341
        // historical data when the remote peer sets a gossip_timestamp_range.
342
        // This prevents ranges with old start times from causing us to dump the
343
        // graph on connect.
344
        IgnoreHistoricalFilters bool
345

346
        // PinnedSyncers is a set of peers that will always transition to
347
        // ActiveSync upon connection. These peers will never transition to
348
        // PassiveSync.
349
        PinnedSyncers PinnedSyncers
350

351
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
352
        // specific channel and direction that we'll accept over an interval.
353
        MaxChannelUpdateBurst int
354

355
        // ChannelUpdateInterval specifies the interval we'll use to determine
356
        // how often we should allow a new update for a specific channel and
357
        // direction.
358
        ChannelUpdateInterval time.Duration
359

360
        // IsAlias returns true if a given ShortChannelID is an alias for
361
        // option_scid_alias channels.
362
        IsAlias func(scid lnwire.ShortChannelID) bool
363

364
        // SignAliasUpdate is used to re-sign a channel update using the
365
        // remote's alias if the option-scid-alias feature bit was negotiated.
366
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
367
                error)
368

369
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
370
        // This is used for channels that have negotiated the option-scid-alias
371
        // feature bit.
372
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
373
                lnwire.ShortChannelID, error)
374

375
        // GetAlias allows the gossiper to look up the peer's alias for a given
376
        // ChannelID. This is used to sign updates for them if the channel has
377
        // no AuthProof and the option-scid-alias feature bit was negotiated.
378
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
379

380
        // FindChannel allows the gossiper to find a channel that we're party
381
        // to without iterating over the entire set of open channels.
382
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
383
                *channeldb.OpenChannel, error)
384

385
        // IsStillZombieChannel takes the timestamps of the latest channel
386
        // updates for a channel and returns true if the channel should be
387
        // considered a zombie based on these timestamps.
388
        IsStillZombieChannel func(time.Time, time.Time) bool
389

390
        // AssumeChannelValid toggles whether the gossiper will check for
391
        // spent-ness of channel outpoints. For neutrino, this saves long
392
        // rescans from blocking initial usage of the daemon.
393
        AssumeChannelValid bool
394

395
        // MsgRateBytes is the rate limit for the number of bytes per second
396
        // that we'll allocate to outbound gossip messages.
397
        MsgRateBytes uint64
398

399
        // MsgBurstBytes is the allotted burst amount in bytes. This is the
400
        // number of starting tokens in our token bucket algorithm.
401
        MsgBurstBytes uint64
402

403
        // FilterConcurrency is the maximum number of concurrent gossip filter
404
        // applications that can be processed.
405
        FilterConcurrency int
406

407
        // BanThreshold is the score used to decide whether a given peer is
408
        // banned or not.
409
        BanThreshold uint64
410
}
411

412
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
413
// used to let the caller of the lru.Cache know if a message has already been
414
// processed or not.
415
type processedNetworkMsg struct {
416
        processed bool
417
        msg       *networkMsg
418
}
419

420
// cachedNetworkMsg is a wrapper around a network message that can be used with
421
// *lru.Cache.
422
//
423
// NOTE: This struct is not thread safe which means you need to assure no
424
// concurrent read write access to it and all its contents which are pointers
425
// as well.
426
type cachedNetworkMsg struct {
427
        msgs []*processedNetworkMsg
428
}
429

430
// Size returns the "size" of an entry. We return the number of items as we
431
// just want to limit the total amount of entries rather than do accurate size
432
// accounting.
433
func (c *cachedNetworkMsg) Size() (uint64, error) {
3✔
434
        return uint64(len(c.msgs)), nil
3✔
435
}
3✔
436

437
// rejectCacheKey is the cache key that we'll use to track announcements we've
438
// recently rejected.
439
type rejectCacheKey struct {
440
        pubkey [33]byte
441
        chanID uint64
442
}
443

444
// newRejectCacheKey returns a new cache key for the reject cache.
445
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
3✔
446
        k := rejectCacheKey{
3✔
447
                chanID: cid,
3✔
448
                pubkey: pub,
3✔
449
        }
3✔
450

3✔
451
        return k
3✔
452
}
3✔
453

454
// sourceToPub returns a serialized-compressed public key for use in the reject
455
// cache.
456
func sourceToPub(pk *btcec.PublicKey) [33]byte {
3✔
457
        var pub [33]byte
3✔
458
        copy(pub[:], pk.SerializeCompressed())
3✔
459
        return pub
3✔
460
}
3✔
461

462
// cachedReject is the empty value used to track the value for rejects.
463
type cachedReject struct {
464
}
465

466
// Size returns the "size" of an entry. We return 1 as we just want to limit
467
// the total size.
UNCOV
468
func (c *cachedReject) Size() (uint64, error) {
×
UNCOV
469
        return 1, nil
×
UNCOV
470
}
×
471

472
// AuthenticatedGossiper is a subsystem which is responsible for receiving
473
// announcements, validating them and applying the changes to router, syncing
474
// lightning network with newly connected nodes, broadcasting announcements
475
// after validation, negotiating the channel announcement proofs exchange and
476
// handling the premature announcements. All outgoing announcements are
477
// expected to be properly signed as dictated in BOLT#7, additionally, all
478
// incoming message are expected to be well formed and signed. Invalid messages
479
// will be rejected by this struct.
480
type AuthenticatedGossiper struct {
481
        // Parameters which are needed to properly handle the start and stop of
482
        // the service.
483
        started sync.Once
484
        stopped sync.Once
485

486
        // bestHeight is the height of the block at the tip of the main chain
487
        // as we know it. Accesses *MUST* be done with the gossiper's lock
488
        // held.
489
        bestHeight uint32
490

491
        // cfg is a copy of the configuration struct that the gossiper service
492
        // was initialized with.
493
        cfg *Config
494

495
        // blockEpochs encapsulates a stream of block epochs that are sent at
496
        // every new block height.
497
        blockEpochs *chainntnfs.BlockEpochEvent
498

499
        // prematureChannelUpdates is a map of ChannelUpdates we have received
500
        // that wasn't associated with any channel we know about.  We store
501
        // them temporarily, such that we can reprocess them when a
502
        // ChannelAnnouncement for the channel is received.
503
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
504

505
        // banman tracks our peer's ban status.
506
        banman *banman
507

508
        // networkMsgs is a channel that carries new network broadcasted
509
        // message from outside the gossiper service to be processed by the
510
        // networkHandler.
511
        networkMsgs chan *networkMsg
512

513
        // futureMsgs is a list of premature network messages that have a block
514
        // height specified in the future. We will save them and resend it to
515
        // the chan networkMsgs once the block height has reached. The cached
516
        // map format is,
517
        //   {msgID1: msg1, msgID2: msg2, ...}
518
        futureMsgs *futureMsgCache
519

520
        // chanPolicyUpdates is a channel that requests to update the
521
        // forwarding policy of a set of channels is sent over.
522
        chanPolicyUpdates chan *chanPolicyUpdateRequest
523

524
        // selfKey is the identity public key of the backing Lightning node.
525
        selfKey *btcec.PublicKey
526

527
        // selfKeyLoc is the locator for the identity public key of the backing
528
        // Lightning node.
529
        selfKeyLoc keychain.KeyLocator
530

531
        // channelMtx is used to restrict the database access to one
532
        // goroutine per channel ID. This is done to ensure that when
533
        // the gossiper is handling an announcement, the db state stays
534
        // consistent between when the DB is first read until it's written.
535
        channelMtx *multimutex.Mutex[uint64]
536

537
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
538

539
        // syncMgr is a subsystem responsible for managing the gossip syncers
540
        // for peers currently connected. When a new peer is connected, the
541
        // manager will create its accompanying gossip syncer and determine
542
        // whether it should have an activeSync or passiveSync sync type based
543
        // on how many other gossip syncers are currently active. Any activeSync
544
        // gossip syncers are started in a round-robin manner to ensure we're
545
        // not syncing with multiple peers at the same time.
546
        syncMgr *SyncManager
547

548
        // reliableSender is a subsystem responsible for handling reliable
549
        // message send requests to peers. This should only be used for channels
550
        // that are unadvertised at the time of handling the message since if it
551
        // is advertised, then peers should be able to get the message from the
552
        // network.
553
        reliableSender *reliableSender
554

555
        // chanUpdateRateLimiter contains rate limiters for each direction of
556
        // a channel update we've processed. We'll use these to determine
557
        // whether we should accept a new update for a specific channel and
558
        // direction.
559
        //
560
        // NOTE: This map must be synchronized with the main
561
        // AuthenticatedGossiper lock.
562
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
563

564
        // vb is used to enforce job dependency ordering of gossip messages.
565
        vb *ValidationBarrier
566

567
        sync.Mutex
568

569
        cancel fn.Option[context.CancelFunc]
570
        quit   chan struct{}
571
        wg     sync.WaitGroup
572
}
573

574
// New creates a new AuthenticatedGossiper instance, initialized with the
575
// passed configuration parameters.
576
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
3✔
577
        gossiper := &AuthenticatedGossiper{
3✔
578
                selfKey:           selfKeyDesc.PubKey,
3✔
579
                selfKeyLoc:        selfKeyDesc.KeyLocator,
3✔
580
                cfg:               &cfg,
3✔
581
                networkMsgs:       make(chan *networkMsg),
3✔
582
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
3✔
583
                quit:              make(chan struct{}),
3✔
584
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
3✔
585
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
3✔
586
                        maxPrematureUpdates,
3✔
587
                ),
3✔
588
                channelMtx: multimutex.NewMutex[uint64](),
3✔
589
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
3✔
590
                        maxRejectedUpdates,
3✔
591
                ),
3✔
592
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
3✔
593
                banman:                newBanman(cfg.BanThreshold),
3✔
594
        }
3✔
595

3✔
596
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
3✔
597

3✔
598
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
3✔
599
                ChainHash:                cfg.ChainHash,
3✔
600
                ChanSeries:               cfg.ChanSeries,
3✔
601
                RotateTicker:             cfg.RotateTicker,
3✔
602
                HistoricalSyncTicker:     cfg.HistoricalSyncTicker,
3✔
603
                NumActiveSyncers:         cfg.NumActiveSyncers,
3✔
604
                NoTimestampQueries:       cfg.NoTimestampQueries,
3✔
605
                IgnoreHistoricalFilters:  cfg.IgnoreHistoricalFilters,
3✔
606
                BestHeight:               gossiper.latestHeight,
3✔
607
                PinnedSyncers:            cfg.PinnedSyncers,
3✔
608
                IsStillZombieChannel:     cfg.IsStillZombieChannel,
3✔
609
                AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
3✔
610
                AllotedMsgBytesBurst:     cfg.MsgBurstBytes,
3✔
611
                FilterConcurrency:        cfg.FilterConcurrency,
3✔
612
        })
3✔
613

3✔
614
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
3✔
615
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
3✔
616
                NotifyWhenOffline: cfg.NotifyWhenOffline,
3✔
617
                MessageStore:      cfg.MessageStore,
3✔
618
                IsMsgStale:        gossiper.isMsgStale,
3✔
619
        })
3✔
620

3✔
621
        return gossiper
3✔
622
}
3✔
623

624
// EdgeWithInfo contains the information that is required to update an edge.
625
type EdgeWithInfo struct {
626
        // Info describes the channel.
627
        Info *models.ChannelEdgeInfo
628

629
        // Edge describes the policy in one direction of the channel.
630
        Edge *models.ChannelEdgePolicy
631
}
632

633
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
634
// specified edge updates. Updates are done in two stages: first, the
635
// AuthenticatedGossiper ensures the update has been committed by dependent
636
// sub-systems, then it signs and broadcasts new updates to the network. A
637
// mapping between outpoints and updated channel policies is returned, which is
638
// used to update the forwarding policies of the underlying links.
639
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
640
        edgesToUpdate []EdgeWithInfo) error {
3✔
641

3✔
642
        errChan := make(chan error, 1)
3✔
643
        policyUpdate := &chanPolicyUpdateRequest{
3✔
644
                edgesToUpdate: edgesToUpdate,
3✔
645
                errChan:       errChan,
3✔
646
        }
3✔
647

3✔
648
        select {
3✔
649
        case d.chanPolicyUpdates <- policyUpdate:
3✔
650
                err := <-errChan
3✔
651
                return err
3✔
652
        case <-d.quit:
×
653
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
654
        }
655
}
656

657
// Start spawns network messages handler goroutine and registers on new block
658
// notifications in order to properly handle the premature announcements.
659
func (d *AuthenticatedGossiper) Start() error {
3✔
660
        var err error
3✔
661
        d.started.Do(func() {
6✔
662
                ctx, cancel := context.WithCancel(context.Background())
3✔
663
                d.cancel = fn.Some(cancel)
3✔
664

3✔
665
                log.Info("Authenticated Gossiper starting")
3✔
666
                err = d.start(ctx)
3✔
667
        })
3✔
668
        return err
3✔
669
}
670

671
func (d *AuthenticatedGossiper) start(ctx context.Context) error {
3✔
672
        // First we register for new notifications of newly discovered blocks.
3✔
673
        // We do this immediately so we'll later be able to consume any/all
3✔
674
        // blocks which were discovered.
3✔
675
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
3✔
676
        if err != nil {
3✔
677
                return err
×
678
        }
×
679
        d.blockEpochs = blockEpochs
3✔
680

3✔
681
        height, err := d.cfg.Graph.CurrentBlockHeight()
3✔
682
        if err != nil {
3✔
683
                return err
×
684
        }
×
685
        d.bestHeight = height
3✔
686

3✔
687
        // Start the reliable sender. In case we had any pending messages ready
3✔
688
        // to be sent when the gossiper was last shut down, we must continue on
3✔
689
        // our quest to deliver them to their respective peers.
3✔
690
        if err := d.reliableSender.Start(); err != nil {
3✔
691
                return err
×
692
        }
×
693

694
        d.syncMgr.Start()
3✔
695

3✔
696
        d.banman.start()
3✔
697

3✔
698
        // Start receiving blocks in its dedicated goroutine.
3✔
699
        d.wg.Add(2)
3✔
700
        go d.syncBlockHeight()
3✔
701
        go d.networkHandler(ctx)
3✔
702

3✔
703
        return nil
3✔
704
}
705

706
// syncBlockHeight syncs the best block height for the gossiper by reading
707
// blockEpochs.
708
//
709
// NOTE: must be run as a goroutine.
710
func (d *AuthenticatedGossiper) syncBlockHeight() {
3✔
711
        defer d.wg.Done()
3✔
712

3✔
713
        for {
6✔
714
                select {
3✔
715
                // A new block has arrived, so we can re-process the previously
716
                // premature announcements.
717
                case newBlock, ok := <-d.blockEpochs.Epochs:
3✔
718
                        // If the channel has been closed, then this indicates
3✔
719
                        // the daemon is shutting down, so we exit ourselves.
3✔
720
                        if !ok {
6✔
721
                                return
3✔
722
                        }
3✔
723

724
                        // Once a new block arrives, we update our running
725
                        // track of the height of the chain tip.
726
                        d.Lock()
3✔
727
                        blockHeight := uint32(newBlock.Height)
3✔
728
                        d.bestHeight = blockHeight
3✔
729
                        d.Unlock()
3✔
730

3✔
731
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
3✔
732
                                newBlock.Hash)
3✔
733

3✔
734
                        // Resend future messages, if any.
3✔
735
                        d.resendFutureMessages(blockHeight)
3✔
736

UNCOV
737
                case <-d.quit:
×
UNCOV
738
                        return
×
739
                }
740
        }
741
}
742

743
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
744
// the unique ID when saving the message.
745
type futureMsgCache struct {
746
        *lru.Cache[uint64, *cachedFutureMsg]
747

748
        // msgID is a monotonically increased integer.
749
        msgID atomic.Uint64
750
}
751

752
// nextMsgID returns a unique message ID.
753
func (f *futureMsgCache) nextMsgID() uint64 {
3✔
754
        return f.msgID.Add(1)
3✔
755
}
3✔
756

757
// newFutureMsgCache creates a new future message cache with the underlying lru
758
// cache being initialized with the specified capacity.
759
func newFutureMsgCache(capacity uint64) *futureMsgCache {
3✔
760
        // Create a new cache.
3✔
761
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
3✔
762

3✔
763
        return &futureMsgCache{
3✔
764
                Cache: cache,
3✔
765
        }
3✔
766
}
3✔
767

768
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
769
type cachedFutureMsg struct {
770
        // msg is the network message.
771
        msg *networkMsg
772

773
        // height is the block height.
774
        height uint32
775
}
776

777
// Size returns the size of the message.
778
func (c *cachedFutureMsg) Size() (uint64, error) {
3✔
779
        // Return a constant 1.
3✔
780
        return 1, nil
3✔
781
}
3✔
782

783
// resendFutureMessages takes a block height, resends all the future messages
784
// found below and equal to that height and deletes those messages found in the
785
// gossiper's futureMsgs.
786
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
3✔
787
        var (
3✔
788
                // msgs are the target messages.
3✔
789
                msgs []*networkMsg
3✔
790

3✔
791
                // keys are the target messages' caching keys.
3✔
792
                keys []uint64
3✔
793
        )
3✔
794

3✔
795
        // filterMsgs is the visitor used when iterating the future cache.
3✔
796
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
6✔
797
                if cmsg.height <= height {
6✔
798
                        msgs = append(msgs, cmsg.msg)
3✔
799
                        keys = append(keys, k)
3✔
800
                }
3✔
801

802
                return true
3✔
803
        }
804

805
        // Filter out the target messages.
806
        d.futureMsgs.Range(filterMsgs)
3✔
807

3✔
808
        // Return early if no messages found.
3✔
809
        if len(msgs) == 0 {
6✔
810
                return
3✔
811
        }
3✔
812

813
        // Remove the filtered messages.
814
        for _, key := range keys {
6✔
815
                d.futureMsgs.Delete(key)
3✔
816
        }
3✔
817

818
        log.Debugf("Resending %d network messages at height %d",
3✔
819
                len(msgs), height)
3✔
820

3✔
821
        for _, msg := range msgs {
6✔
822
                select {
3✔
823
                case d.networkMsgs <- msg:
3✔
824
                case <-d.quit:
×
825
                        msg.err <- ErrGossiperShuttingDown
×
826
                }
827
        }
828
}
829

830
// Stop signals any active goroutines for a graceful closure.
831
func (d *AuthenticatedGossiper) Stop() error {
3✔
832
        d.stopped.Do(func() {
6✔
833
                log.Info("Authenticated gossiper shutting down...")
3✔
834
                defer log.Debug("Authenticated gossiper shutdown complete")
3✔
835

3✔
836
                d.stop()
3✔
837
        })
3✔
838
        return nil
3✔
839
}
840

841
func (d *AuthenticatedGossiper) stop() {
3✔
842
        log.Debug("Authenticated Gossiper is stopping")
3✔
843
        defer log.Debug("Authenticated Gossiper stopped")
3✔
844

3✔
845
        // `blockEpochs` is only initialized in the start routine so we make
3✔
846
        // sure we don't panic here in the case where the `Stop` method is
3✔
847
        // called when the `Start` method does not complete.
3✔
848
        if d.blockEpochs != nil {
6✔
849
                d.blockEpochs.Cancel()
3✔
850
        }
3✔
851

852
        d.syncMgr.Stop()
3✔
853

3✔
854
        d.banman.stop()
3✔
855

3✔
856
        d.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
6✔
857
        close(d.quit)
3✔
858
        d.wg.Wait()
3✔
859

3✔
860
        // We'll stop our reliable sender after all of the gossiper's goroutines
3✔
861
        // have exited to ensure nothing can cause it to continue executing.
3✔
862
        d.reliableSender.Stop()
3✔
863
}
864

865
// TODO(roasbeef): need method to get current gossip timestamp?
866
//  * using mtx, check time rotate forward is needed?
867

868
// ProcessRemoteAnnouncement sends a new remote announcement message along with
869
// the peer that sent the routing message. The announcement will be processed
870
// then added to a queue for batched trickled announcement to all connected
871
// peers.  Remote channel announcements should contain the announcement proof
872
// and be fully validated.
873
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
874
        msg lnwire.Message, peer lnpeer.Peer) chan error {
3✔
875

3✔
876
        errChan := make(chan error, 1)
3✔
877

3✔
878
        // For messages in the known set of channel series queries, we'll
3✔
879
        // dispatch the message directly to the GossipSyncer, and skip the main
3✔
880
        // processing loop.
3✔
881
        switch m := msg.(type) {
3✔
882
        case *lnwire.QueryShortChanIDs,
883
                *lnwire.QueryChannelRange,
884
                *lnwire.ReplyChannelRange,
885
                *lnwire.ReplyShortChanIDsEnd:
3✔
886

3✔
887
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
888
                if !ok {
3✔
889
                        log.Warnf("Gossip syncer for peer=%x not found",
×
890
                                peer.PubKey())
×
891

×
892
                        errChan <- ErrGossipSyncerNotFound
×
893
                        return errChan
×
894
                }
×
895

896
                // If we've found the message target, then we'll dispatch the
897
                // message directly to it.
898
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
3✔
899
                if err != nil {
3✔
900
                        log.Errorf("Process query msg from peer %x got %v",
×
901
                                peer.PubKey(), err)
×
902
                }
×
903

904
                errChan <- err
3✔
905
                return errChan
3✔
906

907
        // If a peer is updating its current update horizon, then we'll dispatch
908
        // that directly to the proper GossipSyncer.
909
        case *lnwire.GossipTimestampRange:
3✔
910
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
911
                if !ok {
3✔
912
                        log.Warnf("Gossip syncer for peer=%x not found",
×
913
                                peer.PubKey())
×
914

×
915
                        errChan <- ErrGossipSyncerNotFound
×
916
                        return errChan
×
917
                }
×
918

919
                // Queue the message for asynchronous processing to prevent
920
                // blocking the gossiper when rate limiting is active.
921
                if !syncer.QueueTimestampRange(m) {
3✔
922
                        log.Warnf("Unable to queue gossip filter for peer=%x: "+
×
923
                                "queue full", peer.PubKey())
×
924

×
925
                        // Return nil to indicate we've handled the message,
×
926
                        // even though it was dropped. This prevents the peer
×
927
                        // from being disconnected.
×
928
                        errChan <- nil
×
929
                        return errChan
×
930
                }
×
931

932
                errChan <- nil
3✔
933
                return errChan
3✔
934

935
        // To avoid inserting edges in the graph for our own channels that we
936
        // have already closed, we ignore such channel announcements coming
937
        // from the remote.
938
        case *lnwire.ChannelAnnouncement1:
3✔
939
                ownKey := d.selfKey.SerializeCompressed()
3✔
940
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
3✔
941
                        "for own channel")
3✔
942

3✔
943
                if bytes.Equal(m.NodeID1[:], ownKey) ||
3✔
944
                        bytes.Equal(m.NodeID2[:], ownKey) {
6✔
945

3✔
946
                        log.Warn(ownErr)
3✔
947
                        errChan <- ownErr
3✔
948
                        return errChan
3✔
949
                }
3✔
950
        }
951

952
        nMsg := &networkMsg{
3✔
953
                msg:      msg,
3✔
954
                isRemote: true,
3✔
955
                peer:     peer,
3✔
956
                source:   peer.IdentityKey(),
3✔
957
                err:      errChan,
3✔
958
        }
3✔
959

3✔
960
        select {
3✔
961
        case d.networkMsgs <- nMsg:
3✔
962

963
        // If the peer that sent us this error is quitting, then we don't need
964
        // to send back an error and can return immediately.
965
        // TODO(elle): the peer should now just rely on canceling the passed
966
        //  context.
967
        case <-peer.QuitSignal():
×
968
                return nil
×
969
        case <-ctx.Done():
×
970
                return nil
×
971
        case <-d.quit:
×
972
                nMsg.err <- ErrGossiperShuttingDown
×
973
        }
974

975
        return nMsg.err
3✔
976
}
977

978
// ProcessLocalAnnouncement sends a new remote announcement message along with
979
// the peer that sent the routing message. The announcement will be processed
980
// then added to a queue for batched trickled announcement to all connected
981
// peers.  Local channel announcements don't contain the announcement proof and
982
// will not be fully validated. Once the channel proofs are received, the
983
// entire channel announcement and update messages will be re-constructed and
984
// broadcast to the rest of the network.
985
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
986
        optionalFields ...OptionalMsgField) chan error {
3✔
987

3✔
988
        optionalMsgFields := &optionalMsgFields{}
3✔
989
        optionalMsgFields.apply(optionalFields...)
3✔
990

3✔
991
        nMsg := &networkMsg{
3✔
992
                msg:               msg,
3✔
993
                optionalMsgFields: optionalMsgFields,
3✔
994
                isRemote:          false,
3✔
995
                source:            d.selfKey,
3✔
996
                err:               make(chan error, 1),
3✔
997
        }
3✔
998

3✔
999
        select {
3✔
1000
        case d.networkMsgs <- nMsg:
3✔
1001
        case <-d.quit:
×
1002
                nMsg.err <- ErrGossiperShuttingDown
×
1003
        }
1004

1005
        return nMsg.err
3✔
1006
}
1007

1008
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
1009
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
1010
// tuple.
1011
type channelUpdateID struct {
1012
        // channelID represents the set of data which is needed to
1013
        // retrieve all necessary data to validate the channel existence.
1014
        channelID lnwire.ShortChannelID
1015

1016
        // Flags least-significant bit must be set to 0 if the creating node
1017
        // corresponds to the first node in the previously sent channel
1018
        // announcement and 1 otherwise.
1019
        flags lnwire.ChanUpdateChanFlags
1020
}
1021

1022
// msgWithSenders is a wrapper struct around a message, and the set of peers
1023
// that originally sent us this message. Using this struct, we can ensure that
1024
// we don't re-send a message to the peer that sent it to us in the first
1025
// place.
1026
type msgWithSenders struct {
1027
        // msg is the wire message itself.
1028
        msg lnwire.Message
1029

1030
        // isLocal is true if this was a message that originated locally. We'll
1031
        // use this to bypass our normal checks to ensure we prioritize sending
1032
        // out our own updates.
1033
        isLocal bool
1034

1035
        // sender is the set of peers that sent us this message.
1036
        senders map[route.Vertex]struct{}
1037
}
1038

1039
// mergeSyncerMap is used to merge the set of senders of a particular message
1040
// with peers that we have an active GossipSyncer with. We do this to ensure
1041
// that we don't broadcast messages to any peers that we have active gossip
1042
// syncers for.
1043
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
3✔
1044
        for peerPub := range syncers {
6✔
1045
                m.senders[peerPub] = struct{}{}
3✔
1046
        }
3✔
1047
}
1048

1049
// deDupedAnnouncements de-duplicates announcements that have been added to the
1050
// batch. Internally, announcements are stored in three maps
1051
// (one each for channel announcements, channel updates, and node
1052
// announcements). These maps keep track of unique announcements and ensure no
1053
// announcements are duplicated. We keep the three message types separate, such
1054
// that we can send channel announcements first, then channel updates, and
1055
// finally node announcements when it's time to broadcast them.
1056
type deDupedAnnouncements struct {
1057
        // channelAnnouncements are identified by the short channel id field.
1058
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
1059

1060
        // channelUpdates are identified by the channel update id field.
1061
        channelUpdates map[channelUpdateID]msgWithSenders
1062

1063
        // nodeAnnouncements are identified by the Vertex field.
1064
        nodeAnnouncements map[route.Vertex]msgWithSenders
1065

1066
        sync.Mutex
1067
}
1068

1069
// Reset operates on deDupedAnnouncements to reset the storage of
1070
// announcements.
1071
func (d *deDupedAnnouncements) Reset() {
3✔
1072
        d.Lock()
3✔
1073
        defer d.Unlock()
3✔
1074

3✔
1075
        d.reset()
3✔
1076
}
3✔
1077

1078
// reset is the private version of the Reset method. We have this so we can
1079
// call this method within method that are already holding the lock.
1080
func (d *deDupedAnnouncements) reset() {
3✔
1081
        // Storage of each type of announcement (channel announcements, channel
3✔
1082
        // updates, node announcements) is set to an empty map where the
3✔
1083
        // appropriate key points to the corresponding lnwire.Message.
3✔
1084
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
3✔
1085
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
3✔
1086
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
3✔
1087
}
3✔
1088

1089
// addMsg adds a new message to the current batch. If the message is already
1090
// present in the current batch, then this new instance replaces the latter,
1091
// and the set of senders is updated to reflect which node sent us this
1092
// message.
1093
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
3✔
1094
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
3✔
1095

3✔
1096
        // Depending on the message type (channel announcement, channel update,
3✔
1097
        // or node announcement), the message is added to the corresponding map
3✔
1098
        // in deDupedAnnouncements. Because each identifying key can have at
3✔
1099
        // most one value, the announcements are de-duplicated, with newer ones
3✔
1100
        // replacing older ones.
3✔
1101
        switch msg := message.msg.(type) {
3✔
1102

1103
        // Channel announcements are identified by the short channel id field.
1104
        case *lnwire.ChannelAnnouncement1:
3✔
1105
                deDupKey := msg.ShortChannelID
3✔
1106
                sender := route.NewVertex(message.source)
3✔
1107

3✔
1108
                mws, ok := d.channelAnnouncements[deDupKey]
3✔
1109
                if !ok {
6✔
1110
                        mws = msgWithSenders{
3✔
1111
                                msg:     msg,
3✔
1112
                                isLocal: !message.isRemote,
3✔
1113
                                senders: make(map[route.Vertex]struct{}),
3✔
1114
                        }
3✔
1115
                        mws.senders[sender] = struct{}{}
3✔
1116

3✔
1117
                        d.channelAnnouncements[deDupKey] = mws
3✔
1118

3✔
1119
                        return
3✔
1120
                }
3✔
1121

UNCOV
1122
                mws.msg = msg
×
UNCOV
1123
                mws.senders[sender] = struct{}{}
×
UNCOV
1124
                d.channelAnnouncements[deDupKey] = mws
×
1125

1126
        // Channel updates are identified by the (short channel id,
1127
        // channelflags) tuple.
1128
        case *lnwire.ChannelUpdate1:
3✔
1129
                sender := route.NewVertex(message.source)
3✔
1130
                deDupKey := channelUpdateID{
3✔
1131
                        msg.ShortChannelID,
3✔
1132
                        msg.ChannelFlags,
3✔
1133
                }
3✔
1134

3✔
1135
                oldTimestamp := uint32(0)
3✔
1136
                mws, ok := d.channelUpdates[deDupKey]
3✔
1137
                if ok {
3✔
UNCOV
1138
                        // If we already have seen this message, record its
×
UNCOV
1139
                        // timestamp.
×
UNCOV
1140
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
×
UNCOV
1141
                        if !ok {
×
1142
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1143
                                        "got: %T", mws.msg)
×
1144

×
1145
                                return
×
1146
                        }
×
1147

UNCOV
1148
                        oldTimestamp = update.Timestamp
×
1149
                }
1150

1151
                // If we already had this message with a strictly newer
1152
                // timestamp, then we'll just discard the message we got.
1153
                if oldTimestamp > msg.Timestamp {
3✔
UNCOV
1154
                        log.Debugf("Ignored outdated network message: "+
×
UNCOV
1155
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
×
UNCOV
1156
                        return
×
UNCOV
1157
                }
×
1158

1159
                // If the message we just got is newer than what we previously
1160
                // have seen, or this is the first time we see it, then we'll
1161
                // add it to our map of announcements.
1162
                if oldTimestamp < msg.Timestamp {
6✔
1163
                        mws = msgWithSenders{
3✔
1164
                                msg:     msg,
3✔
1165
                                isLocal: !message.isRemote,
3✔
1166
                                senders: make(map[route.Vertex]struct{}),
3✔
1167
                        }
3✔
1168

3✔
1169
                        // We'll mark the sender of the message in the
3✔
1170
                        // senders map.
3✔
1171
                        mws.senders[sender] = struct{}{}
3✔
1172

3✔
1173
                        d.channelUpdates[deDupKey] = mws
3✔
1174

3✔
1175
                        return
3✔
1176
                }
3✔
1177

1178
                // Lastly, if we had seen this exact message from before, with
1179
                // the same timestamp, we'll add the sender to the map of
1180
                // senders, such that we can skip sending this message back in
1181
                // the next batch.
UNCOV
1182
                mws.msg = msg
×
UNCOV
1183
                mws.senders[sender] = struct{}{}
×
UNCOV
1184
                d.channelUpdates[deDupKey] = mws
×
1185

1186
        // Node announcements are identified by the Vertex field.  Use the
1187
        // NodeID to create the corresponding Vertex.
1188
        case *lnwire.NodeAnnouncement:
3✔
1189
                sender := route.NewVertex(message.source)
3✔
1190
                deDupKey := route.Vertex(msg.NodeID)
3✔
1191

3✔
1192
                // We do the same for node announcements as we did for channel
3✔
1193
                // updates, as they also carry a timestamp.
3✔
1194
                oldTimestamp := uint32(0)
3✔
1195
                mws, ok := d.nodeAnnouncements[deDupKey]
3✔
1196
                if ok {
6✔
1197
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
3✔
1198
                }
3✔
1199

1200
                // Discard the message if it's old.
1201
                if oldTimestamp > msg.Timestamp {
6✔
1202
                        return
3✔
1203
                }
3✔
1204

1205
                // Replace if it's newer.
1206
                if oldTimestamp < msg.Timestamp {
6✔
1207
                        mws = msgWithSenders{
3✔
1208
                                msg:     msg,
3✔
1209
                                isLocal: !message.isRemote,
3✔
1210
                                senders: make(map[route.Vertex]struct{}),
3✔
1211
                        }
3✔
1212

3✔
1213
                        mws.senders[sender] = struct{}{}
3✔
1214

3✔
1215
                        d.nodeAnnouncements[deDupKey] = mws
3✔
1216

3✔
1217
                        return
3✔
1218
                }
3✔
1219

1220
                // Add to senders map if it's the same as we had.
1221
                mws.msg = msg
3✔
1222
                mws.senders[sender] = struct{}{}
3✔
1223
                d.nodeAnnouncements[deDupKey] = mws
3✔
1224
        }
1225
}
1226

1227
// AddMsgs is a helper method to add multiple messages to the announcement
1228
// batch.
1229
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
3✔
1230
        d.Lock()
3✔
1231
        defer d.Unlock()
3✔
1232

3✔
1233
        for _, msg := range msgs {
6✔
1234
                d.addMsg(msg)
3✔
1235
        }
3✔
1236
}
1237

1238
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1239
// to broadcast next into messages that are locally sourced and those that are
1240
// sourced remotely.
1241
type msgsToBroadcast struct {
1242
        // localMsgs is the set of messages we created locally.
1243
        localMsgs []msgWithSenders
1244

1245
        // remoteMsgs is the set of messages that we received from a remote
1246
        // party.
1247
        remoteMsgs []msgWithSenders
1248
}
1249

1250
// addMsg adds a new message to the appropriate sub-slice.
1251
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
3✔
1252
        if msg.isLocal {
6✔
1253
                m.localMsgs = append(m.localMsgs, msg)
3✔
1254
        } else {
6✔
1255
                m.remoteMsgs = append(m.remoteMsgs, msg)
3✔
1256
        }
3✔
1257
}
1258

1259
// isEmpty returns true if the batch is empty.
1260
func (m *msgsToBroadcast) isEmpty() bool {
3✔
1261
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
3✔
1262
}
3✔
1263

1264
// length returns the length of the combined message set.
UNCOV
1265
func (m *msgsToBroadcast) length() int {
×
UNCOV
1266
        return len(m.localMsgs) + len(m.remoteMsgs)
×
UNCOV
1267
}
×
1268

1269
// Emit returns the set of de-duplicated announcements to be sent out during
1270
// the next announcement epoch, in the order of channel announcements, channel
1271
// updates, and node announcements. Each message emitted, contains the set of
1272
// peers that sent us the message. This way, we can ensure that we don't waste
1273
// bandwidth by re-sending a message to the peer that sent it to us in the
1274
// first place. Additionally, the set of stored messages are reset.
1275
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
3✔
1276
        d.Lock()
3✔
1277
        defer d.Unlock()
3✔
1278

3✔
1279
        // Get the total number of announcements.
3✔
1280
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
3✔
1281
                len(d.nodeAnnouncements)
3✔
1282

3✔
1283
        // Create an empty array of lnwire.Messages with a length equal to
3✔
1284
        // the total number of announcements.
3✔
1285
        msgs := msgsToBroadcast{
3✔
1286
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
3✔
1287
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
3✔
1288
        }
3✔
1289

3✔
1290
        // Add the channel announcements to the array first.
3✔
1291
        for _, message := range d.channelAnnouncements {
6✔
1292
                msgs.addMsg(message)
3✔
1293
        }
3✔
1294

1295
        // Then add the channel updates.
1296
        for _, message := range d.channelUpdates {
6✔
1297
                msgs.addMsg(message)
3✔
1298
        }
3✔
1299

1300
        // Finally add the node announcements.
1301
        for _, message := range d.nodeAnnouncements {
6✔
1302
                msgs.addMsg(message)
3✔
1303
        }
3✔
1304

1305
        d.reset()
3✔
1306

3✔
1307
        // Return the array of lnwire.messages.
3✔
1308
        return msgs
3✔
1309
}
1310

1311
// calculateSubBatchSize is a helper function that calculates the size to break
1312
// down the batchSize into.
1313
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1314
        minimumBatchSize, batchSize int) int {
3✔
1315
        if subBatchDelay > totalDelay {
3✔
UNCOV
1316
                return batchSize
×
UNCOV
1317
        }
×
1318

1319
        subBatchSize := (batchSize*int(subBatchDelay) +
3✔
1320
                int(totalDelay) - 1) / int(totalDelay)
3✔
1321

3✔
1322
        if subBatchSize < minimumBatchSize {
6✔
1323
                return minimumBatchSize
3✔
1324
        }
3✔
1325

UNCOV
1326
        return subBatchSize
×
1327
}
1328

1329
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1330
// this variable so the function can be mocked in our test.
1331
var batchSizeCalculator = calculateSubBatchSize
1332

1333
// splitAnnouncementBatches takes an exiting list of announcements and
1334
// decomposes it into sub batches controlled by the `subBatchSize`.
1335
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1336
        announcementBatch []msgWithSenders) [][]msgWithSenders {
3✔
1337

3✔
1338
        subBatchSize := batchSizeCalculator(
3✔
1339
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
3✔
1340
                d.cfg.MinimumBatchSize, len(announcementBatch),
3✔
1341
        )
3✔
1342

3✔
1343
        var splitAnnouncementBatch [][]msgWithSenders
3✔
1344

3✔
1345
        for subBatchSize < len(announcementBatch) {
6✔
1346
                // For slicing with minimal allocation
3✔
1347
                // https://github.com/golang/go/wiki/SliceTricks
3✔
1348
                announcementBatch, splitAnnouncementBatch =
3✔
1349
                        announcementBatch[subBatchSize:],
3✔
1350
                        append(splitAnnouncementBatch,
3✔
1351
                                announcementBatch[0:subBatchSize:subBatchSize])
3✔
1352
        }
3✔
1353
        splitAnnouncementBatch = append(
3✔
1354
                splitAnnouncementBatch, announcementBatch,
3✔
1355
        )
3✔
1356

3✔
1357
        return splitAnnouncementBatch
3✔
1358
}
1359

1360
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1361
// split size, and then sends out all items to the set of target peers. Locally
1362
// generated announcements are always sent before remotely generated
1363
// announcements.
1364
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(ctx context.Context,
1365
        annBatch msgsToBroadcast) {
3✔
1366

3✔
1367
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
3✔
1368
        // duration to delay the sending of next announcement batch.
3✔
1369
        delayNextBatch := func() {
6✔
1370
                select {
3✔
1371
                case <-time.After(d.cfg.SubBatchDelay):
3✔
UNCOV
1372
                case <-d.quit:
×
UNCOV
1373
                        return
×
1374
                }
1375
        }
1376

1377
        // Fetch the local and remote announcements.
1378
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
3✔
1379
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
3✔
1380

3✔
1381
        d.wg.Add(1)
3✔
1382
        go func() {
6✔
1383
                defer d.wg.Done()
3✔
1384

3✔
1385
                log.Debugf("Broadcasting %v new local announcements in %d "+
3✔
1386
                        "sub batches", len(annBatch.localMsgs),
3✔
1387
                        len(localBatches))
3✔
1388

3✔
1389
                // Send out the local announcements first.
3✔
1390
                for _, annBatch := range localBatches {
6✔
1391
                        d.sendLocalBatch(annBatch)
3✔
1392
                        delayNextBatch()
3✔
1393
                }
3✔
1394

1395
                log.Debugf("Broadcasting %v new remote announcements in %d "+
3✔
1396
                        "sub batches", len(annBatch.remoteMsgs),
3✔
1397
                        len(remoteBatches))
3✔
1398

3✔
1399
                // Now send the remote announcements.
3✔
1400
                for _, annBatch := range remoteBatches {
6✔
1401
                        d.sendRemoteBatch(ctx, annBatch)
3✔
1402
                        delayNextBatch()
3✔
1403
                }
3✔
1404
        }()
1405
}
1406

1407
// sendLocalBatch broadcasts a list of locally generated announcements to our
1408
// peers. For local announcements, we skip the filter and dedup logic and just
1409
// send the announcements out to all our coonnected peers.
1410
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
3✔
1411
        msgsToSend := lnutils.Map(
3✔
1412
                annBatch, func(m msgWithSenders) lnwire.Message {
6✔
1413
                        return m.msg
3✔
1414
                },
3✔
1415
        )
1416

1417
        err := d.cfg.Broadcast(nil, msgsToSend...)
3✔
1418
        if err != nil {
3✔
1419
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1420
        }
×
1421
}
1422

1423
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1424
// peers.
1425
func (d *AuthenticatedGossiper) sendRemoteBatch(ctx context.Context,
1426
        annBatch []msgWithSenders) {
3✔
1427

3✔
1428
        syncerPeers := d.syncMgr.GossipSyncers()
3✔
1429

3✔
1430
        // We'll first attempt to filter out this new message for all peers
3✔
1431
        // that have active gossip syncers active.
3✔
1432
        for pub, syncer := range syncerPeers {
6✔
1433
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
3✔
1434
                syncer.FilterGossipMsgs(ctx, annBatch...)
3✔
1435
        }
3✔
1436

1437
        for _, msgChunk := range annBatch {
6✔
1438
                msgChunk := msgChunk
3✔
1439

3✔
1440
                // With the syncers taken care of, we'll merge the sender map
3✔
1441
                // with the set of syncers, so we don't send out duplicate
3✔
1442
                // messages.
3✔
1443
                msgChunk.mergeSyncerMap(syncerPeers)
3✔
1444

3✔
1445
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
3✔
1446
                if err != nil {
3✔
1447
                        log.Errorf("Unable to send batch "+
×
1448
                                "announcements: %v", err)
×
1449
                        continue
×
1450
                }
1451
        }
1452
}
1453

1454
// networkHandler is the primary goroutine that drives this service. The roles
1455
// of this goroutine includes answering queries related to the state of the
1456
// network, syncing up newly connected peers, and also periodically
1457
// broadcasting our latest topology state to all connected peers.
1458
//
1459
// NOTE: This MUST be run as a goroutine.
1460
func (d *AuthenticatedGossiper) networkHandler(ctx context.Context) {
3✔
1461
        defer d.wg.Done()
3✔
1462

3✔
1463
        // Initialize empty deDupedAnnouncements to store announcement batch.
3✔
1464
        announcements := deDupedAnnouncements{}
3✔
1465
        announcements.Reset()
3✔
1466

3✔
1467
        d.cfg.RetransmitTicker.Resume()
3✔
1468
        defer d.cfg.RetransmitTicker.Stop()
3✔
1469

3✔
1470
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
3✔
1471
        defer trickleTimer.Stop()
3✔
1472

3✔
1473
        // To start, we'll first check to see if there are any stale channel or
3✔
1474
        // node announcements that we need to re-transmit.
3✔
1475
        if err := d.retransmitStaleAnns(ctx, time.Now()); err != nil {
3✔
1476
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1477
        }
×
1478

1479
        for {
6✔
1480
                select {
3✔
1481
                // A new policy update has arrived. We'll commit it to the
1482
                // sub-systems below us, then craft, sign, and broadcast a new
1483
                // ChannelUpdate for the set of affected clients.
1484
                case policyUpdate := <-d.chanPolicyUpdates:
3✔
1485
                        log.Tracef("Received channel %d policy update requests",
3✔
1486
                                len(policyUpdate.edgesToUpdate))
3✔
1487

3✔
1488
                        // First, we'll now create new fully signed updates for
3✔
1489
                        // the affected channels and also update the underlying
3✔
1490
                        // graph with the new state.
3✔
1491
                        newChanUpdates, err := d.processChanPolicyUpdate(
3✔
1492
                                ctx, policyUpdate.edgesToUpdate,
3✔
1493
                        )
3✔
1494
                        policyUpdate.errChan <- err
3✔
1495
                        if err != nil {
3✔
1496
                                log.Errorf("Unable to craft policy updates: %v",
×
1497
                                        err)
×
1498
                                continue
×
1499
                        }
1500

1501
                        // Finally, with the updates committed, we'll now add
1502
                        // them to the announcement batch to be flushed at the
1503
                        // start of the next epoch.
1504
                        announcements.AddMsgs(newChanUpdates...)
3✔
1505

1506
                case announcement := <-d.networkMsgs:
3✔
1507
                        log.Tracef("Received network message: "+
3✔
1508
                                "peer=%v, msg=%s, is_remote=%v",
3✔
1509
                                announcement.peer, announcement.msg.MsgType(),
3✔
1510
                                announcement.isRemote)
3✔
1511

3✔
1512
                        switch announcement.msg.(type) {
3✔
1513
                        // Channel announcement signatures are amongst the only
1514
                        // messages that we'll process serially.
1515
                        case *lnwire.AnnounceSignatures1:
3✔
1516
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
3✔
1517
                                        ctx, announcement,
3✔
1518
                                )
3✔
1519
                                log.Debugf("Processed network message %s, "+
3✔
1520
                                        "returned len(announcements)=%v",
3✔
1521
                                        announcement.msg.MsgType(),
3✔
1522
                                        len(emittedAnnouncements))
3✔
1523

3✔
1524
                                if emittedAnnouncements != nil {
6✔
1525
                                        announcements.AddMsgs(
3✔
1526
                                                emittedAnnouncements...,
3✔
1527
                                        )
3✔
1528
                                }
3✔
1529
                                continue
3✔
1530
                        }
1531

1532
                        // If this message was recently rejected, then we won't
1533
                        // attempt to re-process it.
1534
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
3✔
1535
                                announcement.msg,
3✔
1536
                                sourceToPub(announcement.source),
3✔
1537
                        ) {
3✔
UNCOV
1538

×
UNCOV
1539
                                announcement.err <- fmt.Errorf("recently " +
×
UNCOV
1540
                                        "rejected")
×
UNCOV
1541
                                continue
×
1542
                        }
1543

1544
                        // We'll set up any dependent, and wait until a free
1545
                        // slot for this job opens up, this allow us to not
1546
                        // have thousands of goroutines active.
1547
                        annJobID, err := d.vb.InitJobDependencies(
3✔
1548
                                announcement.msg,
3✔
1549
                        )
3✔
1550
                        if err != nil {
3✔
1551
                                announcement.err <- err
×
1552
                                continue
×
1553
                        }
1554

1555
                        d.wg.Add(1)
3✔
1556
                        go d.handleNetworkMessages(
3✔
1557
                                ctx, announcement, &announcements, annJobID,
3✔
1558
                        )
3✔
1559

1560
                // The trickle timer has ticked, which indicates we should
1561
                // flush to the network the pending batch of new announcements
1562
                // we've received since the last trickle tick.
1563
                case <-trickleTimer.C:
3✔
1564
                        // Emit the current batch of announcements from
3✔
1565
                        // deDupedAnnouncements.
3✔
1566
                        announcementBatch := announcements.Emit()
3✔
1567

3✔
1568
                        // If the current announcements batch is nil, then we
3✔
1569
                        // have no further work here.
3✔
1570
                        if announcementBatch.isEmpty() {
6✔
1571
                                continue
3✔
1572
                        }
1573

1574
                        // At this point, we have the set of local and remote
1575
                        // announcements we want to send out. We'll do the
1576
                        // batching as normal for both, but for local
1577
                        // announcements, we'll blast them out w/o regard for
1578
                        // our peer's policies so we ensure they propagate
1579
                        // properly.
1580
                        d.splitAndSendAnnBatch(ctx, announcementBatch)
3✔
1581

1582
                // The retransmission timer has ticked which indicates that we
1583
                // should check if we need to prune or re-broadcast any of our
1584
                // personal channels or node announcement. This addresses the
1585
                // case of "zombie" channels and channel advertisements that
1586
                // have been dropped, or not properly propagated through the
1587
                // network.
UNCOV
1588
                case tick := <-d.cfg.RetransmitTicker.Ticks():
×
UNCOV
1589
                        if err := d.retransmitStaleAnns(ctx, tick); err != nil {
×
1590
                                log.Errorf("unable to rebroadcast stale "+
×
1591
                                        "announcements: %v", err)
×
1592
                        }
×
1593

1594
                // The gossiper has been signalled to exit, to we exit our
1595
                // main loop so the wait group can be decremented.
1596
                case <-d.quit:
3✔
1597
                        return
3✔
1598
                }
1599
        }
1600
}
1601

1602
// handleNetworkMessages is responsible for waiting for dependencies for a
1603
// given network message and processing the message. Once processed, it will
1604
// signal its dependants and add the new announcements to the announce batch.
1605
//
1606
// NOTE: must be run as a goroutine.
1607
func (d *AuthenticatedGossiper) handleNetworkMessages(ctx context.Context,
1608
        nMsg *networkMsg, deDuped *deDupedAnnouncements, jobID JobID) {
3✔
1609

3✔
1610
        defer d.wg.Done()
3✔
1611
        defer d.vb.CompleteJob()
3✔
1612

3✔
1613
        // We should only broadcast this message forward if it originated from
3✔
1614
        // us or it wasn't received as part of our initial historical sync.
3✔
1615
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
3✔
1616

3✔
1617
        // If this message has an existing dependency, then we'll wait until
3✔
1618
        // that has been fully validated before we proceed.
3✔
1619
        err := d.vb.WaitForParents(jobID, nMsg.msg)
3✔
1620
        if err != nil {
3✔
1621
                log.Debugf("Validating network message %s got err: %v",
×
1622
                        nMsg.msg.MsgType(), err)
×
1623

×
1624
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
1625
                        log.Warnf("unexpected error during validation "+
×
1626
                                "barrier shutdown: %v", err)
×
1627
                }
×
1628
                nMsg.err <- err
×
1629

×
1630
                return
×
1631
        }
1632

1633
        // Process the network announcement to determine if this is either a
1634
        // new announcement from our PoV or an edges to a prior vertex/edge we
1635
        // previously proceeded.
1636
        newAnns, allow := d.processNetworkAnnouncement(ctx, nMsg)
3✔
1637

3✔
1638
        log.Tracef("Processed network message %s, returned "+
3✔
1639
                "len(announcements)=%v, allowDependents=%v",
3✔
1640
                nMsg.msg.MsgType(), len(newAnns), allow)
3✔
1641

3✔
1642
        // If this message had any dependencies, then we can now signal them to
3✔
1643
        // continue.
3✔
1644
        err = d.vb.SignalDependents(nMsg.msg, jobID)
3✔
1645
        if err != nil {
3✔
1646
                // Something is wrong if SignalDependents returns an error.
×
1647
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
1648
                        "JobID=%v", spew.Sdump(nMsg.msg), jobID)
×
1649

×
1650
                nMsg.err <- err
×
1651

×
1652
                return
×
1653
        }
×
1654

1655
        // If the announcement was accepted, then add the emitted announcements
1656
        // to our announce batch to be broadcast once the trickle timer ticks
1657
        // gain.
1658
        if newAnns != nil && shouldBroadcast {
6✔
1659
                // TODO(roasbeef): exclude peer that sent.
3✔
1660
                deDuped.AddMsgs(newAnns...)
3✔
1661
        } else if newAnns != nil {
9✔
1662
                log.Trace("Skipping broadcast of announcements received " +
3✔
1663
                        "during initial graph sync")
3✔
1664
        }
3✔
1665
}
1666

1667
// TODO(roasbeef): d/c peers that send updates not on our chain
1668

1669
// InitSyncState is called by outside sub-systems when a connection is
1670
// established to a new peer that understands how to perform channel range
1671
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1672
// needed to handle new queries.
1673
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
3✔
1674
        d.syncMgr.InitSyncState(syncPeer)
3✔
1675
}
3✔
1676

1677
// PruneSyncState is called by outside sub-systems once a peer that we were
1678
// previously connected to has been disconnected. In this case we can stop the
1679
// existing GossipSyncer assigned to the peer and free up resources.
1680
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
3✔
1681
        d.syncMgr.PruneSyncState(peer)
3✔
1682
}
3✔
1683

1684
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1685
// false otherwise, This avoids expensive reprocessing of the message.
1686
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1687
        peerPub [33]byte) bool {
3✔
1688

3✔
1689
        var scid uint64
3✔
1690
        switch m := msg.(type) {
3✔
1691
        case *lnwire.ChannelUpdate1:
3✔
1692
                scid = m.ShortChannelID.ToUint64()
3✔
1693

1694
        case *lnwire.ChannelAnnouncement1:
3✔
1695
                scid = m.ShortChannelID.ToUint64()
3✔
1696

1697
        default:
3✔
1698
                return false
3✔
1699
        }
1700

1701
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
3✔
1702
        return err != cache.ErrElementNotFound
3✔
1703
}
1704

1705
// retransmitStaleAnns examines all outgoing channels that the source node is
1706
// known to maintain to check to see if any of them are "stale". A channel is
1707
// stale iff, the last timestamp of its rebroadcast is older than the
1708
// RebroadcastInterval. We also check if a refreshed node announcement should
1709
// be resent.
1710
func (d *AuthenticatedGossiper) retransmitStaleAnns(ctx context.Context,
1711
        now time.Time) error {
3✔
1712

3✔
1713
        // Iterate over all of our channels and check if any of them fall
3✔
1714
        // within the prune interval or re-broadcast interval.
3✔
1715
        type updateTuple struct {
3✔
1716
                info *models.ChannelEdgeInfo
3✔
1717
                edge *models.ChannelEdgePolicy
3✔
1718
        }
3✔
1719

3✔
1720
        var (
3✔
1721
                havePublicChannels bool
3✔
1722
                edgesToUpdate      []updateTuple
3✔
1723
        )
3✔
1724
        err := d.cfg.Graph.ForAllOutgoingChannels(ctx, func(
3✔
1725
                info *models.ChannelEdgeInfo,
3✔
1726
                edge *models.ChannelEdgePolicy) error {
6✔
1727

3✔
1728
                // If there's no auth proof attached to this edge, it means
3✔
1729
                // that it is a private channel not meant to be announced to
3✔
1730
                // the greater network, so avoid sending channel updates for
3✔
1731
                // this channel to not leak its
3✔
1732
                // existence.
3✔
1733
                if info.AuthProof == nil {
6✔
1734
                        log.Debugf("Skipping retransmission of channel "+
3✔
1735
                                "without AuthProof: %v", info.ChannelID)
3✔
1736
                        return nil
3✔
1737
                }
3✔
1738

1739
                // We make a note that we have at least one public channel. We
1740
                // use this to determine whether we should send a node
1741
                // announcement below.
1742
                havePublicChannels = true
3✔
1743

3✔
1744
                // If this edge has a ChannelUpdate that was created before the
3✔
1745
                // introduction of the MaxHTLC field, then we'll update this
3✔
1746
                // edge to propagate this information in the network.
3✔
1747
                if !edge.MessageFlags.HasMaxHtlc() {
3✔
1748
                        // We'll make sure we support the new max_htlc field if
×
1749
                        // not already present.
×
1750
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1751
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1752

×
1753
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1754
                                info: info,
×
1755
                                edge: edge,
×
1756
                        })
×
1757
                        return nil
×
1758
                }
×
1759

1760
                timeElapsed := now.Sub(edge.LastUpdate)
3✔
1761

3✔
1762
                // If it's been longer than RebroadcastInterval since we've
3✔
1763
                // re-broadcasted the channel, add the channel to the set of
3✔
1764
                // edges we need to update.
3✔
1765
                if timeElapsed >= d.cfg.RebroadcastInterval {
3✔
UNCOV
1766
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
UNCOV
1767
                                info: info,
×
UNCOV
1768
                                edge: edge,
×
UNCOV
1769
                        })
×
UNCOV
1770
                }
×
1771

1772
                return nil
3✔
1773
        }, func() {
3✔
1774
                havePublicChannels = false
3✔
1775
                edgesToUpdate = nil
3✔
1776
        })
3✔
1777
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
3✔
1778
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1779
                        err)
×
1780
        }
×
1781

1782
        var signedUpdates []lnwire.Message
3✔
1783
        for _, chanToUpdate := range edgesToUpdate {
3✔
UNCOV
1784
                // Re-sign and update the channel on disk and retrieve our
×
UNCOV
1785
                // ChannelUpdate to broadcast.
×
UNCOV
1786
                chanAnn, chanUpdate, err := d.updateChannel(
×
UNCOV
1787
                        ctx, chanToUpdate.info, chanToUpdate.edge,
×
UNCOV
1788
                )
×
UNCOV
1789
                if err != nil {
×
1790
                        return fmt.Errorf("unable to update channel: %w", err)
×
1791
                }
×
1792

1793
                // If we have a valid announcement to transmit, then we'll send
1794
                // that along with the update.
UNCOV
1795
                if chanAnn != nil {
×
UNCOV
1796
                        signedUpdates = append(signedUpdates, chanAnn)
×
UNCOV
1797
                }
×
1798

UNCOV
1799
                signedUpdates = append(signedUpdates, chanUpdate)
×
1800
        }
1801

1802
        // If we don't have any public channels, we return as we don't want to
1803
        // broadcast anything that would reveal our existence.
1804
        if !havePublicChannels {
6✔
1805
                return nil
3✔
1806
        }
3✔
1807

1808
        // We'll also check that our NodeAnnouncement is not too old.
1809
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
3✔
1810
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
3✔
1811
        timeElapsed := now.Sub(timestamp)
3✔
1812

3✔
1813
        // If it's been a full day since we've re-broadcasted the
3✔
1814
        // node announcement, refresh it and resend it.
3✔
1815
        nodeAnnStr := ""
3✔
1816
        if timeElapsed >= d.cfg.RebroadcastInterval {
3✔
UNCOV
1817
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
×
UNCOV
1818
                if err != nil {
×
1819
                        return fmt.Errorf("unable to get refreshed node "+
×
1820
                                "announcement: %v", err)
×
1821
                }
×
1822

UNCOV
1823
                signedUpdates = append(signedUpdates, &newNodeAnn)
×
UNCOV
1824
                nodeAnnStr = " and our refreshed node announcement"
×
UNCOV
1825

×
UNCOV
1826
                // Before broadcasting the refreshed node announcement, add it
×
UNCOV
1827
                // to our own graph.
×
UNCOV
1828
                if err := d.addNode(ctx, &newNodeAnn); err != nil {
×
UNCOV
1829
                        log.Errorf("Unable to add refreshed node announcement "+
×
UNCOV
1830
                                "to graph: %v", err)
×
UNCOV
1831
                }
×
1832
        }
1833

1834
        // If we don't have any updates to re-broadcast, then we'll exit
1835
        // early.
1836
        if len(signedUpdates) == 0 {
6✔
1837
                return nil
3✔
1838
        }
3✔
1839

UNCOV
1840
        log.Infof("Retransmitting %v outgoing channels%v",
×
UNCOV
1841
                len(edgesToUpdate), nodeAnnStr)
×
UNCOV
1842

×
UNCOV
1843
        // With all the wire announcements properly crafted, we'll broadcast
×
UNCOV
1844
        // our known outgoing channels to all our immediate peers.
×
UNCOV
1845
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
×
1846
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1847
        }
×
1848

UNCOV
1849
        return nil
×
1850
}
1851

1852
// processChanPolicyUpdate generates a new set of channel updates for the
1853
// provided list of edges and updates the backing ChannelGraphSource.
1854
func (d *AuthenticatedGossiper) processChanPolicyUpdate(ctx context.Context,
1855
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
3✔
1856

3✔
1857
        var chanUpdates []networkMsg
3✔
1858
        for _, edgeInfo := range edgesToUpdate {
6✔
1859
                // Now that we've collected all the channels we need to update,
3✔
1860
                // we'll re-sign and update the backing ChannelGraphSource, and
3✔
1861
                // retrieve our ChannelUpdate to broadcast.
3✔
1862
                _, chanUpdate, err := d.updateChannel(
3✔
1863
                        ctx, edgeInfo.Info, edgeInfo.Edge,
3✔
1864
                )
3✔
1865
                if err != nil {
3✔
1866
                        return nil, err
×
1867
                }
×
1868

1869
                // We'll avoid broadcasting any updates for private channels to
1870
                // avoid directly giving away their existence. Instead, we'll
1871
                // send the update directly to the remote party.
1872
                if edgeInfo.Info.AuthProof == nil {
6✔
1873
                        // If AuthProof is nil and an alias was found for this
3✔
1874
                        // ChannelID (meaning the option-scid-alias feature was
3✔
1875
                        // negotiated), we'll replace the ShortChannelID in the
3✔
1876
                        // update with the peer's alias. We do this after
3✔
1877
                        // updateChannel so that the alias isn't persisted to
3✔
1878
                        // the database.
3✔
1879
                        chanID := lnwire.NewChanIDFromOutPoint(
3✔
1880
                                edgeInfo.Info.ChannelPoint,
3✔
1881
                        )
3✔
1882

3✔
1883
                        var defaultAlias lnwire.ShortChannelID
3✔
1884
                        foundAlias, _ := d.cfg.GetAlias(chanID)
3✔
1885
                        if foundAlias != defaultAlias {
6✔
1886
                                chanUpdate.ShortChannelID = foundAlias
3✔
1887

3✔
1888
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
3✔
1889
                                if err != nil {
3✔
1890
                                        log.Errorf("Unable to sign alias "+
×
1891
                                                "update: %v", err)
×
1892
                                        continue
×
1893
                                }
1894

1895
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
1896
                                if err != nil {
3✔
1897
                                        log.Errorf("Unable to create sig: %v",
×
1898
                                                err)
×
1899
                                        continue
×
1900
                                }
1901

1902
                                chanUpdate.Signature = lnSig
3✔
1903
                        }
1904

1905
                        remotePubKey := remotePubFromChanInfo(
3✔
1906
                                edgeInfo.Info, chanUpdate.ChannelFlags,
3✔
1907
                        )
3✔
1908
                        err := d.reliableSender.sendMessage(
3✔
1909
                                ctx, chanUpdate, remotePubKey,
3✔
1910
                        )
3✔
1911
                        if err != nil {
3✔
1912
                                log.Errorf("Unable to reliably send %v for "+
×
1913
                                        "channel=%v to peer=%x: %v",
×
1914
                                        chanUpdate.MsgType(),
×
1915
                                        chanUpdate.ShortChannelID,
×
1916
                                        remotePubKey, err)
×
1917
                        }
×
1918
                        continue
3✔
1919
                }
1920

1921
                // We set ourselves as the source of this message to indicate
1922
                // that we shouldn't skip any peers when sending this message.
1923
                chanUpdates = append(chanUpdates, networkMsg{
3✔
1924
                        source:   d.selfKey,
3✔
1925
                        isRemote: false,
3✔
1926
                        msg:      chanUpdate,
3✔
1927
                })
3✔
1928
        }
1929

1930
        return chanUpdates, nil
3✔
1931
}
1932

1933
// remotePubFromChanInfo returns the public key of the remote peer given a
1934
// ChannelEdgeInfo that describe a channel we have with them.
1935
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1936
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
3✔
1937

3✔
1938
        var remotePubKey [33]byte
3✔
1939
        switch {
3✔
1940
        case chanFlags&lnwire.ChanUpdateDirection == 0:
3✔
1941
                remotePubKey = chanInfo.NodeKey2Bytes
3✔
1942
        case chanFlags&lnwire.ChanUpdateDirection == 1:
3✔
1943
                remotePubKey = chanInfo.NodeKey1Bytes
3✔
1944
        }
1945

1946
        return remotePubKey
3✔
1947
}
1948

1949
// processRejectedEdge examines a rejected edge to see if we can extract any
1950
// new announcements from it.  An edge will get rejected if we already added
1951
// the same edge without AuthProof to the graph. If the received announcement
1952
// contains a proof, we can add this proof to our edge.  We can end up in this
1953
// situation in the case where we create a channel, but for some reason fail
1954
// to receive the remote peer's proof, while the remote peer is able to fully
1955
// assemble the proof and craft the ChannelAnnouncement.
1956
func (d *AuthenticatedGossiper) processRejectedEdge(_ context.Context,
1957
        chanAnnMsg *lnwire.ChannelAnnouncement1,
1958
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
3✔
1959

3✔
1960
        // First, we'll fetch the state of the channel as we know if from the
3✔
1961
        // database.
3✔
1962
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
3✔
1963
                chanAnnMsg.ShortChannelID,
3✔
1964
        )
3✔
1965
        if err != nil {
3✔
1966
                return nil, err
×
1967
        }
×
1968

1969
        // The edge is in the graph, and has a proof attached, then we'll just
1970
        // reject it as normal.
1971
        if chanInfo.AuthProof != nil {
6✔
1972
                return nil, nil
3✔
1973
        }
3✔
1974

1975
        // Otherwise, this means that the edge is within the graph, but it
1976
        // doesn't yet have a proper proof attached. If we did not receive
1977
        // the proof such that we now can add it, there's nothing more we
1978
        // can do.
1979
        if proof == nil {
×
1980
                return nil, nil
×
1981
        }
×
1982

1983
        // We'll then create then validate the new fully assembled
1984
        // announcement.
1985
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1986
                proof, chanInfo, e1, e2,
×
1987
        )
×
1988
        if err != nil {
×
1989
                return nil, err
×
1990
        }
×
1991
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1992
        if err != nil {
×
1993
                err := fmt.Errorf("assembled channel announcement proof "+
×
1994
                        "for shortChanID=%v isn't valid: %v",
×
1995
                        chanAnnMsg.ShortChannelID, err)
×
1996
                log.Error(err)
×
1997
                return nil, err
×
1998
        }
×
1999

2000
        // If everything checks out, then we'll add the fully assembled proof
2001
        // to the database.
2002
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
2003
        if err != nil {
×
2004
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
2005
                        chanAnnMsg.ShortChannelID, err)
×
2006
                log.Error(err)
×
2007
                return nil, err
×
2008
        }
×
2009

2010
        // As we now have a complete channel announcement for this channel,
2011
        // we'll construct the announcement so they can be broadcast out to all
2012
        // our peers.
2013
        announcements := make([]networkMsg, 0, 3)
×
2014
        announcements = append(announcements, networkMsg{
×
2015
                source: d.selfKey,
×
2016
                msg:    chanAnn,
×
2017
        })
×
2018
        if e1Ann != nil {
×
2019
                announcements = append(announcements, networkMsg{
×
2020
                        source: d.selfKey,
×
2021
                        msg:    e1Ann,
×
2022
                })
×
2023
        }
×
2024
        if e2Ann != nil {
×
2025
                announcements = append(announcements, networkMsg{
×
2026
                        source: d.selfKey,
×
2027
                        msg:    e2Ann,
×
2028
                })
×
2029

×
2030
        }
×
2031

2032
        return announcements, nil
×
2033
}
2034

2035
// fetchPKScript fetches the output script for the given SCID.
2036
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
2037
        []byte, error) {
×
2038

×
2039
        return lnwallet.FetchPKScriptWithQuit(d.cfg.ChainIO, chanID, d.quit)
×
2040
}
×
2041

2042
// addNode processes the given node announcement, and adds it to our channel
2043
// graph.
2044
func (d *AuthenticatedGossiper) addNode(ctx context.Context,
2045
        msg *lnwire.NodeAnnouncement, op ...batch.SchedulerOption) error {
3✔
2046

3✔
2047
        if err := netann.ValidateNodeAnn(msg); err != nil {
3✔
UNCOV
2048
                return fmt.Errorf("unable to validate node announcement: %w",
×
UNCOV
2049
                        err)
×
UNCOV
2050
        }
×
2051

2052
        return d.cfg.Graph.AddNode(
3✔
2053
                ctx, models.NodeFromWireAnnouncement(msg), op...,
3✔
2054
        )
3✔
2055
}
2056

2057
// isPremature decides whether a given network message has a block height+delta
2058
// value specified in the future. If so, the message will be added to the
2059
// future message map and be processed when the block height as reached.
2060
//
2061
// NOTE: must be used inside a lock.
2062
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2063
        delta uint32, msg *networkMsg) bool {
3✔
2064

3✔
2065
        // The channel is already confirmed at chanID.BlockHeight so we minus
3✔
2066
        // one block. For instance, if the required confirmation for this
3✔
2067
        // channel announcement is 6, we then only need to wait for 5 more
3✔
2068
        // blocks once the funding tx is confirmed.
3✔
2069
        if delta > 0 {
6✔
2070
                delta--
3✔
2071
        }
3✔
2072

2073
        msgHeight := chanID.BlockHeight + delta
3✔
2074

3✔
2075
        // The message height is smaller or equal to our best known height,
3✔
2076
        // thus the message is mature.
3✔
2077
        if msgHeight <= d.bestHeight {
6✔
2078
                return false
3✔
2079
        }
3✔
2080

2081
        // Add the premature message to our future messages which will be
2082
        // resent once the block height has reached.
2083
        //
2084
        // Copy the networkMsgs since the old message's err chan will be
2085
        // consumed.
2086
        copied := &networkMsg{
3✔
2087
                peer:              msg.peer,
3✔
2088
                source:            msg.source,
3✔
2089
                msg:               msg.msg,
3✔
2090
                optionalMsgFields: msg.optionalMsgFields,
3✔
2091
                isRemote:          msg.isRemote,
3✔
2092
                err:               make(chan error, 1),
3✔
2093
        }
3✔
2094

3✔
2095
        // Create the cached message.
3✔
2096
        cachedMsg := &cachedFutureMsg{
3✔
2097
                msg:    copied,
3✔
2098
                height: msgHeight,
3✔
2099
        }
3✔
2100

3✔
2101
        // Increment the msg ID and add it to the cache.
3✔
2102
        nextMsgID := d.futureMsgs.nextMsgID()
3✔
2103
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
3✔
2104
        if err != nil {
3✔
2105
                log.Errorf("Adding future message got error: %v", err)
×
2106
        }
×
2107

2108
        log.Debugf("Network message: %v added to future messages for "+
3✔
2109
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
3✔
2110
                msgHeight, d.bestHeight)
3✔
2111

3✔
2112
        return true
3✔
2113
}
2114

2115
// processNetworkAnnouncement processes a new network relate authenticated
2116
// channel or node announcement or announcements proofs. If the announcement
2117
// didn't affect the internal state due to either being out of date, invalid,
2118
// or redundant, then nil is returned. Otherwise, the set of announcements will
2119
// be returned which should be broadcasted to the rest of the network. The
2120
// boolean returned indicates whether any dependents of the announcement should
2121
// attempt to be processed as well.
2122
func (d *AuthenticatedGossiper) processNetworkAnnouncement(ctx context.Context,
2123
        nMsg *networkMsg) ([]networkMsg, bool) {
3✔
2124

3✔
2125
        // If this is a remote update, we set the scheduler option to lazily
3✔
2126
        // add it to the graph.
3✔
2127
        var schedulerOp []batch.SchedulerOption
3✔
2128
        if nMsg.isRemote {
6✔
2129
                schedulerOp = append(schedulerOp, batch.LazyAdd())
3✔
2130
        }
3✔
2131

2132
        switch msg := nMsg.msg.(type) {
3✔
2133
        // A new node announcement has arrived which either presents new
2134
        // information about a node in one of the channels we know about, or a
2135
        // updating previously advertised information.
2136
        case *lnwire.NodeAnnouncement:
3✔
2137
                return d.handleNodeAnnouncement(ctx, nMsg, msg, schedulerOp)
3✔
2138

2139
        // A new channel announcement has arrived, this indicates the
2140
        // *creation* of a new channel within the network. This only advertises
2141
        // the existence of a channel and not yet the routing policies in
2142
        // either direction of the channel.
2143
        case *lnwire.ChannelAnnouncement1:
3✔
2144
                return d.handleChanAnnouncement(ctx, nMsg, msg, schedulerOp...)
3✔
2145

2146
        // A new authenticated channel edge update has arrived. This indicates
2147
        // that the directional information for an already known channel has
2148
        // been updated.
2149
        case *lnwire.ChannelUpdate1:
3✔
2150
                return d.handleChanUpdate(ctx, nMsg, msg, schedulerOp)
3✔
2151

2152
        // A new signature announcement has been received. This indicates
2153
        // willingness of nodes involved in the funding of a channel to
2154
        // announce this new channel to the rest of the world.
2155
        case *lnwire.AnnounceSignatures1:
3✔
2156
                return d.handleAnnSig(ctx, nMsg, msg)
3✔
2157

2158
        default:
×
2159
                err := errors.New("wrong type of the announcement")
×
2160
                nMsg.err <- err
×
2161
                return nil, false
×
2162
        }
2163
}
2164

2165
// processZombieUpdate determines whether the provided channel update should
2166
// resurrect a given zombie edge.
2167
//
2168
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2169
// should be inspected.
2170
func (d *AuthenticatedGossiper) processZombieUpdate(_ context.Context,
2171
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
UNCOV
2172
        msg *lnwire.ChannelUpdate1) error {
×
UNCOV
2173

×
UNCOV
2174
        // The least-significant bit in the flag on the channel update tells us
×
UNCOV
2175
        // which edge is being updated.
×
UNCOV
2176
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
×
UNCOV
2177

×
UNCOV
2178
        // Since we've deemed the update as not stale above, before marking it
×
UNCOV
2179
        // live, we'll make sure it has been signed by the correct party. If we
×
UNCOV
2180
        // have both pubkeys, either party can resurrect the channel. If we've
×
UNCOV
2181
        // already marked this with the stricter, single-sided resurrection we
×
UNCOV
2182
        // will only have the pubkey of the node with the oldest timestamp.
×
UNCOV
2183
        var pubKey *btcec.PublicKey
×
UNCOV
2184
        switch {
×
2185
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2186
                pubKey, _ = chanInfo.NodeKey1()
×
UNCOV
2187
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
×
UNCOV
2188
                pubKey, _ = chanInfo.NodeKey2()
×
2189
        }
UNCOV
2190
        if pubKey == nil {
×
UNCOV
2191
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
×
UNCOV
2192
                        "with chan_id=%v", msg.ShortChannelID)
×
UNCOV
2193
        }
×
2194

UNCOV
2195
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
×
UNCOV
2196
        if err != nil {
×
UNCOV
2197
                return fmt.Errorf("unable to verify channel "+
×
UNCOV
2198
                        "update signature: %v", err)
×
UNCOV
2199
        }
×
2200

2201
        // With the signature valid, we'll proceed to mark the
2202
        // edge as live and wait for the channel announcement to
2203
        // come through again.
UNCOV
2204
        err = d.cfg.Graph.MarkEdgeLive(scid)
×
UNCOV
2205
        switch {
×
2206
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2207
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2208
                        "zombie index: %v", err)
×
2209

×
2210
                return nil
×
2211

2212
        case err != nil:
×
2213
                return fmt.Errorf("unable to remove edge with "+
×
2214
                        "chan_id=%v from zombie index: %v",
×
2215
                        msg.ShortChannelID, err)
×
2216

UNCOV
2217
        default:
×
2218
        }
2219

UNCOV
2220
        log.Debugf("Removed edge with chan_id=%v from zombie "+
×
UNCOV
2221
                "index", msg.ShortChannelID)
×
UNCOV
2222

×
UNCOV
2223
        return nil
×
2224
}
2225

2226
// fetchNodeAnn fetches the latest signed node announcement from our point of
2227
// view for the node with the given public key.
2228
func (d *AuthenticatedGossiper) fetchNodeAnn(ctx context.Context,
2229
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
3✔
2230

3✔
2231
        node, err := d.cfg.Graph.FetchLightningNode(ctx, pubKey)
3✔
2232
        if err != nil {
3✔
UNCOV
2233
                return nil, err
×
UNCOV
2234
        }
×
2235

2236
        return node.NodeAnnouncement(true)
3✔
2237
}
2238

2239
// isMsgStale determines whether a message retrieved from the backing
2240
// MessageStore is seen as stale by the current graph.
2241
func (d *AuthenticatedGossiper) isMsgStale(_ context.Context,
2242
        msg lnwire.Message) bool {
3✔
2243

3✔
2244
        switch msg := msg.(type) {
3✔
2245
        case *lnwire.AnnounceSignatures1:
3✔
2246
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
3✔
2247
                        msg.ShortChannelID,
3✔
2248
                )
3✔
2249

3✔
2250
                // If the channel cannot be found, it is most likely a leftover
3✔
2251
                // message for a channel that was closed, so we can consider it
3✔
2252
                // stale.
3✔
2253
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
6✔
2254
                        return true
3✔
2255
                }
3✔
2256
                if err != nil {
3✔
2257
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2258
                                "%v", msg.ShortChannelID, err)
×
2259
                        return false
×
2260
                }
×
2261

2262
                // If the proof exists in the graph, then we have successfully
2263
                // received the remote proof and assembled the full proof, so we
2264
                // can safely delete the local proof from the database.
2265
                return chanInfo.AuthProof != nil
3✔
2266

2267
        case *lnwire.ChannelUpdate1:
3✔
2268
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
3✔
2269

3✔
2270
                // If the channel cannot be found, it is most likely a leftover
3✔
2271
                // message for a channel that was closed, so we can consider it
3✔
2272
                // stale.
3✔
2273
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
6✔
2274
                        return true
3✔
2275
                }
3✔
2276
                if err != nil {
3✔
2277
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2278
                                "%v", msg.ShortChannelID, err)
×
2279
                        return false
×
2280
                }
×
2281

2282
                // Otherwise, we'll retrieve the correct policy that we
2283
                // currently have stored within our graph to check if this
2284
                // message is stale by comparing its timestamp.
2285
                var p *models.ChannelEdgePolicy
3✔
2286
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
6✔
2287
                        p = p1
3✔
2288
                } else {
6✔
2289
                        p = p2
3✔
2290
                }
3✔
2291

2292
                // If the policy is still unknown, then we can consider this
2293
                // policy fresh.
2294
                if p == nil {
3✔
2295
                        return false
×
2296
                }
×
2297

2298
                timestamp := time.Unix(int64(msg.Timestamp), 0)
3✔
2299
                return p.LastUpdate.After(timestamp)
3✔
2300

2301
        default:
×
2302
                // We'll make sure to not mark any unsupported messages as stale
×
2303
                // to ensure they are not removed.
×
2304
                return false
×
2305
        }
2306
}
2307

2308
// updateChannel creates a new fully signed update for the channel, and updates
2309
// the underlying graph with the new state.
2310
func (d *AuthenticatedGossiper) updateChannel(ctx context.Context,
2311
        info *models.ChannelEdgeInfo,
2312
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2313
        *lnwire.ChannelUpdate1, error) {
3✔
2314

3✔
2315
        // Parse the unsigned edge into a channel update.
3✔
2316
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
3✔
2317

3✔
2318
        // We'll generate a new signature over a digest of the channel
3✔
2319
        // announcement itself and update the timestamp to ensure it propagate.
3✔
2320
        err := netann.SignChannelUpdate(
3✔
2321
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
3✔
2322
                netann.ChanUpdSetTimestamp,
3✔
2323
        )
3✔
2324
        if err != nil {
3✔
2325
                return nil, nil, err
×
2326
        }
×
2327

2328
        // Next, we'll set the new signature in place, and update the reference
2329
        // in the backing slice.
2330
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
3✔
2331
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
3✔
2332

3✔
2333
        // To ensure that our signature is valid, we'll verify it ourself
3✔
2334
        // before committing it to the slice returned.
3✔
2335
        err = netann.ValidateChannelUpdateAnn(
3✔
2336
                d.selfKey, info.Capacity, chanUpdate,
3✔
2337
        )
3✔
2338
        if err != nil {
3✔
2339
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2340
                        "update sig: %v", err)
×
2341
        }
×
2342

2343
        // Finally, we'll write the new edge policy to disk.
2344
        if err := d.cfg.Graph.UpdateEdge(ctx, edge); err != nil {
3✔
2345
                return nil, nil, err
×
2346
        }
×
2347

2348
        // We'll also create the original channel announcement so the two can
2349
        // be broadcast along side each other (if necessary), but only if we
2350
        // have a full channel announcement for this channel.
2351
        var chanAnn *lnwire.ChannelAnnouncement1
3✔
2352
        if info.AuthProof != nil {
6✔
2353
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
3✔
2354
                chanAnn = &lnwire.ChannelAnnouncement1{
3✔
2355
                        ShortChannelID:  chanID,
3✔
2356
                        NodeID1:         info.NodeKey1Bytes,
3✔
2357
                        NodeID2:         info.NodeKey2Bytes,
3✔
2358
                        ChainHash:       info.ChainHash,
3✔
2359
                        BitcoinKey1:     info.BitcoinKey1Bytes,
3✔
2360
                        Features:        lnwire.NewRawFeatureVector(),
3✔
2361
                        BitcoinKey2:     info.BitcoinKey2Bytes,
3✔
2362
                        ExtraOpaqueData: info.ExtraOpaqueData,
3✔
2363
                }
3✔
2364
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2365
                        info.AuthProof.NodeSig1Bytes,
3✔
2366
                )
3✔
2367
                if err != nil {
3✔
2368
                        return nil, nil, err
×
2369
                }
×
2370
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2371
                        info.AuthProof.NodeSig2Bytes,
3✔
2372
                )
3✔
2373
                if err != nil {
3✔
2374
                        return nil, nil, err
×
2375
                }
×
2376
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2377
                        info.AuthProof.BitcoinSig1Bytes,
3✔
2378
                )
3✔
2379
                if err != nil {
3✔
2380
                        return nil, nil, err
×
2381
                }
×
2382
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2383
                        info.AuthProof.BitcoinSig2Bytes,
3✔
2384
                )
3✔
2385
                if err != nil {
3✔
2386
                        return nil, nil, err
×
2387
                }
×
2388
        }
2389

2390
        return chanAnn, chanUpdate, err
3✔
2391
}
2392

2393
// SyncManager returns the gossiper's SyncManager instance.
2394
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
3✔
2395
        return d.syncMgr
3✔
2396
}
3✔
2397

2398
// IsKeepAliveUpdate determines whether this channel update is considered a
2399
// keep-alive update based on the previous channel update processed for the same
2400
// direction.
2401
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2402
        prev *models.ChannelEdgePolicy) bool {
3✔
2403

3✔
2404
        // Both updates should be from the same direction.
3✔
2405
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
3✔
2406
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
3✔
2407

×
2408
                return false
×
2409
        }
×
2410

2411
        // The timestamp should always increase for a keep-alive update.
2412
        timestamp := time.Unix(int64(update.Timestamp), 0)
3✔
2413
        if !timestamp.After(prev.LastUpdate) {
3✔
2414
                return false
×
2415
        }
×
2416

2417
        // None of the remaining fields should change for a keep-alive update.
2418
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
6✔
2419
                return false
3✔
2420
        }
3✔
2421
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
6✔
2422
                return false
3✔
2423
        }
3✔
2424
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
6✔
2425
                return false
3✔
2426
        }
3✔
2427
        if update.TimeLockDelta != prev.TimeLockDelta {
3✔
2428
                return false
×
2429
        }
×
2430
        if update.HtlcMinimumMsat != prev.MinHTLC {
3✔
2431
                return false
×
2432
        }
×
2433
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
3✔
2434
                return false
×
2435
        }
×
2436
        if update.HtlcMaximumMsat != prev.MaxHTLC {
3✔
2437
                return false
×
2438
        }
×
2439
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
6✔
2440
                return false
3✔
2441
        }
3✔
2442
        return true
3✔
2443
}
2444

2445
// latestHeight returns the gossiper's latest height known of the chain.
2446
func (d *AuthenticatedGossiper) latestHeight() uint32 {
3✔
2447
        d.Lock()
3✔
2448
        defer d.Unlock()
3✔
2449
        return d.bestHeight
3✔
2450
}
3✔
2451

2452
// handleNodeAnnouncement processes a new node announcement.
2453
func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context,
2454
        nMsg *networkMsg, nodeAnn *lnwire.NodeAnnouncement,
2455
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
3✔
2456

3✔
2457
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
3✔
2458

3✔
2459
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
3✔
2460
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
3✔
2461
                nMsg.source.SerializeCompressed())
3✔
2462

3✔
2463
        // We'll quickly ask the router if it already has a newer update for
3✔
2464
        // this node so we can skip validating signatures if not required.
3✔
2465
        if d.cfg.Graph.IsStaleNode(ctx, nodeAnn.NodeID, timestamp) {
6✔
2466
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
3✔
2467
                nMsg.err <- nil
3✔
2468
                return nil, true
3✔
2469
        }
3✔
2470

2471
        if err := d.addNode(ctx, nodeAnn, ops...); err != nil {
6✔
2472
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
3✔
2473
                        err)
3✔
2474

3✔
2475
                if !graph.IsError(
3✔
2476
                        err,
3✔
2477
                        graph.ErrOutdated,
3✔
2478
                        graph.ErrIgnored,
3✔
2479
                ) {
3✔
2480

×
2481
                        log.Error(err)
×
2482
                }
×
2483

2484
                nMsg.err <- err
3✔
2485
                return nil, false
3✔
2486
        }
2487

2488
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2489
        // quick check to ensure this node intends to publicly advertise itself
2490
        // to the network.
2491
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
3✔
2492
        if err != nil {
3✔
2493
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2494
                        nodeAnn.NodeID, err)
×
2495
                nMsg.err <- err
×
2496
                return nil, false
×
2497
        }
×
2498

2499
        var announcements []networkMsg
3✔
2500

3✔
2501
        // If it does, we'll add their announcement to our batch so that it can
3✔
2502
        // be broadcast to the rest of our peers.
3✔
2503
        if isPublic {
6✔
2504
                announcements = append(announcements, networkMsg{
3✔
2505
                        peer:     nMsg.peer,
3✔
2506
                        isRemote: nMsg.isRemote,
3✔
2507
                        source:   nMsg.source,
3✔
2508
                        msg:      nodeAnn,
3✔
2509
                })
3✔
2510
        } else {
6✔
2511
                log.Tracef("Skipping broadcasting node announcement for %x "+
3✔
2512
                        "due to being unadvertised", nodeAnn.NodeID)
3✔
2513
        }
3✔
2514

2515
        nMsg.err <- nil
3✔
2516
        // TODO(roasbeef): get rid of the above
3✔
2517

3✔
2518
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
3✔
2519
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
3✔
2520
                nMsg.source.SerializeCompressed())
3✔
2521

3✔
2522
        return announcements, true
3✔
2523
}
2524

2525
// handleChanAnnouncement processes a new channel announcement.
2526
//
2527
//nolint:funlen
2528
func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
2529
        nMsg *networkMsg, ann *lnwire.ChannelAnnouncement1,
2530
        ops ...batch.SchedulerOption) ([]networkMsg, bool) {
3✔
2531

3✔
2532
        scid := ann.ShortChannelID
3✔
2533

3✔
2534
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
3✔
2535
                nMsg.peer, scid.ToUint64())
3✔
2536

3✔
2537
        // We'll ignore any channel announcements that target any chain other
3✔
2538
        // than the set of chains we know of.
3✔
2539
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
3✔
2540
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2541
                        ", gossiper on chain=%v", ann.ChainHash,
×
2542
                        d.cfg.ChainHash)
×
2543
                log.Errorf(err.Error())
×
2544

×
2545
                key := newRejectCacheKey(
×
2546
                        scid.ToUint64(),
×
2547
                        sourceToPub(nMsg.source),
×
2548
                )
×
2549
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2550

×
2551
                nMsg.err <- err
×
2552
                return nil, false
×
2553
        }
×
2554

2555
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2556
        // reject the announcement. Since the router accepts alias SCIDs,
2557
        // not erroring out would be a DoS vector.
2558
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
3✔
2559
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2560
                log.Errorf(err.Error())
×
2561

×
2562
                key := newRejectCacheKey(
×
2563
                        scid.ToUint64(),
×
2564
                        sourceToPub(nMsg.source),
×
2565
                )
×
2566
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2567

×
2568
                nMsg.err <- err
×
2569
                return nil, false
×
2570
        }
×
2571

2572
        // If the advertised inclusionary block is beyond our knowledge of the
2573
        // chain tip, then we'll ignore it for now.
2574
        d.Lock()
3✔
2575
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
3✔
UNCOV
2576
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
×
UNCOV
2577
                        "advertises height %v, only height %v is known",
×
UNCOV
2578
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
×
UNCOV
2579
                d.Unlock()
×
UNCOV
2580
                nMsg.err <- nil
×
UNCOV
2581
                return nil, false
×
UNCOV
2582
        }
×
2583
        d.Unlock()
3✔
2584

3✔
2585
        // At this point, we'll now ask the router if this is a zombie/known
3✔
2586
        // edge. If so we can skip all the processing below.
3✔
2587
        if d.cfg.Graph.IsKnownEdge(scid) {
6✔
2588
                nMsg.err <- nil
3✔
2589
                return nil, true
3✔
2590
        }
3✔
2591

2592
        // Check if the channel is already closed in which case we can ignore
2593
        // it.
2594
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
3✔
2595
        if err != nil {
3✔
2596
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2597
                        err)
×
2598
                nMsg.err <- err
×
2599

×
2600
                return nil, false
×
2601
        }
×
2602

2603
        if closed {
3✔
UNCOV
2604
                err = fmt.Errorf("ignoring closed channel %v", scid)
×
UNCOV
2605

×
UNCOV
2606
                // If this is an announcement from us, we'll just ignore it.
×
UNCOV
2607
                if !nMsg.isRemote {
×
2608
                        nMsg.err <- err
×
2609
                        return nil, false
×
2610
                }
×
2611

NEW
2612
                log.Warnf("Increasing ban score for peer=%v due to outdated "+
×
NEW
2613
                        "channel announcement for channel %v", nMsg.peer, scid)
×
NEW
2614

×
UNCOV
2615
                // Increment the peer's ban score if they are sending closed
×
UNCOV
2616
                // channel announcements.
×
NEW
2617
                dcErr := d.handleBadPeer(nMsg.peer)
×
UNCOV
2618
                if dcErr != nil {
×
NEW
2619
                        err = dcErr
×
2620
                }
×
2621

UNCOV
2622
                nMsg.err <- err
×
UNCOV
2623

×
UNCOV
2624
                return nil, false
×
2625
        }
2626

2627
        // If this is a remote channel announcement, then we'll validate all
2628
        // the signatures within the proof as it should be well formed.
2629
        var proof *models.ChannelAuthProof
3✔
2630
        if nMsg.isRemote {
6✔
2631
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
3✔
2632
                if err != nil {
3✔
2633
                        err := fmt.Errorf("unable to validate announcement: "+
×
2634
                                "%v", err)
×
2635

×
2636
                        key := newRejectCacheKey(
×
2637
                                scid.ToUint64(),
×
2638
                                sourceToPub(nMsg.source),
×
2639
                        )
×
2640
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2641

×
2642
                        log.Error(err)
×
2643
                        nMsg.err <- err
×
2644
                        return nil, false
×
2645
                }
×
2646

2647
                // If the proof checks out, then we'll save the proof itself to
2648
                // the database so we can fetch it later when gossiping with
2649
                // other nodes.
2650
                proof = &models.ChannelAuthProof{
3✔
2651
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
3✔
2652
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
3✔
2653
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
3✔
2654
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
3✔
2655
                }
3✔
2656
        }
2657

2658
        // With the proof validated (if necessary), we can now store it within
2659
        // the database for our path finding and syncing needs.
2660
        edge := &models.ChannelEdgeInfo{
3✔
2661
                ChannelID:        scid.ToUint64(),
3✔
2662
                ChainHash:        ann.ChainHash,
3✔
2663
                NodeKey1Bytes:    ann.NodeID1,
3✔
2664
                NodeKey2Bytes:    ann.NodeID2,
3✔
2665
                BitcoinKey1Bytes: ann.BitcoinKey1,
3✔
2666
                BitcoinKey2Bytes: ann.BitcoinKey2,
3✔
2667
                AuthProof:        proof,
3✔
2668
                Features: lnwire.NewFeatureVector(
3✔
2669
                        ann.Features, lnwire.Features,
3✔
2670
                ),
3✔
2671
                ExtraOpaqueData: ann.ExtraOpaqueData,
3✔
2672
        }
3✔
2673

3✔
2674
        // If there were any optional message fields provided, we'll include
3✔
2675
        // them in its serialized disk representation now.
3✔
2676
        var tapscriptRoot fn.Option[chainhash.Hash]
3✔
2677
        if nMsg.optionalMsgFields != nil {
6✔
2678
                if nMsg.optionalMsgFields.capacity != nil {
6✔
2679
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
3✔
2680
                }
3✔
2681
                if nMsg.optionalMsgFields.channelPoint != nil {
6✔
2682
                        cp := *nMsg.optionalMsgFields.channelPoint
3✔
2683
                        edge.ChannelPoint = cp
3✔
2684
                }
3✔
2685

2686
                // Optional tapscript root for custom channels.
2687
                tapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
3✔
2688
        }
2689

2690
        // Before we start validation or add the edge to the database, we obtain
2691
        // the mutex for this channel ID. We do this to ensure no other
2692
        // goroutine has read the database and is now making decisions based on
2693
        // this DB state, before it writes to the DB. It also ensures that we
2694
        // don't perform the expensive validation check on the same channel
2695
        // announcement at the same time.
2696
        d.channelMtx.Lock(scid.ToUint64())
3✔
2697

3✔
2698
        // If AssumeChannelValid is present, then we are unable to perform any
3✔
2699
        // of the expensive checks below, so we'll short-circuit our path
3✔
2700
        // straight to adding the edge to our graph. If the passed
3✔
2701
        // ShortChannelID is an alias, then we'll skip validation as it will
3✔
2702
        // not map to a legitimate tx. This is not a DoS vector as only we can
3✔
2703
        // add an alias ChannelAnnouncement from the gossiper.
3✔
2704
        if !(d.cfg.AssumeChannelValid || d.cfg.IsAlias(scid)) {
6✔
2705
                op, capacity, script, err := d.validateFundingTransaction(
3✔
2706
                        ctx, ann, tapscriptRoot,
3✔
2707
                )
3✔
2708
                if err != nil {
3✔
UNCOV
2709
                        defer d.channelMtx.Unlock(scid.ToUint64())
×
UNCOV
2710

×
UNCOV
2711
                        switch {
×
2712
                        case errors.Is(err, ErrNoFundingTransaction),
UNCOV
2713
                                errors.Is(err, ErrInvalidFundingOutput):
×
UNCOV
2714

×
UNCOV
2715
                                key := newRejectCacheKey(
×
UNCOV
2716
                                        scid.ToUint64(),
×
UNCOV
2717
                                        sourceToPub(nMsg.source),
×
UNCOV
2718
                                )
×
UNCOV
2719
                                _, _ = d.recentRejects.Put(
×
UNCOV
2720
                                        key, &cachedReject{},
×
UNCOV
2721
                                )
×
2722

UNCOV
2723
                        case errors.Is(err, ErrChannelSpent):
×
UNCOV
2724
                                key := newRejectCacheKey(
×
UNCOV
2725
                                        scid.ToUint64(),
×
UNCOV
2726
                                        sourceToPub(nMsg.source),
×
UNCOV
2727
                                )
×
UNCOV
2728
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2729

×
UNCOV
2730
                                // Since this channel has already been closed,
×
UNCOV
2731
                                // we'll add it to the graph's closed channel
×
UNCOV
2732
                                // index such that we won't attempt to do
×
UNCOV
2733
                                // expensive validation checks on it again.
×
UNCOV
2734
                                // TODO: Populate the ScidCloser by using closed
×
UNCOV
2735
                                // channel notifications.
×
UNCOV
2736
                                dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
×
UNCOV
2737
                                if dbErr != nil {
×
2738
                                        log.Errorf("failed to mark scid(%v) "+
×
2739
                                                "as closed: %v", scid, dbErr)
×
2740

×
2741
                                        nMsg.err <- dbErr
×
2742

×
2743
                                        return nil, false
×
2744
                                }
×
2745

UNCOV
2746
                        default:
×
UNCOV
2747
                                // Otherwise, this is just a regular rejected
×
NEW
2748
                                // edge. We won't increase the ban score for the
×
NEW
2749
                                // remote peer.
×
2750
                                key := newRejectCacheKey(
×
2751
                                        scid.ToUint64(),
×
2752
                                        sourceToPub(nMsg.source),
×
2753
                                )
×
2754
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
NEW
2755

×
NEW
2756
                                nMsg.err <- err
×
NEW
2757

×
NEW
2758
                                return nil, false
×
2759
                        }
2760

UNCOV
2761
                        if !nMsg.isRemote {
×
2762
                                log.Errorf("failed to add edge for local "+
×
2763
                                        "channel: %v", err)
×
2764
                                nMsg.err <- err
×
2765

×
2766
                                return nil, false
×
2767
                        }
×
2768

NEW
2769
                        log.Warnf("Increasing ban score for peer=%v due to "+
×
NEW
2770
                                "invalid channel announcement for channel %v",
×
NEW
2771
                                nMsg.peer, scid)
×
UNCOV
2772

×
NEW
2773
                        // Increment the peer's ban score if they are sending
×
NEW
2774
                        // us invalid channel announcements.
×
NEW
2775
                        dcErr := d.handleBadPeer(nMsg.peer)
×
NEW
2776
                        if dcErr != nil {
×
NEW
2777
                                err = dcErr
×
UNCOV
2778
                        }
×
2779

UNCOV
2780
                        nMsg.err <- err
×
UNCOV
2781

×
UNCOV
2782
                        return nil, false
×
2783
                }
2784

2785
                edge.FundingScript = fn.Some(script)
3✔
2786

3✔
2787
                // TODO(roasbeef): this is a hack, needs to be removed after
3✔
2788
                //  commitment fees are dynamic.
3✔
2789
                edge.Capacity = capacity
3✔
2790
                edge.ChannelPoint = op
3✔
2791
        }
2792

2793
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
3✔
2794

3✔
2795
        // We will add the edge to the channel router. If the nodes present in
3✔
2796
        // this channel are not present in the database, a partial node will be
3✔
2797
        // added to represent each node while we wait for a node announcement.
3✔
2798
        err = d.cfg.Graph.AddEdge(ctx, edge, ops...)
3✔
2799
        if err != nil {
6✔
2800
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
3✔
2801
                        scid.ToUint64(), err)
3✔
2802

3✔
2803
                defer d.channelMtx.Unlock(scid.ToUint64())
3✔
2804

3✔
2805
                // If the edge was rejected due to already being known, then it
3✔
2806
                // may be the case that this new message has a fresh channel
3✔
2807
                // proof, so we'll check.
3✔
2808
                if graph.IsError(err, graph.ErrIgnored) {
6✔
2809
                        // Attempt to process the rejected message to see if we
3✔
2810
                        // get any new announcements.
3✔
2811
                        anns, rErr := d.processRejectedEdge(ctx, ann, proof)
3✔
2812
                        if rErr != nil {
3✔
2813
                                key := newRejectCacheKey(
×
2814
                                        scid.ToUint64(),
×
2815
                                        sourceToPub(nMsg.source),
×
2816
                                )
×
2817
                                cr := &cachedReject{}
×
2818
                                _, _ = d.recentRejects.Put(key, cr)
×
2819

×
2820
                                nMsg.err <- rErr
×
2821

×
2822
                                return nil, false
×
2823
                        }
×
2824

2825
                        log.Debugf("Extracted %v announcements from rejected "+
3✔
2826
                                "msgs", len(anns))
3✔
2827

3✔
2828
                        // If while processing this rejected edge, we realized
3✔
2829
                        // there's a set of announcements we could extract,
3✔
2830
                        // then we'll return those directly.
3✔
2831
                        //
3✔
2832
                        // NOTE: since this is an ErrIgnored, we can return
3✔
2833
                        // true here to signal "allow" to its dependants.
3✔
2834
                        nMsg.err <- nil
3✔
2835

3✔
2836
                        return anns, true
3✔
2837
                }
2838

2839
                // Otherwise, this is just a regular rejected edge.
UNCOV
2840
                key := newRejectCacheKey(
×
UNCOV
2841
                        scid.ToUint64(),
×
UNCOV
2842
                        sourceToPub(nMsg.source),
×
UNCOV
2843
                )
×
UNCOV
2844
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2845

×
UNCOV
2846
                if !nMsg.isRemote {
×
2847
                        log.Errorf("failed to add edge for local channel: %v",
×
2848
                                err)
×
2849
                        nMsg.err <- err
×
2850

×
2851
                        return nil, false
×
2852
                }
×
2853

UNCOV
2854
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
×
UNCOV
2855
                if dcErr != nil {
×
2856
                        log.Errorf("failed to check if we should disconnect "+
×
2857
                                "peer: %v", dcErr)
×
2858
                        nMsg.err <- dcErr
×
2859

×
2860
                        return nil, false
×
2861
                }
×
2862

UNCOV
2863
                if shouldDc {
×
2864
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2865
                }
×
2866

UNCOV
2867
                nMsg.err <- err
×
UNCOV
2868

×
UNCOV
2869
                return nil, false
×
2870
        }
2871

2872
        // If err is nil, release the lock immediately.
2873
        d.channelMtx.Unlock(scid.ToUint64())
3✔
2874

3✔
2875
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
3✔
2876

3✔
2877
        // If we earlier received any ChannelUpdates for this channel, we can
3✔
2878
        // now process them, as the channel is added to the graph.
3✔
2879
        var channelUpdates []*processedNetworkMsg
3✔
2880

3✔
2881
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
3✔
2882
        if err == nil {
6✔
2883
                // There was actually an entry in the map, so we'll accumulate
3✔
2884
                // it. We don't worry about deletion, since it'll eventually
3✔
2885
                // fall out anyway.
3✔
2886
                chanMsgs := earlyChanUpdates
3✔
2887
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
3✔
2888
        }
3✔
2889

2890
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2891
        // ensure we don't block here, as we can handle only one announcement
2892
        // at a time.
2893
        for _, cu := range channelUpdates {
6✔
2894
                // Skip if already processed.
3✔
2895
                if cu.processed {
4✔
2896
                        continue
1✔
2897
                }
2898

2899
                // Mark the ChannelUpdate as processed. This ensures that a
2900
                // subsequent announcement in the option-scid-alias case does
2901
                // not re-use an old ChannelUpdate.
2902
                cu.processed = true
3✔
2903

3✔
2904
                d.wg.Add(1)
3✔
2905
                go func(updMsg *networkMsg) {
6✔
2906
                        defer d.wg.Done()
3✔
2907

3✔
2908
                        switch msg := updMsg.msg.(type) {
3✔
2909
                        // Reprocess the message, making sure we return an
2910
                        // error to the original caller in case the gossiper
2911
                        // shuts down.
2912
                        case *lnwire.ChannelUpdate1:
3✔
2913
                                log.Debugf("Reprocessing ChannelUpdate for "+
3✔
2914
                                        "shortChanID=%v", scid.ToUint64())
3✔
2915

3✔
2916
                                select {
3✔
2917
                                case d.networkMsgs <- updMsg:
3✔
2918
                                case <-d.quit:
×
2919
                                        updMsg.err <- ErrGossiperShuttingDown
×
2920
                                }
2921

2922
                        // We don't expect any other message type than
2923
                        // ChannelUpdate to be in this cache.
2924
                        default:
×
2925
                                log.Errorf("Unsupported message type found "+
×
2926
                                        "among ChannelUpdates: %T", msg)
×
2927
                        }
2928
                }(cu.msg)
2929
        }
2930

2931
        // Channel announcement was successfully processed and now it might be
2932
        // broadcast to other connected nodes if it was an announcement with
2933
        // proof (remote).
2934
        var announcements []networkMsg
3✔
2935

3✔
2936
        if proof != nil {
6✔
2937
                announcements = append(announcements, networkMsg{
3✔
2938
                        peer:     nMsg.peer,
3✔
2939
                        isRemote: nMsg.isRemote,
3✔
2940
                        source:   nMsg.source,
3✔
2941
                        msg:      ann,
3✔
2942
                })
3✔
2943
        }
3✔
2944

2945
        nMsg.err <- nil
3✔
2946

3✔
2947
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
3✔
2948
                nMsg.peer, scid.ToUint64())
3✔
2949

3✔
2950
        return announcements, true
3✔
2951
}
2952

2953
// handleChanUpdate processes a new channel update.
2954
//
2955
//nolint:funlen
2956
func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
2957
        nMsg *networkMsg, upd *lnwire.ChannelUpdate1,
2958
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
3✔
2959

3✔
2960
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
3✔
2961
                nMsg.peer, upd.ShortChannelID.ToUint64())
3✔
2962

3✔
2963
        // We'll ignore any channel updates that target any chain other than
3✔
2964
        // the set of chains we know of.
3✔
2965
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
3✔
2966
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2967
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
×
2968
                log.Errorf(err.Error())
×
2969

×
2970
                key := newRejectCacheKey(
×
2971
                        upd.ShortChannelID.ToUint64(),
×
2972
                        sourceToPub(nMsg.source),
×
2973
                )
×
2974
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2975

×
2976
                nMsg.err <- err
×
2977
                return nil, false
×
2978
        }
×
2979

2980
        blockHeight := upd.ShortChannelID.BlockHeight
3✔
2981
        shortChanID := upd.ShortChannelID.ToUint64()
3✔
2982

3✔
2983
        // If the advertised inclusionary block is beyond our knowledge of the
3✔
2984
        // chain tip, then we'll put the announcement in limbo to be fully
3✔
2985
        // verified once we advance forward in the chain. If the update has an
3✔
2986
        // alias SCID, we'll skip the isPremature check. This is necessary
3✔
2987
        // since aliases start at block height 16_000_000.
3✔
2988
        d.Lock()
3✔
2989
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
3✔
2990
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
3✔
2991

×
2992
                log.Warnf("Update announcement for short_chan_id(%v), is "+
×
2993
                        "premature: advertises height %v, only height %v is "+
×
2994
                        "known", shortChanID, blockHeight, d.bestHeight)
×
2995
                d.Unlock()
×
2996
                nMsg.err <- nil
×
2997
                return nil, false
×
2998
        }
×
2999
        d.Unlock()
3✔
3000

3✔
3001
        // Before we perform any of the expensive checks below, we'll check
3✔
3002
        // whether this update is stale or is for a zombie channel in order to
3✔
3003
        // quickly reject it.
3✔
3004
        timestamp := time.Unix(int64(upd.Timestamp), 0)
3✔
3005

3✔
3006
        // Fetch the SCID we should be using to lock the channelMtx and make
3✔
3007
        // graph queries with.
3✔
3008
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
3✔
3009
        if err != nil {
6✔
3010
                // Fallback and set the graphScid to the peer-provided SCID.
3✔
3011
                // This will occur for non-option-scid-alias channels and for
3✔
3012
                // public option-scid-alias channels after 6 confirmations.
3✔
3013
                // Once public option-scid-alias channels have 6 confs, we'll
3✔
3014
                // ignore ChannelUpdates with one of their aliases.
3✔
3015
                graphScid = upd.ShortChannelID
3✔
3016
        }
3✔
3017

3018
        // We make sure to obtain the mutex for this channel ID before we access
3019
        // the database. This ensures the state we read from the database has
3020
        // not changed between this point and when we call UpdateEdge() later.
3021
        d.channelMtx.Lock(graphScid.ToUint64())
3✔
3022
        defer d.channelMtx.Unlock(graphScid.ToUint64())
3✔
3023

3✔
3024
        if d.cfg.Graph.IsStaleEdgePolicy(
3✔
3025
                graphScid, timestamp, upd.ChannelFlags,
3✔
3026
        ) {
6✔
3027

3✔
3028
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
3✔
3029
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
3✔
3030
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
3✔
3031
                )
3✔
3032

3✔
3033
                nMsg.err <- nil
3✔
3034
                return nil, true
3✔
3035
        }
3✔
3036

3037
        // Check that the ChanUpdate is not too far into the future, this could
3038
        // reveal some faulty implementation therefore we log an error.
3039
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
3✔
NEW
3040
                err := fmt.Errorf("skewed timestamp of edge policy, "+
×
3041
                        "timestamp too far in the future: %v", timestamp.Unix())
×
3042

×
NEW
3043
                // If this is a channel_update from us, we'll just ignore it.
×
NEW
3044
                if !nMsg.isRemote {
×
NEW
3045
                        nMsg.err <- err
×
NEW
3046
                        return nil, false
×
NEW
3047
                }
×
3048

NEW
3049
                log.Errorf("Increasing ban score for peer=%v due to bad "+
×
NEW
3050
                        "channel_update with short_chan_id(%v): timestamp(%v) "+
×
NEW
3051
                        "too far in the future", nMsg.peer, shortChanID,
×
NEW
3052
                        timestamp.Unix())
×
NEW
3053

×
NEW
3054
                // Increment the peer's ban score if they are skewed channel
×
NEW
3055
                // updates.
×
NEW
3056
                dcErr := d.handleBadPeer(nMsg.peer)
×
NEW
3057
                if dcErr != nil {
×
NEW
3058
                        err = dcErr
×
NEW
3059
                }
×
3060

NEW
3061
                nMsg.err <- err
×
NEW
3062

×
3063
                return nil, false
×
3064
        }
3065

3066
        // Get the node pub key as far since we don't have it in the channel
3067
        // update announcement message. We'll need this to properly verify the
3068
        // message's signature.
3069
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
3✔
3070
        switch {
3✔
3071
        // No error, break.
3072
        case err == nil:
3✔
3073
                break
3✔
3074

UNCOV
3075
        case errors.Is(err, graphdb.ErrZombieEdge):
×
UNCOV
3076
                err = d.processZombieUpdate(ctx, chanInfo, graphScid, upd)
×
UNCOV
3077
                if err != nil {
×
UNCOV
3078
                        log.Debug(err)
×
UNCOV
3079
                        nMsg.err <- err
×
UNCOV
3080
                        return nil, false
×
UNCOV
3081
                }
×
3082

3083
                // We'll fallthrough to ensure we stash the update until we
3084
                // receive its corresponding ChannelAnnouncement. This is
3085
                // needed to ensure the edge exists in the graph before
3086
                // applying the update.
UNCOV
3087
                fallthrough
×
UNCOV
3088
        case errors.Is(err, graphdb.ErrGraphNotFound):
×
UNCOV
3089
                fallthrough
×
UNCOV
3090
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
×
UNCOV
3091
                fallthrough
×
3092
        case errors.Is(err, graphdb.ErrEdgeNotFound):
3✔
3093
                // If the edge corresponding to this ChannelUpdate was not
3✔
3094
                // found in the graph, this might be a channel in the process
3✔
3095
                // of being opened, and we haven't processed our own
3✔
3096
                // ChannelAnnouncement yet, hence it is not not found in the
3✔
3097
                // graph. This usually gets resolved after the channel proofs
3✔
3098
                // are exchanged and the channel is broadcasted to the rest of
3✔
3099
                // the network, but in case this is a private channel this
3✔
3100
                // won't ever happen. This can also happen in the case of a
3✔
3101
                // zombie channel with a fresh update for which we don't have a
3✔
3102
                // ChannelAnnouncement for since we reject them. Because of
3✔
3103
                // this, we temporarily add it to a map, and reprocess it after
3✔
3104
                // our own ChannelAnnouncement has been processed.
3✔
3105
                //
3✔
3106
                // The shortChanID may be an alias, but it is fine to use here
3✔
3107
                // since we don't have an edge in the graph and if the peer is
3✔
3108
                // not buggy, we should be able to use it once the gossiper
3✔
3109
                // receives the local announcement.
3✔
3110
                pMsg := &processedNetworkMsg{msg: nMsg}
3✔
3111

3✔
3112
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
3✔
3113
                switch {
3✔
3114
                // Nothing in the cache yet, we can just directly insert this
3115
                // element.
3116
                case err == cache.ErrElementNotFound:
3✔
3117
                        _, _ = d.prematureChannelUpdates.Put(
3✔
3118
                                shortChanID, &cachedNetworkMsg{
3✔
3119
                                        msgs: []*processedNetworkMsg{pMsg},
3✔
3120
                                })
3✔
3121

3122
                // There's already something in the cache, so we'll combine the
3123
                // set of messages into a single value.
3124
                default:
3✔
3125
                        msgs := earlyMsgs.msgs
3✔
3126
                        msgs = append(msgs, pMsg)
3✔
3127
                        _, _ = d.prematureChannelUpdates.Put(
3✔
3128
                                shortChanID, &cachedNetworkMsg{
3✔
3129
                                        msgs: msgs,
3✔
3130
                                })
3✔
3131
                }
3132

3133
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
3✔
3134
                        "(shortChanID=%v), saving for reprocessing later",
3✔
3135
                        shortChanID)
3✔
3136

3✔
3137
                // NOTE: We don't return anything on the error channel for this
3✔
3138
                // message, as we expect that will be done when this
3✔
3139
                // ChannelUpdate is later reprocessed. This might never happen
3✔
3140
                // if the corresponding ChannelAnnouncement is never received
3✔
3141
                // or the LRU cache is filled up and the entry is evicted.
3✔
3142
                return nil, false
3✔
3143

3144
        default:
×
3145
                err := fmt.Errorf("unable to validate channel update "+
×
3146
                        "short_chan_id=%v: %v", shortChanID, err)
×
3147
                log.Error(err)
×
3148
                nMsg.err <- err
×
3149

×
3150
                key := newRejectCacheKey(
×
3151
                        upd.ShortChannelID.ToUint64(),
×
3152
                        sourceToPub(nMsg.source),
×
3153
                )
×
3154
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3155

×
3156
                return nil, false
×
3157
        }
3158

3159
        // The least-significant bit in the flag on the channel update
3160
        // announcement tells us "which" side of the channels directed edge is
3161
        // being updated.
3162
        var (
3✔
3163
                pubKey       *btcec.PublicKey
3✔
3164
                edgeToUpdate *models.ChannelEdgePolicy
3✔
3165
        )
3✔
3166
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
3✔
3167
        switch direction {
3✔
3168
        case 0:
3✔
3169
                pubKey, _ = chanInfo.NodeKey1()
3✔
3170
                edgeToUpdate = e1
3✔
3171
        case 1:
3✔
3172
                pubKey, _ = chanInfo.NodeKey2()
3✔
3173
                edgeToUpdate = e2
3✔
3174
        }
3175

3176
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
3✔
3177
                "edge policy=%v", chanInfo.ChannelID,
3✔
3178
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
3✔
3179

3✔
3180
        // Validate the channel announcement with the expected public key and
3✔
3181
        // channel capacity. In the case of an invalid channel update, we'll
3✔
3182
        // return an error to the caller and exit early.
3✔
3183
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
3✔
3184
        if err != nil {
3✔
UNCOV
3185
                rErr := fmt.Errorf("unable to validate channel update "+
×
UNCOV
3186
                        "announcement for short_chan_id=%v: %v",
×
UNCOV
3187
                        spew.Sdump(upd.ShortChannelID), err)
×
UNCOV
3188

×
UNCOV
3189
                log.Error(rErr)
×
UNCOV
3190
                nMsg.err <- rErr
×
UNCOV
3191
                return nil, false
×
UNCOV
3192
        }
×
3193

3194
        // If we have a previous version of the edge being updated, we'll want
3195
        // to rate limit its updates to prevent spam throughout the network.
3196
        if nMsg.isRemote && edgeToUpdate != nil {
6✔
3197
                // If it's a keep-alive update, we'll only propagate one if
3✔
3198
                // it's been a day since the previous. This follows our own
3✔
3199
                // heuristic of sending keep-alive updates after the same
3✔
3200
                // duration (see retransmitStaleAnns).
3✔
3201
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
3✔
3202
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
6✔
3203
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
6✔
3204
                                log.Debugf("Ignoring keep alive update not "+
3✔
3205
                                        "within %v period for channel %v",
3✔
3206
                                        d.cfg.RebroadcastInterval, shortChanID)
3✔
3207
                                nMsg.err <- nil
3✔
3208
                                return nil, false
3✔
3209
                        }
3✔
3210
                } else {
3✔
3211
                        // If it's not, we'll allow an update per minute with a
3✔
3212
                        // maximum burst of 10. If we haven't seen an update
3✔
3213
                        // for this channel before, we'll need to initialize a
3✔
3214
                        // rate limiter for each direction.
3✔
3215
                        //
3✔
3216
                        // Since the edge exists in the graph, we'll create a
3✔
3217
                        // rate limiter for chanInfo.ChannelID rather then the
3✔
3218
                        // SCID the peer sent. This is because there may be
3✔
3219
                        // multiple aliases for a channel and we may otherwise
3✔
3220
                        // rate-limit only a single alias of the channel,
3✔
3221
                        // instead of the whole channel.
3✔
3222
                        baseScid := chanInfo.ChannelID
3✔
3223
                        d.Lock()
3✔
3224
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
3✔
3225
                        if !ok {
6✔
3226
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
3✔
3227
                                b := d.cfg.MaxChannelUpdateBurst
3✔
3228
                                rls = [2]*rate.Limiter{
3✔
3229
                                        rate.NewLimiter(r, b),
3✔
3230
                                        rate.NewLimiter(r, b),
3✔
3231
                                }
3✔
3232
                                d.chanUpdateRateLimiter[baseScid] = rls
3✔
3233
                        }
3✔
3234
                        d.Unlock()
3✔
3235

3✔
3236
                        if !rls[direction].Allow() {
6✔
3237
                                log.Debugf("Rate limiting update for channel "+
3✔
3238
                                        "%v from direction %x", shortChanID,
3✔
3239
                                        pubKey.SerializeCompressed())
3✔
3240
                                nMsg.err <- nil
3✔
3241
                                return nil, false
3✔
3242
                        }
3✔
3243
                }
3244
        }
3245

3246
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3247
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3248
        // lookup the stored SCID. If we're sending the update, we'll always
3249
        // use the SCID stored in the database rather than a potentially
3250
        // different alias. This might mean that SigBytes is incorrect as it
3251
        // signs a different SCID than the database SCID, but since there will
3252
        // only be a difference if AuthProof == nil, this is fine.
3253
        update := &models.ChannelEdgePolicy{
3✔
3254
                SigBytes:                  upd.Signature.ToSignatureBytes(),
3✔
3255
                ChannelID:                 chanInfo.ChannelID,
3✔
3256
                LastUpdate:                timestamp,
3✔
3257
                MessageFlags:              upd.MessageFlags,
3✔
3258
                ChannelFlags:              upd.ChannelFlags,
3✔
3259
                TimeLockDelta:             upd.TimeLockDelta,
3✔
3260
                MinHTLC:                   upd.HtlcMinimumMsat,
3✔
3261
                MaxHTLC:                   upd.HtlcMaximumMsat,
3✔
3262
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
3✔
3263
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
3✔
3264
                InboundFee:                upd.InboundFee.ValOpt(),
3✔
3265
                ExtraOpaqueData:           upd.ExtraOpaqueData,
3✔
3266
        }
3✔
3267

3✔
3268
        if err := d.cfg.Graph.UpdateEdge(ctx, update, ops...); err != nil {
3✔
3269
                if graph.IsError(
×
3270
                        err, graph.ErrOutdated,
×
3271
                        graph.ErrIgnored,
×
3272
                ) {
×
3273

×
3274
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
×
3275
                                shortChanID, err)
×
3276
                } else {
×
3277
                        // Since we know the stored SCID in the graph, we'll
×
3278
                        // cache that SCID.
×
3279
                        key := newRejectCacheKey(
×
3280
                                chanInfo.ChannelID,
×
3281
                                sourceToPub(nMsg.source),
×
3282
                        )
×
3283
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3284

×
3285
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3286
                                shortChanID, err)
×
3287
                }
×
3288

3289
                nMsg.err <- err
×
3290
                return nil, false
×
3291
        }
3292

3293
        // If this is a local ChannelUpdate without an AuthProof, it means it
3294
        // is an update to a channel that is not (yet) supposed to be announced
3295
        // to the greater network. However, our channel counter party will need
3296
        // to be given the update, so we'll try sending the update directly to
3297
        // the remote peer.
3298
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
6✔
3299
                if nMsg.optionalMsgFields != nil {
6✔
3300
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
3✔
3301
                        if remoteAlias != nil {
6✔
3302
                                // The remoteAlias field was specified, meaning
3✔
3303
                                // that we should replace the SCID in the
3✔
3304
                                // update with the remote's alias. We'll also
3✔
3305
                                // need to re-sign the channel update. This is
3✔
3306
                                // required for option-scid-alias feature-bit
3✔
3307
                                // negotiated channels.
3✔
3308
                                upd.ShortChannelID = *remoteAlias
3✔
3309

3✔
3310
                                sig, err := d.cfg.SignAliasUpdate(upd)
3✔
3311
                                if err != nil {
3✔
3312
                                        log.Error(err)
×
3313
                                        nMsg.err <- err
×
3314
                                        return nil, false
×
3315
                                }
×
3316

3317
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
3318
                                if err != nil {
3✔
3319
                                        log.Error(err)
×
3320
                                        nMsg.err <- err
×
3321
                                        return nil, false
×
3322
                                }
×
3323

3324
                                upd.Signature = lnSig
3✔
3325
                        }
3326
                }
3327

3328
                // Get our peer's public key.
3329
                remotePubKey := remotePubFromChanInfo(
3✔
3330
                        chanInfo, upd.ChannelFlags,
3✔
3331
                )
3✔
3332

3✔
3333
                log.Debugf("The message %v has no AuthProof, sending the "+
3✔
3334
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
3✔
3335

3✔
3336
                // Now we'll attempt to send the channel update message
3✔
3337
                // reliably to the remote peer in the background, so that we
3✔
3338
                // don't block if the peer happens to be offline at the moment.
3✔
3339
                err := d.reliableSender.sendMessage(ctx, upd, remotePubKey)
3✔
3340
                if err != nil {
3✔
3341
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3342
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3343
                                upd.ShortChannelID, remotePubKey, err)
×
3344
                        nMsg.err <- err
×
3345
                        return nil, false
×
3346
                }
×
3347
        }
3348

3349
        // Channel update announcement was successfully processed and now it
3350
        // can be broadcast to the rest of the network. However, we'll only
3351
        // broadcast the channel update announcement if it has an attached
3352
        // authentication proof. We also won't broadcast the update if it
3353
        // contains an alias because the network would reject this.
3354
        var announcements []networkMsg
3✔
3355
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
6✔
3356
                announcements = append(announcements, networkMsg{
3✔
3357
                        peer:     nMsg.peer,
3✔
3358
                        source:   nMsg.source,
3✔
3359
                        isRemote: nMsg.isRemote,
3✔
3360
                        msg:      upd,
3✔
3361
                })
3✔
3362
        }
3✔
3363

3364
        nMsg.err <- nil
3✔
3365

3✔
3366
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
3✔
3367
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
3✔
3368
                timestamp)
3✔
3369
        return announcements, true
3✔
3370
}
3371

3372
// handleAnnSig processes a new announcement signatures message.
3373
//
3374
//nolint:funlen
3375
func (d *AuthenticatedGossiper) handleAnnSig(ctx context.Context,
3376
        nMsg *networkMsg, ann *lnwire.AnnounceSignatures1) ([]networkMsg,
3377
        bool) {
3✔
3378

3✔
3379
        needBlockHeight := ann.ShortChannelID.BlockHeight +
3✔
3380
                d.cfg.ProofMatureDelta
3✔
3381
        shortChanID := ann.ShortChannelID.ToUint64()
3✔
3382

3✔
3383
        prefix := "local"
3✔
3384
        if nMsg.isRemote {
6✔
3385
                prefix = "remote"
3✔
3386
        }
3✔
3387

3388
        log.Infof("Received new %v announcement signature for %v", prefix,
3✔
3389
                ann.ShortChannelID)
3✔
3390

3✔
3391
        // By the specification, channel announcement proofs should be sent
3✔
3392
        // after some number of confirmations after channel was registered in
3✔
3393
        // bitcoin blockchain. Therefore, we check if the proof is mature.
3✔
3394
        d.Lock()
3✔
3395
        premature := d.isPremature(
3✔
3396
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
3✔
3397
        )
3✔
3398
        if premature {
6✔
3399
                log.Warnf("Premature proof announcement, current block height"+
3✔
3400
                        "lower than needed: %v < %v", d.bestHeight,
3✔
3401
                        needBlockHeight)
3✔
3402
                d.Unlock()
3✔
3403
                nMsg.err <- nil
3✔
3404
                return nil, false
3✔
3405
        }
3✔
3406
        d.Unlock()
3✔
3407

3✔
3408
        // Ensure that we know of a channel with the target channel ID before
3✔
3409
        // proceeding further.
3✔
3410
        //
3✔
3411
        // We must acquire the mutex for this channel ID before getting the
3✔
3412
        // channel from the database, to ensure what we read does not change
3✔
3413
        // before we call AddProof() later.
3✔
3414
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
3✔
3415
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
3✔
3416

3✔
3417
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
3✔
3418
                ann.ShortChannelID,
3✔
3419
        )
3✔
3420
        if err != nil {
6✔
3421
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
3✔
3422
                if err != nil {
6✔
3423
                        err := fmt.Errorf("unable to store the proof for "+
3✔
3424
                                "short_chan_id=%v: %v", shortChanID, err)
3✔
3425
                        log.Error(err)
3✔
3426
                        nMsg.err <- err
3✔
3427

3✔
3428
                        return nil, false
3✔
3429
                }
3✔
3430

3431
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
3✔
3432
                err := d.cfg.WaitingProofStore.Add(proof)
3✔
3433
                if err != nil {
3✔
3434
                        err := fmt.Errorf("unable to store the proof for "+
×
3435
                                "short_chan_id=%v: %v", shortChanID, err)
×
3436
                        log.Error(err)
×
3437
                        nMsg.err <- err
×
3438
                        return nil, false
×
3439
                }
×
3440

3441
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
3✔
3442
                        ", adding to waiting batch", prefix, shortChanID)
3✔
3443
                nMsg.err <- nil
3✔
3444
                return nil, false
3✔
3445
        }
3446

3447
        nodeID := nMsg.source.SerializeCompressed()
3✔
3448
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
3✔
3449
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
3✔
3450

3✔
3451
        // Ensure that channel that was retrieved belongs to the peer which
3✔
3452
        // sent the proof announcement.
3✔
3453
        if !(isFirstNode || isSecondNode) {
3✔
3454
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3455
                        "to the peer which sent the proof, short_chan_id=%v",
×
3456
                        shortChanID)
×
3457
                log.Error(err)
×
3458
                nMsg.err <- err
×
3459
                return nil, false
×
3460
        }
×
3461

3462
        // If proof was sent by a local sub-system, then we'll send the
3463
        // announcement signature to the remote node so they can also
3464
        // reconstruct the full channel announcement.
3465
        if !nMsg.isRemote {
6✔
3466
                var remotePubKey [33]byte
3✔
3467
                if isFirstNode {
6✔
3468
                        remotePubKey = chanInfo.NodeKey2Bytes
3✔
3469
                } else {
6✔
3470
                        remotePubKey = chanInfo.NodeKey1Bytes
3✔
3471
                }
3✔
3472

3473
                // Since the remote peer might not be online we'll call a
3474
                // method that will attempt to deliver the proof when it comes
3475
                // online.
3476
                err := d.reliableSender.sendMessage(ctx, ann, remotePubKey)
3✔
3477
                if err != nil {
3✔
3478
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3479
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3480
                                ann.ShortChannelID, remotePubKey, err)
×
3481
                        nMsg.err <- err
×
3482
                        return nil, false
×
3483
                }
×
3484
        }
3485

3486
        // Check if we already have the full proof for this channel.
3487
        if chanInfo.AuthProof != nil {
6✔
3488
                // If we already have the fully assembled proof, then the peer
3✔
3489
                // sending us their proof has probably not received our local
3✔
3490
                // proof yet. So be kind and send them the full proof.
3✔
3491
                if nMsg.isRemote {
6✔
3492
                        peerID := nMsg.source.SerializeCompressed()
3✔
3493
                        log.Debugf("Got AnnounceSignatures for channel with " +
3✔
3494
                                "full proof.")
3✔
3495

3✔
3496
                        d.wg.Add(1)
3✔
3497
                        go func() {
6✔
3498
                                defer d.wg.Done()
3✔
3499

3✔
3500
                                log.Debugf("Received half proof for channel "+
3✔
3501
                                        "%v with existing full proof. Sending"+
3✔
3502
                                        " full proof to peer=%x",
3✔
3503
                                        ann.ChannelID, peerID)
3✔
3504

3✔
3505
                                ca, _, _, err := netann.CreateChanAnnouncement(
3✔
3506
                                        chanInfo.AuthProof, chanInfo, e1, e2,
3✔
3507
                                )
3✔
3508
                                if err != nil {
3✔
3509
                                        log.Errorf("unable to gen ann: %v",
×
3510
                                                err)
×
3511
                                        return
×
3512
                                }
×
3513

3514
                                err = nMsg.peer.SendMessage(false, ca)
3✔
3515
                                if err != nil {
3✔
3516
                                        log.Errorf("Failed sending full proof"+
×
3517
                                                " to peer=%x: %v", peerID, err)
×
3518
                                        return
×
3519
                                }
×
3520

3521
                                log.Debugf("Full proof sent to peer=%x for "+
3✔
3522
                                        "chanID=%v", peerID, ann.ChannelID)
3✔
3523
                        }()
3524
                }
3525

3526
                log.Debugf("Already have proof for channel with chanID=%v",
3✔
3527
                        ann.ChannelID)
3✔
3528
                nMsg.err <- nil
3✔
3529
                return nil, true
3✔
3530
        }
3531

3532
        // Check that we received the opposite proof. If so, then we're now
3533
        // able to construct the full proof, and create the channel
3534
        // announcement. If we didn't receive the opposite half of the proof
3535
        // then we should store this one, and wait for the opposite to be
3536
        // received.
3537
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
3✔
3538
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
3✔
3539
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
3✔
3540
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3541
                        "short_chan_id=%v: %v", shortChanID, err)
×
3542
                log.Error(err)
×
3543
                nMsg.err <- err
×
3544
                return nil, false
×
3545
        }
×
3546

3547
        if err == channeldb.ErrWaitingProofNotFound {
6✔
3548
                err := d.cfg.WaitingProofStore.Add(proof)
3✔
3549
                if err != nil {
3✔
3550
                        err := fmt.Errorf("unable to store the proof for "+
×
3551
                                "short_chan_id=%v: %v", shortChanID, err)
×
3552
                        log.Error(err)
×
3553
                        nMsg.err <- err
×
3554
                        return nil, false
×
3555
                }
×
3556

3557
                log.Infof("1/2 of channel ann proof received for "+
3✔
3558
                        "short_chan_id=%v, waiting for other half",
3✔
3559
                        shortChanID)
3✔
3560

3✔
3561
                nMsg.err <- nil
3✔
3562
                return nil, false
3✔
3563
        }
3564

3565
        // We now have both halves of the channel announcement proof, then
3566
        // we'll reconstruct the initial announcement so we can validate it
3567
        // shortly below.
3568
        var dbProof models.ChannelAuthProof
3✔
3569
        if isFirstNode {
6✔
3570
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
3✔
3571
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
3✔
3572
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
3✔
3573
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
3✔
3574
        } else {
6✔
3575
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
3✔
3576
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
3✔
3577
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
3✔
3578
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
3✔
3579
        }
3✔
3580

3581
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
3✔
3582
                &dbProof, chanInfo, e1, e2,
3✔
3583
        )
3✔
3584
        if err != nil {
3✔
3585
                log.Error(err)
×
3586
                nMsg.err <- err
×
3587
                return nil, false
×
3588
        }
×
3589

3590
        // With all the necessary components assembled validate the full
3591
        // channel announcement proof.
3592
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
3✔
3593
        if err != nil {
3✔
3594
                err := fmt.Errorf("channel announcement proof for "+
×
3595
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3596

×
3597
                log.Error(err)
×
3598
                nMsg.err <- err
×
3599
                return nil, false
×
3600
        }
×
3601

3602
        // If the channel was returned by the router it means that existence of
3603
        // funding point and inclusion of nodes bitcoin keys in it already
3604
        // checked by the router. In this stage we should check that node keys
3605
        // attest to the bitcoin keys by validating the signatures of
3606
        // announcement. If proof is valid then we'll populate the channel edge
3607
        // with it, so we can announce it on peer connect.
3608
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
3✔
3609
        if err != nil {
3✔
3610
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3611
                        " %v", ann.ChannelID, err)
×
3612
                log.Error(err)
×
3613
                nMsg.err <- err
×
3614
                return nil, false
×
3615
        }
×
3616

3617
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
3✔
3618
        if err != nil {
3✔
3619
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3620
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3621
                log.Error(err)
×
3622
                nMsg.err <- err
×
3623
                return nil, false
×
3624
        }
×
3625

3626
        // Proof was successfully created and now can announce the channel to
3627
        // the remain network.
3628
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
3✔
3629
                ", adding to next ann batch", shortChanID)
3✔
3630

3✔
3631
        // Assemble the necessary announcements to add to the next broadcasting
3✔
3632
        // batch.
3✔
3633
        var announcements []networkMsg
3✔
3634
        announcements = append(announcements, networkMsg{
3✔
3635
                peer:   nMsg.peer,
3✔
3636
                source: nMsg.source,
3✔
3637
                msg:    chanAnn,
3✔
3638
        })
3✔
3639
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
6✔
3640
                announcements = append(announcements, networkMsg{
3✔
3641
                        peer:   nMsg.peer,
3✔
3642
                        source: src,
3✔
3643
                        msg:    e1Ann,
3✔
3644
                })
3✔
3645
        }
3✔
3646
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
6✔
3647
                announcements = append(announcements, networkMsg{
3✔
3648
                        peer:   nMsg.peer,
3✔
3649
                        source: src,
3✔
3650
                        msg:    e2Ann,
3✔
3651
                })
3✔
3652
        }
3✔
3653

3654
        // We'll also send along the node announcements for each channel
3655
        // participant if we know of them. To ensure our node announcement
3656
        // propagates to our channel counterparty, we'll set the source for
3657
        // each announcement to the node it belongs to, otherwise we won't send
3658
        // it since the source gets skipped. This isn't necessary for channel
3659
        // updates and announcement signatures since we send those directly to
3660
        // our channel counterparty through the gossiper's reliable sender.
3661
        node1Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey1Bytes)
3✔
3662
        if err != nil {
6✔
3663
                log.Debugf("Unable to fetch node announcement for %x: %v",
3✔
3664
                        chanInfo.NodeKey1Bytes, err)
3✔
3665
        } else {
6✔
3666
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
6✔
3667
                        announcements = append(announcements, networkMsg{
3✔
3668
                                peer:   nMsg.peer,
3✔
3669
                                source: nodeKey1,
3✔
3670
                                msg:    node1Ann,
3✔
3671
                        })
3✔
3672
                }
3✔
3673
        }
3674

3675
        node2Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey2Bytes)
3✔
3676
        if err != nil {
6✔
3677
                log.Debugf("Unable to fetch node announcement for %x: %v",
3✔
3678
                        chanInfo.NodeKey2Bytes, err)
3✔
3679
        } else {
6✔
3680
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
6✔
3681
                        announcements = append(announcements, networkMsg{
3✔
3682
                                peer:   nMsg.peer,
3✔
3683
                                source: nodeKey2,
3✔
3684
                                msg:    node2Ann,
3✔
3685
                        })
3✔
3686
                }
3✔
3687
        }
3688

3689
        nMsg.err <- nil
3✔
3690
        return announcements, true
3✔
3691
}
3692

3693
// isBanned returns true if the peer identified by pubkey is banned for sending
3694
// invalid channel announcements.
3695
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
3✔
3696
        return d.banman.isBanned(pubkey)
3✔
3697
}
3✔
3698

3699
// ShouldDisconnect returns true if we should disconnect the peer identified by
3700
// pubkey.
3701
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3702
        bool, error) {
3✔
3703

3✔
3704
        pubkeySer := pubkey.SerializeCompressed()
3✔
3705

3✔
3706
        var pubkeyBytes [33]byte
3✔
3707
        copy(pubkeyBytes[:], pubkeySer)
3✔
3708

3✔
3709
        // If the public key is banned, check whether or not this is a channel
3✔
3710
        // peer.
3✔
3711
        if d.isBanned(pubkeyBytes) {
3✔
UNCOV
3712
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
×
UNCOV
3713
                if err != nil {
×
3714
                        return false, err
×
3715
                }
×
3716

3717
                // We should only disconnect non-channel peers.
UNCOV
3718
                if !isChanPeer {
×
UNCOV
3719
                        return true, nil
×
UNCOV
3720
                }
×
3721
        }
3722

3723
        return false, nil
3✔
3724
}
3725

3726
// validateFundingTransaction fetches the channel announcements claimed funding
3727
// transaction from chain to ensure that it exists, is not spent and matches
3728
// the channel announcement proof. The transaction's outpoint and value are
3729
// returned if we can glean them from the work done in this method.
3730
func (d *AuthenticatedGossiper) validateFundingTransaction(_ context.Context,
3731
        ann *lnwire.ChannelAnnouncement1,
3732
        tapscriptRoot fn.Option[chainhash.Hash]) (wire.OutPoint, btcutil.Amount,
3733
        []byte, error) {
3✔
3734

3✔
3735
        scid := ann.ShortChannelID
3✔
3736

3✔
3737
        // Before we can add the channel to the channel graph, we need to obtain
3✔
3738
        // the full funding outpoint that's encoded within the channel ID.
3✔
3739
        fundingTx, err := lnwallet.FetchFundingTxWrapper(
3✔
3740
                d.cfg.ChainIO, &scid, d.quit,
3✔
3741
        )
3✔
3742
        if err != nil {
3✔
UNCOV
3743
                //nolint:ll
×
UNCOV
3744
                //
×
UNCOV
3745
                // In order to ensure we don't erroneously mark a channel as a
×
UNCOV
3746
                // zombie due to an RPC failure, we'll attempt to string match
×
UNCOV
3747
                // for the relevant errors.
×
UNCOV
3748
                //
×
UNCOV
3749
                // * btcd:
×
UNCOV
3750
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
×
UNCOV
3751
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
×
UNCOV
3752
                // * bitcoind:
×
UNCOV
3753
                //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
×
UNCOV
3754
                //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
×
UNCOV
3755
                switch {
×
UNCOV
3756
                case strings.Contains(err.Error(), "not found"):
×
UNCOV
3757
                        fallthrough
×
3758

UNCOV
3759
                case strings.Contains(err.Error(), "out of range"):
×
UNCOV
3760
                        // If the funding transaction isn't found at all, then
×
UNCOV
3761
                        // we'll mark the edge itself as a zombie so we don't
×
UNCOV
3762
                        // continue to request it. We use the "zero key" for
×
UNCOV
3763
                        // both node pubkeys so this edge can't be resurrected.
×
UNCOV
3764
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
×
UNCOV
3765
                        if zErr != nil {
×
3766
                                return wire.OutPoint{}, 0, nil, zErr
×
3767
                        }
×
3768

3769
                default:
×
3770
                }
3771

UNCOV
3772
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
×
UNCOV
3773
                        ErrNoFundingTransaction, err)
×
3774
        }
3775

3776
        // Recreate witness output to be sure that declared in channel edge
3777
        // bitcoin keys and channel value corresponds to the reality.
3778
        fundingPkScript, err := makeFundingScript(
3✔
3779
                ann.BitcoinKey1[:], ann.BitcoinKey2[:], ann.Features,
3✔
3780
                tapscriptRoot,
3✔
3781
        )
3✔
3782
        if err != nil {
3✔
3783
                return wire.OutPoint{}, 0, nil, err
×
3784
        }
×
3785

3786
        // Next we'll validate that this channel is actually well formed. If
3787
        // this check fails, then this channel either doesn't exist, or isn't
3788
        // the one that was meant to be created according to the passed channel
3789
        // proofs.
3790
        fundingPoint, err := chanvalidate.Validate(
3✔
3791
                &chanvalidate.Context{
3✔
3792
                        Locator: &chanvalidate.ShortChanIDChanLocator{
3✔
3793
                                ID: scid,
3✔
3794
                        },
3✔
3795
                        MultiSigPkScript: fundingPkScript,
3✔
3796
                        FundingTx:        fundingTx,
3✔
3797
                },
3✔
3798
        )
3✔
3799
        if err != nil {
3✔
UNCOV
3800
                // Mark the edge as a zombie so we won't try to re-validate it
×
UNCOV
3801
                // on start up.
×
UNCOV
3802
                zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
×
UNCOV
3803
                if zErr != nil {
×
3804
                        return wire.OutPoint{}, 0, nil, zErr
×
3805
                }
×
3806

UNCOV
3807
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
×
UNCOV
3808
                        ErrInvalidFundingOutput, err)
×
3809
        }
3810

3811
        // Now that we have the funding outpoint of the channel, ensure
3812
        // that it hasn't yet been spent. If so, then this channel has
3813
        // been closed so we'll ignore it.
3814
        chanUtxo, err := d.cfg.ChainIO.GetUtxo(
3✔
3815
                fundingPoint, fundingPkScript, scid.BlockHeight, d.quit,
3✔
3816
        )
3✔
3817
        if err != nil {
3✔
UNCOV
3818
                if errors.Is(err, btcwallet.ErrOutputSpent) {
×
UNCOV
3819
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
×
UNCOV
3820
                        if zErr != nil {
×
3821
                                return wire.OutPoint{}, 0, nil, zErr
×
3822
                        }
×
3823
                }
3824

UNCOV
3825
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: unable to "+
×
UNCOV
3826
                        "fetch utxo for chan_id=%v, chan_point=%v: %w",
×
UNCOV
3827
                        ErrChannelSpent, scid.ToUint64(), fundingPoint, err)
×
3828
        }
3829

3830
        return *fundingPoint, btcutil.Amount(chanUtxo.Value), fundingPkScript,
3✔
3831
                nil
3✔
3832
}
3833

3834
// handleBadPeer takes a misbehaving peer and increases its ban score. Once
3835
// increased, it will disconnect the peer if its ban score has reached
3836
// `banThreshold` and it doesn't have a channel with us.
NEW
3837
func (d *AuthenticatedGossiper) handleBadPeer(peer lnpeer.Peer) error {
×
NEW
3838
        // Increment the peer's ban score for misbehavior.
×
NEW
3839
        d.banman.incrementBanScore(peer.PubKey())
×
NEW
3840

×
NEW
3841
        // If the peer is banned and not a channel peer, we'll disconnect them.
×
NEW
3842
        shouldDc, dcErr := d.ShouldDisconnect(peer.IdentityKey())
×
NEW
3843
        if dcErr != nil {
×
NEW
3844
                log.Errorf("failed to check if we should disconnect peer: %v",
×
NEW
3845
                        dcErr)
×
NEW
3846

×
NEW
3847
                return dcErr
×
NEW
3848
        }
×
3849

NEW
3850
        if shouldDc {
×
NEW
3851
                peer.Disconnect(ErrPeerBanned)
×
NEW
3852
        }
×
3853

NEW
3854
        return nil
×
3855
}
3856

3857
// makeFundingScript is used to make the funding script for both segwit v0 and
3858
// segwit v1 (taproot) channels.
3859
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
3860
        features *lnwire.RawFeatureVector,
3861
        tapscriptRoot fn.Option[chainhash.Hash]) ([]byte, error) {
3✔
3862

3✔
3863
        legacyFundingScript := func() ([]byte, error) {
6✔
3864
                witnessScript, err := input.GenMultiSigScript(
3✔
3865
                        bitcoinKey1, bitcoinKey2,
3✔
3866
                )
3✔
3867
                if err != nil {
3✔
3868
                        return nil, err
×
3869
                }
×
3870
                pkScript, err := input.WitnessScriptHash(witnessScript)
3✔
3871
                if err != nil {
3✔
3872
                        return nil, err
×
3873
                }
×
3874

3875
                return pkScript, nil
3✔
3876
        }
3877

3878
        if features.IsEmpty() {
6✔
3879
                return legacyFundingScript()
3✔
3880
        }
3✔
3881

3882
        chanFeatureBits := lnwire.NewFeatureVector(features, lnwire.Features)
3✔
3883
        if chanFeatureBits.HasFeature(
3✔
3884
                lnwire.SimpleTaprootChannelsOptionalStaging,
3✔
3885
        ) {
6✔
3886

3✔
3887
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
3✔
3888
                if err != nil {
3✔
3889
                        return nil, err
×
3890
                }
×
3891
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
3✔
3892
                if err != nil {
3✔
3893
                        return nil, err
×
3894
                }
×
3895

3896
                fundingScript, _, err := input.GenTaprootFundingScript(
3✔
3897
                        pubKey1, pubKey2, 0, tapscriptRoot,
3✔
3898
                )
3✔
3899
                if err != nil {
3✔
3900
                        return nil, err
×
3901
                }
×
3902

3903
                // TODO(roasbeef): add tapscript root to gossip v1.5
3904

3905
                return fundingScript, nil
3✔
3906
        }
3907

3908
        return legacyFundingScript()
×
3909
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc