• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17830307614

18 Sep 2025 01:29PM UTC coverage: 54.617% (-12.0%) from 66.637%
17830307614

Pull #10200

github

web-flow
Merge 181a0a7bc into b34fc964b
Pull Request #10200: github: change to form-based issue template

109249 of 200028 relevant lines covered (54.62%)

21896.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.27
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/chaincfg/chainhash"
17
        "github.com/btcsuite/btcd/wire"
18
        "github.com/lightninglabs/neutrino/cache"
19
        "github.com/lightninglabs/neutrino/cache/lru"
20
        "github.com/lightningnetwork/lnd/batch"
21
        "github.com/lightningnetwork/lnd/chainntnfs"
22
        "github.com/lightningnetwork/lnd/channeldb"
23
        "github.com/lightningnetwork/lnd/fn/v2"
24
        "github.com/lightningnetwork/lnd/graph"
25
        graphdb "github.com/lightningnetwork/lnd/graph/db"
26
        "github.com/lightningnetwork/lnd/graph/db/models"
27
        "github.com/lightningnetwork/lnd/input"
28
        "github.com/lightningnetwork/lnd/keychain"
29
        "github.com/lightningnetwork/lnd/lnpeer"
30
        "github.com/lightningnetwork/lnd/lnutils"
31
        "github.com/lightningnetwork/lnd/lnwallet"
32
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
33
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
34
        "github.com/lightningnetwork/lnd/lnwire"
35
        "github.com/lightningnetwork/lnd/multimutex"
36
        "github.com/lightningnetwork/lnd/netann"
37
        "github.com/lightningnetwork/lnd/routing/route"
38
        "github.com/lightningnetwork/lnd/ticker"
39
        "golang.org/x/time/rate"
40
)
41

42
const (
43
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
44
        // for a specific channel and direction that we'll accept over an
45
        // interval.
46
        DefaultMaxChannelUpdateBurst = 10
47

48
        // DefaultChannelUpdateInterval is the default interval we'll use to
49
        // determine how often we should allow a new update for a specific
50
        // channel and direction.
51
        DefaultChannelUpdateInterval = time.Minute
52

53
        // maxPrematureUpdates tracks the max amount of premature channel
54
        // updates that we'll hold onto.
55
        maxPrematureUpdates = 100
56

57
        // maxFutureMessages tracks the max amount of future messages that
58
        // we'll hold onto.
59
        maxFutureMessages = 1000
60

61
        // DefaultSubBatchDelay is the default delay we'll use when
62
        // broadcasting the next announcement batch.
63
        DefaultSubBatchDelay = 5 * time.Second
64

65
        // maxRejectedUpdates tracks the max amount of rejected channel updates
66
        // we'll maintain. This is the global size across all peers. We'll
67
        // allocate ~3 MB max to the cache.
68
        maxRejectedUpdates = 10_000
69

70
        // DefaultProofMatureDelta specifies the default value used for
71
        // ProofMatureDelta, which is the number of confirmations needed before
72
        // processing the announcement signatures.
73
        DefaultProofMatureDelta = 6
74
)
75

76
var (
77
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
78
        // is in the process of being shut down.
79
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
80

81
        // ErrGossipSyncerNotFound signals that we were unable to find an active
82
        // gossip syncer corresponding to a gossip query message received from
83
        // the remote peer.
84
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
85

86
        // ErrNoFundingTransaction is returned when we are unable to find the
87
        // funding transaction described by the short channel ID on chain.
88
        ErrNoFundingTransaction = errors.New(
89
                "unable to find the funding transaction",
90
        )
91

92
        // ErrInvalidFundingOutput is returned if the channel funding output
93
        // fails validation.
94
        ErrInvalidFundingOutput = errors.New(
95
                "channel funding output validation failed",
96
        )
97

98
        // ErrChannelSpent is returned when we go to validate a channel, but
99
        // the purported funding output has actually already been spent on
100
        // chain.
101
        ErrChannelSpent = errors.New("channel output has been spent")
102

103
        // emptyPubkey is used to compare compressed pubkeys against an empty
104
        // byte array.
105
        emptyPubkey [33]byte
106
)
107

108
// optionalMsgFields is a set of optional message fields that external callers
109
// can provide that serve useful when processing a specific network
110
// announcement.
111
type optionalMsgFields struct {
112
        capacity      *btcutil.Amount
113
        channelPoint  *wire.OutPoint
114
        remoteAlias   *lnwire.ShortChannelID
115
        tapscriptRoot fn.Option[chainhash.Hash]
116
}
117

118
// apply applies the optional fields within the functional options.
119
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
47✔
120
        for _, optionalMsgField := range optionalMsgFields {
52✔
121
                optionalMsgField(f)
5✔
122
        }
5✔
123
}
124

125
// OptionalMsgField is a functional option parameter that can be used to provide
126
// external information that is not included within a network message but serves
127
// useful when processing it.
128
type OptionalMsgField func(*optionalMsgFields)
129

130
// ChannelCapacity is an optional field that lets the gossiper know of the
131
// capacity of a channel.
132
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
27✔
133
        return func(f *optionalMsgFields) {
28✔
134
                f.capacity = &capacity
1✔
135
        }
1✔
136
}
137

138
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
139
// of a channel.
140
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
30✔
141
        return func(f *optionalMsgFields) {
34✔
142
                f.channelPoint = &op
4✔
143
        }
4✔
144
}
145

146
// TapscriptRoot is an optional field that lets the gossiper know of the root of
147
// the tapscript tree for a custom channel.
148
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
26✔
149
        return func(f *optionalMsgFields) {
26✔
150
                f.tapscriptRoot = root
×
151
        }
×
152
}
153

154
// RemoteAlias is an optional field that lets the gossiper know that a locally
155
// sent channel update is actually an update for the peer that should replace
156
// the ShortChannelID field with the remote's alias. This is only used for
157
// channels with peers where the option-scid-alias feature bit was negotiated.
158
// The channel update will be added to the graph under the original SCID, but
159
// will be modified and re-signed with this alias.
160
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
26✔
161
        return func(f *optionalMsgFields) {
26✔
162
                f.remoteAlias = alias
×
163
        }
×
164
}
165

166
// networkMsg couples a routing related wire message with the peer that
167
// originally sent it.
168
type networkMsg struct {
169
        peer              lnpeer.Peer
170
        source            *btcec.PublicKey
171
        msg               lnwire.Message
172
        optionalMsgFields *optionalMsgFields
173

174
        isRemote bool
175

176
        err chan error
177
}
178

179
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
180
// wishes to update a particular set of channels. New ChannelUpdate messages
181
// will be crafted to be sent out during the next broadcast epoch and the fee
182
// updates committed to the lower layer.
183
type chanPolicyUpdateRequest struct {
184
        edgesToUpdate []EdgeWithInfo
185
        errChan       chan error
186
}
187

188
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
189
// syncer at all times.
190
type PinnedSyncers map[route.Vertex]struct{}
191

192
// Config defines the configuration for the service. ALL elements within the
193
// configuration MUST be non-nil for the service to carry out its duties.
194
type Config struct {
195
        // ChainHash is a hash that indicates which resident chain of the
196
        // AuthenticatedGossiper. Any announcements that don't match this
197
        // chain hash will be ignored.
198
        //
199
        // TODO(roasbeef): eventually make into map so can de-multiplex
200
        // incoming announcements
201
        //   * also need to do same for Notifier
202
        ChainHash chainhash.Hash
203

204
        // Graph is the subsystem which is responsible for managing the
205
        // topology of lightning network. After incoming channel, node, channel
206
        // updates announcements are validated they are sent to the router in
207
        // order to be included in the LN graph.
208
        Graph graph.ChannelGraphSource
209

210
        // ChainIO represents an abstraction over a source that can query the
211
        // blockchain.
212
        ChainIO lnwallet.BlockChainIO
213

214
        // ChanSeries is an interfaces that provides access to a time series
215
        // view of the current known channel graph. Each GossipSyncer enabled
216
        // peer will utilize this in order to create and respond to channel
217
        // graph time series queries.
218
        ChanSeries ChannelGraphTimeSeries
219

220
        // Notifier is used for receiving notifications of incoming blocks.
221
        // With each new incoming block found we process previously premature
222
        // announcements.
223
        //
224
        // TODO(roasbeef): could possibly just replace this with an epoch
225
        // channel.
226
        Notifier chainntnfs.ChainNotifier
227

228
        // Broadcast broadcasts a particular set of announcements to all peers
229
        // that the daemon is connected to. If supplied, the exclude parameter
230
        // indicates that the target peer should be excluded from the
231
        // broadcast.
232
        Broadcast func(skips map[route.Vertex]struct{},
233
                msg ...lnwire.Message) error
234

235
        // NotifyWhenOnline is a function that allows the gossiper to be
236
        // notified when a certain peer comes online, allowing it to
237
        // retry sending a peer message.
238
        //
239
        // NOTE: The peerChan channel must be buffered.
240
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
241

242
        // NotifyWhenOffline is a function that allows the gossiper to be
243
        // notified when a certain peer disconnects, allowing it to request a
244
        // notification for when it reconnects.
245
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
246

247
        // FetchSelfAnnouncement retrieves our current node announcement, for
248
        // use when determining whether we should update our peers about our
249
        // presence in the network.
250
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
251

252
        // UpdateSelfAnnouncement produces a new announcement for our node with
253
        // an updated timestamp which can be broadcast to our peers.
254
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
255

256
        // ProofMatureDelta the number of confirmations which is needed before
257
        // exchange the channel announcement proofs.
258
        ProofMatureDelta uint32
259

260
        // TrickleDelay the period of trickle timer which flushes to the
261
        // network the pending batch of new announcements we've received since
262
        // the last trickle tick.
263
        TrickleDelay time.Duration
264

265
        // RetransmitTicker is a ticker that ticks with a period which
266
        // indicates that we should check if we need re-broadcast any of our
267
        // personal channels.
268
        RetransmitTicker ticker.Ticker
269

270
        // RebroadcastInterval is the maximum time we wait between sending out
271
        // channel updates for our active channels and our own node
272
        // announcement. We do this to ensure our active presence on the
273
        // network is known, and we are not being considered a zombie node or
274
        // having zombie channels.
275
        RebroadcastInterval time.Duration
276

277
        // WaitingProofStore is a persistent storage of partial channel proof
278
        // announcement messages. We use it to buffer half of the material
279
        // needed to reconstruct a full authenticated channel announcement.
280
        // Once we receive the other half the channel proof, we'll be able to
281
        // properly validate it and re-broadcast it out to the network.
282
        //
283
        // TODO(wilmer): make interface to prevent channeldb dependency.
284
        WaitingProofStore *channeldb.WaitingProofStore
285

286
        // MessageStore is a persistent storage of gossip messages which we will
287
        // use to determine which messages need to be resent for a given peer.
288
        MessageStore GossipMessageStore
289

290
        // AnnSigner is an instance of the MessageSigner interface which will
291
        // be used to manually sign any outgoing channel updates. The signer
292
        // implementation should be backed by the public key of the backing
293
        // Lightning node.
294
        //
295
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
296
        // here?
297
        AnnSigner lnwallet.MessageSigner
298

299
        // ScidCloser is an instance of ClosedChannelTracker that helps the
300
        // gossiper cut down on spam channel announcements for already closed
301
        // channels.
302
        ScidCloser ClosedChannelTracker
303

304
        // NumActiveSyncers is the number of peers for which we should have
305
        // active syncers with. After reaching NumActiveSyncers, any future
306
        // gossip syncers will be passive.
307
        NumActiveSyncers int
308

309
        // NoTimestampQueries will prevent the GossipSyncer from querying
310
        // timestamps of announcement messages from the peer and from replying
311
        // to timestamp queries.
312
        NoTimestampQueries bool
313

314
        // RotateTicker is a ticker responsible for notifying the SyncManager
315
        // when it should rotate its active syncers. A single active syncer with
316
        // a chansSynced state will be exchanged for a passive syncer in order
317
        // to ensure we don't keep syncing with the same peers.
318
        RotateTicker ticker.Ticker
319

320
        // HistoricalSyncTicker is a ticker responsible for notifying the
321
        // syncManager when it should attempt a historical sync with a gossip
322
        // sync peer.
323
        HistoricalSyncTicker ticker.Ticker
324

325
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
326
        // syncManager when it should attempt to start the next pending
327
        // activeSyncer due to the current one not completing its state machine
328
        // within the timeout.
329
        ActiveSyncerTimeoutTicker ticker.Ticker
330

331
        // MinimumBatchSize is minimum size of a sub batch of announcement
332
        // messages.
333
        MinimumBatchSize int
334

335
        // SubBatchDelay is the delay between sending sub batches of
336
        // gossip messages.
337
        SubBatchDelay time.Duration
338

339
        // IgnoreHistoricalFilters will prevent syncers from replying with
340
        // historical data when the remote peer sets a gossip_timestamp_range.
341
        // This prevents ranges with old start times from causing us to dump the
342
        // graph on connect.
343
        IgnoreHistoricalFilters bool
344

345
        // PinnedSyncers is a set of peers that will always transition to
346
        // ActiveSync upon connection. These peers will never transition to
347
        // PassiveSync.
348
        PinnedSyncers PinnedSyncers
349

350
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
351
        // specific channel and direction that we'll accept over an interval.
352
        MaxChannelUpdateBurst int
353

354
        // ChannelUpdateInterval specifies the interval we'll use to determine
355
        // how often we should allow a new update for a specific channel and
356
        // direction.
357
        ChannelUpdateInterval time.Duration
358

359
        // IsAlias returns true if a given ShortChannelID is an alias for
360
        // option_scid_alias channels.
361
        IsAlias func(scid lnwire.ShortChannelID) bool
362

363
        // SignAliasUpdate is used to re-sign a channel update using the
364
        // remote's alias if the option-scid-alias feature bit was negotiated.
365
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
366
                error)
367

368
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
369
        // This is used for channels that have negotiated the option-scid-alias
370
        // feature bit.
371
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
372
                lnwire.ShortChannelID, error)
373

374
        // GetAlias allows the gossiper to look up the peer's alias for a given
375
        // ChannelID. This is used to sign updates for them if the channel has
376
        // no AuthProof and the option-scid-alias feature bit was negotiated.
377
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
378

379
        // FindChannel allows the gossiper to find a channel that we're party
380
        // to without iterating over the entire set of open channels.
381
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
382
                *channeldb.OpenChannel, error)
383

384
        // IsStillZombieChannel takes the timestamps of the latest channel
385
        // updates for a channel and returns true if the channel should be
386
        // considered a zombie based on these timestamps.
387
        IsStillZombieChannel func(time.Time, time.Time) bool
388

389
        // AssumeChannelValid toggles whether the gossiper will check for
390
        // spent-ness of channel outpoints. For neutrino, this saves long
391
        // rescans from blocking initial usage of the daemon.
392
        AssumeChannelValid bool
393

394
        // MsgRateBytes is the rate limit for the number of bytes per second
395
        // that we'll allocate to outbound gossip messages.
396
        MsgRateBytes uint64
397

398
        // MsgBurstBytes is the allotted burst amount in bytes. This is the
399
        // number of starting tokens in our token bucket algorithm.
400
        MsgBurstBytes uint64
401

402
        // FilterConcurrency is the maximum number of concurrent gossip filter
403
        // applications that can be processed.
404
        FilterConcurrency int
405

406
        // BanThreshold is the score used to decide whether a given peer is
407
        // banned or not.
408
        BanThreshold uint64
409

410
        // PeerMsgRateBytes is the rate limit for the number of bytes per second
411
        // that we'll allocate to outbound gossip messages for a single peer.
412
        PeerMsgRateBytes uint64
413
}
414

415
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
416
// used to let the caller of the lru.Cache know if a message has already been
417
// processed or not.
418
type processedNetworkMsg struct {
419
        processed bool
420
        msg       *networkMsg
421
}
422

423
// cachedNetworkMsg is a wrapper around a network message that can be used with
424
// *lru.Cache.
425
//
426
// NOTE: This struct is not thread safe which means you need to assure no
427
// concurrent read write access to it and all its contents which are pointers
428
// as well.
429
type cachedNetworkMsg struct {
430
        msgs []*processedNetworkMsg
431
}
432

433
// Size returns the "size" of an entry. We return the number of items as we
434
// just want to limit the total amount of entries rather than do accurate size
435
// accounting.
436
func (c *cachedNetworkMsg) Size() (uint64, error) {
2✔
437
        return uint64(len(c.msgs)), nil
2✔
438
}
2✔
439

440
// rejectCacheKey is the cache key that we'll use to track announcements we've
441
// recently rejected.
442
type rejectCacheKey struct {
443
        pubkey [33]byte
444
        chanID uint64
445
}
446

447
// newRejectCacheKey returns a new cache key for the reject cache.
448
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
472✔
449
        k := rejectCacheKey{
472✔
450
                chanID: cid,
472✔
451
                pubkey: pub,
472✔
452
        }
472✔
453

472✔
454
        return k
472✔
455
}
472✔
456

457
// sourceToPub returns a serialized-compressed public key for use in the reject
458
// cache.
459
func sourceToPub(pk *btcec.PublicKey) [33]byte {
486✔
460
        var pub [33]byte
486✔
461
        copy(pub[:], pk.SerializeCompressed())
486✔
462
        return pub
486✔
463
}
486✔
464

465
// cachedReject is the empty value used to track the value for rejects.
466
type cachedReject struct {
467
}
468

469
// Size returns the "size" of an entry. We return 1 as we just want to limit
470
// the total size.
471
func (c *cachedReject) Size() (uint64, error) {
206✔
472
        return 1, nil
206✔
473
}
206✔
474

475
// AuthenticatedGossiper is a subsystem which is responsible for receiving
476
// announcements, validating them and applying the changes to router, syncing
477
// lightning network with newly connected nodes, broadcasting announcements
478
// after validation, negotiating the channel announcement proofs exchange and
479
// handling the premature announcements. All outgoing announcements are
480
// expected to be properly signed as dictated in BOLT#7, additionally, all
481
// incoming message are expected to be well formed and signed. Invalid messages
482
// will be rejected by this struct.
483
type AuthenticatedGossiper struct {
484
        // Parameters which are needed to properly handle the start and stop of
485
        // the service.
486
        started sync.Once
487
        stopped sync.Once
488

489
        // bestHeight is the height of the block at the tip of the main chain
490
        // as we know it. Accesses *MUST* be done with the gossiper's lock
491
        // held.
492
        bestHeight uint32
493

494
        // cfg is a copy of the configuration struct that the gossiper service
495
        // was initialized with.
496
        cfg *Config
497

498
        // blockEpochs encapsulates a stream of block epochs that are sent at
499
        // every new block height.
500
        blockEpochs *chainntnfs.BlockEpochEvent
501

502
        // prematureChannelUpdates is a map of ChannelUpdates we have received
503
        // that wasn't associated with any channel we know about.  We store
504
        // them temporarily, such that we can reprocess them when a
505
        // ChannelAnnouncement for the channel is received.
506
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
507

508
        // banman tracks our peer's ban status.
509
        banman *banman
510

511
        // networkMsgs is a channel that carries new network broadcasted
512
        // message from outside the gossiper service to be processed by the
513
        // networkHandler.
514
        networkMsgs chan *networkMsg
515

516
        // futureMsgs is a list of premature network messages that have a block
517
        // height specified in the future. We will save them and resend it to
518
        // the chan networkMsgs once the block height has reached. The cached
519
        // map format is,
520
        //   {msgID1: msg1, msgID2: msg2, ...}
521
        futureMsgs *futureMsgCache
522

523
        // chanPolicyUpdates is a channel that requests to update the
524
        // forwarding policy of a set of channels is sent over.
525
        chanPolicyUpdates chan *chanPolicyUpdateRequest
526

527
        // selfKey is the identity public key of the backing Lightning node.
528
        selfKey *btcec.PublicKey
529

530
        // selfKeyLoc is the locator for the identity public key of the backing
531
        // Lightning node.
532
        selfKeyLoc keychain.KeyLocator
533

534
        // channelMtx is used to restrict the database access to one
535
        // goroutine per channel ID. This is done to ensure that when
536
        // the gossiper is handling an announcement, the db state stays
537
        // consistent between when the DB is first read until it's written.
538
        channelMtx *multimutex.Mutex[uint64]
539

540
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
541

542
        // syncMgr is a subsystem responsible for managing the gossip syncers
543
        // for peers currently connected. When a new peer is connected, the
544
        // manager will create its accompanying gossip syncer and determine
545
        // whether it should have an activeSync or passiveSync sync type based
546
        // on how many other gossip syncers are currently active. Any activeSync
547
        // gossip syncers are started in a round-robin manner to ensure we're
548
        // not syncing with multiple peers at the same time.
549
        syncMgr *SyncManager
550

551
        // reliableSender is a subsystem responsible for handling reliable
552
        // message send requests to peers. This should only be used for channels
553
        // that are unadvertised at the time of handling the message since if it
554
        // is advertised, then peers should be able to get the message from the
555
        // network.
556
        reliableSender *reliableSender
557

558
        // chanUpdateRateLimiter contains rate limiters for each direction of
559
        // a channel update we've processed. We'll use these to determine
560
        // whether we should accept a new update for a specific channel and
561
        // direction.
562
        //
563
        // NOTE: This map must be synchronized with the main
564
        // AuthenticatedGossiper lock.
565
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
566

567
        // vb is used to enforce job dependency ordering of gossip messages.
568
        vb *ValidationBarrier
569

570
        sync.Mutex
571

572
        cancel fn.Option[context.CancelFunc]
573
        quit   chan struct{}
574
        wg     sync.WaitGroup
575
}
576

577
// New creates a new AuthenticatedGossiper instance, initialized with the
578
// passed configuration parameters.
579
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
30✔
580
        gossiper := &AuthenticatedGossiper{
30✔
581
                selfKey:           selfKeyDesc.PubKey,
30✔
582
                selfKeyLoc:        selfKeyDesc.KeyLocator,
30✔
583
                cfg:               &cfg,
30✔
584
                networkMsgs:       make(chan *networkMsg),
30✔
585
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
30✔
586
                quit:              make(chan struct{}),
30✔
587
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
30✔
588
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
30✔
589
                        maxPrematureUpdates,
30✔
590
                ),
30✔
591
                channelMtx: multimutex.NewMutex[uint64](),
30✔
592
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
30✔
593
                        maxRejectedUpdates,
30✔
594
                ),
30✔
595
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
30✔
596
                banman:                newBanman(cfg.BanThreshold),
30✔
597
        }
30✔
598

30✔
599
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
30✔
600

30✔
601
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
30✔
602
                ChainHash:                cfg.ChainHash,
30✔
603
                ChanSeries:               cfg.ChanSeries,
30✔
604
                RotateTicker:             cfg.RotateTicker,
30✔
605
                HistoricalSyncTicker:     cfg.HistoricalSyncTicker,
30✔
606
                NumActiveSyncers:         cfg.NumActiveSyncers,
30✔
607
                NoTimestampQueries:       cfg.NoTimestampQueries,
30✔
608
                IgnoreHistoricalFilters:  cfg.IgnoreHistoricalFilters,
30✔
609
                BestHeight:               gossiper.latestHeight,
30✔
610
                PinnedSyncers:            cfg.PinnedSyncers,
30✔
611
                IsStillZombieChannel:     cfg.IsStillZombieChannel,
30✔
612
                AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
30✔
613
                AllotedMsgBytesBurst:     cfg.MsgBurstBytes,
30✔
614
                FilterConcurrency:        cfg.FilterConcurrency,
30✔
615
                PeerMsgBytesPerSecond:    cfg.PeerMsgRateBytes,
30✔
616
        })
30✔
617

30✔
618
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
30✔
619
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
30✔
620
                NotifyWhenOffline: cfg.NotifyWhenOffline,
30✔
621
                MessageStore:      cfg.MessageStore,
30✔
622
                IsMsgStale:        gossiper.isMsgStale,
30✔
623
        })
30✔
624

30✔
625
        return gossiper
30✔
626
}
30✔
627

628
// EdgeWithInfo contains the information that is required to update an edge.
629
type EdgeWithInfo struct {
630
        // Info describes the channel.
631
        Info *models.ChannelEdgeInfo
632

633
        // Edge describes the policy in one direction of the channel.
634
        Edge *models.ChannelEdgePolicy
635
}
636

637
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
638
// specified edge updates. Updates are done in two stages: first, the
639
// AuthenticatedGossiper ensures the update has been committed by dependent
640
// sub-systems, then it signs and broadcasts new updates to the network. A
641
// mapping between outpoints and updated channel policies is returned, which is
642
// used to update the forwarding policies of the underlying links.
643
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
644
        edgesToUpdate []EdgeWithInfo) error {
1✔
645

1✔
646
        errChan := make(chan error, 1)
1✔
647
        policyUpdate := &chanPolicyUpdateRequest{
1✔
648
                edgesToUpdate: edgesToUpdate,
1✔
649
                errChan:       errChan,
1✔
650
        }
1✔
651

1✔
652
        select {
1✔
653
        case d.chanPolicyUpdates <- policyUpdate:
1✔
654
                err := <-errChan
1✔
655
                return err
1✔
656
        case <-d.quit:
×
657
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
658
        }
659
}
660

661
// Start spawns network messages handler goroutine and registers on new block
662
// notifications in order to properly handle the premature announcements.
663
func (d *AuthenticatedGossiper) Start() error {
30✔
664
        var err error
30✔
665
        d.started.Do(func() {
60✔
666
                ctx, cancel := context.WithCancel(context.Background())
30✔
667
                d.cancel = fn.Some(cancel)
30✔
668

30✔
669
                log.Info("Authenticated Gossiper starting")
30✔
670
                err = d.start(ctx)
30✔
671
        })
30✔
672
        return err
30✔
673
}
674

675
func (d *AuthenticatedGossiper) start(ctx context.Context) error {
30✔
676
        // First we register for new notifications of newly discovered blocks.
30✔
677
        // We do this immediately so we'll later be able to consume any/all
30✔
678
        // blocks which were discovered.
30✔
679
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
30✔
680
        if err != nil {
30✔
681
                return err
×
682
        }
×
683
        d.blockEpochs = blockEpochs
30✔
684

30✔
685
        height, err := d.cfg.Graph.CurrentBlockHeight()
30✔
686
        if err != nil {
30✔
687
                return err
×
688
        }
×
689
        d.bestHeight = height
30✔
690

30✔
691
        // Start the reliable sender. In case we had any pending messages ready
30✔
692
        // to be sent when the gossiper was last shut down, we must continue on
30✔
693
        // our quest to deliver them to their respective peers.
30✔
694
        if err := d.reliableSender.Start(); err != nil {
30✔
695
                return err
×
696
        }
×
697

698
        d.syncMgr.Start()
30✔
699

30✔
700
        d.banman.start()
30✔
701

30✔
702
        // Start receiving blocks in its dedicated goroutine.
30✔
703
        d.wg.Add(2)
30✔
704
        go d.syncBlockHeight()
30✔
705
        go d.networkHandler(ctx)
30✔
706

30✔
707
        return nil
30✔
708
}
709

710
// syncBlockHeight syncs the best block height for the gossiper by reading
711
// blockEpochs.
712
//
713
// NOTE: must be run as a goroutine.
714
func (d *AuthenticatedGossiper) syncBlockHeight() {
30✔
715
        defer d.wg.Done()
30✔
716

30✔
717
        for {
60✔
718
                select {
30✔
719
                // A new block has arrived, so we can re-process the previously
720
                // premature announcements.
721
                case newBlock, ok := <-d.blockEpochs.Epochs:
×
722
                        // If the channel has been closed, then this indicates
×
723
                        // the daemon is shutting down, so we exit ourselves.
×
724
                        if !ok {
×
725
                                return
×
726
                        }
×
727

728
                        // Once a new block arrives, we update our running
729
                        // track of the height of the chain tip.
730
                        d.Lock()
×
731
                        blockHeight := uint32(newBlock.Height)
×
732
                        d.bestHeight = blockHeight
×
733
                        d.Unlock()
×
734

×
735
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
×
736
                                newBlock.Hash)
×
737

×
738
                        // Resend future messages, if any.
×
739
                        d.resendFutureMessages(blockHeight)
×
740

741
                case <-d.quit:
30✔
742
                        return
30✔
743
                }
744
        }
745
}
746

747
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
748
// the unique ID when saving the message.
749
type futureMsgCache struct {
750
        *lru.Cache[uint64, *cachedFutureMsg]
751

752
        // msgID is a monotonically increased integer.
753
        msgID atomic.Uint64
754
}
755

756
// nextMsgID returns a unique message ID.
757
func (f *futureMsgCache) nextMsgID() uint64 {
3✔
758
        return f.msgID.Add(1)
3✔
759
}
3✔
760

761
// newFutureMsgCache creates a new future message cache with the underlying lru
762
// cache being initialized with the specified capacity.
763
func newFutureMsgCache(capacity uint64) *futureMsgCache {
31✔
764
        // Create a new cache.
31✔
765
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
31✔
766

31✔
767
        return &futureMsgCache{
31✔
768
                Cache: cache,
31✔
769
        }
31✔
770
}
31✔
771

772
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
773
type cachedFutureMsg struct {
774
        // msg is the network message.
775
        msg *networkMsg
776

777
        // height is the block height.
778
        height uint32
779
}
780

781
// Size returns the size of the message.
782
func (c *cachedFutureMsg) Size() (uint64, error) {
4✔
783
        // Return a constant 1.
4✔
784
        return 1, nil
4✔
785
}
4✔
786

787
// resendFutureMessages takes a block height, resends all the future messages
788
// found below and equal to that height and deletes those messages found in the
789
// gossiper's futureMsgs.
790
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
×
791
        var (
×
792
                // msgs are the target messages.
×
793
                msgs []*networkMsg
×
794

×
795
                // keys are the target messages' caching keys.
×
796
                keys []uint64
×
797
        )
×
798

×
799
        // filterMsgs is the visitor used when iterating the future cache.
×
800
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
×
801
                if cmsg.height <= height {
×
802
                        msgs = append(msgs, cmsg.msg)
×
803
                        keys = append(keys, k)
×
804
                }
×
805

806
                return true
×
807
        }
808

809
        // Filter out the target messages.
810
        d.futureMsgs.Range(filterMsgs)
×
811

×
812
        // Return early if no messages found.
×
813
        if len(msgs) == 0 {
×
814
                return
×
815
        }
×
816

817
        // Remove the filtered messages.
818
        for _, key := range keys {
×
819
                d.futureMsgs.Delete(key)
×
820
        }
×
821

822
        log.Debugf("Resending %d network messages at height %d",
×
823
                len(msgs), height)
×
824

×
825
        for _, msg := range msgs {
×
826
                select {
×
827
                case d.networkMsgs <- msg:
×
828
                case <-d.quit:
×
829
                        msg.err <- ErrGossiperShuttingDown
×
830
                }
831
        }
832
}
833

834
// Stop signals any active goroutines for a graceful closure.
835
func (d *AuthenticatedGossiper) Stop() error {
31✔
836
        d.stopped.Do(func() {
61✔
837
                log.Info("Authenticated gossiper shutting down...")
30✔
838
                defer log.Debug("Authenticated gossiper shutdown complete")
30✔
839

30✔
840
                d.stop()
30✔
841
        })
30✔
842
        return nil
31✔
843
}
844

845
func (d *AuthenticatedGossiper) stop() {
30✔
846
        log.Debug("Authenticated Gossiper is stopping")
30✔
847
        defer log.Debug("Authenticated Gossiper stopped")
30✔
848

30✔
849
        // `blockEpochs` is only initialized in the start routine so we make
30✔
850
        // sure we don't panic here in the case where the `Stop` method is
30✔
851
        // called when the `Start` method does not complete.
30✔
852
        if d.blockEpochs != nil {
60✔
853
                d.blockEpochs.Cancel()
30✔
854
        }
30✔
855

856
        d.syncMgr.Stop()
30✔
857

30✔
858
        d.banman.stop()
30✔
859

30✔
860
        d.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
60✔
861
        close(d.quit)
30✔
862
        d.wg.Wait()
30✔
863

30✔
864
        // We'll stop our reliable sender after all of the gossiper's goroutines
30✔
865
        // have exited to ensure nothing can cause it to continue executing.
30✔
866
        d.reliableSender.Stop()
30✔
867
}
868

869
// TODO(roasbeef): need method to get current gossip timestamp?
870
//  * using mtx, check time rotate forward is needed?
871

872
// ProcessRemoteAnnouncement sends a new remote announcement message along with
873
// the peer that sent the routing message. The announcement will be processed
874
// then added to a queue for batched trickled announcement to all connected
875
// peers.  Remote channel announcements should contain the announcement proof
876
// and be fully validated.
877
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
878
        msg lnwire.Message, peer lnpeer.Peer) chan error {
291✔
879

291✔
880
        errChan := make(chan error, 1)
291✔
881

291✔
882
        // For messages in the known set of channel series queries, we'll
291✔
883
        // dispatch the message directly to the GossipSyncer, and skip the main
291✔
884
        // processing loop.
291✔
885
        switch m := msg.(type) {
291✔
886
        case *lnwire.QueryShortChanIDs,
887
                *lnwire.QueryChannelRange,
888
                *lnwire.ReplyChannelRange,
889
                *lnwire.ReplyShortChanIDsEnd:
×
890

×
891
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
×
892
                if !ok {
×
893
                        log.Warnf("Gossip syncer for peer=%x not found",
×
894
                                peer.PubKey())
×
895

×
896
                        errChan <- ErrGossipSyncerNotFound
×
897
                        return errChan
×
898
                }
×
899

900
                // If we've found the message target, then we'll dispatch the
901
                // message directly to it.
902
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
×
903
                if err != nil {
×
904
                        log.Errorf("Process query msg from peer %x got %v",
×
905
                                peer.PubKey(), err)
×
906
                }
×
907

908
                errChan <- err
×
909
                return errChan
×
910

911
        // If a peer is updating its current update horizon, then we'll dispatch
912
        // that directly to the proper GossipSyncer.
913
        case *lnwire.GossipTimestampRange:
×
914
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
×
915
                if !ok {
×
916
                        log.Warnf("Gossip syncer for peer=%x not found",
×
917
                                peer.PubKey())
×
918

×
919
                        errChan <- ErrGossipSyncerNotFound
×
920
                        return errChan
×
921
                }
×
922

923
                // Queue the message for asynchronous processing to prevent
924
                // blocking the gossiper when rate limiting is active.
925
                if !syncer.QueueTimestampRange(m) {
×
926
                        log.Warnf("Unable to queue gossip filter for peer=%x: "+
×
927
                                "queue full", peer.PubKey())
×
928

×
929
                        // Return nil to indicate we've handled the message,
×
930
                        // even though it was dropped. This prevents the peer
×
931
                        // from being disconnected.
×
932
                        errChan <- nil
×
933
                        return errChan
×
934
                }
×
935

936
                errChan <- nil
×
937
                return errChan
×
938

939
        // To avoid inserting edges in the graph for our own channels that we
940
        // have already closed, we ignore such channel announcements coming
941
        // from the remote.
942
        case *lnwire.ChannelAnnouncement1:
220✔
943
                ownKey := d.selfKey.SerializeCompressed()
220✔
944
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
220✔
945
                        "for own channel")
220✔
946

220✔
947
                if bytes.Equal(m.NodeID1[:], ownKey) ||
220✔
948
                        bytes.Equal(m.NodeID2[:], ownKey) {
222✔
949

2✔
950
                        log.Warn(ownErr)
2✔
951
                        errChan <- ownErr
2✔
952
                        return errChan
2✔
953
                }
2✔
954
        }
955

956
        nMsg := &networkMsg{
289✔
957
                msg:      msg,
289✔
958
                isRemote: true,
289✔
959
                peer:     peer,
289✔
960
                source:   peer.IdentityKey(),
289✔
961
                err:      errChan,
289✔
962
        }
289✔
963

289✔
964
        select {
289✔
965
        case d.networkMsgs <- nMsg:
289✔
966

967
        // If the peer that sent us this error is quitting, then we don't need
968
        // to send back an error and can return immediately.
969
        // TODO(elle): the peer should now just rely on canceling the passed
970
        //  context.
971
        case <-peer.QuitSignal():
×
972
                return nil
×
973
        case <-ctx.Done():
×
974
                return nil
×
975
        case <-d.quit:
×
976
                nMsg.err <- ErrGossiperShuttingDown
×
977
        }
978

979
        return nMsg.err
289✔
980
}
981

982
// ProcessLocalAnnouncement sends a new remote announcement message along with
983
// the peer that sent the routing message. The announcement will be processed
984
// then added to a queue for batched trickled announcement to all connected
985
// peers.  Local channel announcements don't contain the announcement proof and
986
// will not be fully validated. Once the channel proofs are received, the
987
// entire channel announcement and update messages will be re-constructed and
988
// broadcast to the rest of the network.
989
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
990
        optionalFields ...OptionalMsgField) chan error {
47✔
991

47✔
992
        optionalMsgFields := &optionalMsgFields{}
47✔
993
        optionalMsgFields.apply(optionalFields...)
47✔
994

47✔
995
        nMsg := &networkMsg{
47✔
996
                msg:               msg,
47✔
997
                optionalMsgFields: optionalMsgFields,
47✔
998
                isRemote:          false,
47✔
999
                source:            d.selfKey,
47✔
1000
                err:               make(chan error, 1),
47✔
1001
        }
47✔
1002

47✔
1003
        select {
47✔
1004
        case d.networkMsgs <- nMsg:
47✔
1005
        case <-d.quit:
×
1006
                nMsg.err <- ErrGossiperShuttingDown
×
1007
        }
1008

1009
        return nMsg.err
47✔
1010
}
1011

1012
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
1013
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
1014
// tuple.
1015
type channelUpdateID struct {
1016
        // channelID represents the set of data which is needed to
1017
        // retrieve all necessary data to validate the channel existence.
1018
        channelID lnwire.ShortChannelID
1019

1020
        // Flags least-significant bit must be set to 0 if the creating node
1021
        // corresponds to the first node in the previously sent channel
1022
        // announcement and 1 otherwise.
1023
        flags lnwire.ChanUpdateChanFlags
1024
}
1025

1026
// msgWithSenders is a wrapper struct around a message, and the set of peers
1027
// that originally sent us this message. Using this struct, we can ensure that
1028
// we don't re-send a message to the peer that sent it to us in the first
1029
// place.
1030
type msgWithSenders struct {
1031
        // msg is the wire message itself.
1032
        msg lnwire.Message
1033

1034
        // isLocal is true if this was a message that originated locally. We'll
1035
        // use this to bypass our normal checks to ensure we prioritize sending
1036
        // out our own updates.
1037
        isLocal bool
1038

1039
        // sender is the set of peers that sent us this message.
1040
        senders map[route.Vertex]struct{}
1041
}
1042

1043
// mergeSyncerMap is used to merge the set of senders of a particular message
1044
// with peers that we have an active GossipSyncer with. We do this to ensure
1045
// that we don't broadcast messages to any peers that we have active gossip
1046
// syncers for.
1047
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
29✔
1048
        for peerPub := range syncers {
29✔
1049
                m.senders[peerPub] = struct{}{}
×
1050
        }
×
1051
}
1052

1053
// deDupedAnnouncements de-duplicates announcements that have been added to the
1054
// batch. Internally, announcements are stored in three maps
1055
// (one each for channel announcements, channel updates, and node
1056
// announcements). These maps keep track of unique announcements and ensure no
1057
// announcements are duplicated. We keep the three message types separate, such
1058
// that we can send channel announcements first, then channel updates, and
1059
// finally node announcements when it's time to broadcast them.
1060
type deDupedAnnouncements struct {
1061
        // channelAnnouncements are identified by the short channel id field.
1062
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
1063

1064
        // channelUpdates are identified by the channel update id field.
1065
        channelUpdates map[channelUpdateID]msgWithSenders
1066

1067
        // nodeAnnouncements are identified by the Vertex field.
1068
        nodeAnnouncements map[route.Vertex]msgWithSenders
1069

1070
        sync.Mutex
1071
}
1072

1073
// Reset operates on deDupedAnnouncements to reset the storage of
1074
// announcements.
1075
func (d *deDupedAnnouncements) Reset() {
32✔
1076
        d.Lock()
32✔
1077
        defer d.Unlock()
32✔
1078

32✔
1079
        d.reset()
32✔
1080
}
32✔
1081

1082
// reset is the private version of the Reset method. We have this so we can
1083
// call this method within method that are already holding the lock.
1084
func (d *deDupedAnnouncements) reset() {
327✔
1085
        // Storage of each type of announcement (channel announcements, channel
327✔
1086
        // updates, node announcements) is set to an empty map where the
327✔
1087
        // appropriate key points to the corresponding lnwire.Message.
327✔
1088
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
327✔
1089
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
327✔
1090
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
327✔
1091
}
327✔
1092

1093
// addMsg adds a new message to the current batch. If the message is already
1094
// present in the current batch, then this new instance replaces the latter,
1095
// and the set of senders is updated to reflect which node sent us this
1096
// message.
1097
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
91✔
1098
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
91✔
1099

91✔
1100
        // Depending on the message type (channel announcement, channel update,
91✔
1101
        // or node announcement), the message is added to the corresponding map
91✔
1102
        // in deDupedAnnouncements. Because each identifying key can have at
91✔
1103
        // most one value, the announcements are de-duplicated, with newer ones
91✔
1104
        // replacing older ones.
91✔
1105
        switch msg := message.msg.(type) {
91✔
1106

1107
        // Channel announcements are identified by the short channel id field.
1108
        case *lnwire.ChannelAnnouncement1:
23✔
1109
                deDupKey := msg.ShortChannelID
23✔
1110
                sender := route.NewVertex(message.source)
23✔
1111

23✔
1112
                mws, ok := d.channelAnnouncements[deDupKey]
23✔
1113
                if !ok {
45✔
1114
                        mws = msgWithSenders{
22✔
1115
                                msg:     msg,
22✔
1116
                                isLocal: !message.isRemote,
22✔
1117
                                senders: make(map[route.Vertex]struct{}),
22✔
1118
                        }
22✔
1119
                        mws.senders[sender] = struct{}{}
22✔
1120

22✔
1121
                        d.channelAnnouncements[deDupKey] = mws
22✔
1122

22✔
1123
                        return
22✔
1124
                }
22✔
1125

1126
                mws.msg = msg
1✔
1127
                mws.senders[sender] = struct{}{}
1✔
1128
                d.channelAnnouncements[deDupKey] = mws
1✔
1129

1130
        // Channel updates are identified by the (short channel id,
1131
        // channelflags) tuple.
1132
        case *lnwire.ChannelUpdate1:
46✔
1133
                sender := route.NewVertex(message.source)
46✔
1134
                deDupKey := channelUpdateID{
46✔
1135
                        msg.ShortChannelID,
46✔
1136
                        msg.ChannelFlags,
46✔
1137
                }
46✔
1138

46✔
1139
                oldTimestamp := uint32(0)
46✔
1140
                mws, ok := d.channelUpdates[deDupKey]
46✔
1141
                if ok {
49✔
1142
                        // If we already have seen this message, record its
3✔
1143
                        // timestamp.
3✔
1144
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
3✔
1145
                        if !ok {
3✔
1146
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1147
                                        "got: %T", mws.msg)
×
1148

×
1149
                                return
×
1150
                        }
×
1151

1152
                        oldTimestamp = update.Timestamp
3✔
1153
                }
1154

1155
                // If we already had this message with a strictly newer
1156
                // timestamp, then we'll just discard the message we got.
1157
                if oldTimestamp > msg.Timestamp {
47✔
1158
                        log.Debugf("Ignored outdated network message: "+
1✔
1159
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1✔
1160
                        return
1✔
1161
                }
1✔
1162

1163
                // If the message we just got is newer than what we previously
1164
                // have seen, or this is the first time we see it, then we'll
1165
                // add it to our map of announcements.
1166
                if oldTimestamp < msg.Timestamp {
89✔
1167
                        mws = msgWithSenders{
44✔
1168
                                msg:     msg,
44✔
1169
                                isLocal: !message.isRemote,
44✔
1170
                                senders: make(map[route.Vertex]struct{}),
44✔
1171
                        }
44✔
1172

44✔
1173
                        // We'll mark the sender of the message in the
44✔
1174
                        // senders map.
44✔
1175
                        mws.senders[sender] = struct{}{}
44✔
1176

44✔
1177
                        d.channelUpdates[deDupKey] = mws
44✔
1178

44✔
1179
                        return
44✔
1180
                }
44✔
1181

1182
                // Lastly, if we had seen this exact message from before, with
1183
                // the same timestamp, we'll add the sender to the map of
1184
                // senders, such that we can skip sending this message back in
1185
                // the next batch.
1186
                mws.msg = msg
1✔
1187
                mws.senders[sender] = struct{}{}
1✔
1188
                d.channelUpdates[deDupKey] = mws
1✔
1189

1190
        // Node announcements are identified by the Vertex field.  Use the
1191
        // NodeID to create the corresponding Vertex.
1192
        case *lnwire.NodeAnnouncement:
22✔
1193
                sender := route.NewVertex(message.source)
22✔
1194
                deDupKey := route.Vertex(msg.NodeID)
22✔
1195

22✔
1196
                // We do the same for node announcements as we did for channel
22✔
1197
                // updates, as they also carry a timestamp.
22✔
1198
                oldTimestamp := uint32(0)
22✔
1199
                mws, ok := d.nodeAnnouncements[deDupKey]
22✔
1200
                if ok {
27✔
1201
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
5✔
1202
                }
5✔
1203

1204
                // Discard the message if it's old.
1205
                if oldTimestamp > msg.Timestamp {
22✔
1206
                        return
×
1207
                }
×
1208

1209
                // Replace if it's newer.
1210
                if oldTimestamp < msg.Timestamp {
40✔
1211
                        mws = msgWithSenders{
18✔
1212
                                msg:     msg,
18✔
1213
                                isLocal: !message.isRemote,
18✔
1214
                                senders: make(map[route.Vertex]struct{}),
18✔
1215
                        }
18✔
1216

18✔
1217
                        mws.senders[sender] = struct{}{}
18✔
1218

18✔
1219
                        d.nodeAnnouncements[deDupKey] = mws
18✔
1220

18✔
1221
                        return
18✔
1222
                }
18✔
1223

1224
                // Add to senders map if it's the same as we had.
1225
                mws.msg = msg
4✔
1226
                mws.senders[sender] = struct{}{}
4✔
1227
                d.nodeAnnouncements[deDupKey] = mws
4✔
1228
        }
1229
}
1230

1231
// AddMsgs is a helper method to add multiple messages to the announcement
1232
// batch.
1233
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
59✔
1234
        d.Lock()
59✔
1235
        defer d.Unlock()
59✔
1236

59✔
1237
        for _, msg := range msgs {
150✔
1238
                d.addMsg(msg)
91✔
1239
        }
91✔
1240
}
1241

1242
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1243
// to broadcast next into messages that are locally sourced and those that are
1244
// sourced remotely.
1245
type msgsToBroadcast struct {
1246
        // localMsgs is the set of messages we created locally.
1247
        localMsgs []msgWithSenders
1248

1249
        // remoteMsgs is the set of messages that we received from a remote
1250
        // party.
1251
        remoteMsgs []msgWithSenders
1252
}
1253

1254
// addMsg adds a new message to the appropriate sub-slice.
1255
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
76✔
1256
        if msg.isLocal {
123✔
1257
                m.localMsgs = append(m.localMsgs, msg)
47✔
1258
        } else {
76✔
1259
                m.remoteMsgs = append(m.remoteMsgs, msg)
29✔
1260
        }
29✔
1261
}
1262

1263
// isEmpty returns true if the batch is empty.
1264
func (m *msgsToBroadcast) isEmpty() bool {
294✔
1265
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
294✔
1266
}
294✔
1267

1268
// length returns the length of the combined message set.
1269
func (m *msgsToBroadcast) length() int {
1✔
1270
        return len(m.localMsgs) + len(m.remoteMsgs)
1✔
1271
}
1✔
1272

1273
// Emit returns the set of de-duplicated announcements to be sent out during
1274
// the next announcement epoch, in the order of channel announcements, channel
1275
// updates, and node announcements. Each message emitted, contains the set of
1276
// peers that sent us the message. This way, we can ensure that we don't waste
1277
// bandwidth by re-sending a message to the peer that sent it to us in the
1278
// first place. Additionally, the set of stored messages are reset.
1279
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
295✔
1280
        d.Lock()
295✔
1281
        defer d.Unlock()
295✔
1282

295✔
1283
        // Get the total number of announcements.
295✔
1284
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
295✔
1285
                len(d.nodeAnnouncements)
295✔
1286

295✔
1287
        // Create an empty array of lnwire.Messages with a length equal to
295✔
1288
        // the total number of announcements.
295✔
1289
        msgs := msgsToBroadcast{
295✔
1290
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
295✔
1291
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
295✔
1292
        }
295✔
1293

295✔
1294
        // Add the channel announcements to the array first.
295✔
1295
        for _, message := range d.channelAnnouncements {
314✔
1296
                msgs.addMsg(message)
19✔
1297
        }
19✔
1298

1299
        // Then add the channel updates.
1300
        for _, message := range d.channelUpdates {
335✔
1301
                msgs.addMsg(message)
40✔
1302
        }
40✔
1303

1304
        // Finally add the node announcements.
1305
        for _, message := range d.nodeAnnouncements {
312✔
1306
                msgs.addMsg(message)
17✔
1307
        }
17✔
1308

1309
        d.reset()
295✔
1310

295✔
1311
        // Return the array of lnwire.messages.
295✔
1312
        return msgs
295✔
1313
}
1314

1315
// calculateSubBatchSize is a helper function that calculates the size to break
1316
// down the batchSize into.
1317
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1318
        minimumBatchSize, batchSize int) int {
13✔
1319
        if subBatchDelay > totalDelay {
15✔
1320
                return batchSize
2✔
1321
        }
2✔
1322

1323
        subBatchSize := (batchSize*int(subBatchDelay) +
11✔
1324
                int(totalDelay) - 1) / int(totalDelay)
11✔
1325

11✔
1326
        if subBatchSize < minimumBatchSize {
12✔
1327
                return minimumBatchSize
1✔
1328
        }
1✔
1329

1330
        return subBatchSize
10✔
1331
}
1332

1333
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1334
// this variable so the function can be mocked in our test.
1335
var batchSizeCalculator = calculateSubBatchSize
1336

1337
// splitAnnouncementBatches takes an exiting list of announcements and
1338
// decomposes it into sub batches controlled by the `subBatchSize`.
1339
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1340
        announcementBatch []msgWithSenders) [][]msgWithSenders {
75✔
1341

75✔
1342
        subBatchSize := batchSizeCalculator(
75✔
1343
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
75✔
1344
                d.cfg.MinimumBatchSize, len(announcementBatch),
75✔
1345
        )
75✔
1346

75✔
1347
        var splitAnnouncementBatch [][]msgWithSenders
75✔
1348

75✔
1349
        for subBatchSize < len(announcementBatch) {
196✔
1350
                // For slicing with minimal allocation
121✔
1351
                // https://github.com/golang/go/wiki/SliceTricks
121✔
1352
                announcementBatch, splitAnnouncementBatch =
121✔
1353
                        announcementBatch[subBatchSize:],
121✔
1354
                        append(splitAnnouncementBatch,
121✔
1355
                                announcementBatch[0:subBatchSize:subBatchSize])
121✔
1356
        }
121✔
1357
        splitAnnouncementBatch = append(
75✔
1358
                splitAnnouncementBatch, announcementBatch,
75✔
1359
        )
75✔
1360

75✔
1361
        return splitAnnouncementBatch
75✔
1362
}
1363

1364
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1365
// split size, and then sends out all items to the set of target peers. Locally
1366
// generated announcements are always sent before remotely generated
1367
// announcements.
1368
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(ctx context.Context,
1369
        annBatch msgsToBroadcast) {
34✔
1370

34✔
1371
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
34✔
1372
        // duration to delay the sending of next announcement batch.
34✔
1373
        delayNextBatch := func() {
102✔
1374
                select {
68✔
1375
                case <-time.After(d.cfg.SubBatchDelay):
51✔
1376
                case <-d.quit:
17✔
1377
                        return
17✔
1378
                }
1379
        }
1380

1381
        // Fetch the local and remote announcements.
1382
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
34✔
1383
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
34✔
1384

34✔
1385
        d.wg.Add(1)
34✔
1386
        go func() {
68✔
1387
                defer d.wg.Done()
34✔
1388

34✔
1389
                log.Debugf("Broadcasting %v new local announcements in %d "+
34✔
1390
                        "sub batches", len(annBatch.localMsgs),
34✔
1391
                        len(localBatches))
34✔
1392

34✔
1393
                // Send out the local announcements first.
34✔
1394
                for _, annBatch := range localBatches {
68✔
1395
                        d.sendLocalBatch(annBatch)
34✔
1396
                        delayNextBatch()
34✔
1397
                }
34✔
1398

1399
                log.Debugf("Broadcasting %v new remote announcements in %d "+
34✔
1400
                        "sub batches", len(annBatch.remoteMsgs),
34✔
1401
                        len(remoteBatches))
34✔
1402

34✔
1403
                // Now send the remote announcements.
34✔
1404
                for _, annBatch := range remoteBatches {
68✔
1405
                        d.sendRemoteBatch(ctx, annBatch)
34✔
1406
                        delayNextBatch()
34✔
1407
                }
34✔
1408
        }()
1409
}
1410

1411
// sendLocalBatch broadcasts a list of locally generated announcements to our
1412
// peers. For local announcements, we skip the filter and dedup logic and just
1413
// send the announcements out to all our coonnected peers.
1414
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
34✔
1415
        msgsToSend := lnutils.Map(
34✔
1416
                annBatch, func(m msgWithSenders) lnwire.Message {
77✔
1417
                        return m.msg
43✔
1418
                },
43✔
1419
        )
1420

1421
        err := d.cfg.Broadcast(nil, msgsToSend...)
34✔
1422
        if err != nil {
34✔
1423
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1424
        }
×
1425
}
1426

1427
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1428
// peers.
1429
func (d *AuthenticatedGossiper) sendRemoteBatch(ctx context.Context,
1430
        annBatch []msgWithSenders) {
34✔
1431

34✔
1432
        syncerPeers := d.syncMgr.GossipSyncers()
34✔
1433

34✔
1434
        // We'll first attempt to filter out this new message for all peers
34✔
1435
        // that have active gossip syncers active.
34✔
1436
        for pub, syncer := range syncerPeers {
34✔
1437
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
×
1438
                syncer.FilterGossipMsgs(ctx, annBatch...)
×
1439
        }
×
1440

1441
        for _, msgChunk := range annBatch {
63✔
1442
                msgChunk := msgChunk
29✔
1443

29✔
1444
                // With the syncers taken care of, we'll merge the sender map
29✔
1445
                // with the set of syncers, so we don't send out duplicate
29✔
1446
                // messages.
29✔
1447
                msgChunk.mergeSyncerMap(syncerPeers)
29✔
1448

29✔
1449
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
29✔
1450
                if err != nil {
29✔
1451
                        log.Errorf("Unable to send batch "+
×
1452
                                "announcements: %v", err)
×
1453
                        continue
×
1454
                }
1455
        }
1456
}
1457

1458
// networkHandler is the primary goroutine that drives this service. The roles
1459
// of this goroutine includes answering queries related to the state of the
1460
// network, syncing up newly connected peers, and also periodically
1461
// broadcasting our latest topology state to all connected peers.
1462
//
1463
// NOTE: This MUST be run as a goroutine.
1464
func (d *AuthenticatedGossiper) networkHandler(ctx context.Context) {
30✔
1465
        defer d.wg.Done()
30✔
1466

30✔
1467
        // Initialize empty deDupedAnnouncements to store announcement batch.
30✔
1468
        announcements := deDupedAnnouncements{}
30✔
1469
        announcements.Reset()
30✔
1470

30✔
1471
        d.cfg.RetransmitTicker.Resume()
30✔
1472
        defer d.cfg.RetransmitTicker.Stop()
30✔
1473

30✔
1474
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
30✔
1475
        defer trickleTimer.Stop()
30✔
1476

30✔
1477
        // To start, we'll first check to see if there are any stale channel or
30✔
1478
        // node announcements that we need to re-transmit.
30✔
1479
        if err := d.retransmitStaleAnns(ctx, time.Now()); err != nil {
30✔
1480
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1481
        }
×
1482

1483
        for {
694✔
1484
                select {
664✔
1485
                // A new policy update has arrived. We'll commit it to the
1486
                // sub-systems below us, then craft, sign, and broadcast a new
1487
                // ChannelUpdate for the set of affected clients.
1488
                case policyUpdate := <-d.chanPolicyUpdates:
1✔
1489
                        log.Tracef("Received channel %d policy update requests",
1✔
1490
                                len(policyUpdate.edgesToUpdate))
1✔
1491

1✔
1492
                        // First, we'll now create new fully signed updates for
1✔
1493
                        // the affected channels and also update the underlying
1✔
1494
                        // graph with the new state.
1✔
1495
                        newChanUpdates, err := d.processChanPolicyUpdate(
1✔
1496
                                ctx, policyUpdate.edgesToUpdate,
1✔
1497
                        )
1✔
1498
                        policyUpdate.errChan <- err
1✔
1499
                        if err != nil {
1✔
1500
                                log.Errorf("Unable to craft policy updates: %v",
×
1501
                                        err)
×
1502
                                continue
×
1503
                        }
1504

1505
                        // Finally, with the updates committed, we'll now add
1506
                        // them to the announcement batch to be flushed at the
1507
                        // start of the next epoch.
1508
                        announcements.AddMsgs(newChanUpdates...)
1✔
1509

1510
                case announcement := <-d.networkMsgs:
338✔
1511
                        log.Tracef("Received network message: "+
338✔
1512
                                "peer=%v, msg=%s, is_remote=%v",
338✔
1513
                                announcement.peer, announcement.msg.MsgType(),
338✔
1514
                                announcement.isRemote)
338✔
1515

338✔
1516
                        switch announcement.msg.(type) {
338✔
1517
                        // Channel announcement signatures are amongst the only
1518
                        // messages that we'll process serially.
1519
                        case *lnwire.AnnounceSignatures1:
21✔
1520
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
21✔
1521
                                        ctx, announcement,
21✔
1522
                                )
21✔
1523
                                log.Debugf("Processed network message %s, "+
21✔
1524
                                        "returned len(announcements)=%v",
21✔
1525
                                        announcement.msg.MsgType(),
21✔
1526
                                        len(emittedAnnouncements))
21✔
1527

21✔
1528
                                if emittedAnnouncements != nil {
31✔
1529
                                        announcements.AddMsgs(
10✔
1530
                                                emittedAnnouncements...,
10✔
1531
                                        )
10✔
1532
                                }
10✔
1533
                                continue
21✔
1534
                        }
1535

1536
                        // If this message was recently rejected, then we won't
1537
                        // attempt to re-process it.
1538
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
317✔
1539
                                announcement.msg,
317✔
1540
                                sourceToPub(announcement.source),
317✔
1541
                        ) {
318✔
1542

1✔
1543
                                announcement.err <- fmt.Errorf("recently " +
1✔
1544
                                        "rejected")
1✔
1545
                                continue
1✔
1546
                        }
1547

1548
                        // We'll set up any dependent, and wait until a free
1549
                        // slot for this job opens up, this allow us to not
1550
                        // have thousands of goroutines active.
1551
                        annJobID, err := d.vb.InitJobDependencies(
316✔
1552
                                announcement.msg,
316✔
1553
                        )
316✔
1554
                        if err != nil {
316✔
1555
                                announcement.err <- err
×
1556
                                continue
×
1557
                        }
1558

1559
                        d.wg.Add(1)
316✔
1560
                        go d.handleNetworkMessages(
316✔
1561
                                ctx, announcement, &announcements, annJobID,
316✔
1562
                        )
316✔
1563

1564
                // The trickle timer has ticked, which indicates we should
1565
                // flush to the network the pending batch of new announcements
1566
                // we've received since the last trickle tick.
1567
                case <-trickleTimer.C:
294✔
1568
                        // Emit the current batch of announcements from
294✔
1569
                        // deDupedAnnouncements.
294✔
1570
                        announcementBatch := announcements.Emit()
294✔
1571

294✔
1572
                        // If the current announcements batch is nil, then we
294✔
1573
                        // have no further work here.
294✔
1574
                        if announcementBatch.isEmpty() {
554✔
1575
                                continue
260✔
1576
                        }
1577

1578
                        // At this point, we have the set of local and remote
1579
                        // announcements we want to send out. We'll do the
1580
                        // batching as normal for both, but for local
1581
                        // announcements, we'll blast them out w/o regard for
1582
                        // our peer's policies so we ensure they propagate
1583
                        // properly.
1584
                        d.splitAndSendAnnBatch(ctx, announcementBatch)
34✔
1585

1586
                // The retransmission timer has ticked which indicates that we
1587
                // should check if we need to prune or re-broadcast any of our
1588
                // personal channels or node announcement. This addresses the
1589
                // case of "zombie" channels and channel advertisements that
1590
                // have been dropped, or not properly propagated through the
1591
                // network.
1592
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1✔
1593
                        if err := d.retransmitStaleAnns(ctx, tick); err != nil {
1✔
1594
                                log.Errorf("unable to rebroadcast stale "+
×
1595
                                        "announcements: %v", err)
×
1596
                        }
×
1597

1598
                // The gossiper has been signalled to exit, to we exit our
1599
                // main loop so the wait group can be decremented.
1600
                case <-d.quit:
30✔
1601
                        return
30✔
1602
                }
1603
        }
1604
}
1605

1606
// handleNetworkMessages is responsible for waiting for dependencies for a
1607
// given network message and processing the message. Once processed, it will
1608
// signal its dependants and add the new announcements to the announce batch.
1609
//
1610
// NOTE: must be run as a goroutine.
1611
func (d *AuthenticatedGossiper) handleNetworkMessages(ctx context.Context,
1612
        nMsg *networkMsg, deDuped *deDupedAnnouncements, jobID JobID) {
316✔
1613

316✔
1614
        defer d.wg.Done()
316✔
1615
        defer d.vb.CompleteJob()
316✔
1616

316✔
1617
        // We should only broadcast this message forward if it originated from
316✔
1618
        // us or it wasn't received as part of our initial historical sync.
316✔
1619
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
316✔
1620

316✔
1621
        // If this message has an existing dependency, then we'll wait until
316✔
1622
        // that has been fully validated before we proceed.
316✔
1623
        err := d.vb.WaitForParents(jobID, nMsg.msg)
316✔
1624
        if err != nil {
316✔
1625
                log.Debugf("Validating network message %s got err: %v",
×
1626
                        nMsg.msg.MsgType(), err)
×
1627

×
1628
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
1629
                        log.Warnf("unexpected error during validation "+
×
1630
                                "barrier shutdown: %v", err)
×
1631
                }
×
1632
                nMsg.err <- err
×
1633

×
1634
                return
×
1635
        }
1636

1637
        // Process the network announcement to determine if this is either a
1638
        // new announcement from our PoV or an edges to a prior vertex/edge we
1639
        // previously proceeded.
1640
        newAnns, allow := d.processNetworkAnnouncement(ctx, nMsg)
316✔
1641

316✔
1642
        log.Tracef("Processed network message %s, returned "+
316✔
1643
                "len(announcements)=%v, allowDependents=%v",
316✔
1644
                nMsg.msg.MsgType(), len(newAnns), allow)
316✔
1645

316✔
1646
        // If this message had any dependencies, then we can now signal them to
316✔
1647
        // continue.
316✔
1648
        err = d.vb.SignalDependents(nMsg.msg, jobID)
316✔
1649
        if err != nil {
316✔
1650
                // Something is wrong if SignalDependents returns an error.
×
1651
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
1652
                        "JobID=%v", lnutils.SpewLogClosure(nMsg.msg), jobID)
×
1653

×
1654
                nMsg.err <- err
×
1655

×
1656
                return
×
1657
        }
×
1658

1659
        // If the announcement was accepted, then add the emitted announcements
1660
        // to our announce batch to be broadcast once the trickle timer ticks
1661
        // gain.
1662
        if newAnns != nil && shouldBroadcast {
353✔
1663
                // TODO(roasbeef): exclude peer that sent.
37✔
1664
                deDuped.AddMsgs(newAnns...)
37✔
1665
        } else if newAnns != nil {
317✔
1666
                log.Trace("Skipping broadcast of announcements received " +
1✔
1667
                        "during initial graph sync")
1✔
1668
        }
1✔
1669
}
1670

1671
// TODO(roasbeef): d/c peers that send updates not on our chain
1672

1673
// InitSyncState is called by outside sub-systems when a connection is
1674
// established to a new peer that understands how to perform channel range
1675
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1676
// needed to handle new queries.
1677
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
×
1678
        d.syncMgr.InitSyncState(syncPeer)
×
1679
}
×
1680

1681
// PruneSyncState is called by outside sub-systems once a peer that we were
1682
// previously connected to has been disconnected. In this case we can stop the
1683
// existing GossipSyncer assigned to the peer and free up resources.
1684
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
×
1685
        d.syncMgr.PruneSyncState(peer)
×
1686
}
×
1687

1688
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1689
// false otherwise, This avoids expensive reprocessing of the message.
1690
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1691
        peerPub [33]byte) bool {
280✔
1692

280✔
1693
        var scid uint64
280✔
1694
        switch m := msg.(type) {
280✔
1695
        case *lnwire.ChannelUpdate1:
48✔
1696
                scid = m.ShortChannelID.ToUint64()
48✔
1697

1698
        case *lnwire.ChannelAnnouncement1:
218✔
1699
                scid = m.ShortChannelID.ToUint64()
218✔
1700

1701
        default:
14✔
1702
                return false
14✔
1703
        }
1704

1705
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
266✔
1706
        return err != cache.ErrElementNotFound
266✔
1707
}
1708

1709
// retransmitStaleAnns examines all outgoing channels that the source node is
1710
// known to maintain to check to see if any of them are "stale". A channel is
1711
// stale iff, the last timestamp of its rebroadcast is older than the
1712
// RebroadcastInterval. We also check if a refreshed node announcement should
1713
// be resent.
1714
func (d *AuthenticatedGossiper) retransmitStaleAnns(ctx context.Context,
1715
        now time.Time) error {
31✔
1716

31✔
1717
        // Iterate over all of our channels and check if any of them fall
31✔
1718
        // within the prune interval or re-broadcast interval.
31✔
1719
        type updateTuple struct {
31✔
1720
                info *models.ChannelEdgeInfo
31✔
1721
                edge *models.ChannelEdgePolicy
31✔
1722
        }
31✔
1723

31✔
1724
        var (
31✔
1725
                havePublicChannels bool
31✔
1726
                edgesToUpdate      []updateTuple
31✔
1727
        )
31✔
1728
        err := d.cfg.Graph.ForAllOutgoingChannels(ctx, func(
31✔
1729
                info *models.ChannelEdgeInfo,
31✔
1730
                edge *models.ChannelEdgePolicy) error {
33✔
1731

2✔
1732
                // If there's no auth proof attached to this edge, it means
2✔
1733
                // that it is a private channel not meant to be announced to
2✔
1734
                // the greater network, so avoid sending channel updates for
2✔
1735
                // this channel to not leak its
2✔
1736
                // existence.
2✔
1737
                if info.AuthProof == nil {
3✔
1738
                        log.Debugf("Skipping retransmission of channel "+
1✔
1739
                                "without AuthProof: %v", info.ChannelID)
1✔
1740
                        return nil
1✔
1741
                }
1✔
1742

1743
                // We make a note that we have at least one public channel. We
1744
                // use this to determine whether we should send a node
1745
                // announcement below.
1746
                havePublicChannels = true
1✔
1747

1✔
1748
                // If this edge has a ChannelUpdate that was created before the
1✔
1749
                // introduction of the MaxHTLC field, then we'll update this
1✔
1750
                // edge to propagate this information in the network.
1✔
1751
                if !edge.MessageFlags.HasMaxHtlc() {
1✔
1752
                        // We'll make sure we support the new max_htlc field if
×
1753
                        // not already present.
×
1754
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1755
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1756

×
1757
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1758
                                info: info,
×
1759
                                edge: edge,
×
1760
                        })
×
1761
                        return nil
×
1762
                }
×
1763

1764
                timeElapsed := now.Sub(edge.LastUpdate)
1✔
1765

1✔
1766
                // If it's been longer than RebroadcastInterval since we've
1✔
1767
                // re-broadcasted the channel, add the channel to the set of
1✔
1768
                // edges we need to update.
1✔
1769
                if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
1770
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
1✔
1771
                                info: info,
1✔
1772
                                edge: edge,
1✔
1773
                        })
1✔
1774
                }
1✔
1775

1776
                return nil
1✔
1777
        }, func() {
×
1778
                havePublicChannels = false
×
1779
                edgesToUpdate = nil
×
1780
        })
×
1781
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
31✔
1782
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1783
                        err)
×
1784
        }
×
1785

1786
        var signedUpdates []lnwire.Message
31✔
1787
        for _, chanToUpdate := range edgesToUpdate {
32✔
1788
                // Re-sign and update the channel on disk and retrieve our
1✔
1789
                // ChannelUpdate to broadcast.
1✔
1790
                chanAnn, chanUpdate, err := d.updateChannel(
1✔
1791
                        ctx, chanToUpdate.info, chanToUpdate.edge,
1✔
1792
                )
1✔
1793
                if err != nil {
1✔
1794
                        return fmt.Errorf("unable to update channel: %w", err)
×
1795
                }
×
1796

1797
                // If we have a valid announcement to transmit, then we'll send
1798
                // that along with the update.
1799
                if chanAnn != nil {
2✔
1800
                        signedUpdates = append(signedUpdates, chanAnn)
1✔
1801
                }
1✔
1802

1803
                signedUpdates = append(signedUpdates, chanUpdate)
1✔
1804
        }
1805

1806
        // If we don't have any public channels, we return as we don't want to
1807
        // broadcast anything that would reveal our existence.
1808
        if !havePublicChannels {
61✔
1809
                return nil
30✔
1810
        }
30✔
1811

1812
        // We'll also check that our NodeAnnouncement is not too old.
1813
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
1✔
1814
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
1✔
1815
        timeElapsed := now.Sub(timestamp)
1✔
1816

1✔
1817
        // If it's been a full day since we've re-broadcasted the
1✔
1818
        // node announcement, refresh it and resend it.
1✔
1819
        nodeAnnStr := ""
1✔
1820
        if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
1821
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
1✔
1822
                if err != nil {
1✔
1823
                        return fmt.Errorf("unable to get refreshed node "+
×
1824
                                "announcement: %v", err)
×
1825
                }
×
1826

1827
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1828
                nodeAnnStr = " and our refreshed node announcement"
1✔
1829

1✔
1830
                // Before broadcasting the refreshed node announcement, add it
1✔
1831
                // to our own graph.
1✔
1832
                if err := d.addNode(ctx, &newNodeAnn); err != nil {
2✔
1833
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1834
                                "to graph: %v", err)
1✔
1835
                }
1✔
1836
        }
1837

1838
        // If we don't have any updates to re-broadcast, then we'll exit
1839
        // early.
1840
        if len(signedUpdates) == 0 {
1✔
1841
                return nil
×
1842
        }
×
1843

1844
        log.Infof("Retransmitting %v outgoing channels%v",
1✔
1845
                len(edgesToUpdate), nodeAnnStr)
1✔
1846

1✔
1847
        // With all the wire announcements properly crafted, we'll broadcast
1✔
1848
        // our known outgoing channels to all our immediate peers.
1✔
1849
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
1850
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1851
        }
×
1852

1853
        return nil
1✔
1854
}
1855

1856
// processChanPolicyUpdate generates a new set of channel updates for the
1857
// provided list of edges and updates the backing ChannelGraphSource.
1858
func (d *AuthenticatedGossiper) processChanPolicyUpdate(ctx context.Context,
1859
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
1✔
1860

1✔
1861
        var chanUpdates []networkMsg
1✔
1862
        for _, edgeInfo := range edgesToUpdate {
4✔
1863
                // Now that we've collected all the channels we need to update,
3✔
1864
                // we'll re-sign and update the backing ChannelGraphSource, and
3✔
1865
                // retrieve our ChannelUpdate to broadcast.
3✔
1866
                _, chanUpdate, err := d.updateChannel(
3✔
1867
                        ctx, edgeInfo.Info, edgeInfo.Edge,
3✔
1868
                )
3✔
1869
                if err != nil {
3✔
1870
                        return nil, err
×
1871
                }
×
1872

1873
                // We'll avoid broadcasting any updates for private channels to
1874
                // avoid directly giving away their existence. Instead, we'll
1875
                // send the update directly to the remote party.
1876
                if edgeInfo.Info.AuthProof == nil {
4✔
1877
                        // If AuthProof is nil and an alias was found for this
1✔
1878
                        // ChannelID (meaning the option-scid-alias feature was
1✔
1879
                        // negotiated), we'll replace the ShortChannelID in the
1✔
1880
                        // update with the peer's alias. We do this after
1✔
1881
                        // updateChannel so that the alias isn't persisted to
1✔
1882
                        // the database.
1✔
1883
                        chanID := lnwire.NewChanIDFromOutPoint(
1✔
1884
                                edgeInfo.Info.ChannelPoint,
1✔
1885
                        )
1✔
1886

1✔
1887
                        var defaultAlias lnwire.ShortChannelID
1✔
1888
                        foundAlias, _ := d.cfg.GetAlias(chanID)
1✔
1889
                        if foundAlias != defaultAlias {
1✔
1890
                                chanUpdate.ShortChannelID = foundAlias
×
1891

×
1892
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
×
1893
                                if err != nil {
×
1894
                                        log.Errorf("Unable to sign alias "+
×
1895
                                                "update: %v", err)
×
1896
                                        continue
×
1897
                                }
1898

1899
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
1900
                                if err != nil {
×
1901
                                        log.Errorf("Unable to create sig: %v",
×
1902
                                                err)
×
1903
                                        continue
×
1904
                                }
1905

1906
                                chanUpdate.Signature = lnSig
×
1907
                        }
1908

1909
                        remotePubKey := remotePubFromChanInfo(
1✔
1910
                                edgeInfo.Info, chanUpdate.ChannelFlags,
1✔
1911
                        )
1✔
1912
                        err := d.reliableSender.sendMessage(
1✔
1913
                                ctx, chanUpdate, remotePubKey,
1✔
1914
                        )
1✔
1915
                        if err != nil {
1✔
1916
                                log.Errorf("Unable to reliably send %v for "+
×
1917
                                        "channel=%v to peer=%x: %v",
×
1918
                                        chanUpdate.MsgType(),
×
1919
                                        chanUpdate.ShortChannelID,
×
1920
                                        remotePubKey, err)
×
1921
                        }
×
1922
                        continue
1✔
1923
                }
1924

1925
                // We set ourselves as the source of this message to indicate
1926
                // that we shouldn't skip any peers when sending this message.
1927
                chanUpdates = append(chanUpdates, networkMsg{
2✔
1928
                        source:   d.selfKey,
2✔
1929
                        isRemote: false,
2✔
1930
                        msg:      chanUpdate,
2✔
1931
                })
2✔
1932
        }
1933

1934
        return chanUpdates, nil
1✔
1935
}
1936

1937
// remotePubFromChanInfo returns the public key of the remote peer given a
1938
// ChannelEdgeInfo that describe a channel we have with them.
1939
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1940
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
12✔
1941

12✔
1942
        var remotePubKey [33]byte
12✔
1943
        switch {
12✔
1944
        case chanFlags&lnwire.ChanUpdateDirection == 0:
12✔
1945
                remotePubKey = chanInfo.NodeKey2Bytes
12✔
1946
        case chanFlags&lnwire.ChanUpdateDirection == 1:
×
1947
                remotePubKey = chanInfo.NodeKey1Bytes
×
1948
        }
1949

1950
        return remotePubKey
12✔
1951
}
1952

1953
// processRejectedEdge examines a rejected edge to see if we can extract any
1954
// new announcements from it.  An edge will get rejected if we already added
1955
// the same edge without AuthProof to the graph. If the received announcement
1956
// contains a proof, we can add this proof to our edge.  We can end up in this
1957
// situation in the case where we create a channel, but for some reason fail
1958
// to receive the remote peer's proof, while the remote peer is able to fully
1959
// assemble the proof and craft the ChannelAnnouncement.
1960
func (d *AuthenticatedGossiper) processRejectedEdge(_ context.Context,
1961
        chanAnnMsg *lnwire.ChannelAnnouncement1,
1962
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
×
1963

×
1964
        // First, we'll fetch the state of the channel as we know if from the
×
1965
        // database.
×
1966
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
×
1967
                chanAnnMsg.ShortChannelID,
×
1968
        )
×
1969
        if err != nil {
×
1970
                return nil, err
×
1971
        }
×
1972

1973
        // The edge is in the graph, and has a proof attached, then we'll just
1974
        // reject it as normal.
1975
        if chanInfo.AuthProof != nil {
×
1976
                return nil, nil
×
1977
        }
×
1978

1979
        // Otherwise, this means that the edge is within the graph, but it
1980
        // doesn't yet have a proper proof attached. If we did not receive
1981
        // the proof such that we now can add it, there's nothing more we
1982
        // can do.
1983
        if proof == nil {
×
1984
                return nil, nil
×
1985
        }
×
1986

1987
        // We'll then create then validate the new fully assembled
1988
        // announcement.
1989
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1990
                proof, chanInfo, e1, e2,
×
1991
        )
×
1992
        if err != nil {
×
1993
                return nil, err
×
1994
        }
×
1995
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1996
        if err != nil {
×
1997
                err := fmt.Errorf("assembled channel announcement proof "+
×
1998
                        "for shortChanID=%v isn't valid: %v",
×
1999
                        chanAnnMsg.ShortChannelID, err)
×
2000
                log.Error(err)
×
2001
                return nil, err
×
2002
        }
×
2003

2004
        // If everything checks out, then we'll add the fully assembled proof
2005
        // to the database.
2006
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
2007
        if err != nil {
×
2008
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
2009
                        chanAnnMsg.ShortChannelID, err)
×
2010
                log.Error(err)
×
2011
                return nil, err
×
2012
        }
×
2013

2014
        // As we now have a complete channel announcement for this channel,
2015
        // we'll construct the announcement so they can be broadcast out to all
2016
        // our peers.
2017
        announcements := make([]networkMsg, 0, 3)
×
2018
        announcements = append(announcements, networkMsg{
×
2019
                source: d.selfKey,
×
2020
                msg:    chanAnn,
×
2021
        })
×
2022
        if e1Ann != nil {
×
2023
                announcements = append(announcements, networkMsg{
×
2024
                        source: d.selfKey,
×
2025
                        msg:    e1Ann,
×
2026
                })
×
2027
        }
×
2028
        if e2Ann != nil {
×
2029
                announcements = append(announcements, networkMsg{
×
2030
                        source: d.selfKey,
×
2031
                        msg:    e2Ann,
×
2032
                })
×
2033

×
2034
        }
×
2035

2036
        return announcements, nil
×
2037
}
2038

2039
// fetchPKScript fetches the output script for the given SCID.
2040
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
2041
        []byte, error) {
×
2042

×
2043
        return lnwallet.FetchPKScriptWithQuit(d.cfg.ChainIO, chanID, d.quit)
×
2044
}
×
2045

2046
// addNode processes the given node announcement, and adds it to our channel
2047
// graph.
2048
func (d *AuthenticatedGossiper) addNode(ctx context.Context,
2049
        msg *lnwire.NodeAnnouncement, op ...batch.SchedulerOption) error {
17✔
2050

17✔
2051
        if err := netann.ValidateNodeAnn(msg); err != nil {
18✔
2052
                return fmt.Errorf("unable to validate node announcement: %w",
1✔
2053
                        err)
1✔
2054
        }
1✔
2055

2056
        return d.cfg.Graph.AddNode(
16✔
2057
                ctx, models.NodeFromWireAnnouncement(msg), op...,
16✔
2058
        )
16✔
2059
}
2060

2061
// isPremature decides whether a given network message has a block height+delta
2062
// value specified in the future. If so, the message will be added to the
2063
// future message map and be processed when the block height as reached.
2064
//
2065
// NOTE: must be used inside a lock.
2066
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2067
        delta uint32, msg *networkMsg) bool {
289✔
2068

289✔
2069
        // The channel is already confirmed at chanID.BlockHeight so we minus
289✔
2070
        // one block. For instance, if the required confirmation for this
289✔
2071
        // channel announcement is 6, we then only need to wait for 5 more
289✔
2072
        // blocks once the funding tx is confirmed.
289✔
2073
        if delta > 0 {
289✔
2074
                delta--
×
2075
        }
×
2076

2077
        msgHeight := chanID.BlockHeight + delta
289✔
2078

289✔
2079
        // The message height is smaller or equal to our best known height,
289✔
2080
        // thus the message is mature.
289✔
2081
        if msgHeight <= d.bestHeight {
577✔
2082
                return false
288✔
2083
        }
288✔
2084

2085
        // Add the premature message to our future messages which will be
2086
        // resent once the block height has reached.
2087
        //
2088
        // Copy the networkMsgs since the old message's err chan will be
2089
        // consumed.
2090
        copied := &networkMsg{
1✔
2091
                peer:              msg.peer,
1✔
2092
                source:            msg.source,
1✔
2093
                msg:               msg.msg,
1✔
2094
                optionalMsgFields: msg.optionalMsgFields,
1✔
2095
                isRemote:          msg.isRemote,
1✔
2096
                err:               make(chan error, 1),
1✔
2097
        }
1✔
2098

1✔
2099
        // Create the cached message.
1✔
2100
        cachedMsg := &cachedFutureMsg{
1✔
2101
                msg:    copied,
1✔
2102
                height: msgHeight,
1✔
2103
        }
1✔
2104

1✔
2105
        // Increment the msg ID and add it to the cache.
1✔
2106
        nextMsgID := d.futureMsgs.nextMsgID()
1✔
2107
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
1✔
2108
        if err != nil {
1✔
2109
                log.Errorf("Adding future message got error: %v", err)
×
2110
        }
×
2111

2112
        log.Debugf("Network message: %v added to future messages for "+
1✔
2113
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
1✔
2114
                msgHeight, d.bestHeight)
1✔
2115

1✔
2116
        return true
1✔
2117
}
2118

2119
// processNetworkAnnouncement processes a new network relate authenticated
2120
// channel or node announcement or announcements proofs. If the announcement
2121
// didn't affect the internal state due to either being out of date, invalid,
2122
// or redundant, then nil is returned. Otherwise, the set of announcements will
2123
// be returned which should be broadcasted to the rest of the network. The
2124
// boolean returned indicates whether any dependents of the announcement should
2125
// attempt to be processed as well.
2126
func (d *AuthenticatedGossiper) processNetworkAnnouncement(ctx context.Context,
2127
        nMsg *networkMsg) ([]networkMsg, bool) {
337✔
2128

337✔
2129
        // If this is a remote update, we set the scheduler option to lazily
337✔
2130
        // add it to the graph.
337✔
2131
        var schedulerOp []batch.SchedulerOption
337✔
2132
        if nMsg.isRemote {
627✔
2133
                schedulerOp = append(schedulerOp, batch.LazyAdd())
290✔
2134
        }
290✔
2135

2136
        switch msg := nMsg.msg.(type) {
337✔
2137
        // A new node announcement has arrived which either presents new
2138
        // information about a node in one of the channels we know about, or a
2139
        // updating previously advertised information.
2140
        case *lnwire.NodeAnnouncement:
24✔
2141
                return d.handleNodeAnnouncement(ctx, nMsg, msg, schedulerOp)
24✔
2142

2143
        // A new channel announcement has arrived, this indicates the
2144
        // *creation* of a new channel within the network. This only advertises
2145
        // the existence of a channel and not yet the routing policies in
2146
        // either direction of the channel.
2147
        case *lnwire.ChannelAnnouncement1:
231✔
2148
                return d.handleChanAnnouncement(ctx, nMsg, msg, schedulerOp...)
231✔
2149

2150
        // A new authenticated channel edge update has arrived. This indicates
2151
        // that the directional information for an already known channel has
2152
        // been updated.
2153
        case *lnwire.ChannelUpdate1:
61✔
2154
                return d.handleChanUpdate(ctx, nMsg, msg, schedulerOp)
61✔
2155

2156
        // A new signature announcement has been received. This indicates
2157
        // willingness of nodes involved in the funding of a channel to
2158
        // announce this new channel to the rest of the world.
2159
        case *lnwire.AnnounceSignatures1:
21✔
2160
                return d.handleAnnSig(ctx, nMsg, msg)
21✔
2161

2162
        default:
×
2163
                err := errors.New("wrong type of the announcement")
×
2164
                nMsg.err <- err
×
2165
                return nil, false
×
2166
        }
2167
}
2168

2169
// processZombieUpdate determines whether the provided channel update should
2170
// resurrect a given zombie edge.
2171
//
2172
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2173
// should be inspected.
2174
func (d *AuthenticatedGossiper) processZombieUpdate(_ context.Context,
2175
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2176
        msg *lnwire.ChannelUpdate1) error {
3✔
2177

3✔
2178
        // The least-significant bit in the flag on the channel update tells us
3✔
2179
        // which edge is being updated.
3✔
2180
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
2181

3✔
2182
        // Since we've deemed the update as not stale above, before marking it
3✔
2183
        // live, we'll make sure it has been signed by the correct party. If we
3✔
2184
        // have both pubkeys, either party can resurrect the channel. If we've
3✔
2185
        // already marked this with the stricter, single-sided resurrection we
3✔
2186
        // will only have the pubkey of the node with the oldest timestamp.
3✔
2187
        var pubKey *btcec.PublicKey
3✔
2188
        switch {
3✔
2189
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2190
                pubKey, _ = chanInfo.NodeKey1()
×
2191
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
2✔
2192
                pubKey, _ = chanInfo.NodeKey2()
2✔
2193
        }
2194
        if pubKey == nil {
4✔
2195
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
1✔
2196
                        "with chan_id=%v", msg.ShortChannelID)
1✔
2197
        }
1✔
2198

2199
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
2✔
2200
        if err != nil {
3✔
2201
                return fmt.Errorf("unable to verify channel "+
1✔
2202
                        "update signature: %v", err)
1✔
2203
        }
1✔
2204

2205
        // With the signature valid, we'll proceed to mark the
2206
        // edge as live and wait for the channel announcement to
2207
        // come through again.
2208
        err = d.cfg.Graph.MarkEdgeLive(scid)
1✔
2209
        switch {
1✔
2210
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2211
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2212
                        "zombie index: %v", err)
×
2213

×
2214
                return nil
×
2215

2216
        case err != nil:
×
2217
                return fmt.Errorf("unable to remove edge with "+
×
2218
                        "chan_id=%v from zombie index: %v",
×
2219
                        msg.ShortChannelID, err)
×
2220

2221
        default:
1✔
2222
        }
2223

2224
        log.Debugf("Removed edge with chan_id=%v from zombie "+
1✔
2225
                "index", msg.ShortChannelID)
1✔
2226

1✔
2227
        return nil
1✔
2228
}
2229

2230
// fetchNodeAnn fetches the latest signed node announcement from our point of
2231
// view for the node with the given public key. It also validates the node
2232
// announcement fields and returns an error if they are invalid to prevent
2233
// forwarding invalid node announcements to our peers.
2234
func (d *AuthenticatedGossiper) fetchNodeAnn(ctx context.Context,
2235
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
20✔
2236

20✔
2237
        node, err := d.cfg.Graph.FetchNode(ctx, pubKey)
20✔
2238
        if err != nil {
26✔
2239
                return nil, err
6✔
2240
        }
6✔
2241

2242
        nodeAnn, err := node.NodeAnnouncement(true)
14✔
2243
        if err != nil {
14✔
2244
                return nil, err
×
2245
        }
×
2246

2247
        return nodeAnn, netann.ValidateNodeAnnFields(nodeAnn)
14✔
2248
}
2249

2250
// isMsgStale determines whether a message retrieved from the backing
2251
// MessageStore is seen as stale by the current graph.
2252
func (d *AuthenticatedGossiper) isMsgStale(_ context.Context,
2253
        msg lnwire.Message) bool {
12✔
2254

12✔
2255
        switch msg := msg.(type) {
12✔
2256
        case *lnwire.AnnounceSignatures1:
2✔
2257
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
2✔
2258
                        msg.ShortChannelID,
2✔
2259
                )
2✔
2260

2✔
2261
                // If the channel cannot be found, it is most likely a leftover
2✔
2262
                // message for a channel that was closed, so we can consider it
2✔
2263
                // stale.
2✔
2264
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
2✔
2265
                        return true
×
2266
                }
×
2267
                if err != nil {
2✔
2268
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2269
                                "%v", msg.ShortChannelID, err)
×
2270
                        return false
×
2271
                }
×
2272

2273
                // If the proof exists in the graph, then we have successfully
2274
                // received the remote proof and assembled the full proof, so we
2275
                // can safely delete the local proof from the database.
2276
                return chanInfo.AuthProof != nil
2✔
2277

2278
        case *lnwire.ChannelUpdate1:
10✔
2279
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
10✔
2280

10✔
2281
                // If the channel cannot be found, it is most likely a leftover
10✔
2282
                // message for a channel that was closed, so we can consider it
10✔
2283
                // stale.
10✔
2284
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
10✔
2285
                        return true
×
2286
                }
×
2287
                if err != nil {
10✔
2288
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2289
                                "%v", msg.ShortChannelID, err)
×
2290
                        return false
×
2291
                }
×
2292

2293
                // Otherwise, we'll retrieve the correct policy that we
2294
                // currently have stored within our graph to check if this
2295
                // message is stale by comparing its timestamp.
2296
                var p *models.ChannelEdgePolicy
10✔
2297
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
20✔
2298
                        p = p1
10✔
2299
                } else {
10✔
2300
                        p = p2
×
2301
                }
×
2302

2303
                // If the policy is still unknown, then we can consider this
2304
                // policy fresh.
2305
                if p == nil {
10✔
2306
                        return false
×
2307
                }
×
2308

2309
                timestamp := time.Unix(int64(msg.Timestamp), 0)
10✔
2310
                return p.LastUpdate.After(timestamp)
10✔
2311

2312
        default:
×
2313
                // We'll make sure to not mark any unsupported messages as stale
×
2314
                // to ensure they are not removed.
×
2315
                return false
×
2316
        }
2317
}
2318

2319
// updateChannel creates a new fully signed update for the channel, and updates
2320
// the underlying graph with the new state.
2321
func (d *AuthenticatedGossiper) updateChannel(ctx context.Context,
2322
        info *models.ChannelEdgeInfo,
2323
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2324
        *lnwire.ChannelUpdate1, error) {
4✔
2325

4✔
2326
        // Parse the unsigned edge into a channel update.
4✔
2327
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
4✔
2328

4✔
2329
        // We'll generate a new signature over a digest of the channel
4✔
2330
        // announcement itself and update the timestamp to ensure it propagate.
4✔
2331
        err := netann.SignChannelUpdate(
4✔
2332
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
4✔
2333
                netann.ChanUpdSetTimestamp,
4✔
2334
        )
4✔
2335
        if err != nil {
4✔
2336
                return nil, nil, err
×
2337
        }
×
2338

2339
        // Next, we'll set the new signature in place, and update the reference
2340
        // in the backing slice.
2341
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
4✔
2342
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
4✔
2343

4✔
2344
        // To ensure that our signature is valid, we'll verify it ourself
4✔
2345
        // before committing it to the slice returned.
4✔
2346
        err = netann.ValidateChannelUpdateAnn(
4✔
2347
                d.selfKey, info.Capacity, chanUpdate,
4✔
2348
        )
4✔
2349
        if err != nil {
4✔
2350
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2351
                        "update sig: %v", err)
×
2352
        }
×
2353

2354
        // Finally, we'll write the new edge policy to disk.
2355
        if err := d.cfg.Graph.UpdateEdge(ctx, edge); err != nil {
4✔
2356
                return nil, nil, err
×
2357
        }
×
2358

2359
        // We'll also create the original channel announcement so the two can
2360
        // be broadcast along side each other (if necessary), but only if we
2361
        // have a full channel announcement for this channel.
2362
        var chanAnn *lnwire.ChannelAnnouncement1
4✔
2363
        if info.AuthProof != nil {
7✔
2364
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
3✔
2365
                chanAnn = &lnwire.ChannelAnnouncement1{
3✔
2366
                        ShortChannelID:  chanID,
3✔
2367
                        NodeID1:         info.NodeKey1Bytes,
3✔
2368
                        NodeID2:         info.NodeKey2Bytes,
3✔
2369
                        ChainHash:       info.ChainHash,
3✔
2370
                        BitcoinKey1:     info.BitcoinKey1Bytes,
3✔
2371
                        Features:        lnwire.NewRawFeatureVector(),
3✔
2372
                        BitcoinKey2:     info.BitcoinKey2Bytes,
3✔
2373
                        ExtraOpaqueData: info.ExtraOpaqueData,
3✔
2374
                }
3✔
2375
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2376
                        info.AuthProof.NodeSig1Bytes,
3✔
2377
                )
3✔
2378
                if err != nil {
3✔
2379
                        return nil, nil, err
×
2380
                }
×
2381
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2382
                        info.AuthProof.NodeSig2Bytes,
3✔
2383
                )
3✔
2384
                if err != nil {
3✔
2385
                        return nil, nil, err
×
2386
                }
×
2387
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2388
                        info.AuthProof.BitcoinSig1Bytes,
3✔
2389
                )
3✔
2390
                if err != nil {
3✔
2391
                        return nil, nil, err
×
2392
                }
×
2393
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2394
                        info.AuthProof.BitcoinSig2Bytes,
3✔
2395
                )
3✔
2396
                if err != nil {
3✔
2397
                        return nil, nil, err
×
2398
                }
×
2399
        }
2400

2401
        return chanAnn, chanUpdate, err
4✔
2402
}
2403

2404
// SyncManager returns the gossiper's SyncManager instance.
2405
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
×
2406
        return d.syncMgr
×
2407
}
×
2408

2409
// IsKeepAliveUpdate determines whether this channel update is considered a
2410
// keep-alive update based on the previous channel update processed for the same
2411
// direction.
2412
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2413
        prev *models.ChannelEdgePolicy) bool {
17✔
2414

17✔
2415
        // Both updates should be from the same direction.
17✔
2416
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
17✔
2417
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
17✔
2418

×
2419
                return false
×
2420
        }
×
2421

2422
        // The timestamp should always increase for a keep-alive update.
2423
        timestamp := time.Unix(int64(update.Timestamp), 0)
17✔
2424
        if !timestamp.After(prev.LastUpdate) {
17✔
2425
                return false
×
2426
        }
×
2427

2428
        // None of the remaining fields should change for a keep-alive update.
2429
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
17✔
2430
                return false
×
2431
        }
×
2432
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
32✔
2433
                return false
15✔
2434
        }
15✔
2435
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
2✔
2436
                return false
×
2437
        }
×
2438
        if update.TimeLockDelta != prev.TimeLockDelta {
2✔
2439
                return false
×
2440
        }
×
2441
        if update.HtlcMinimumMsat != prev.MinHTLC {
2✔
2442
                return false
×
2443
        }
×
2444
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
2✔
2445
                return false
×
2446
        }
×
2447
        if update.HtlcMaximumMsat != prev.MaxHTLC {
2✔
2448
                return false
×
2449
        }
×
2450
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
2✔
2451
                return false
×
2452
        }
×
2453
        return true
2✔
2454
}
2455

2456
// latestHeight returns the gossiper's latest height known of the chain.
2457
func (d *AuthenticatedGossiper) latestHeight() uint32 {
×
2458
        d.Lock()
×
2459
        defer d.Unlock()
×
2460
        return d.bestHeight
×
2461
}
×
2462

2463
// handleNodeAnnouncement processes a new node announcement.
2464
func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context,
2465
        nMsg *networkMsg, nodeAnn *lnwire.NodeAnnouncement,
2466
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
24✔
2467

24✔
2468
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
24✔
2469

24✔
2470
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
24✔
2471
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
24✔
2472
                nMsg.source.SerializeCompressed())
24✔
2473

24✔
2474
        // We'll quickly ask the router if it already has a newer update for
24✔
2475
        // this node so we can skip validating signatures if not required.
24✔
2476
        if d.cfg.Graph.IsStaleNode(ctx, nodeAnn.NodeID, timestamp) {
32✔
2477
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
8✔
2478
                nMsg.err <- nil
8✔
2479
                return nil, true
8✔
2480
        }
8✔
2481

2482
        if err := d.addNode(ctx, nodeAnn, ops...); err != nil {
16✔
2483
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
×
2484
                        err)
×
2485

×
2486
                if !graph.IsError(
×
2487
                        err,
×
2488
                        graph.ErrOutdated,
×
2489
                        graph.ErrIgnored,
×
2490
                ) {
×
2491

×
2492
                        log.Error(err)
×
2493
                }
×
2494

2495
                nMsg.err <- err
×
2496
                return nil, false
×
2497
        }
2498

2499
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2500
        // quick check to ensure this node intends to publicly advertise itself
2501
        // to the network.
2502
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
16✔
2503
        if err != nil {
16✔
2504
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2505
                        nodeAnn.NodeID, err)
×
2506
                nMsg.err <- err
×
2507
                return nil, false
×
2508
        }
×
2509

2510
        var announcements []networkMsg
16✔
2511

16✔
2512
        // If it does, we'll add their announcement to our batch so that it can
16✔
2513
        // be broadcast to the rest of our peers.
16✔
2514
        if isPublic {
19✔
2515
                announcements = append(announcements, networkMsg{
3✔
2516
                        peer:     nMsg.peer,
3✔
2517
                        isRemote: nMsg.isRemote,
3✔
2518
                        source:   nMsg.source,
3✔
2519
                        msg:      nodeAnn,
3✔
2520
                })
3✔
2521
        } else {
16✔
2522
                log.Tracef("Skipping broadcasting node announcement for %x "+
13✔
2523
                        "due to being unadvertised", nodeAnn.NodeID)
13✔
2524
        }
13✔
2525

2526
        nMsg.err <- nil
16✔
2527
        // TODO(roasbeef): get rid of the above
16✔
2528

16✔
2529
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
16✔
2530
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
16✔
2531
                nMsg.source.SerializeCompressed())
16✔
2532

16✔
2533
        return announcements, true
16✔
2534
}
2535

2536
// handleChanAnnouncement processes a new channel announcement.
2537
//
2538
//nolint:funlen
2539
func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
2540
        nMsg *networkMsg, ann *lnwire.ChannelAnnouncement1,
2541
        ops ...batch.SchedulerOption) ([]networkMsg, bool) {
234✔
2542

234✔
2543
        scid := ann.ShortChannelID
234✔
2544

234✔
2545
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
234✔
2546
                nMsg.peer, scid.ToUint64())
234✔
2547

234✔
2548
        // We'll ignore any channel announcements that target any chain other
234✔
2549
        // than the set of chains we know of.
234✔
2550
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
234✔
2551
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2552
                        ", gossiper on chain=%v", ann.ChainHash,
×
2553
                        d.cfg.ChainHash)
×
2554
                log.Errorf(err.Error())
×
2555

×
2556
                key := newRejectCacheKey(
×
2557
                        scid.ToUint64(),
×
2558
                        sourceToPub(nMsg.source),
×
2559
                )
×
2560
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2561

×
2562
                nMsg.err <- err
×
2563
                return nil, false
×
2564
        }
×
2565

2566
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2567
        // reject the announcement. Since the router accepts alias SCIDs,
2568
        // not erroring out would be a DoS vector.
2569
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
234✔
2570
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2571
                log.Errorf(err.Error())
×
2572

×
2573
                key := newRejectCacheKey(
×
2574
                        scid.ToUint64(),
×
2575
                        sourceToPub(nMsg.source),
×
2576
                )
×
2577
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2578

×
2579
                nMsg.err <- err
×
2580
                return nil, false
×
2581
        }
×
2582

2583
        // If the advertised inclusionary block is beyond our knowledge of the
2584
        // chain tip, then we'll ignore it for now.
2585
        d.Lock()
234✔
2586
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
235✔
2587
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
1✔
2588
                        "advertises height %v, only height %v is known",
1✔
2589
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
1✔
2590
                d.Unlock()
1✔
2591
                nMsg.err <- nil
1✔
2592
                return nil, false
1✔
2593
        }
1✔
2594
        d.Unlock()
233✔
2595

233✔
2596
        // At this point, we'll now ask the router if this is a zombie/known
233✔
2597
        // edge. If so we can skip all the processing below.
233✔
2598
        if d.cfg.Graph.IsKnownEdge(scid) {
234✔
2599
                nMsg.err <- nil
1✔
2600
                return nil, true
1✔
2601
        }
1✔
2602

2603
        // Check if the channel is already closed in which case we can ignore
2604
        // it.
2605
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
232✔
2606
        if err != nil {
232✔
2607
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2608
                        err)
×
2609
                nMsg.err <- err
×
2610

×
2611
                return nil, false
×
2612
        }
×
2613

2614
        if closed {
233✔
2615
                err = fmt.Errorf("ignoring closed channel %v", scid)
1✔
2616

1✔
2617
                // If this is an announcement from us, we'll just ignore it.
1✔
2618
                if !nMsg.isRemote {
1✔
2619
                        nMsg.err <- err
×
2620
                        return nil, false
×
2621
                }
×
2622

2623
                log.Warnf("Increasing ban score for peer=%v due to outdated "+
1✔
2624
                        "channel announcement for channel %v", nMsg.peer, scid)
1✔
2625

1✔
2626
                // Increment the peer's ban score if they are sending closed
1✔
2627
                // channel announcements.
1✔
2628
                dcErr := d.handleBadPeer(nMsg.peer)
1✔
2629
                if dcErr != nil {
1✔
2630
                        err = dcErr
×
2631
                }
×
2632

2633
                nMsg.err <- err
1✔
2634

1✔
2635
                return nil, false
1✔
2636
        }
2637

2638
        // If this is a remote channel announcement, then we'll validate all
2639
        // the signatures within the proof as it should be well formed.
2640
        var proof *models.ChannelAuthProof
231✔
2641
        if nMsg.isRemote {
448✔
2642
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
217✔
2643
                if err != nil {
217✔
2644
                        err := fmt.Errorf("unable to validate announcement: "+
×
2645
                                "%v", err)
×
2646

×
2647
                        key := newRejectCacheKey(
×
2648
                                scid.ToUint64(),
×
2649
                                sourceToPub(nMsg.source),
×
2650
                        )
×
2651
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2652

×
2653
                        log.Error(err)
×
2654
                        nMsg.err <- err
×
2655
                        return nil, false
×
2656
                }
×
2657

2658
                // If the proof checks out, then we'll save the proof itself to
2659
                // the database so we can fetch it later when gossiping with
2660
                // other nodes.
2661
                proof = &models.ChannelAuthProof{
217✔
2662
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
217✔
2663
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
217✔
2664
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
217✔
2665
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
217✔
2666
                }
217✔
2667
        }
2668

2669
        // With the proof validated (if necessary), we can now store it within
2670
        // the database for our path finding and syncing needs.
2671
        edge := &models.ChannelEdgeInfo{
231✔
2672
                ChannelID:        scid.ToUint64(),
231✔
2673
                ChainHash:        ann.ChainHash,
231✔
2674
                NodeKey1Bytes:    ann.NodeID1,
231✔
2675
                NodeKey2Bytes:    ann.NodeID2,
231✔
2676
                BitcoinKey1Bytes: ann.BitcoinKey1,
231✔
2677
                BitcoinKey2Bytes: ann.BitcoinKey2,
231✔
2678
                AuthProof:        proof,
231✔
2679
                Features: lnwire.NewFeatureVector(
231✔
2680
                        ann.Features, lnwire.Features,
231✔
2681
                ),
231✔
2682
                ExtraOpaqueData: ann.ExtraOpaqueData,
231✔
2683
        }
231✔
2684

231✔
2685
        // If there were any optional message fields provided, we'll include
231✔
2686
        // them in its serialized disk representation now.
231✔
2687
        var tapscriptRoot fn.Option[chainhash.Hash]
231✔
2688
        if nMsg.optionalMsgFields != nil {
245✔
2689
                if nMsg.optionalMsgFields.capacity != nil {
15✔
2690
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
1✔
2691
                }
1✔
2692
                if nMsg.optionalMsgFields.channelPoint != nil {
18✔
2693
                        cp := *nMsg.optionalMsgFields.channelPoint
4✔
2694
                        edge.ChannelPoint = cp
4✔
2695
                }
4✔
2696

2697
                // Optional tapscript root for custom channels.
2698
                tapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
14✔
2699
        }
2700

2701
        // Before we start validation or add the edge to the database, we obtain
2702
        // the mutex for this channel ID. We do this to ensure no other
2703
        // goroutine has read the database and is now making decisions based on
2704
        // this DB state, before it writes to the DB. It also ensures that we
2705
        // don't perform the expensive validation check on the same channel
2706
        // announcement at the same time.
2707
        d.channelMtx.Lock(scid.ToUint64())
231✔
2708

231✔
2709
        // If AssumeChannelValid is present, then we are unable to perform any
231✔
2710
        // of the expensive checks below, so we'll short-circuit our path
231✔
2711
        // straight to adding the edge to our graph. If the passed
231✔
2712
        // ShortChannelID is an alias, then we'll skip validation as it will
231✔
2713
        // not map to a legitimate tx. This is not a DoS vector as only we can
231✔
2714
        // add an alias ChannelAnnouncement from the gossiper.
231✔
2715
        if !(d.cfg.AssumeChannelValid || d.cfg.IsAlias(scid)) {
460✔
2716
                op, capacity, script, err := d.validateFundingTransaction(
229✔
2717
                        ctx, ann, tapscriptRoot,
229✔
2718
                )
229✔
2719
                if err != nil {
433✔
2720
                        defer d.channelMtx.Unlock(scid.ToUint64())
204✔
2721

204✔
2722
                        switch {
204✔
2723
                        case errors.Is(err, ErrNoFundingTransaction),
2724
                                errors.Is(err, ErrInvalidFundingOutput):
202✔
2725

202✔
2726
                                key := newRejectCacheKey(
202✔
2727
                                        scid.ToUint64(),
202✔
2728
                                        sourceToPub(nMsg.source),
202✔
2729
                                )
202✔
2730
                                _, _ = d.recentRejects.Put(
202✔
2731
                                        key, &cachedReject{},
202✔
2732
                                )
202✔
2733

2734
                        case errors.Is(err, ErrChannelSpent):
2✔
2735
                                key := newRejectCacheKey(
2✔
2736
                                        scid.ToUint64(),
2✔
2737
                                        sourceToPub(nMsg.source),
2✔
2738
                                )
2✔
2739
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
2✔
2740

2✔
2741
                                // Since this channel has already been closed,
2✔
2742
                                // we'll add it to the graph's closed channel
2✔
2743
                                // index such that we won't attempt to do
2✔
2744
                                // expensive validation checks on it again.
2✔
2745
                                // TODO: Populate the ScidCloser by using closed
2✔
2746
                                // channel notifications.
2✔
2747
                                dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
2✔
2748
                                if dbErr != nil {
2✔
2749
                                        log.Errorf("failed to mark scid(%v) "+
×
2750
                                                "as closed: %v", scid, dbErr)
×
2751

×
2752
                                        nMsg.err <- dbErr
×
2753

×
2754
                                        return nil, false
×
2755
                                }
×
2756

2757
                        default:
×
2758
                                // Otherwise, this is just a regular rejected
×
2759
                                // edge. We won't increase the ban score for the
×
2760
                                // remote peer.
×
2761
                                key := newRejectCacheKey(
×
2762
                                        scid.ToUint64(),
×
2763
                                        sourceToPub(nMsg.source),
×
2764
                                )
×
2765
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2766

×
2767
                                nMsg.err <- err
×
2768

×
2769
                                return nil, false
×
2770
                        }
2771

2772
                        if !nMsg.isRemote {
204✔
2773
                                log.Errorf("failed to add edge for local "+
×
2774
                                        "channel: %v", err)
×
2775
                                nMsg.err <- err
×
2776

×
2777
                                return nil, false
×
2778
                        }
×
2779

2780
                        log.Warnf("Increasing ban score for peer=%v due to "+
204✔
2781
                                "invalid channel announcement for channel %v",
204✔
2782
                                nMsg.peer, scid)
204✔
2783

204✔
2784
                        // Increment the peer's ban score if they are sending
204✔
2785
                        // us invalid channel announcements.
204✔
2786
                        dcErr := d.handleBadPeer(nMsg.peer)
204✔
2787
                        if dcErr != nil {
204✔
2788
                                err = dcErr
×
2789
                        }
×
2790

2791
                        nMsg.err <- err
204✔
2792

204✔
2793
                        return nil, false
204✔
2794
                }
2795

2796
                edge.FundingScript = fn.Some(script)
25✔
2797

25✔
2798
                // TODO(roasbeef): this is a hack, needs to be removed after
25✔
2799
                //  commitment fees are dynamic.
25✔
2800
                edge.Capacity = capacity
25✔
2801
                edge.ChannelPoint = op
25✔
2802
        }
2803

2804
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
27✔
2805

27✔
2806
        // We will add the edge to the channel router. If the nodes present in
27✔
2807
        // this channel are not present in the database, a partial node will be
27✔
2808
        // added to represent each node while we wait for a node announcement.
27✔
2809
        err = d.cfg.Graph.AddEdge(ctx, edge, ops...)
27✔
2810
        if err != nil {
28✔
2811
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
1✔
2812
                        scid.ToUint64(), err)
1✔
2813

1✔
2814
                defer d.channelMtx.Unlock(scid.ToUint64())
1✔
2815

1✔
2816
                // If the edge was rejected due to already being known, then it
1✔
2817
                // may be the case that this new message has a fresh channel
1✔
2818
                // proof, so we'll check.
1✔
2819
                if graph.IsError(err, graph.ErrIgnored) {
1✔
2820
                        // Attempt to process the rejected message to see if we
×
2821
                        // get any new announcements.
×
2822
                        anns, rErr := d.processRejectedEdge(ctx, ann, proof)
×
2823
                        if rErr != nil {
×
2824
                                key := newRejectCacheKey(
×
2825
                                        scid.ToUint64(),
×
2826
                                        sourceToPub(nMsg.source),
×
2827
                                )
×
2828
                                cr := &cachedReject{}
×
2829
                                _, _ = d.recentRejects.Put(key, cr)
×
2830

×
2831
                                nMsg.err <- rErr
×
2832

×
2833
                                return nil, false
×
2834
                        }
×
2835

2836
                        log.Debugf("Extracted %v announcements from rejected "+
×
2837
                                "msgs", len(anns))
×
2838

×
2839
                        // If while processing this rejected edge, we realized
×
2840
                        // there's a set of announcements we could extract,
×
2841
                        // then we'll return those directly.
×
2842
                        //
×
2843
                        // NOTE: since this is an ErrIgnored, we can return
×
2844
                        // true here to signal "allow" to its dependants.
×
2845
                        nMsg.err <- nil
×
2846

×
2847
                        return anns, true
×
2848
                }
2849

2850
                // Otherwise, this is just a regular rejected edge.
2851
                key := newRejectCacheKey(
1✔
2852
                        scid.ToUint64(),
1✔
2853
                        sourceToPub(nMsg.source),
1✔
2854
                )
1✔
2855
                _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2856

1✔
2857
                if !nMsg.isRemote {
1✔
2858
                        log.Errorf("failed to add edge for local channel: %v",
×
2859
                                err)
×
2860
                        nMsg.err <- err
×
2861

×
2862
                        return nil, false
×
2863
                }
×
2864

2865
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2866
                if dcErr != nil {
1✔
2867
                        log.Errorf("failed to check if we should disconnect "+
×
2868
                                "peer: %v", dcErr)
×
2869
                        nMsg.err <- dcErr
×
2870

×
2871
                        return nil, false
×
2872
                }
×
2873

2874
                if shouldDc {
1✔
2875
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2876
                }
×
2877

2878
                nMsg.err <- err
1✔
2879

1✔
2880
                return nil, false
1✔
2881
        }
2882

2883
        // If err is nil, release the lock immediately.
2884
        d.channelMtx.Unlock(scid.ToUint64())
26✔
2885

26✔
2886
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
26✔
2887

26✔
2888
        // If we earlier received any ChannelUpdates for this channel, we can
26✔
2889
        // now process them, as the channel is added to the graph.
26✔
2890
        var channelUpdates []*processedNetworkMsg
26✔
2891

26✔
2892
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
26✔
2893
        if err == nil {
28✔
2894
                // There was actually an entry in the map, so we'll accumulate
2✔
2895
                // it. We don't worry about deletion, since it'll eventually
2✔
2896
                // fall out anyway.
2✔
2897
                chanMsgs := earlyChanUpdates
2✔
2898
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
2✔
2899
        }
2✔
2900

2901
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2902
        // ensure we don't block here, as we can handle only one announcement
2903
        // at a time.
2904
        for _, cu := range channelUpdates {
28✔
2905
                // Skip if already processed.
2✔
2906
                if cu.processed {
2✔
2907
                        continue
×
2908
                }
2909

2910
                // Mark the ChannelUpdate as processed. This ensures that a
2911
                // subsequent announcement in the option-scid-alias case does
2912
                // not re-use an old ChannelUpdate.
2913
                cu.processed = true
2✔
2914

2✔
2915
                d.wg.Add(1)
2✔
2916
                go func(updMsg *networkMsg) {
4✔
2917
                        defer d.wg.Done()
2✔
2918

2✔
2919
                        switch msg := updMsg.msg.(type) {
2✔
2920
                        // Reprocess the message, making sure we return an
2921
                        // error to the original caller in case the gossiper
2922
                        // shuts down.
2923
                        case *lnwire.ChannelUpdate1:
2✔
2924
                                log.Debugf("Reprocessing ChannelUpdate for "+
2✔
2925
                                        "shortChanID=%v", scid.ToUint64())
2✔
2926

2✔
2927
                                select {
2✔
2928
                                case d.networkMsgs <- updMsg:
2✔
2929
                                case <-d.quit:
×
2930
                                        updMsg.err <- ErrGossiperShuttingDown
×
2931
                                }
2932

2933
                        // We don't expect any other message type than
2934
                        // ChannelUpdate to be in this cache.
2935
                        default:
×
2936
                                log.Errorf("Unsupported message type found "+
×
2937
                                        "among ChannelUpdates: %T", msg)
×
2938
                        }
2939
                }(cu.msg)
2940
        }
2941

2942
        // Channel announcement was successfully processed and now it might be
2943
        // broadcast to other connected nodes if it was an announcement with
2944
        // proof (remote).
2945
        var announcements []networkMsg
26✔
2946

26✔
2947
        if proof != nil {
38✔
2948
                announcements = append(announcements, networkMsg{
12✔
2949
                        peer:     nMsg.peer,
12✔
2950
                        isRemote: nMsg.isRemote,
12✔
2951
                        source:   nMsg.source,
12✔
2952
                        msg:      ann,
12✔
2953
                })
12✔
2954
        }
12✔
2955

2956
        nMsg.err <- nil
26✔
2957

26✔
2958
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
26✔
2959
                nMsg.peer, scid.ToUint64())
26✔
2960

26✔
2961
        return announcements, true
26✔
2962
}
2963

2964
// handleChanUpdate processes a new channel update.
2965
//
2966
//nolint:funlen
2967
func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
2968
        nMsg *networkMsg, upd *lnwire.ChannelUpdate1,
2969
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
61✔
2970

61✔
2971
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
61✔
2972
                nMsg.peer, upd.ShortChannelID.ToUint64())
61✔
2973

61✔
2974
        // We'll ignore any channel updates that target any chain other than
61✔
2975
        // the set of chains we know of.
61✔
2976
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
61✔
2977
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2978
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
×
2979
                log.Errorf(err.Error())
×
2980

×
2981
                key := newRejectCacheKey(
×
2982
                        upd.ShortChannelID.ToUint64(),
×
2983
                        sourceToPub(nMsg.source),
×
2984
                )
×
2985
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2986

×
2987
                nMsg.err <- err
×
2988
                return nil, false
×
2989
        }
×
2990

2991
        blockHeight := upd.ShortChannelID.BlockHeight
61✔
2992
        shortChanID := upd.ShortChannelID.ToUint64()
61✔
2993

61✔
2994
        // If the advertised inclusionary block is beyond our knowledge of the
61✔
2995
        // chain tip, then we'll put the announcement in limbo to be fully
61✔
2996
        // verified once we advance forward in the chain. If the update has an
61✔
2997
        // alias SCID, we'll skip the isPremature check. This is necessary
61✔
2998
        // since aliases start at block height 16_000_000.
61✔
2999
        d.Lock()
61✔
3000
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
61✔
3001
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
61✔
3002

×
3003
                log.Warnf("Update announcement for short_chan_id(%v), is "+
×
3004
                        "premature: advertises height %v, only height %v is "+
×
3005
                        "known", shortChanID, blockHeight, d.bestHeight)
×
3006
                d.Unlock()
×
3007
                nMsg.err <- nil
×
3008
                return nil, false
×
3009
        }
×
3010
        d.Unlock()
61✔
3011

61✔
3012
        // Before we perform any of the expensive checks below, we'll check
61✔
3013
        // whether this update is stale or is for a zombie channel in order to
61✔
3014
        // quickly reject it.
61✔
3015
        timestamp := time.Unix(int64(upd.Timestamp), 0)
61✔
3016

61✔
3017
        // Fetch the SCID we should be using to lock the channelMtx and make
61✔
3018
        // graph queries with.
61✔
3019
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
61✔
3020
        if err != nil {
122✔
3021
                // Fallback and set the graphScid to the peer-provided SCID.
61✔
3022
                // This will occur for non-option-scid-alias channels and for
61✔
3023
                // public option-scid-alias channels after 6 confirmations.
61✔
3024
                // Once public option-scid-alias channels have 6 confs, we'll
61✔
3025
                // ignore ChannelUpdates with one of their aliases.
61✔
3026
                graphScid = upd.ShortChannelID
61✔
3027
        }
61✔
3028

3029
        // We make sure to obtain the mutex for this channel ID before we access
3030
        // the database. This ensures the state we read from the database has
3031
        // not changed between this point and when we call UpdateEdge() later.
3032
        d.channelMtx.Lock(graphScid.ToUint64())
61✔
3033
        defer d.channelMtx.Unlock(graphScid.ToUint64())
61✔
3034

61✔
3035
        if d.cfg.Graph.IsStaleEdgePolicy(
61✔
3036
                graphScid, timestamp, upd.ChannelFlags,
61✔
3037
        ) {
64✔
3038

3✔
3039
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
3✔
3040
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
3✔
3041
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
3✔
3042
                )
3✔
3043

3✔
3044
                nMsg.err <- nil
3✔
3045
                return nil, true
3✔
3046
        }
3✔
3047

3048
        // Check that the ChanUpdate is not too far into the future, this could
3049
        // reveal some faulty implementation therefore we log an error.
3050
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
58✔
3051
                err := fmt.Errorf("skewed timestamp of edge policy, "+
×
3052
                        "timestamp too far in the future: %v", timestamp.Unix())
×
3053

×
3054
                // If this is a channel_update from us, we'll just ignore it.
×
3055
                if !nMsg.isRemote {
×
3056
                        nMsg.err <- err
×
3057
                        return nil, false
×
3058
                }
×
3059

3060
                log.Errorf("Increasing ban score for peer=%v due to bad "+
×
3061
                        "channel_update with short_chan_id(%v): timestamp(%v) "+
×
3062
                        "too far in the future", nMsg.peer, shortChanID,
×
3063
                        timestamp.Unix())
×
3064

×
3065
                // Increment the peer's ban score if they are skewed channel
×
3066
                // updates.
×
3067
                dcErr := d.handleBadPeer(nMsg.peer)
×
3068
                if dcErr != nil {
×
3069
                        err = dcErr
×
3070
                }
×
3071

3072
                nMsg.err <- err
×
3073

×
3074
                return nil, false
×
3075
        }
3076

3077
        // Get the node pub key as far since we don't have it in the channel
3078
        // update announcement message. We'll need this to properly verify the
3079
        // message's signature.
3080
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
58✔
3081
        switch {
58✔
3082
        // No error, break.
3083
        case err == nil:
54✔
3084
                break
54✔
3085

3086
        case errors.Is(err, graphdb.ErrZombieEdge):
3✔
3087
                err = d.processZombieUpdate(ctx, chanInfo, graphScid, upd)
3✔
3088
                if err != nil {
5✔
3089
                        log.Debug(err)
2✔
3090
                        nMsg.err <- err
2✔
3091
                        return nil, false
2✔
3092
                }
2✔
3093

3094
                // We'll fallthrough to ensure we stash the update until we
3095
                // receive its corresponding ChannelAnnouncement. This is
3096
                // needed to ensure the edge exists in the graph before
3097
                // applying the update.
3098
                fallthrough
1✔
3099
        case errors.Is(err, graphdb.ErrGraphNotFound):
1✔
3100
                fallthrough
1✔
3101
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
1✔
3102
                fallthrough
1✔
3103
        case errors.Is(err, graphdb.ErrEdgeNotFound):
2✔
3104
                // If the edge corresponding to this ChannelUpdate was not
2✔
3105
                // found in the graph, this might be a channel in the process
2✔
3106
                // of being opened, and we haven't processed our own
2✔
3107
                // ChannelAnnouncement yet, hence it is not not found in the
2✔
3108
                // graph. This usually gets resolved after the channel proofs
2✔
3109
                // are exchanged and the channel is broadcasted to the rest of
2✔
3110
                // the network, but in case this is a private channel this
2✔
3111
                // won't ever happen. This can also happen in the case of a
2✔
3112
                // zombie channel with a fresh update for which we don't have a
2✔
3113
                // ChannelAnnouncement for since we reject them. Because of
2✔
3114
                // this, we temporarily add it to a map, and reprocess it after
2✔
3115
                // our own ChannelAnnouncement has been processed.
2✔
3116
                //
2✔
3117
                // The shortChanID may be an alias, but it is fine to use here
2✔
3118
                // since we don't have an edge in the graph and if the peer is
2✔
3119
                // not buggy, we should be able to use it once the gossiper
2✔
3120
                // receives the local announcement.
2✔
3121
                pMsg := &processedNetworkMsg{msg: nMsg}
2✔
3122

2✔
3123
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
2✔
3124
                switch {
2✔
3125
                // Nothing in the cache yet, we can just directly insert this
3126
                // element.
3127
                case err == cache.ErrElementNotFound:
2✔
3128
                        _, _ = d.prematureChannelUpdates.Put(
2✔
3129
                                shortChanID, &cachedNetworkMsg{
2✔
3130
                                        msgs: []*processedNetworkMsg{pMsg},
2✔
3131
                                })
2✔
3132

3133
                // There's already something in the cache, so we'll combine the
3134
                // set of messages into a single value.
3135
                default:
×
3136
                        msgs := earlyMsgs.msgs
×
3137
                        msgs = append(msgs, pMsg)
×
3138
                        _, _ = d.prematureChannelUpdates.Put(
×
3139
                                shortChanID, &cachedNetworkMsg{
×
3140
                                        msgs: msgs,
×
3141
                                })
×
3142
                }
3143

3144
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
2✔
3145
                        "(shortChanID=%v), saving for reprocessing later",
2✔
3146
                        shortChanID)
2✔
3147

2✔
3148
                // NOTE: We don't return anything on the error channel for this
2✔
3149
                // message, as we expect that will be done when this
2✔
3150
                // ChannelUpdate is later reprocessed. This might never happen
2✔
3151
                // if the corresponding ChannelAnnouncement is never received
2✔
3152
                // or the LRU cache is filled up and the entry is evicted.
2✔
3153
                return nil, false
2✔
3154

3155
        default:
×
3156
                err := fmt.Errorf("unable to validate channel update "+
×
3157
                        "short_chan_id=%v: %v", shortChanID, err)
×
3158
                log.Error(err)
×
3159
                nMsg.err <- err
×
3160

×
3161
                key := newRejectCacheKey(
×
3162
                        upd.ShortChannelID.ToUint64(),
×
3163
                        sourceToPub(nMsg.source),
×
3164
                )
×
3165
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3166

×
3167
                return nil, false
×
3168
        }
3169

3170
        // The least-significant bit in the flag on the channel update
3171
        // announcement tells us "which" side of the channels directed edge is
3172
        // being updated.
3173
        var (
54✔
3174
                pubKey       *btcec.PublicKey
54✔
3175
                edgeToUpdate *models.ChannelEdgePolicy
54✔
3176
        )
54✔
3177
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
54✔
3178
        switch direction {
54✔
3179
        case 0:
38✔
3180
                pubKey, _ = chanInfo.NodeKey1()
38✔
3181
                edgeToUpdate = e1
38✔
3182
        case 1:
16✔
3183
                pubKey, _ = chanInfo.NodeKey2()
16✔
3184
                edgeToUpdate = e2
16✔
3185
        }
3186

3187
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
54✔
3188
                "edge policy=%v", chanInfo.ChannelID,
54✔
3189
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
54✔
3190

54✔
3191
        // Validate the channel announcement with the expected public key and
54✔
3192
        // channel capacity. In the case of an invalid channel update, we'll
54✔
3193
        // return an error to the caller and exit early.
54✔
3194
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
54✔
3195
        if err != nil {
58✔
3196
                rErr := fmt.Errorf("unable to validate channel update "+
4✔
3197
                        "announcement for short_chan_id=%v: %v",
4✔
3198
                        lnutils.SpewLogClosure(upd.ShortChannelID), err)
4✔
3199

4✔
3200
                log.Error(rErr)
4✔
3201
                nMsg.err <- rErr
4✔
3202
                return nil, false
4✔
3203
        }
4✔
3204

3205
        // If we have a previous version of the edge being updated, we'll want
3206
        // to rate limit its updates to prevent spam throughout the network.
3207
        if nMsg.isRemote && edgeToUpdate != nil {
67✔
3208
                // If it's a keep-alive update, we'll only propagate one if
17✔
3209
                // it's been a day since the previous. This follows our own
17✔
3210
                // heuristic of sending keep-alive updates after the same
17✔
3211
                // duration (see retransmitStaleAnns).
17✔
3212
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
17✔
3213
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
19✔
3214
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
3✔
3215
                                log.Debugf("Ignoring keep alive update not "+
1✔
3216
                                        "within %v period for channel %v",
1✔
3217
                                        d.cfg.RebroadcastInterval, shortChanID)
1✔
3218
                                nMsg.err <- nil
1✔
3219
                                return nil, false
1✔
3220
                        }
1✔
3221
                } else {
15✔
3222
                        // If it's not, we'll allow an update per minute with a
15✔
3223
                        // maximum burst of 10. If we haven't seen an update
15✔
3224
                        // for this channel before, we'll need to initialize a
15✔
3225
                        // rate limiter for each direction.
15✔
3226
                        //
15✔
3227
                        // Since the edge exists in the graph, we'll create a
15✔
3228
                        // rate limiter for chanInfo.ChannelID rather then the
15✔
3229
                        // SCID the peer sent. This is because there may be
15✔
3230
                        // multiple aliases for a channel and we may otherwise
15✔
3231
                        // rate-limit only a single alias of the channel,
15✔
3232
                        // instead of the whole channel.
15✔
3233
                        baseScid := chanInfo.ChannelID
15✔
3234
                        d.Lock()
15✔
3235
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
15✔
3236
                        if !ok {
17✔
3237
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
2✔
3238
                                b := d.cfg.MaxChannelUpdateBurst
2✔
3239
                                rls = [2]*rate.Limiter{
2✔
3240
                                        rate.NewLimiter(r, b),
2✔
3241
                                        rate.NewLimiter(r, b),
2✔
3242
                                }
2✔
3243
                                d.chanUpdateRateLimiter[baseScid] = rls
2✔
3244
                        }
2✔
3245
                        d.Unlock()
15✔
3246

15✔
3247
                        if !rls[direction].Allow() {
21✔
3248
                                log.Debugf("Rate limiting update for channel "+
6✔
3249
                                        "%v from direction %x", shortChanID,
6✔
3250
                                        pubKey.SerializeCompressed())
6✔
3251
                                nMsg.err <- nil
6✔
3252
                                return nil, false
6✔
3253
                        }
6✔
3254
                }
3255
        }
3256

3257
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3258
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3259
        // lookup the stored SCID. If we're sending the update, we'll always
3260
        // use the SCID stored in the database rather than a potentially
3261
        // different alias. This might mean that SigBytes is incorrect as it
3262
        // signs a different SCID than the database SCID, but since there will
3263
        // only be a difference if AuthProof == nil, this is fine.
3264
        update := &models.ChannelEdgePolicy{
43✔
3265
                SigBytes:                  upd.Signature.ToSignatureBytes(),
43✔
3266
                ChannelID:                 chanInfo.ChannelID,
43✔
3267
                LastUpdate:                timestamp,
43✔
3268
                MessageFlags:              upd.MessageFlags,
43✔
3269
                ChannelFlags:              upd.ChannelFlags,
43✔
3270
                TimeLockDelta:             upd.TimeLockDelta,
43✔
3271
                MinHTLC:                   upd.HtlcMinimumMsat,
43✔
3272
                MaxHTLC:                   upd.HtlcMaximumMsat,
43✔
3273
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
43✔
3274
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
43✔
3275
                InboundFee:                upd.InboundFee.ValOpt(),
43✔
3276
                ExtraOpaqueData:           upd.ExtraOpaqueData,
43✔
3277
        }
43✔
3278

43✔
3279
        if err := d.cfg.Graph.UpdateEdge(ctx, update, ops...); err != nil {
43✔
3280
                if graph.IsError(
×
3281
                        err, graph.ErrOutdated,
×
3282
                        graph.ErrIgnored,
×
3283
                ) {
×
3284

×
3285
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
×
3286
                                shortChanID, err)
×
3287
                } else {
×
3288
                        // Since we know the stored SCID in the graph, we'll
×
3289
                        // cache that SCID.
×
3290
                        key := newRejectCacheKey(
×
3291
                                chanInfo.ChannelID,
×
3292
                                sourceToPub(nMsg.source),
×
3293
                        )
×
3294
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3295

×
3296
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3297
                                shortChanID, err)
×
3298
                }
×
3299

3300
                nMsg.err <- err
×
3301
                return nil, false
×
3302
        }
3303

3304
        // If this is a local ChannelUpdate without an AuthProof, it means it
3305
        // is an update to a channel that is not (yet) supposed to be announced
3306
        // to the greater network. However, our channel counter party will need
3307
        // to be given the update, so we'll try sending the update directly to
3308
        // the remote peer.
3309
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
54✔
3310
                if nMsg.optionalMsgFields != nil {
22✔
3311
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
11✔
3312
                        if remoteAlias != nil {
11✔
3313
                                // The remoteAlias field was specified, meaning
×
3314
                                // that we should replace the SCID in the
×
3315
                                // update with the remote's alias. We'll also
×
3316
                                // need to re-sign the channel update. This is
×
3317
                                // required for option-scid-alias feature-bit
×
3318
                                // negotiated channels.
×
3319
                                upd.ShortChannelID = *remoteAlias
×
3320

×
3321
                                sig, err := d.cfg.SignAliasUpdate(upd)
×
3322
                                if err != nil {
×
3323
                                        log.Error(err)
×
3324
                                        nMsg.err <- err
×
3325
                                        return nil, false
×
3326
                                }
×
3327

3328
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
3329
                                if err != nil {
×
3330
                                        log.Error(err)
×
3331
                                        nMsg.err <- err
×
3332
                                        return nil, false
×
3333
                                }
×
3334

3335
                                upd.Signature = lnSig
×
3336
                        }
3337
                }
3338

3339
                // Get our peer's public key.
3340
                remotePubKey := remotePubFromChanInfo(
11✔
3341
                        chanInfo, upd.ChannelFlags,
11✔
3342
                )
11✔
3343

11✔
3344
                log.Debugf("The message %v has no AuthProof, sending the "+
11✔
3345
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
11✔
3346

11✔
3347
                // Now we'll attempt to send the channel update message
11✔
3348
                // reliably to the remote peer in the background, so that we
11✔
3349
                // don't block if the peer happens to be offline at the moment.
11✔
3350
                err := d.reliableSender.sendMessage(ctx, upd, remotePubKey)
11✔
3351
                if err != nil {
11✔
3352
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3353
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3354
                                upd.ShortChannelID, remotePubKey, err)
×
3355
                        nMsg.err <- err
×
3356
                        return nil, false
×
3357
                }
×
3358
        }
3359

3360
        // Channel update announcement was successfully processed and now it
3361
        // can be broadcast to the rest of the network. However, we'll only
3362
        // broadcast the channel update announcement if it has an attached
3363
        // authentication proof. We also won't broadcast the update if it
3364
        // contains an alias because the network would reject this.
3365
        var announcements []networkMsg
43✔
3366
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
66✔
3367
                announcements = append(announcements, networkMsg{
23✔
3368
                        peer:     nMsg.peer,
23✔
3369
                        source:   nMsg.source,
23✔
3370
                        isRemote: nMsg.isRemote,
23✔
3371
                        msg:      upd,
23✔
3372
                })
23✔
3373
        }
23✔
3374

3375
        nMsg.err <- nil
43✔
3376

43✔
3377
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
43✔
3378
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
43✔
3379
                timestamp)
43✔
3380
        return announcements, true
43✔
3381
}
3382

3383
// handleAnnSig processes a new announcement signatures message.
3384
//
3385
//nolint:funlen
3386
func (d *AuthenticatedGossiper) handleAnnSig(ctx context.Context,
3387
        nMsg *networkMsg, ann *lnwire.AnnounceSignatures1) ([]networkMsg,
3388
        bool) {
21✔
3389

21✔
3390
        needBlockHeight := ann.ShortChannelID.BlockHeight +
21✔
3391
                d.cfg.ProofMatureDelta
21✔
3392
        shortChanID := ann.ShortChannelID.ToUint64()
21✔
3393

21✔
3394
        prefix := "local"
21✔
3395
        if nMsg.isRemote {
32✔
3396
                prefix = "remote"
11✔
3397
        }
11✔
3398

3399
        log.Infof("Received new %v announcement signature for %v", prefix,
21✔
3400
                ann.ShortChannelID)
21✔
3401

21✔
3402
        // By the specification, channel announcement proofs should be sent
21✔
3403
        // after some number of confirmations after channel was registered in
21✔
3404
        // bitcoin blockchain. Therefore, we check if the proof is mature.
21✔
3405
        d.Lock()
21✔
3406
        premature := d.isPremature(
21✔
3407
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
21✔
3408
        )
21✔
3409
        if premature {
21✔
3410
                log.Warnf("Premature proof announcement, current block height"+
×
3411
                        "lower than needed: %v < %v", d.bestHeight,
×
3412
                        needBlockHeight)
×
3413
                d.Unlock()
×
3414
                nMsg.err <- nil
×
3415
                return nil, false
×
3416
        }
×
3417
        d.Unlock()
21✔
3418

21✔
3419
        // Ensure that we know of a channel with the target channel ID before
21✔
3420
        // proceeding further.
21✔
3421
        //
21✔
3422
        // We must acquire the mutex for this channel ID before getting the
21✔
3423
        // channel from the database, to ensure what we read does not change
21✔
3424
        // before we call AddProof() later.
21✔
3425
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
21✔
3426
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
21✔
3427

21✔
3428
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
21✔
3429
                ann.ShortChannelID,
21✔
3430
        )
21✔
3431
        if err != nil {
22✔
3432
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
1✔
3433
                if err != nil {
1✔
3434
                        err := fmt.Errorf("unable to store the proof for "+
×
3435
                                "short_chan_id=%v: %v", shortChanID, err)
×
3436
                        log.Error(err)
×
3437
                        nMsg.err <- err
×
3438

×
3439
                        return nil, false
×
3440
                }
×
3441

3442
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
1✔
3443
                err := d.cfg.WaitingProofStore.Add(proof)
1✔
3444
                if err != nil {
1✔
3445
                        err := fmt.Errorf("unable to store the proof for "+
×
3446
                                "short_chan_id=%v: %v", shortChanID, err)
×
3447
                        log.Error(err)
×
3448
                        nMsg.err <- err
×
3449
                        return nil, false
×
3450
                }
×
3451

3452
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
1✔
3453
                        ", adding to waiting batch", prefix, shortChanID)
1✔
3454
                nMsg.err <- nil
1✔
3455
                return nil, false
1✔
3456
        }
3457

3458
        nodeID := nMsg.source.SerializeCompressed()
20✔
3459
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
20✔
3460
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
20✔
3461

20✔
3462
        // Ensure that channel that was retrieved belongs to the peer which
20✔
3463
        // sent the proof announcement.
20✔
3464
        if !(isFirstNode || isSecondNode) {
20✔
3465
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3466
                        "to the peer which sent the proof, short_chan_id=%v",
×
3467
                        shortChanID)
×
3468
                log.Error(err)
×
3469
                nMsg.err <- err
×
3470
                return nil, false
×
3471
        }
×
3472

3473
        // If proof was sent by a local sub-system, then we'll send the
3474
        // announcement signature to the remote node so they can also
3475
        // reconstruct the full channel announcement.
3476
        if !nMsg.isRemote {
30✔
3477
                var remotePubKey [33]byte
10✔
3478
                if isFirstNode {
20✔
3479
                        remotePubKey = chanInfo.NodeKey2Bytes
10✔
3480
                } else {
10✔
3481
                        remotePubKey = chanInfo.NodeKey1Bytes
×
3482
                }
×
3483

3484
                // Since the remote peer might not be online we'll call a
3485
                // method that will attempt to deliver the proof when it comes
3486
                // online.
3487
                err := d.reliableSender.sendMessage(ctx, ann, remotePubKey)
10✔
3488
                if err != nil {
10✔
3489
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3490
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3491
                                ann.ShortChannelID, remotePubKey, err)
×
3492
                        nMsg.err <- err
×
3493
                        return nil, false
×
3494
                }
×
3495
        }
3496

3497
        // Check if we already have the full proof for this channel.
3498
        if chanInfo.AuthProof != nil {
21✔
3499
                // If we already have the fully assembled proof, then the peer
1✔
3500
                // sending us their proof has probably not received our local
1✔
3501
                // proof yet. So be kind and send them the full proof.
1✔
3502
                if nMsg.isRemote {
2✔
3503
                        peerID := nMsg.source.SerializeCompressed()
1✔
3504
                        log.Debugf("Got AnnounceSignatures for channel with " +
1✔
3505
                                "full proof.")
1✔
3506

1✔
3507
                        d.wg.Add(1)
1✔
3508
                        go func() {
2✔
3509
                                defer d.wg.Done()
1✔
3510

1✔
3511
                                log.Debugf("Received half proof for channel "+
1✔
3512
                                        "%v with existing full proof. Sending"+
1✔
3513
                                        " full proof to peer=%x",
1✔
3514
                                        ann.ChannelID, peerID)
1✔
3515

1✔
3516
                                ca, _, _, err := netann.CreateChanAnnouncement(
1✔
3517
                                        chanInfo.AuthProof, chanInfo, e1, e2,
1✔
3518
                                )
1✔
3519
                                if err != nil {
1✔
3520
                                        log.Errorf("unable to gen ann: %v",
×
3521
                                                err)
×
3522
                                        return
×
3523
                                }
×
3524

3525
                                err = nMsg.peer.SendMessage(false, ca)
1✔
3526
                                if err != nil {
1✔
3527
                                        log.Errorf("Failed sending full proof"+
×
3528
                                                " to peer=%x: %v", peerID, err)
×
3529
                                        return
×
3530
                                }
×
3531

3532
                                log.Debugf("Full proof sent to peer=%x for "+
1✔
3533
                                        "chanID=%v", peerID, ann.ChannelID)
1✔
3534
                        }()
3535
                }
3536

3537
                log.Debugf("Already have proof for channel with chanID=%v",
1✔
3538
                        ann.ChannelID)
1✔
3539
                nMsg.err <- nil
1✔
3540
                return nil, true
1✔
3541
        }
3542

3543
        // Check that we received the opposite proof. If so, then we're now
3544
        // able to construct the full proof, and create the channel
3545
        // announcement. If we didn't receive the opposite half of the proof
3546
        // then we should store this one, and wait for the opposite to be
3547
        // received.
3548
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
19✔
3549
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
19✔
3550
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
19✔
3551
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3552
                        "short_chan_id=%v: %v", shortChanID, err)
×
3553
                log.Error(err)
×
3554
                nMsg.err <- err
×
3555
                return nil, false
×
3556
        }
×
3557

3558
        if err == channeldb.ErrWaitingProofNotFound {
28✔
3559
                err := d.cfg.WaitingProofStore.Add(proof)
9✔
3560
                if err != nil {
9✔
3561
                        err := fmt.Errorf("unable to store the proof for "+
×
3562
                                "short_chan_id=%v: %v", shortChanID, err)
×
3563
                        log.Error(err)
×
3564
                        nMsg.err <- err
×
3565
                        return nil, false
×
3566
                }
×
3567

3568
                log.Infof("1/2 of channel ann proof received for "+
9✔
3569
                        "short_chan_id=%v, waiting for other half",
9✔
3570
                        shortChanID)
9✔
3571

9✔
3572
                nMsg.err <- nil
9✔
3573
                return nil, false
9✔
3574
        }
3575

3576
        // We now have both halves of the channel announcement proof, then
3577
        // we'll reconstruct the initial announcement so we can validate it
3578
        // shortly below.
3579
        var dbProof models.ChannelAuthProof
10✔
3580
        if isFirstNode {
11✔
3581
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
1✔
3582
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
1✔
3583
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
1✔
3584
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
1✔
3585
        } else {
10✔
3586
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
9✔
3587
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
9✔
3588
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
9✔
3589
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
9✔
3590
        }
9✔
3591

3592
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
10✔
3593
                &dbProof, chanInfo, e1, e2,
10✔
3594
        )
10✔
3595
        if err != nil {
10✔
3596
                log.Error(err)
×
3597
                nMsg.err <- err
×
3598
                return nil, false
×
3599
        }
×
3600

3601
        // With all the necessary components assembled validate the full
3602
        // channel announcement proof.
3603
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
10✔
3604
        if err != nil {
10✔
3605
                err := fmt.Errorf("channel announcement proof for "+
×
3606
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3607

×
3608
                log.Error(err)
×
3609
                nMsg.err <- err
×
3610
                return nil, false
×
3611
        }
×
3612

3613
        // If the channel was returned by the router it means that existence of
3614
        // funding point and inclusion of nodes bitcoin keys in it already
3615
        // checked by the router. In this stage we should check that node keys
3616
        // attest to the bitcoin keys by validating the signatures of
3617
        // announcement. If proof is valid then we'll populate the channel edge
3618
        // with it, so we can announce it on peer connect.
3619
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
10✔
3620
        if err != nil {
10✔
3621
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3622
                        " %v", ann.ChannelID, err)
×
3623
                log.Error(err)
×
3624
                nMsg.err <- err
×
3625
                return nil, false
×
3626
        }
×
3627

3628
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
10✔
3629
        if err != nil {
10✔
3630
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3631
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3632
                log.Error(err)
×
3633
                nMsg.err <- err
×
3634
                return nil, false
×
3635
        }
×
3636

3637
        // Proof was successfully created and now can announce the channel to
3638
        // the remain network.
3639
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
10✔
3640
                ", adding to next ann batch", shortChanID)
10✔
3641

10✔
3642
        // Assemble the necessary announcements to add to the next broadcasting
10✔
3643
        // batch.
10✔
3644
        var announcements []networkMsg
10✔
3645
        announcements = append(announcements, networkMsg{
10✔
3646
                peer:   nMsg.peer,
10✔
3647
                source: nMsg.source,
10✔
3648
                msg:    chanAnn,
10✔
3649
        })
10✔
3650
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
19✔
3651
                announcements = append(announcements, networkMsg{
9✔
3652
                        peer:   nMsg.peer,
9✔
3653
                        source: src,
9✔
3654
                        msg:    e1Ann,
9✔
3655
                })
9✔
3656
        }
9✔
3657
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
18✔
3658
                announcements = append(announcements, networkMsg{
8✔
3659
                        peer:   nMsg.peer,
8✔
3660
                        source: src,
8✔
3661
                        msg:    e2Ann,
8✔
3662
                })
8✔
3663
        }
8✔
3664

3665
        // We'll also send along the node announcements for each channel
3666
        // participant if we know of them. To ensure our node announcement
3667
        // propagates to our channel counterparty, we'll set the source for
3668
        // each announcement to the node it belongs to, otherwise we won't send
3669
        // it since the source gets skipped. This isn't necessary for channel
3670
        // updates and announcement signatures since we send those directly to
3671
        // our channel counterparty through the gossiper's reliable sender.
3672
        node1Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey1Bytes)
10✔
3673
        if err != nil {
12✔
3674
                log.Debugf("Unable to fetch node announcement for %x: %v",
2✔
3675
                        chanInfo.NodeKey1Bytes, err)
2✔
3676
        } else {
10✔
3677
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
16✔
3678
                        announcements = append(announcements, networkMsg{
8✔
3679
                                peer:   nMsg.peer,
8✔
3680
                                source: nodeKey1,
8✔
3681
                                msg:    node1Ann,
8✔
3682
                        })
8✔
3683
                }
8✔
3684
        }
3685

3686
        node2Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey2Bytes)
10✔
3687
        if err != nil {
14✔
3688
                log.Debugf("Unable to fetch node announcement for %x: %v",
4✔
3689
                        chanInfo.NodeKey2Bytes, err)
4✔
3690
        } else {
10✔
3691
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
12✔
3692
                        announcements = append(announcements, networkMsg{
6✔
3693
                                peer:   nMsg.peer,
6✔
3694
                                source: nodeKey2,
6✔
3695
                                msg:    node2Ann,
6✔
3696
                        })
6✔
3697
                }
6✔
3698
        }
3699

3700
        nMsg.err <- nil
10✔
3701
        return announcements, true
10✔
3702
}
3703

3704
// isBanned returns true if the peer identified by pubkey is banned for sending
3705
// invalid channel announcements.
3706
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
208✔
3707
        return d.banman.isBanned(pubkey)
208✔
3708
}
208✔
3709

3710
// ShouldDisconnect returns true if we should disconnect the peer identified by
3711
// pubkey.
3712
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3713
        bool, error) {
206✔
3714

206✔
3715
        pubkeySer := pubkey.SerializeCompressed()
206✔
3716

206✔
3717
        var pubkeyBytes [33]byte
206✔
3718
        copy(pubkeyBytes[:], pubkeySer)
206✔
3719

206✔
3720
        // If the public key is banned, check whether or not this is a channel
206✔
3721
        // peer.
206✔
3722
        if d.isBanned(pubkeyBytes) {
208✔
3723
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
2✔
3724
                if err != nil {
2✔
3725
                        return false, err
×
3726
                }
×
3727

3728
                // We should only disconnect non-channel peers.
3729
                if !isChanPeer {
3✔
3730
                        return true, nil
1✔
3731
                }
1✔
3732
        }
3733

3734
        return false, nil
205✔
3735
}
3736

3737
// validateFundingTransaction fetches the channel announcements claimed funding
3738
// transaction from chain to ensure that it exists, is not spent and matches
3739
// the channel announcement proof. The transaction's outpoint and value are
3740
// returned if we can glean them from the work done in this method.
3741
func (d *AuthenticatedGossiper) validateFundingTransaction(_ context.Context,
3742
        ann *lnwire.ChannelAnnouncement1,
3743
        tapscriptRoot fn.Option[chainhash.Hash]) (wire.OutPoint, btcutil.Amount,
3744
        []byte, error) {
229✔
3745

229✔
3746
        scid := ann.ShortChannelID
229✔
3747

229✔
3748
        // Before we can add the channel to the channel graph, we need to obtain
229✔
3749
        // the full funding outpoint that's encoded within the channel ID.
229✔
3750
        fundingTx, err := lnwallet.FetchFundingTxWrapper(
229✔
3751
                d.cfg.ChainIO, &scid, d.quit,
229✔
3752
        )
229✔
3753
        if err != nil {
230✔
3754
                //nolint:ll
1✔
3755
                //
1✔
3756
                // In order to ensure we don't erroneously mark a channel as a
1✔
3757
                // zombie due to an RPC failure, we'll attempt to string match
1✔
3758
                // for the relevant errors.
1✔
3759
                //
1✔
3760
                // * btcd:
1✔
3761
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
1✔
3762
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
1✔
3763
                // * bitcoind:
1✔
3764
                //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
1✔
3765
                //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
1✔
3766
                switch {
1✔
3767
                case strings.Contains(err.Error(), "not found"):
1✔
3768
                        fallthrough
1✔
3769

3770
                case strings.Contains(err.Error(), "out of range"):
1✔
3771
                        // If the funding transaction isn't found at all, then
1✔
3772
                        // we'll mark the edge itself as a zombie so we don't
1✔
3773
                        // continue to request it. We use the "zero key" for
1✔
3774
                        // both node pubkeys so this edge can't be resurrected.
1✔
3775
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
1✔
3776
                        if zErr != nil {
1✔
3777
                                return wire.OutPoint{}, 0, nil, zErr
×
3778
                        }
×
3779

3780
                default:
×
3781
                }
3782

3783
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
1✔
3784
                        ErrNoFundingTransaction, err)
1✔
3785
        }
3786

3787
        // Recreate witness output to be sure that declared in channel edge
3788
        // bitcoin keys and channel value corresponds to the reality.
3789
        fundingPkScript, err := makeFundingScript(
228✔
3790
                ann.BitcoinKey1[:], ann.BitcoinKey2[:], ann.Features,
228✔
3791
                tapscriptRoot,
228✔
3792
        )
228✔
3793
        if err != nil {
228✔
3794
                return wire.OutPoint{}, 0, nil, err
×
3795
        }
×
3796

3797
        // Next we'll validate that this channel is actually well formed. If
3798
        // this check fails, then this channel either doesn't exist, or isn't
3799
        // the one that was meant to be created according to the passed channel
3800
        // proofs.
3801
        fundingPoint, err := chanvalidate.Validate(
228✔
3802
                &chanvalidate.Context{
228✔
3803
                        Locator: &chanvalidate.ShortChanIDChanLocator{
228✔
3804
                                ID: scid,
228✔
3805
                        },
228✔
3806
                        MultiSigPkScript: fundingPkScript,
228✔
3807
                        FundingTx:        fundingTx,
228✔
3808
                },
228✔
3809
        )
228✔
3810
        if err != nil {
429✔
3811
                // Mark the edge as a zombie so we won't try to re-validate it
201✔
3812
                // on start up.
201✔
3813
                zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
201✔
3814
                if zErr != nil {
201✔
3815
                        return wire.OutPoint{}, 0, nil, zErr
×
3816
                }
×
3817

3818
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
201✔
3819
                        ErrInvalidFundingOutput, err)
201✔
3820
        }
3821

3822
        // Now that we have the funding outpoint of the channel, ensure
3823
        // that it hasn't yet been spent. If so, then this channel has
3824
        // been closed so we'll ignore it.
3825
        chanUtxo, err := d.cfg.ChainIO.GetUtxo(
27✔
3826
                fundingPoint, fundingPkScript, scid.BlockHeight, d.quit,
27✔
3827
        )
27✔
3828
        if err != nil {
29✔
3829
                if errors.Is(err, btcwallet.ErrOutputSpent) {
4✔
3830
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
2✔
3831
                        if zErr != nil {
2✔
3832
                                return wire.OutPoint{}, 0, nil, zErr
×
3833
                        }
×
3834
                }
3835

3836
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: unable to "+
2✔
3837
                        "fetch utxo for chan_id=%v, chan_point=%v: %w",
2✔
3838
                        ErrChannelSpent, scid.ToUint64(), fundingPoint, err)
2✔
3839
        }
3840

3841
        return *fundingPoint, btcutil.Amount(chanUtxo.Value), fundingPkScript,
25✔
3842
                nil
25✔
3843
}
3844

3845
// handleBadPeer takes a misbehaving peer and increases its ban score. Once
3846
// increased, it will disconnect the peer if its ban score has reached
3847
// `banThreshold` and it doesn't have a channel with us.
3848
func (d *AuthenticatedGossiper) handleBadPeer(peer lnpeer.Peer) error {
205✔
3849
        // Increment the peer's ban score for misbehavior.
205✔
3850
        d.banman.incrementBanScore(peer.PubKey())
205✔
3851

205✔
3852
        // If the peer is banned and not a channel peer, we'll disconnect them.
205✔
3853
        shouldDc, dcErr := d.ShouldDisconnect(peer.IdentityKey())
205✔
3854
        if dcErr != nil {
205✔
3855
                log.Errorf("failed to check if we should disconnect peer: %v",
×
3856
                        dcErr)
×
3857

×
3858
                return dcErr
×
3859
        }
×
3860

3861
        if shouldDc {
206✔
3862
                peer.Disconnect(ErrPeerBanned)
1✔
3863
        }
1✔
3864

3865
        return nil
205✔
3866
}
3867

3868
// makeFundingScript is used to make the funding script for both segwit v0 and
3869
// segwit v1 (taproot) channels.
3870
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
3871
        features *lnwire.RawFeatureVector,
3872
        tapscriptRoot fn.Option[chainhash.Hash]) ([]byte, error) {
228✔
3873

228✔
3874
        legacyFundingScript := func() ([]byte, error) {
456✔
3875
                witnessScript, err := input.GenMultiSigScript(
228✔
3876
                        bitcoinKey1, bitcoinKey2,
228✔
3877
                )
228✔
3878
                if err != nil {
228✔
3879
                        return nil, err
×
3880
                }
×
3881
                pkScript, err := input.WitnessScriptHash(witnessScript)
228✔
3882
                if err != nil {
228✔
3883
                        return nil, err
×
3884
                }
×
3885

3886
                return pkScript, nil
228✔
3887
        }
3888

3889
        if features.IsEmpty() {
456✔
3890
                return legacyFundingScript()
228✔
3891
        }
228✔
3892

3893
        chanFeatureBits := lnwire.NewFeatureVector(features, lnwire.Features)
×
3894
        if chanFeatureBits.HasFeature(
×
3895
                lnwire.SimpleTaprootChannelsOptionalStaging,
×
3896
        ) {
×
3897

×
3898
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
×
3899
                if err != nil {
×
3900
                        return nil, err
×
3901
                }
×
3902
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
×
3903
                if err != nil {
×
3904
                        return nil, err
×
3905
                }
×
3906

3907
                fundingScript, _, err := input.GenTaprootFundingScript(
×
3908
                        pubKey1, pubKey2, 0, tapscriptRoot,
×
3909
                )
×
3910
                if err != nil {
×
3911
                        return nil, err
×
3912
                }
×
3913

3914
                // TODO(roasbeef): add tapscript root to gossip v1.5
3915

3916
                return fundingScript, nil
×
3917
        }
3918

3919
        return legacyFundingScript()
×
3920
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc