• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18197857992

02 Oct 2025 03:32PM UTC coverage: 66.622% (-0.02%) from 66.646%
18197857992

Pull #10267

github

web-flow
Merge 0d9bfccfe into 1c2ff4a7e
Pull Request #10267: [g175] multi: small G175 preparations

24 of 141 new or added lines in 12 files covered. (17.02%)

64 existing lines in 20 files now uncovered.

137216 of 205963 relevant lines covered (66.62%)

21302.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.06
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "strings"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/chaincfg"
17
        "github.com/btcsuite/btcd/chaincfg/chainhash"
18
        "github.com/btcsuite/btcd/txscript"
19
        "github.com/btcsuite/btcd/wire"
20
        "github.com/lightninglabs/neutrino/cache"
21
        "github.com/lightninglabs/neutrino/cache/lru"
22
        "github.com/lightningnetwork/lnd/batch"
23
        "github.com/lightningnetwork/lnd/chainntnfs"
24
        "github.com/lightningnetwork/lnd/channeldb"
25
        "github.com/lightningnetwork/lnd/fn/v2"
26
        "github.com/lightningnetwork/lnd/graph"
27
        graphdb "github.com/lightningnetwork/lnd/graph/db"
28
        "github.com/lightningnetwork/lnd/graph/db/models"
29
        "github.com/lightningnetwork/lnd/input"
30
        "github.com/lightningnetwork/lnd/keychain"
31
        "github.com/lightningnetwork/lnd/lnpeer"
32
        "github.com/lightningnetwork/lnd/lnutils"
33
        "github.com/lightningnetwork/lnd/lnwallet"
34
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
35
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
36
        "github.com/lightningnetwork/lnd/lnwire"
37
        "github.com/lightningnetwork/lnd/multimutex"
38
        "github.com/lightningnetwork/lnd/netann"
39
        "github.com/lightningnetwork/lnd/routing/route"
40
        "github.com/lightningnetwork/lnd/ticker"
41
        "golang.org/x/time/rate"
42
)
43

44
const (
45
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
46
        // for a specific channel and direction that we'll accept over an
47
        // interval.
48
        DefaultMaxChannelUpdateBurst = 10
49

50
        // DefaultChannelUpdateInterval is the default interval we'll use to
51
        // determine how often we should allow a new update for a specific
52
        // channel and direction.
53
        DefaultChannelUpdateInterval = time.Minute
54

55
        // maxPrematureUpdates tracks the max amount of premature channel
56
        // updates that we'll hold onto.
57
        maxPrematureUpdates = 100
58

59
        // maxFutureMessages tracks the max amount of future messages that
60
        // we'll hold onto.
61
        maxFutureMessages = 1000
62

63
        // DefaultSubBatchDelay is the default delay we'll use when
64
        // broadcasting the next announcement batch.
65
        DefaultSubBatchDelay = 5 * time.Second
66

67
        // maxRejectedUpdates tracks the max amount of rejected channel updates
68
        // we'll maintain. This is the global size across all peers. We'll
69
        // allocate ~3 MB max to the cache.
70
        maxRejectedUpdates = 10_000
71

72
        // DefaultProofMatureDelta specifies the default value used for
73
        // ProofMatureDelta, which is the number of confirmations needed before
74
        // processing the announcement signatures.
75
        DefaultProofMatureDelta = 6
76
)
77

78
var (
79
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
80
        // is in the process of being shut down.
81
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
82

83
        // ErrGossipSyncerNotFound signals that we were unable to find an active
84
        // gossip syncer corresponding to a gossip query message received from
85
        // the remote peer.
86
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
87

88
        // ErrNoFundingTransaction is returned when we are unable to find the
89
        // funding transaction described by the short channel ID on chain.
90
        ErrNoFundingTransaction = errors.New(
91
                "unable to find the funding transaction",
92
        )
93

94
        // ErrInvalidFundingOutput is returned if the channel funding output
95
        // fails validation.
96
        ErrInvalidFundingOutput = errors.New(
97
                "channel funding output validation failed",
98
        )
99

100
        // ErrChannelSpent is returned when we go to validate a channel, but
101
        // the purported funding output has actually already been spent on
102
        // chain.
103
        ErrChannelSpent = errors.New("channel output has been spent")
104

105
        // emptyPubkey is used to compare compressed pubkeys against an empty
106
        // byte array.
107
        emptyPubkey [33]byte
108
)
109

110
// optionalMsgFields is a set of optional message fields that external callers
111
// can provide that serve useful when processing a specific network
112
// announcement.
113
type optionalMsgFields struct {
114
        capacity      *btcutil.Amount
115
        channelPoint  *wire.OutPoint
116
        remoteAlias   *lnwire.ShortChannelID
117
        tapscriptRoot fn.Option[chainhash.Hash]
118
}
119

120
// apply applies the optional fields within the functional options.
121
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
50✔
122
        for _, optionalMsgField := range optionalMsgFields {
58✔
123
                optionalMsgField(f)
8✔
124
        }
8✔
125
}
126

127
// OptionalMsgField is a functional option parameter that can be used to provide
128
// external information that is not included within a network message but serves
129
// useful when processing it.
130
type OptionalMsgField func(*optionalMsgFields)
131

132
// ChannelCapacity is an optional field that lets the gossiper know of the
133
// capacity of a channel.
134
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
30✔
135
        return func(f *optionalMsgFields) {
34✔
136
                f.capacity = &capacity
4✔
137
        }
4✔
138
}
139

140
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
141
// of a channel.
142
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
33✔
143
        return func(f *optionalMsgFields) {
40✔
144
                f.channelPoint = &op
7✔
145
        }
7✔
146
}
147

148
// TapscriptRoot is an optional field that lets the gossiper know of the root of
149
// the tapscript tree for a custom channel.
150
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
29✔
151
        return func(f *optionalMsgFields) {
32✔
152
                f.tapscriptRoot = root
3✔
153
        }
3✔
154
}
155

156
// RemoteAlias is an optional field that lets the gossiper know that a locally
157
// sent channel update is actually an update for the peer that should replace
158
// the ShortChannelID field with the remote's alias. This is only used for
159
// channels with peers where the option-scid-alias feature bit was negotiated.
160
// The channel update will be added to the graph under the original SCID, but
161
// will be modified and re-signed with this alias.
162
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
29✔
163
        return func(f *optionalMsgFields) {
32✔
164
                f.remoteAlias = alias
3✔
165
        }
3✔
166
}
167

168
// networkMsg couples a routing related wire message with the peer that
169
// originally sent it.
170
type networkMsg struct {
171
        peer              lnpeer.Peer
172
        source            *btcec.PublicKey
173
        msg               lnwire.Message
174
        optionalMsgFields *optionalMsgFields
175

176
        isRemote bool
177

178
        err chan error
179
}
180

181
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
182
// wishes to update a particular set of channels. New ChannelUpdate messages
183
// will be crafted to be sent out during the next broadcast epoch and the fee
184
// updates committed to the lower layer.
185
type chanPolicyUpdateRequest struct {
186
        edgesToUpdate []EdgeWithInfo
187
        errChan       chan error
188
}
189

190
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
191
// syncer at all times.
192
type PinnedSyncers map[route.Vertex]struct{}
193

194
// Config defines the configuration for the service. ALL elements within the
195
// configuration MUST be non-nil for the service to carry out its duties.
196
type Config struct {
197
        // ChainParams holds the chain parameters for the active network this
198
        // node is participating on.
199
        ChainParams *chaincfg.Params
200

201
        // Graph is the subsystem which is responsible for managing the
202
        // topology of lightning network. After incoming channel, node, channel
203
        // updates announcements are validated they are sent to the router in
204
        // order to be included in the LN graph.
205
        Graph graph.ChannelGraphSource
206

207
        // ChainIO represents an abstraction over a source that can query the
208
        // blockchain.
209
        ChainIO lnwallet.BlockChainIO
210

211
        // ChanSeries is an interfaces that provides access to a time series
212
        // view of the current known channel graph. Each GossipSyncer enabled
213
        // peer will utilize this in order to create and respond to channel
214
        // graph time series queries.
215
        ChanSeries ChannelGraphTimeSeries
216

217
        // Notifier is used for receiving notifications of incoming blocks.
218
        // With each new incoming block found we process previously premature
219
        // announcements.
220
        //
221
        // TODO(roasbeef): could possibly just replace this with an epoch
222
        // channel.
223
        Notifier chainntnfs.ChainNotifier
224

225
        // Broadcast broadcasts a particular set of announcements to all peers
226
        // that the daemon is connected to. If supplied, the exclude parameter
227
        // indicates that the target peer should be excluded from the
228
        // broadcast.
229
        Broadcast func(skips map[route.Vertex]struct{},
230
                msg ...lnwire.Message) error
231

232
        // NotifyWhenOnline is a function that allows the gossiper to be
233
        // notified when a certain peer comes online, allowing it to
234
        // retry sending a peer message.
235
        //
236
        // NOTE: The peerChan channel must be buffered.
237
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
238

239
        // NotifyWhenOffline is a function that allows the gossiper to be
240
        // notified when a certain peer disconnects, allowing it to request a
241
        // notification for when it reconnects.
242
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
243

244
        // FetchSelfAnnouncement retrieves our current node announcement, for
245
        // use when determining whether we should update our peers about our
246
        // presence in the network.
247
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement1
248

249
        // UpdateSelfAnnouncement produces a new announcement for our node with
250
        // an updated timestamp which can be broadcast to our peers.
251
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement1, error)
252

253
        // ProofMatureDelta the number of confirmations which is needed before
254
        // exchange the channel announcement proofs.
255
        ProofMatureDelta uint32
256

257
        // TrickleDelay the period of trickle timer which flushes to the
258
        // network the pending batch of new announcements we've received since
259
        // the last trickle tick.
260
        TrickleDelay time.Duration
261

262
        // RetransmitTicker is a ticker that ticks with a period which
263
        // indicates that we should check if we need re-broadcast any of our
264
        // personal channels.
265
        RetransmitTicker ticker.Ticker
266

267
        // RebroadcastInterval is the maximum time we wait between sending out
268
        // channel updates for our active channels and our own node
269
        // announcement. We do this to ensure our active presence on the
270
        // network is known, and we are not being considered a zombie node or
271
        // having zombie channels.
272
        RebroadcastInterval time.Duration
273

274
        // WaitingProofStore is a persistent storage of partial channel proof
275
        // announcement messages. We use it to buffer half of the material
276
        // needed to reconstruct a full authenticated channel announcement.
277
        // Once we receive the other half the channel proof, we'll be able to
278
        // properly validate it and re-broadcast it out to the network.
279
        //
280
        // TODO(wilmer): make interface to prevent channeldb dependency.
281
        WaitingProofStore *channeldb.WaitingProofStore
282

283
        // MessageStore is a persistent storage of gossip messages which we will
284
        // use to determine which messages need to be resent for a given peer.
285
        MessageStore GossipMessageStore
286

287
        // AnnSigner is an instance of the MessageSigner interface which will
288
        // be used to manually sign any outgoing channel updates. The signer
289
        // implementation should be backed by the public key of the backing
290
        // Lightning node.
291
        //
292
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
293
        // here?
294
        AnnSigner lnwallet.MessageSigner
295

296
        // ScidCloser is an instance of ClosedChannelTracker that helps the
297
        // gossiper cut down on spam channel announcements for already closed
298
        // channels.
299
        ScidCloser ClosedChannelTracker
300

301
        // NumActiveSyncers is the number of peers for which we should have
302
        // active syncers with. After reaching NumActiveSyncers, any future
303
        // gossip syncers will be passive.
304
        NumActiveSyncers int
305

306
        // NoTimestampQueries will prevent the GossipSyncer from querying
307
        // timestamps of announcement messages from the peer and from replying
308
        // to timestamp queries.
309
        NoTimestampQueries bool
310

311
        // RotateTicker is a ticker responsible for notifying the SyncManager
312
        // when it should rotate its active syncers. A single active syncer with
313
        // a chansSynced state will be exchanged for a passive syncer in order
314
        // to ensure we don't keep syncing with the same peers.
315
        RotateTicker ticker.Ticker
316

317
        // HistoricalSyncTicker is a ticker responsible for notifying the
318
        // syncManager when it should attempt a historical sync with a gossip
319
        // sync peer.
320
        HistoricalSyncTicker ticker.Ticker
321

322
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
323
        // syncManager when it should attempt to start the next pending
324
        // activeSyncer due to the current one not completing its state machine
325
        // within the timeout.
326
        ActiveSyncerTimeoutTicker ticker.Ticker
327

328
        // MinimumBatchSize is minimum size of a sub batch of announcement
329
        // messages.
330
        MinimumBatchSize int
331

332
        // SubBatchDelay is the delay between sending sub batches of
333
        // gossip messages.
334
        SubBatchDelay time.Duration
335

336
        // IgnoreHistoricalFilters will prevent syncers from replying with
337
        // historical data when the remote peer sets a gossip_timestamp_range.
338
        // This prevents ranges with old start times from causing us to dump the
339
        // graph on connect.
340
        IgnoreHistoricalFilters bool
341

342
        // PinnedSyncers is a set of peers that will always transition to
343
        // ActiveSync upon connection. These peers will never transition to
344
        // PassiveSync.
345
        PinnedSyncers PinnedSyncers
346

347
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
348
        // specific channel and direction that we'll accept over an interval.
349
        MaxChannelUpdateBurst int
350

351
        // ChannelUpdateInterval specifies the interval we'll use to determine
352
        // how often we should allow a new update for a specific channel and
353
        // direction.
354
        ChannelUpdateInterval time.Duration
355

356
        // IsAlias returns true if a given ShortChannelID is an alias for
357
        // option_scid_alias channels.
358
        IsAlias func(scid lnwire.ShortChannelID) bool
359

360
        // SignAliasUpdate is used to re-sign a channel update using the
361
        // remote's alias if the option-scid-alias feature bit was negotiated.
362
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
363
                error)
364

365
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
366
        // This is used for channels that have negotiated the option-scid-alias
367
        // feature bit.
368
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
369
                lnwire.ShortChannelID, error)
370

371
        // GetAlias allows the gossiper to look up the peer's alias for a given
372
        // ChannelID. This is used to sign updates for them if the channel has
373
        // no AuthProof and the option-scid-alias feature bit was negotiated.
374
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
375

376
        // FindChannel allows the gossiper to find a channel that we're party
377
        // to without iterating over the entire set of open channels.
378
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
379
                *channeldb.OpenChannel, error)
380

381
        // IsStillZombieChannel takes the timestamps of the latest channel
382
        // updates for a channel and returns true if the channel should be
383
        // considered a zombie based on these timestamps.
384
        IsStillZombieChannel func(time.Time, time.Time) bool
385

386
        // AssumeChannelValid toggles whether the gossiper will check for
387
        // spent-ness of channel outpoints. For neutrino, this saves long
388
        // rescans from blocking initial usage of the daemon.
389
        AssumeChannelValid bool
390

391
        // MsgRateBytes is the rate limit for the number of bytes per second
392
        // that we'll allocate to outbound gossip messages.
393
        MsgRateBytes uint64
394

395
        // MsgBurstBytes is the allotted burst amount in bytes. This is the
396
        // number of starting tokens in our token bucket algorithm.
397
        MsgBurstBytes uint64
398

399
        // FilterConcurrency is the maximum number of concurrent gossip filter
400
        // applications that can be processed.
401
        FilterConcurrency int
402

403
        // BanThreshold is the score used to decide whether a given peer is
404
        // banned or not.
405
        BanThreshold uint64
406

407
        // PeerMsgRateBytes is the rate limit for the number of bytes per second
408
        // that we'll allocate to outbound gossip messages for a single peer.
409
        PeerMsgRateBytes uint64
410
}
411

412
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
413
// used to let the caller of the lru.Cache know if a message has already been
414
// processed or not.
415
type processedNetworkMsg struct {
416
        processed bool
417
        msg       *networkMsg
418
}
419

420
// cachedNetworkMsg is a wrapper around a network message that can be used with
421
// *lru.Cache.
422
//
423
// NOTE: This struct is not thread safe which means you need to assure no
424
// concurrent read write access to it and all its contents which are pointers
425
// as well.
426
type cachedNetworkMsg struct {
427
        msgs []*processedNetworkMsg
428
}
429

430
// Size returns the "size" of an entry. We return the number of items as we
431
// just want to limit the total amount of entries rather than do accurate size
432
// accounting.
433
func (c *cachedNetworkMsg) Size() (uint64, error) {
5✔
434
        return uint64(len(c.msgs)), nil
5✔
435
}
5✔
436

437
// rejectCacheKey is the cache key that we'll use to track announcements we've
438
// recently rejected.
439
type rejectCacheKey struct {
440
        gossipVersion lnwire.GossipVersion
441
        pubkey        [33]byte
442
        chanID        uint64
443
}
444

445
// newRejectCacheKey returns a new cache key for the reject cache.
446
func newRejectCacheKey(v lnwire.GossipVersion, cid uint64,
447
        pub [33]byte) rejectCacheKey {
475✔
448

475✔
449
        k := rejectCacheKey{
475✔
450
                gossipVersion: v,
475✔
451
                chanID:        cid,
475✔
452
                pubkey:        pub,
475✔
453
        }
475✔
454

475✔
455
        return k
475✔
456
}
475✔
457

458
// sourceToPub returns a serialized-compressed public key for use in the reject
459
// cache.
460
func sourceToPub(pk *btcec.PublicKey) [33]byte {
489✔
461
        var pub [33]byte
489✔
462
        copy(pub[:], pk.SerializeCompressed())
489✔
463
        return pub
489✔
464
}
489✔
465

466
// cachedReject is the empty value used to track the value for rejects.
467
type cachedReject struct {
468
}
469

470
// Size returns the "size" of an entry. We return 1 as we just want to limit
471
// the total size.
472
func (c *cachedReject) Size() (uint64, error) {
206✔
473
        return 1, nil
206✔
474
}
206✔
475

476
// AuthenticatedGossiper is a subsystem which is responsible for receiving
477
// announcements, validating them and applying the changes to router, syncing
478
// lightning network with newly connected nodes, broadcasting announcements
479
// after validation, negotiating the channel announcement proofs exchange and
480
// handling the premature announcements. All outgoing announcements are
481
// expected to be properly signed as dictated in BOLT#7, additionally, all
482
// incoming message are expected to be well formed and signed. Invalid messages
483
// will be rejected by this struct.
484
type AuthenticatedGossiper struct {
485
        // Parameters which are needed to properly handle the start and stop of
486
        // the service.
487
        started sync.Once
488
        stopped sync.Once
489

490
        // bestHeight is the height of the block at the tip of the main chain
491
        // as we know it. Accesses *MUST* be done with the gossiper's lock
492
        // held.
493
        bestHeight uint32
494

495
        // cfg is a copy of the configuration struct that the gossiper service
496
        // was initialized with.
497
        cfg *Config
498

499
        // blockEpochs encapsulates a stream of block epochs that are sent at
500
        // every new block height.
501
        blockEpochs *chainntnfs.BlockEpochEvent
502

503
        // prematureChannelUpdates is a map of ChannelUpdates we have received
504
        // that wasn't associated with any channel we know about.  We store
505
        // them temporarily, such that we can reprocess them when a
506
        // ChannelAnnouncement for the channel is received.
507
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
508

509
        // banman tracks our peer's ban status.
510
        banman *banman
511

512
        // networkMsgs is a channel that carries new network broadcasted
513
        // message from outside the gossiper service to be processed by the
514
        // networkHandler.
515
        networkMsgs chan *networkMsg
516

517
        // futureMsgs is a list of premature network messages that have a block
518
        // height specified in the future. We will save them and resend it to
519
        // the chan networkMsgs once the block height has reached. The cached
520
        // map format is,
521
        //   {msgID1: msg1, msgID2: msg2, ...}
522
        futureMsgs *futureMsgCache
523

524
        // chanPolicyUpdates is a channel that requests to update the
525
        // forwarding policy of a set of channels is sent over.
526
        chanPolicyUpdates chan *chanPolicyUpdateRequest
527

528
        // selfKey is the identity public key of the backing Lightning node.
529
        selfKey *btcec.PublicKey
530

531
        // selfKeyLoc is the locator for the identity public key of the backing
532
        // Lightning node.
533
        selfKeyLoc keychain.KeyLocator
534

535
        // channelMtx is used to restrict the database access to one
536
        // goroutine per channel ID. This is done to ensure that when
537
        // the gossiper is handling an announcement, the db state stays
538
        // consistent between when the DB is first read until it's written.
539
        channelMtx *multimutex.Mutex[uint64]
540

541
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
542

543
        // syncMgr is a subsystem responsible for managing the gossip syncers
544
        // for peers currently connected. When a new peer is connected, the
545
        // manager will create its accompanying gossip syncer and determine
546
        // whether it should have an activeSync or passiveSync sync type based
547
        // on how many other gossip syncers are currently active. Any activeSync
548
        // gossip syncers are started in a round-robin manner to ensure we're
549
        // not syncing with multiple peers at the same time.
550
        syncMgr *SyncManager
551

552
        // reliableSender is a subsystem responsible for handling reliable
553
        // message send requests to peers. This should only be used for channels
554
        // that are unadvertised at the time of handling the message since if it
555
        // is advertised, then peers should be able to get the message from the
556
        // network.
557
        reliableSender *reliableSender
558

559
        // chanUpdateRateLimiter contains rate limiters for each direction of
560
        // a channel update we've processed. We'll use these to determine
561
        // whether we should accept a new update for a specific channel and
562
        // direction.
563
        //
564
        // NOTE: This map must be synchronized with the main
565
        // AuthenticatedGossiper lock.
566
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
567

568
        // vb is used to enforce job dependency ordering of gossip messages.
569
        vb *ValidationBarrier
570

571
        sync.Mutex
572

573
        cancel fn.Option[context.CancelFunc]
574
        quit   chan struct{}
575
        wg     sync.WaitGroup
576
}
577

578
// New creates a new AuthenticatedGossiper instance, initialized with the
579
// passed configuration parameters.
580
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
33✔
581
        gossiper := &AuthenticatedGossiper{
33✔
582
                selfKey:           selfKeyDesc.PubKey,
33✔
583
                selfKeyLoc:        selfKeyDesc.KeyLocator,
33✔
584
                cfg:               &cfg,
33✔
585
                networkMsgs:       make(chan *networkMsg),
33✔
586
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
33✔
587
                quit:              make(chan struct{}),
33✔
588
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
33✔
589
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
33✔
590
                        maxPrematureUpdates,
33✔
591
                ),
33✔
592
                channelMtx: multimutex.NewMutex[uint64](),
33✔
593
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
33✔
594
                        maxRejectedUpdates,
33✔
595
                ),
33✔
596
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
33✔
597
                banman:                newBanman(cfg.BanThreshold),
33✔
598
        }
33✔
599

33✔
600
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
33✔
601

33✔
602
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
33✔
603
                ChainHash:                *cfg.ChainParams.GenesisHash,
33✔
604
                ChanSeries:               cfg.ChanSeries,
33✔
605
                RotateTicker:             cfg.RotateTicker,
33✔
606
                HistoricalSyncTicker:     cfg.HistoricalSyncTicker,
33✔
607
                NumActiveSyncers:         cfg.NumActiveSyncers,
33✔
608
                NoTimestampQueries:       cfg.NoTimestampQueries,
33✔
609
                IgnoreHistoricalFilters:  cfg.IgnoreHistoricalFilters,
33✔
610
                BestHeight:               gossiper.latestHeight,
33✔
611
                PinnedSyncers:            cfg.PinnedSyncers,
33✔
612
                IsStillZombieChannel:     cfg.IsStillZombieChannel,
33✔
613
                AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
33✔
614
                AllotedMsgBytesBurst:     cfg.MsgBurstBytes,
33✔
615
                FilterConcurrency:        cfg.FilterConcurrency,
33✔
616
                PeerMsgBytesPerSecond:    cfg.PeerMsgRateBytes,
33✔
617
        })
33✔
618

33✔
619
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
33✔
620
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
33✔
621
                NotifyWhenOffline: cfg.NotifyWhenOffline,
33✔
622
                MessageStore:      cfg.MessageStore,
33✔
623
                IsMsgStale:        gossiper.isMsgStale,
33✔
624
        })
33✔
625

33✔
626
        return gossiper
33✔
627
}
33✔
628

629
// EdgeWithInfo contains the information that is required to update an edge.
630
type EdgeWithInfo struct {
631
        // Info describes the channel.
632
        Info *models.ChannelEdgeInfo
633

634
        // Edge describes the policy in one direction of the channel.
635
        Edge *models.ChannelEdgePolicy
636
}
637

638
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
639
// specified edge updates. Updates are done in two stages: first, the
640
// AuthenticatedGossiper ensures the update has been committed by dependent
641
// sub-systems, then it signs and broadcasts new updates to the network. A
642
// mapping between outpoints and updated channel policies is returned, which is
643
// used to update the forwarding policies of the underlying links.
644
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
645
        edgesToUpdate []EdgeWithInfo) error {
4✔
646

4✔
647
        errChan := make(chan error, 1)
4✔
648
        policyUpdate := &chanPolicyUpdateRequest{
4✔
649
                edgesToUpdate: edgesToUpdate,
4✔
650
                errChan:       errChan,
4✔
651
        }
4✔
652

4✔
653
        select {
4✔
654
        case d.chanPolicyUpdates <- policyUpdate:
4✔
655
                err := <-errChan
4✔
656
                return err
4✔
657
        case <-d.quit:
×
658
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
659
        }
660
}
661

662
// Start spawns network messages handler goroutine and registers on new block
663
// notifications in order to properly handle the premature announcements.
664
func (d *AuthenticatedGossiper) Start() error {
33✔
665
        var err error
33✔
666
        d.started.Do(func() {
66✔
667
                ctx, cancel := context.WithCancel(context.Background())
33✔
668
                d.cancel = fn.Some(cancel)
33✔
669

33✔
670
                log.Info("Authenticated Gossiper starting")
33✔
671
                err = d.start(ctx)
33✔
672
        })
33✔
673
        return err
33✔
674
}
675

676
func (d *AuthenticatedGossiper) start(ctx context.Context) error {
33✔
677
        // First we register for new notifications of newly discovered blocks.
33✔
678
        // We do this immediately so we'll later be able to consume any/all
33✔
679
        // blocks which were discovered.
33✔
680
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
33✔
681
        if err != nil {
33✔
682
                return err
×
683
        }
×
684
        d.blockEpochs = blockEpochs
33✔
685

33✔
686
        height, err := d.cfg.Graph.CurrentBlockHeight()
33✔
687
        if err != nil {
33✔
688
                return err
×
689
        }
×
690
        d.bestHeight = height
33✔
691

33✔
692
        // Start the reliable sender. In case we had any pending messages ready
33✔
693
        // to be sent when the gossiper was last shut down, we must continue on
33✔
694
        // our quest to deliver them to their respective peers.
33✔
695
        if err := d.reliableSender.Start(); err != nil {
33✔
696
                return err
×
697
        }
×
698

699
        d.syncMgr.Start()
33✔
700

33✔
701
        d.banman.start()
33✔
702

33✔
703
        // Start receiving blocks in its dedicated goroutine.
33✔
704
        d.wg.Add(2)
33✔
705
        go d.syncBlockHeight()
33✔
706
        go d.networkHandler(ctx)
33✔
707

33✔
708
        return nil
33✔
709
}
710

711
// syncBlockHeight syncs the best block height for the gossiper by reading
712
// blockEpochs.
713
//
714
// NOTE: must be run as a goroutine.
715
func (d *AuthenticatedGossiper) syncBlockHeight() {
33✔
716
        defer d.wg.Done()
33✔
717

33✔
718
        for {
66✔
719
                select {
33✔
720
                // A new block has arrived, so we can re-process the previously
721
                // premature announcements.
722
                case newBlock, ok := <-d.blockEpochs.Epochs:
3✔
723
                        // If the channel has been closed, then this indicates
3✔
724
                        // the daemon is shutting down, so we exit ourselves.
3✔
725
                        if !ok {
6✔
726
                                return
3✔
727
                        }
3✔
728

729
                        // Once a new block arrives, we update our running
730
                        // track of the height of the chain tip.
731
                        d.Lock()
3✔
732
                        blockHeight := uint32(newBlock.Height)
3✔
733
                        d.bestHeight = blockHeight
3✔
734
                        d.Unlock()
3✔
735

3✔
736
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
3✔
737
                                newBlock.Hash)
3✔
738

3✔
739
                        // Resend future messages, if any.
3✔
740
                        d.resendFutureMessages(blockHeight)
3✔
741

742
                case <-d.quit:
30✔
743
                        return
30✔
744
                }
745
        }
746
}
747

748
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
749
// the unique ID when saving the message.
750
type futureMsgCache struct {
751
        *lru.Cache[uint64, *cachedFutureMsg]
752

753
        // msgID is a monotonically increased integer.
754
        msgID atomic.Uint64
755
}
756

757
// nextMsgID returns a unique message ID.
758
func (f *futureMsgCache) nextMsgID() uint64 {
6✔
759
        return f.msgID.Add(1)
6✔
760
}
6✔
761

762
// newFutureMsgCache creates a new future message cache with the underlying lru
763
// cache being initialized with the specified capacity.
764
func newFutureMsgCache(capacity uint64) *futureMsgCache {
34✔
765
        // Create a new cache.
34✔
766
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
34✔
767

34✔
768
        return &futureMsgCache{
34✔
769
                Cache: cache,
34✔
770
        }
34✔
771
}
34✔
772

773
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
774
type cachedFutureMsg struct {
775
        // msg is the network message.
776
        msg *networkMsg
777

778
        // height is the block height.
779
        height uint32
780
}
781

782
// Size returns the size of the message.
783
func (c *cachedFutureMsg) Size() (uint64, error) {
7✔
784
        // Return a constant 1.
7✔
785
        return 1, nil
7✔
786
}
7✔
787

788
// resendFutureMessages takes a block height, resends all the future messages
789
// found below and equal to that height and deletes those messages found in the
790
// gossiper's futureMsgs.
791
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
3✔
792
        var (
3✔
793
                // msgs are the target messages.
3✔
794
                msgs []*networkMsg
3✔
795

3✔
796
                // keys are the target messages' caching keys.
3✔
797
                keys []uint64
3✔
798
        )
3✔
799

3✔
800
        // filterMsgs is the visitor used when iterating the future cache.
3✔
801
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
6✔
802
                if cmsg.height <= height {
6✔
803
                        msgs = append(msgs, cmsg.msg)
3✔
804
                        keys = append(keys, k)
3✔
805
                }
3✔
806

807
                return true
3✔
808
        }
809

810
        // Filter out the target messages.
811
        d.futureMsgs.Range(filterMsgs)
3✔
812

3✔
813
        // Return early if no messages found.
3✔
814
        if len(msgs) == 0 {
6✔
815
                return
3✔
816
        }
3✔
817

818
        // Remove the filtered messages.
819
        for _, key := range keys {
6✔
820
                d.futureMsgs.Delete(key)
3✔
821
        }
3✔
822

823
        log.Debugf("Resending %d network messages at height %d",
3✔
824
                len(msgs), height)
3✔
825

3✔
826
        for _, msg := range msgs {
6✔
827
                select {
3✔
828
                case d.networkMsgs <- msg:
3✔
829
                case <-d.quit:
×
830
                        msg.err <- ErrGossiperShuttingDown
×
831
                }
832
        }
833
}
834

835
// Stop signals any active goroutines for a graceful closure.
836
func (d *AuthenticatedGossiper) Stop() error {
34✔
837
        d.stopped.Do(func() {
67✔
838
                log.Info("Authenticated gossiper shutting down...")
33✔
839
                defer log.Debug("Authenticated gossiper shutdown complete")
33✔
840

33✔
841
                d.stop()
33✔
842
        })
33✔
843
        return nil
34✔
844
}
845

846
func (d *AuthenticatedGossiper) stop() {
33✔
847
        log.Debug("Authenticated Gossiper is stopping")
33✔
848
        defer log.Debug("Authenticated Gossiper stopped")
33✔
849

33✔
850
        // `blockEpochs` is only initialized in the start routine so we make
33✔
851
        // sure we don't panic here in the case where the `Stop` method is
33✔
852
        // called when the `Start` method does not complete.
33✔
853
        if d.blockEpochs != nil {
66✔
854
                d.blockEpochs.Cancel()
33✔
855
        }
33✔
856

857
        d.syncMgr.Stop()
33✔
858

33✔
859
        d.banman.stop()
33✔
860

33✔
861
        d.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
66✔
862
        close(d.quit)
33✔
863
        d.wg.Wait()
33✔
864

33✔
865
        // We'll stop our reliable sender after all of the gossiper's goroutines
33✔
866
        // have exited to ensure nothing can cause it to continue executing.
33✔
867
        d.reliableSender.Stop()
33✔
868
}
869

870
// TODO(roasbeef): need method to get current gossip timestamp?
871
//  * using mtx, check time rotate forward is needed?
872

873
// ProcessRemoteAnnouncement sends a new remote announcement message along with
874
// the peer that sent the routing message. The announcement will be processed
875
// then added to a queue for batched trickled announcement to all connected
876
// peers.  Remote channel announcements should contain the announcement proof
877
// and be fully validated.
878
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
879
        msg lnwire.Message, peer lnpeer.Peer) chan error {
294✔
880

294✔
881
        errChan := make(chan error, 1)
294✔
882

294✔
883
        // For messages in the known set of channel series queries, we'll
294✔
884
        // dispatch the message directly to the GossipSyncer, and skip the main
294✔
885
        // processing loop.
294✔
886
        switch m := msg.(type) {
294✔
887
        case *lnwire.QueryShortChanIDs,
888
                *lnwire.QueryChannelRange,
889
                *lnwire.ReplyChannelRange,
890
                *lnwire.ReplyShortChanIDsEnd:
3✔
891

3✔
892
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
893
                if !ok {
3✔
894
                        log.Warnf("Gossip syncer for peer=%x not found",
×
895
                                peer.PubKey())
×
896

×
897
                        errChan <- ErrGossipSyncerNotFound
×
898
                        return errChan
×
899
                }
×
900

901
                // If we've found the message target, then we'll dispatch the
902
                // message directly to it.
903
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
3✔
904
                if err != nil {
3✔
905
                        log.Errorf("Process query msg from peer %x got %v",
×
906
                                peer.PubKey(), err)
×
907
                }
×
908

909
                errChan <- err
3✔
910
                return errChan
3✔
911

912
        // If a peer is updating its current update horizon, then we'll dispatch
913
        // that directly to the proper GossipSyncer.
914
        case *lnwire.GossipTimestampRange:
3✔
915
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
916
                if !ok {
3✔
917
                        log.Warnf("Gossip syncer for peer=%x not found",
×
918
                                peer.PubKey())
×
919

×
920
                        errChan <- ErrGossipSyncerNotFound
×
921
                        return errChan
×
922
                }
×
923

924
                // Queue the message for asynchronous processing to prevent
925
                // blocking the gossiper when rate limiting is active.
926
                if !syncer.QueueTimestampRange(m) {
3✔
927
                        log.Warnf("Unable to queue gossip filter for peer=%x: "+
×
928
                                "queue full", peer.PubKey())
×
929

×
930
                        // Return nil to indicate we've handled the message,
×
931
                        // even though it was dropped. This prevents the peer
×
932
                        // from being disconnected.
×
933
                        errChan <- nil
×
934
                        return errChan
×
935
                }
×
936

937
                errChan <- nil
3✔
938
                return errChan
3✔
939

940
        // To avoid inserting edges in the graph for our own channels that we
941
        // have already closed, we ignore such channel announcements coming
942
        // from the remote.
943
        case *lnwire.ChannelAnnouncement1:
223✔
944
                ownKey := d.selfKey.SerializeCompressed()
223✔
945
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
223✔
946
                        "for own channel")
223✔
947

223✔
948
                if bytes.Equal(m.NodeID1[:], ownKey) ||
223✔
949
                        bytes.Equal(m.NodeID2[:], ownKey) {
228✔
950

5✔
951
                        log.Warn(ownErr)
5✔
952
                        errChan <- ownErr
5✔
953
                        return errChan
5✔
954
                }
5✔
955
        }
956

957
        nMsg := &networkMsg{
292✔
958
                msg:      msg,
292✔
959
                isRemote: true,
292✔
960
                peer:     peer,
292✔
961
                source:   peer.IdentityKey(),
292✔
962
                err:      errChan,
292✔
963
        }
292✔
964

292✔
965
        select {
292✔
966
        case d.networkMsgs <- nMsg:
292✔
967

968
        // If the peer that sent us this error is quitting, then we don't need
969
        // to send back an error and can return immediately.
970
        // TODO(elle): the peer should now just rely on canceling the passed
971
        //  context.
972
        case <-peer.QuitSignal():
×
973
                return nil
×
974
        case <-ctx.Done():
×
975
                return nil
×
976
        case <-d.quit:
×
977
                nMsg.err <- ErrGossiperShuttingDown
×
978
        }
979

980
        return nMsg.err
292✔
981
}
982

983
// ProcessLocalAnnouncement sends a new remote announcement message along with
984
// the peer that sent the routing message. The announcement will be processed
985
// then added to a queue for batched trickled announcement to all connected
986
// peers.  Local channel announcements don't contain the announcement proof and
987
// will not be fully validated. Once the channel proofs are received, the
988
// entire channel announcement and update messages will be re-constructed and
989
// broadcast to the rest of the network.
990
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
991
        optionalFields ...OptionalMsgField) chan error {
50✔
992

50✔
993
        optionalMsgFields := &optionalMsgFields{}
50✔
994
        optionalMsgFields.apply(optionalFields...)
50✔
995

50✔
996
        nMsg := &networkMsg{
50✔
997
                msg:               msg,
50✔
998
                optionalMsgFields: optionalMsgFields,
50✔
999
                isRemote:          false,
50✔
1000
                source:            d.selfKey,
50✔
1001
                err:               make(chan error, 1),
50✔
1002
        }
50✔
1003

50✔
1004
        select {
50✔
1005
        case d.networkMsgs <- nMsg:
50✔
1006
        case <-d.quit:
×
1007
                nMsg.err <- ErrGossiperShuttingDown
×
1008
        }
1009

1010
        return nMsg.err
50✔
1011
}
1012

1013
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
1014
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
1015
// tuple.
1016
type channelUpdateID struct {
1017
        // channelID represents the set of data which is needed to
1018
        // retrieve all necessary data to validate the channel existence.
1019
        channelID lnwire.ShortChannelID
1020

1021
        // Flags least-significant bit must be set to 0 if the creating node
1022
        // corresponds to the first node in the previously sent channel
1023
        // announcement and 1 otherwise.
1024
        flags lnwire.ChanUpdateChanFlags
1025
}
1026

1027
// msgWithSenders is a wrapper struct around a message, and the set of peers
1028
// that originally sent us this message. Using this struct, we can ensure that
1029
// we don't re-send a message to the peer that sent it to us in the first
1030
// place.
1031
type msgWithSenders struct {
1032
        // msg is the wire message itself.
1033
        msg lnwire.Message
1034

1035
        // isLocal is true if this was a message that originated locally. We'll
1036
        // use this to bypass our normal checks to ensure we prioritize sending
1037
        // out our own updates.
1038
        isLocal bool
1039

1040
        // sender is the set of peers that sent us this message.
1041
        senders map[route.Vertex]struct{}
1042
}
1043

1044
// mergeSyncerMap is used to merge the set of senders of a particular message
1045
// with peers that we have an active GossipSyncer with. We do this to ensure
1046
// that we don't broadcast messages to any peers that we have active gossip
1047
// syncers for.
1048
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
32✔
1049
        for peerPub := range syncers {
35✔
1050
                m.senders[peerPub] = struct{}{}
3✔
1051
        }
3✔
1052
}
1053

1054
// deDupedAnnouncements de-duplicates announcements that have been added to the
1055
// batch. Internally, announcements are stored in three maps
1056
// (one each for channel announcements, channel updates, and node
1057
// announcements). These maps keep track of unique announcements and ensure no
1058
// announcements are duplicated. We keep the three message types separate, such
1059
// that we can send channel announcements first, then channel updates, and
1060
// finally node announcements when it's time to broadcast them.
1061
type deDupedAnnouncements struct {
1062
        // channelAnnouncements are identified by the short channel id field.
1063
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
1064

1065
        // channelUpdates are identified by the channel update id field.
1066
        channelUpdates map[channelUpdateID]msgWithSenders
1067

1068
        // nodeAnnouncements are identified by the Vertex field.
1069
        nodeAnnouncements map[route.Vertex]msgWithSenders
1070

1071
        sync.Mutex
1072
}
1073

1074
// Reset operates on deDupedAnnouncements to reset the storage of
1075
// announcements.
1076
func (d *deDupedAnnouncements) Reset() {
35✔
1077
        d.Lock()
35✔
1078
        defer d.Unlock()
35✔
1079

35✔
1080
        d.reset()
35✔
1081
}
35✔
1082

1083
// reset is the private version of the Reset method. We have this so we can
1084
// call this method within method that are already holding the lock.
1085
func (d *deDupedAnnouncements) reset() {
332✔
1086
        // Storage of each type of announcement (channel announcements, channel
332✔
1087
        // updates, node announcements) is set to an empty map where the
332✔
1088
        // appropriate key points to the corresponding lnwire.Message.
332✔
1089
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
332✔
1090
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
332✔
1091
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
332✔
1092
}
332✔
1093

1094
// addMsg adds a new message to the current batch. If the message is already
1095
// present in the current batch, then this new instance replaces the latter,
1096
// and the set of senders is updated to reflect which node sent us this
1097
// message.
1098
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
94✔
1099
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
94✔
1100

94✔
1101
        // Depending on the message type (channel announcement, channel update,
94✔
1102
        // or node announcement), the message is added to the corresponding map
94✔
1103
        // in deDupedAnnouncements. Because each identifying key can have at
94✔
1104
        // most one value, the announcements are de-duplicated, with newer ones
94✔
1105
        // replacing older ones.
94✔
1106
        switch msg := message.msg.(type) {
94✔
1107

1108
        // Channel announcements are identified by the short channel id field.
1109
        case *lnwire.ChannelAnnouncement1:
26✔
1110
                deDupKey := msg.ShortChannelID
26✔
1111
                sender := route.NewVertex(message.source)
26✔
1112

26✔
1113
                mws, ok := d.channelAnnouncements[deDupKey]
26✔
1114
                if !ok {
51✔
1115
                        mws = msgWithSenders{
25✔
1116
                                msg:     msg,
25✔
1117
                                isLocal: !message.isRemote,
25✔
1118
                                senders: make(map[route.Vertex]struct{}),
25✔
1119
                        }
25✔
1120
                        mws.senders[sender] = struct{}{}
25✔
1121

25✔
1122
                        d.channelAnnouncements[deDupKey] = mws
25✔
1123

25✔
1124
                        return
25✔
1125
                }
25✔
1126

1127
                mws.msg = msg
1✔
1128
                mws.senders[sender] = struct{}{}
1✔
1129
                d.channelAnnouncements[deDupKey] = mws
1✔
1130

1131
        // Channel updates are identified by the (short channel id,
1132
        // channelflags) tuple.
1133
        case *lnwire.ChannelUpdate1:
49✔
1134
                sender := route.NewVertex(message.source)
49✔
1135
                deDupKey := channelUpdateID{
49✔
1136
                        msg.ShortChannelID,
49✔
1137
                        msg.ChannelFlags,
49✔
1138
                }
49✔
1139

49✔
1140
                oldTimestamp := uint32(0)
49✔
1141
                mws, ok := d.channelUpdates[deDupKey]
49✔
1142
                if ok {
52✔
1143
                        // If we already have seen this message, record its
3✔
1144
                        // timestamp.
3✔
1145
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
3✔
1146
                        if !ok {
3✔
1147
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1148
                                        "got: %T", mws.msg)
×
1149

×
1150
                                return
×
1151
                        }
×
1152

1153
                        oldTimestamp = update.Timestamp
3✔
1154
                }
1155

1156
                // If we already had this message with a strictly newer
1157
                // timestamp, then we'll just discard the message we got.
1158
                if oldTimestamp > msg.Timestamp {
50✔
1159
                        log.Debugf("Ignored outdated network message: "+
1✔
1160
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1✔
1161
                        return
1✔
1162
                }
1✔
1163

1164
                // If the message we just got is newer than what we previously
1165
                // have seen, or this is the first time we see it, then we'll
1166
                // add it to our map of announcements.
1167
                if oldTimestamp < msg.Timestamp {
95✔
1168
                        mws = msgWithSenders{
47✔
1169
                                msg:     msg,
47✔
1170
                                isLocal: !message.isRemote,
47✔
1171
                                senders: make(map[route.Vertex]struct{}),
47✔
1172
                        }
47✔
1173

47✔
1174
                        // We'll mark the sender of the message in the
47✔
1175
                        // senders map.
47✔
1176
                        mws.senders[sender] = struct{}{}
47✔
1177

47✔
1178
                        d.channelUpdates[deDupKey] = mws
47✔
1179

47✔
1180
                        return
47✔
1181
                }
47✔
1182

1183
                // Lastly, if we had seen this exact message from before, with
1184
                // the same timestamp, we'll add the sender to the map of
1185
                // senders, such that we can skip sending this message back in
1186
                // the next batch.
1187
                mws.msg = msg
1✔
1188
                mws.senders[sender] = struct{}{}
1✔
1189
                d.channelUpdates[deDupKey] = mws
1✔
1190

1191
        // Node announcements are identified by the Vertex field.  Use the
1192
        // NodeID to create the corresponding Vertex.
1193
        case *lnwire.NodeAnnouncement1:
25✔
1194
                sender := route.NewVertex(message.source)
25✔
1195
                deDupKey := route.Vertex(msg.NodeID)
25✔
1196

25✔
1197
                // We do the same for node announcements as we did for channel
25✔
1198
                // updates, as they also carry a timestamp.
25✔
1199
                oldTimestamp := uint32(0)
25✔
1200
                mws, ok := d.nodeAnnouncements[deDupKey]
25✔
1201
                if ok {
33✔
1202
                        ann, _ := mws.msg.(*lnwire.NodeAnnouncement1)
8✔
1203
                        oldTimestamp = ann.Timestamp
8✔
1204
                }
8✔
1205

1206
                // Discard the message if it's old.
1207
                if oldTimestamp > msg.Timestamp {
28✔
1208
                        return
3✔
1209
                }
3✔
1210

1211
                // Replace if it's newer.
1212
                if oldTimestamp < msg.Timestamp {
46✔
1213
                        mws = msgWithSenders{
21✔
1214
                                msg:     msg,
21✔
1215
                                isLocal: !message.isRemote,
21✔
1216
                                senders: make(map[route.Vertex]struct{}),
21✔
1217
                        }
21✔
1218

21✔
1219
                        mws.senders[sender] = struct{}{}
21✔
1220

21✔
1221
                        d.nodeAnnouncements[deDupKey] = mws
21✔
1222

21✔
1223
                        return
21✔
1224
                }
21✔
1225

1226
                // Add to senders map if it's the same as we had.
1227
                mws.msg = msg
7✔
1228
                mws.senders[sender] = struct{}{}
7✔
1229
                d.nodeAnnouncements[deDupKey] = mws
7✔
1230
        }
1231
}
1232

1233
// AddMsgs is a helper method to add multiple messages to the announcement
1234
// batch.
1235
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
62✔
1236
        d.Lock()
62✔
1237
        defer d.Unlock()
62✔
1238

62✔
1239
        for _, msg := range msgs {
156✔
1240
                d.addMsg(msg)
94✔
1241
        }
94✔
1242
}
1243

1244
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1245
// to broadcast next into messages that are locally sourced and those that are
1246
// sourced remotely.
1247
type msgsToBroadcast struct {
1248
        // localMsgs is the set of messages we created locally.
1249
        localMsgs []msgWithSenders
1250

1251
        // remoteMsgs is the set of messages that we received from a remote
1252
        // party.
1253
        remoteMsgs []msgWithSenders
1254
}
1255

1256
// addMsg adds a new message to the appropriate sub-slice.
1257
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
79✔
1258
        if msg.isLocal {
129✔
1259
                m.localMsgs = append(m.localMsgs, msg)
50✔
1260
        } else {
82✔
1261
                m.remoteMsgs = append(m.remoteMsgs, msg)
32✔
1262
        }
32✔
1263
}
1264

1265
// isEmpty returns true if the batch is empty.
1266
func (m *msgsToBroadcast) isEmpty() bool {
299✔
1267
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
299✔
1268
}
299✔
1269

1270
// length returns the length of the combined message set.
1271
func (m *msgsToBroadcast) length() int {
1✔
1272
        return len(m.localMsgs) + len(m.remoteMsgs)
1✔
1273
}
1✔
1274

1275
// Emit returns the set of de-duplicated announcements to be sent out during
1276
// the next announcement epoch, in the order of channel announcements, channel
1277
// updates, and node announcements. Each message emitted, contains the set of
1278
// peers that sent us the message. This way, we can ensure that we don't waste
1279
// bandwidth by re-sending a message to the peer that sent it to us in the
1280
// first place. Additionally, the set of stored messages are reset.
1281
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
300✔
1282
        d.Lock()
300✔
1283
        defer d.Unlock()
300✔
1284

300✔
1285
        // Get the total number of announcements.
300✔
1286
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
300✔
1287
                len(d.nodeAnnouncements)
300✔
1288

300✔
1289
        // Create an empty array of lnwire.Messages with a length equal to
300✔
1290
        // the total number of announcements.
300✔
1291
        msgs := msgsToBroadcast{
300✔
1292
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
300✔
1293
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
300✔
1294
        }
300✔
1295

300✔
1296
        // Add the channel announcements to the array first.
300✔
1297
        for _, message := range d.channelAnnouncements {
322✔
1298
                msgs.addMsg(message)
22✔
1299
        }
22✔
1300

1301
        // Then add the channel updates.
1302
        for _, message := range d.channelUpdates {
343✔
1303
                msgs.addMsg(message)
43✔
1304
        }
43✔
1305

1306
        // Finally add the node announcements.
1307
        for _, message := range d.nodeAnnouncements {
320✔
1308
                msgs.addMsg(message)
20✔
1309
        }
20✔
1310

1311
        d.reset()
300✔
1312

300✔
1313
        // Return the array of lnwire.messages.
300✔
1314
        return msgs
300✔
1315
}
1316

1317
// calculateSubBatchSize is a helper function that calculates the size to break
1318
// down the batchSize into.
1319
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1320
        minimumBatchSize, batchSize int) int {
16✔
1321
        if subBatchDelay > totalDelay {
18✔
1322
                return batchSize
2✔
1323
        }
2✔
1324

1325
        subBatchSize := (batchSize*int(subBatchDelay) +
14✔
1326
                int(totalDelay) - 1) / int(totalDelay)
14✔
1327

14✔
1328
        if subBatchSize < minimumBatchSize {
18✔
1329
                return minimumBatchSize
4✔
1330
        }
4✔
1331

1332
        return subBatchSize
10✔
1333
}
1334

1335
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1336
// this variable so the function can be mocked in our test.
1337
var batchSizeCalculator = calculateSubBatchSize
1338

1339
// splitAnnouncementBatches takes an exiting list of announcements and
1340
// decomposes it into sub batches controlled by the `subBatchSize`.
1341
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1342
        announcementBatch []msgWithSenders) [][]msgWithSenders {
78✔
1343

78✔
1344
        subBatchSize := batchSizeCalculator(
78✔
1345
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
78✔
1346
                d.cfg.MinimumBatchSize, len(announcementBatch),
78✔
1347
        )
78✔
1348

78✔
1349
        var splitAnnouncementBatch [][]msgWithSenders
78✔
1350

78✔
1351
        for subBatchSize < len(announcementBatch) {
202✔
1352
                // For slicing with minimal allocation
124✔
1353
                // https://github.com/golang/go/wiki/SliceTricks
124✔
1354
                announcementBatch, splitAnnouncementBatch =
124✔
1355
                        announcementBatch[subBatchSize:],
124✔
1356
                        append(splitAnnouncementBatch,
124✔
1357
                                announcementBatch[0:subBatchSize:subBatchSize])
124✔
1358
        }
124✔
1359
        splitAnnouncementBatch = append(
78✔
1360
                splitAnnouncementBatch, announcementBatch,
78✔
1361
        )
78✔
1362

78✔
1363
        return splitAnnouncementBatch
78✔
1364
}
1365

1366
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1367
// split size, and then sends out all items to the set of target peers. Locally
1368
// generated announcements are always sent before remotely generated
1369
// announcements.
1370
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(ctx context.Context,
1371
        annBatch msgsToBroadcast) {
37✔
1372

37✔
1373
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
37✔
1374
        // duration to delay the sending of next announcement batch.
37✔
1375
        delayNextBatch := func() {
108✔
1376
                select {
71✔
1377
                case <-time.After(d.cfg.SubBatchDelay):
54✔
1378
                case <-d.quit:
17✔
1379
                        return
17✔
1380
                }
1381
        }
1382

1383
        // Fetch the local and remote announcements.
1384
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
37✔
1385
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
37✔
1386

37✔
1387
        d.wg.Add(1)
37✔
1388
        go func() {
74✔
1389
                defer d.wg.Done()
37✔
1390

37✔
1391
                log.Debugf("Broadcasting %v new local announcements in %d "+
37✔
1392
                        "sub batches", len(annBatch.localMsgs),
37✔
1393
                        len(localBatches))
37✔
1394

37✔
1395
                // Send out the local announcements first.
37✔
1396
                for _, annBatch := range localBatches {
74✔
1397
                        d.sendLocalBatch(annBatch)
37✔
1398
                        delayNextBatch()
37✔
1399
                }
37✔
1400

1401
                log.Debugf("Broadcasting %v new remote announcements in %d "+
37✔
1402
                        "sub batches", len(annBatch.remoteMsgs),
37✔
1403
                        len(remoteBatches))
37✔
1404

37✔
1405
                // Now send the remote announcements.
37✔
1406
                for _, annBatch := range remoteBatches {
74✔
1407
                        d.sendRemoteBatch(ctx, annBatch)
37✔
1408
                        delayNextBatch()
37✔
1409
                }
37✔
1410
        }()
1411
}
1412

1413
// sendLocalBatch broadcasts a list of locally generated announcements to our
1414
// peers. For local announcements, we skip the filter and dedup logic and just
1415
// send the announcements out to all our coonnected peers.
1416
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
37✔
1417
        msgsToSend := lnutils.Map(
37✔
1418
                annBatch, func(m msgWithSenders) lnwire.Message {
83✔
1419
                        return m.msg
46✔
1420
                },
46✔
1421
        )
1422

1423
        err := d.cfg.Broadcast(nil, msgsToSend...)
37✔
1424
        if err != nil {
37✔
1425
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1426
        }
×
1427
}
1428

1429
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1430
// peers.
1431
func (d *AuthenticatedGossiper) sendRemoteBatch(ctx context.Context,
1432
        annBatch []msgWithSenders) {
37✔
1433

37✔
1434
        syncerPeers := d.syncMgr.GossipSyncers()
37✔
1435

37✔
1436
        // We'll first attempt to filter out this new message for all peers
37✔
1437
        // that have active gossip syncers active.
37✔
1438
        for pub, syncer := range syncerPeers {
40✔
1439
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
3✔
1440
                syncer.FilterGossipMsgs(ctx, annBatch...)
3✔
1441
        }
3✔
1442

1443
        for _, msgChunk := range annBatch {
69✔
1444
                msgChunk := msgChunk
32✔
1445

32✔
1446
                // With the syncers taken care of, we'll merge the sender map
32✔
1447
                // with the set of syncers, so we don't send out duplicate
32✔
1448
                // messages.
32✔
1449
                msgChunk.mergeSyncerMap(syncerPeers)
32✔
1450

32✔
1451
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
32✔
1452
                if err != nil {
32✔
1453
                        log.Errorf("Unable to send batch "+
×
1454
                                "announcements: %v", err)
×
1455
                        continue
×
1456
                }
1457
        }
1458
}
1459

1460
// networkHandler is the primary goroutine that drives this service. The roles
1461
// of this goroutine includes answering queries related to the state of the
1462
// network, syncing up newly connected peers, and also periodically
1463
// broadcasting our latest topology state to all connected peers.
1464
//
1465
// NOTE: This MUST be run as a goroutine.
1466
func (d *AuthenticatedGossiper) networkHandler(ctx context.Context) {
33✔
1467
        defer d.wg.Done()
33✔
1468

33✔
1469
        // Initialize empty deDupedAnnouncements to store announcement batch.
33✔
1470
        announcements := deDupedAnnouncements{}
33✔
1471
        announcements.Reset()
33✔
1472

33✔
1473
        d.cfg.RetransmitTicker.Resume()
33✔
1474
        defer d.cfg.RetransmitTicker.Stop()
33✔
1475

33✔
1476
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
33✔
1477
        defer trickleTimer.Stop()
33✔
1478

33✔
1479
        // To start, we'll first check to see if there are any stale channel or
33✔
1480
        // node announcements that we need to re-transmit.
33✔
1481
        if err := d.retransmitStaleAnns(ctx, time.Now()); err != nil {
33✔
1482
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1483
        }
×
1484

1485
        for {
702✔
1486
                select {
669✔
1487
                // A new policy update has arrived. We'll commit it to the
1488
                // sub-systems below us, then craft, sign, and broadcast a new
1489
                // ChannelUpdate for the set of affected clients.
1490
                case policyUpdate := <-d.chanPolicyUpdates:
4✔
1491
                        log.Tracef("Received channel %d policy update requests",
4✔
1492
                                len(policyUpdate.edgesToUpdate))
4✔
1493

4✔
1494
                        // First, we'll now create new fully signed updates for
4✔
1495
                        // the affected channels and also update the underlying
4✔
1496
                        // graph with the new state.
4✔
1497
                        newChanUpdates, err := d.processChanPolicyUpdate(
4✔
1498
                                ctx, policyUpdate.edgesToUpdate,
4✔
1499
                        )
4✔
1500
                        policyUpdate.errChan <- err
4✔
1501
                        if err != nil {
4✔
1502
                                log.Errorf("Unable to craft policy updates: %v",
×
1503
                                        err)
×
1504
                                continue
×
1505
                        }
1506

1507
                        // Finally, with the updates committed, we'll now add
1508
                        // them to the announcement batch to be flushed at the
1509
                        // start of the next epoch.
1510
                        announcements.AddMsgs(newChanUpdates...)
4✔
1511

1512
                case announcement := <-d.networkMsgs:
341✔
1513
                        log.Tracef("Received network message: "+
341✔
1514
                                "peer=%v, msg=%s, is_remote=%v",
341✔
1515
                                announcement.peer, announcement.msg.MsgType(),
341✔
1516
                                announcement.isRemote)
341✔
1517

341✔
1518
                        switch announcement.msg.(type) {
341✔
1519
                        // Channel announcement signatures are amongst the only
1520
                        // messages that we'll process serially.
1521
                        case *lnwire.AnnounceSignatures1:
24✔
1522
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
24✔
1523
                                        ctx, announcement,
24✔
1524
                                )
24✔
1525
                                log.Debugf("Processed network message %s, "+
24✔
1526
                                        "returned len(announcements)=%v",
24✔
1527
                                        announcement.msg.MsgType(),
24✔
1528
                                        len(emittedAnnouncements))
24✔
1529

24✔
1530
                                if emittedAnnouncements != nil {
37✔
1531
                                        announcements.AddMsgs(
13✔
1532
                                                emittedAnnouncements...,
13✔
1533
                                        )
13✔
1534
                                }
13✔
1535
                                continue
24✔
1536
                        }
1537

1538
                        // If this message was recently rejected, then we won't
1539
                        // attempt to re-process it.
1540
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
320✔
1541
                                announcement.msg,
320✔
1542
                                sourceToPub(announcement.source),
320✔
1543
                        ) {
321✔
1544

1✔
1545
                                announcement.err <- fmt.Errorf("recently " +
1✔
1546
                                        "rejected")
1✔
1547
                                continue
1✔
1548
                        }
1549

1550
                        // We'll set up any dependent, and wait until a free
1551
                        // slot for this job opens up, this allow us to not
1552
                        // have thousands of goroutines active.
1553
                        annJobID, err := d.vb.InitJobDependencies(
319✔
1554
                                announcement.msg,
319✔
1555
                        )
319✔
1556
                        if err != nil {
319✔
1557
                                announcement.err <- err
×
1558
                                continue
×
1559
                        }
1560

1561
                        d.wg.Add(1)
319✔
1562
                        go d.handleNetworkMessages(
319✔
1563
                                ctx, announcement, &announcements, annJobID,
319✔
1564
                        )
319✔
1565

1566
                // The trickle timer has ticked, which indicates we should
1567
                // flush to the network the pending batch of new announcements
1568
                // we've received since the last trickle tick.
1569
                case <-trickleTimer.C:
299✔
1570
                        // Emit the current batch of announcements from
299✔
1571
                        // deDupedAnnouncements.
299✔
1572
                        announcementBatch := announcements.Emit()
299✔
1573

299✔
1574
                        // If the current announcements batch is nil, then we
299✔
1575
                        // have no further work here.
299✔
1576
                        if announcementBatch.isEmpty() {
564✔
1577
                                continue
265✔
1578
                        }
1579

1580
                        // At this point, we have the set of local and remote
1581
                        // announcements we want to send out. We'll do the
1582
                        // batching as normal for both, but for local
1583
                        // announcements, we'll blast them out w/o regard for
1584
                        // our peer's policies so we ensure they propagate
1585
                        // properly.
1586
                        d.splitAndSendAnnBatch(ctx, announcementBatch)
37✔
1587

1588
                // The retransmission timer has ticked which indicates that we
1589
                // should check if we need to prune or re-broadcast any of our
1590
                // personal channels or node announcement. This addresses the
1591
                // case of "zombie" channels and channel advertisements that
1592
                // have been dropped, or not properly propagated through the
1593
                // network.
1594
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1✔
1595
                        if err := d.retransmitStaleAnns(ctx, tick); err != nil {
1✔
1596
                                log.Errorf("unable to rebroadcast stale "+
×
1597
                                        "announcements: %v", err)
×
1598
                        }
×
1599

1600
                // The gossiper has been signalled to exit, to we exit our
1601
                // main loop so the wait group can be decremented.
1602
                case <-d.quit:
33✔
1603
                        return
33✔
1604
                }
1605
        }
1606
}
1607

1608
// handleNetworkMessages is responsible for waiting for dependencies for a
1609
// given network message and processing the message. Once processed, it will
1610
// signal its dependants and add the new announcements to the announce batch.
1611
//
1612
// NOTE: must be run as a goroutine.
1613
func (d *AuthenticatedGossiper) handleNetworkMessages(ctx context.Context,
1614
        nMsg *networkMsg, deDuped *deDupedAnnouncements, jobID JobID) {
319✔
1615

319✔
1616
        defer d.wg.Done()
319✔
1617
        defer d.vb.CompleteJob()
319✔
1618

319✔
1619
        // We should only broadcast this message forward if it originated from
319✔
1620
        // us or it wasn't received as part of our initial historical sync.
319✔
1621
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
319✔
1622

319✔
1623
        // If this message has an existing dependency, then we'll wait until
319✔
1624
        // that has been fully validated before we proceed.
319✔
1625
        err := d.vb.WaitForParents(jobID, nMsg.msg)
319✔
1626
        if err != nil {
319✔
1627
                log.Debugf("Validating network message %s got err: %v",
×
1628
                        nMsg.msg.MsgType(), err)
×
1629

×
1630
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
1631
                        log.Warnf("unexpected error during validation "+
×
1632
                                "barrier shutdown: %v", err)
×
1633
                }
×
1634
                nMsg.err <- err
×
1635

×
1636
                return
×
1637
        }
1638

1639
        // Process the network announcement to determine if this is either a
1640
        // new announcement from our PoV or an edges to a prior vertex/edge we
1641
        // previously proceeded.
1642
        newAnns, allow := d.processNetworkAnnouncement(ctx, nMsg)
319✔
1643

319✔
1644
        log.Tracef("Processed network message %s, returned "+
319✔
1645
                "len(announcements)=%v, allowDependents=%v",
319✔
1646
                nMsg.msg.MsgType(), len(newAnns), allow)
319✔
1647

319✔
1648
        // If this message had any dependencies, then we can now signal them to
319✔
1649
        // continue.
319✔
1650
        err = d.vb.SignalDependents(nMsg.msg, jobID)
319✔
1651
        if err != nil {
319✔
1652
                // Something is wrong if SignalDependents returns an error.
×
1653
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
1654
                        "JobID=%v", lnutils.SpewLogClosure(nMsg.msg), jobID)
×
1655

×
1656
                nMsg.err <- err
×
1657

×
1658
                return
×
1659
        }
×
1660

1661
        // If the announcement was accepted, then add the emitted announcements
1662
        // to our announce batch to be broadcast once the trickle timer ticks
1663
        // gain.
1664
        if newAnns != nil && shouldBroadcast {
359✔
1665
                // TODO(roasbeef): exclude peer that sent.
40✔
1666
                deDuped.AddMsgs(newAnns...)
40✔
1667
        } else if newAnns != nil {
326✔
1668
                log.Trace("Skipping broadcast of announcements received " +
4✔
1669
                        "during initial graph sync")
4✔
1670
        }
4✔
1671
}
1672

1673
// TODO(roasbeef): d/c peers that send updates not on our chain
1674

1675
// InitSyncState is called by outside sub-systems when a connection is
1676
// established to a new peer that understands how to perform channel range
1677
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1678
// needed to handle new queries.
1679
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
3✔
1680
        d.syncMgr.InitSyncState(syncPeer)
3✔
1681
}
3✔
1682

1683
// PruneSyncState is called by outside sub-systems once a peer that we were
1684
// previously connected to has been disconnected. In this case we can stop the
1685
// existing GossipSyncer assigned to the peer and free up resources.
1686
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
3✔
1687
        d.syncMgr.PruneSyncState(peer)
3✔
1688
}
3✔
1689

1690
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1691
// false otherwise, This avoids expensive reprocessing of the message.
1692
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1693
        peerPub [33]byte) bool {
283✔
1694

283✔
1695
        // We only cache rejections for gossip messages. So if it is not
283✔
1696
        // a gossip message, we return false.
283✔
1697
        gMsg, ok := msg.(lnwire.GossipMessage)
283✔
1698
        if !ok {
283✔
NEW
1699
                return false
×
NEW
1700
        }
×
1701

1702
        var scid uint64
283✔
1703
        switch m := gMsg.(type) {
283✔
1704
        case *lnwire.ChannelUpdate1:
51✔
1705
                scid = m.ShortChannelID.ToUint64()
51✔
1706

1707
        case *lnwire.ChannelAnnouncement1:
221✔
1708
                scid = m.ShortChannelID.ToUint64()
221✔
1709

1710
        default:
17✔
1711
                return false
17✔
1712
        }
1713

1714
        _, err := d.recentRejects.Get(newRejectCacheKey(
269✔
1715
                gMsg.GossipVersion(), scid, peerPub,
269✔
1716
        ))
269✔
1717

269✔
1718
        return !errors.Is(err, cache.ErrElementNotFound)
269✔
1719
}
1720

1721
// retransmitStaleAnns examines all outgoing channels that the source node is
1722
// known to maintain to check to see if any of them are "stale". A channel is
1723
// stale iff, the last timestamp of its rebroadcast is older than the
1724
// RebroadcastInterval. We also check if a refreshed node announcement should
1725
// be resent.
1726
func (d *AuthenticatedGossiper) retransmitStaleAnns(ctx context.Context,
1727
        now time.Time) error {
34✔
1728

34✔
1729
        // Iterate over all of our channels and check if any of them fall
34✔
1730
        // within the prune interval or re-broadcast interval.
34✔
1731
        type updateTuple struct {
34✔
1732
                info *models.ChannelEdgeInfo
34✔
1733
                edge *models.ChannelEdgePolicy
34✔
1734
        }
34✔
1735

34✔
1736
        var (
34✔
1737
                havePublicChannels bool
34✔
1738
                edgesToUpdate      []updateTuple
34✔
1739
        )
34✔
1740
        err := d.cfg.Graph.ForAllOutgoingChannels(ctx, func(
34✔
1741
                info *models.ChannelEdgeInfo,
34✔
1742
                edge *models.ChannelEdgePolicy) error {
39✔
1743

5✔
1744
                // If there's no auth proof attached to this edge, it means
5✔
1745
                // that it is a private channel not meant to be announced to
5✔
1746
                // the greater network, so avoid sending channel updates for
5✔
1747
                // this channel to not leak its
5✔
1748
                // existence.
5✔
1749
                if info.AuthProof == nil {
9✔
1750
                        log.Debugf("Skipping retransmission of channel "+
4✔
1751
                                "without AuthProof: %v", info.ChannelID)
4✔
1752
                        return nil
4✔
1753
                }
4✔
1754

1755
                // We make a note that we have at least one public channel. We
1756
                // use this to determine whether we should send a node
1757
                // announcement below.
1758
                havePublicChannels = true
4✔
1759

4✔
1760
                // If this edge has a ChannelUpdate that was created before the
4✔
1761
                // introduction of the MaxHTLC field, then we'll update this
4✔
1762
                // edge to propagate this information in the network.
4✔
1763
                if !edge.MessageFlags.HasMaxHtlc() {
4✔
1764
                        // We'll make sure we support the new max_htlc field if
×
1765
                        // not already present.
×
1766
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1767
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1768

×
1769
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1770
                                info: info,
×
1771
                                edge: edge,
×
1772
                        })
×
1773
                        return nil
×
1774
                }
×
1775

1776
                timeElapsed := now.Sub(edge.LastUpdate)
4✔
1777

4✔
1778
                // If it's been longer than RebroadcastInterval since we've
4✔
1779
                // re-broadcasted the channel, add the channel to the set of
4✔
1780
                // edges we need to update.
4✔
1781
                if timeElapsed >= d.cfg.RebroadcastInterval {
5✔
1782
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
1✔
1783
                                info: info,
1✔
1784
                                edge: edge,
1✔
1785
                        })
1✔
1786
                }
1✔
1787

1788
                return nil
4✔
1789
        }, func() {
3✔
1790
                havePublicChannels = false
3✔
1791
                edgesToUpdate = nil
3✔
1792
        })
3✔
1793
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
34✔
1794
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1795
                        err)
×
1796
        }
×
1797

1798
        var signedUpdates []lnwire.Message
34✔
1799
        for _, chanToUpdate := range edgesToUpdate {
35✔
1800
                // Re-sign and update the channel on disk and retrieve our
1✔
1801
                // ChannelUpdate to broadcast.
1✔
1802
                chanAnn, chanUpdate, err := d.updateChannel(
1✔
1803
                        ctx, chanToUpdate.info, chanToUpdate.edge,
1✔
1804
                )
1✔
1805
                if err != nil {
1✔
1806
                        return fmt.Errorf("unable to update channel: %w", err)
×
1807
                }
×
1808

1809
                // If we have a valid announcement to transmit, then we'll send
1810
                // that along with the update.
1811
                if chanAnn != nil {
2✔
1812
                        signedUpdates = append(signedUpdates, chanAnn)
1✔
1813
                }
1✔
1814

1815
                signedUpdates = append(signedUpdates, chanUpdate)
1✔
1816
        }
1817

1818
        // If we don't have any public channels, we return as we don't want to
1819
        // broadcast anything that would reveal our existence.
1820
        if !havePublicChannels {
67✔
1821
                return nil
33✔
1822
        }
33✔
1823

1824
        // We'll also check that our NodeAnnouncement1 is not too old.
1825
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
4✔
1826
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
4✔
1827
        timeElapsed := now.Sub(timestamp)
4✔
1828

4✔
1829
        // If it's been a full day since we've re-broadcasted the
4✔
1830
        // node announcement, refresh it and resend it.
4✔
1831
        nodeAnnStr := ""
4✔
1832
        if timeElapsed >= d.cfg.RebroadcastInterval {
5✔
1833
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
1✔
1834
                if err != nil {
1✔
1835
                        return fmt.Errorf("unable to get refreshed node "+
×
1836
                                "announcement: %v", err)
×
1837
                }
×
1838

1839
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1840
                nodeAnnStr = " and our refreshed node announcement"
1✔
1841

1✔
1842
                // Before broadcasting the refreshed node announcement, add it
1✔
1843
                // to our own graph.
1✔
1844
                if err := d.addNode(ctx, &newNodeAnn); err != nil {
2✔
1845
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1846
                                "to graph: %v", err)
1✔
1847
                }
1✔
1848
        }
1849

1850
        // If we don't have any updates to re-broadcast, then we'll exit
1851
        // early.
1852
        if len(signedUpdates) == 0 {
7✔
1853
                return nil
3✔
1854
        }
3✔
1855

1856
        log.Infof("Retransmitting %v outgoing channels%v",
1✔
1857
                len(edgesToUpdate), nodeAnnStr)
1✔
1858

1✔
1859
        // With all the wire announcements properly crafted, we'll broadcast
1✔
1860
        // our known outgoing channels to all our immediate peers.
1✔
1861
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
1862
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1863
        }
×
1864

1865
        return nil
1✔
1866
}
1867

1868
// processChanPolicyUpdate generates a new set of channel updates for the
1869
// provided list of edges and updates the backing ChannelGraphSource.
1870
func (d *AuthenticatedGossiper) processChanPolicyUpdate(ctx context.Context,
1871
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
4✔
1872

4✔
1873
        var chanUpdates []networkMsg
4✔
1874
        for _, edgeInfo := range edgesToUpdate {
10✔
1875
                // Now that we've collected all the channels we need to update,
6✔
1876
                // we'll re-sign and update the backing ChannelGraphSource, and
6✔
1877
                // retrieve our ChannelUpdate to broadcast.
6✔
1878
                _, chanUpdate, err := d.updateChannel(
6✔
1879
                        ctx, edgeInfo.Info, edgeInfo.Edge,
6✔
1880
                )
6✔
1881
                if err != nil {
6✔
1882
                        return nil, err
×
1883
                }
×
1884

1885
                // We'll avoid broadcasting any updates for private channels to
1886
                // avoid directly giving away their existence. Instead, we'll
1887
                // send the update directly to the remote party.
1888
                if edgeInfo.Info.AuthProof == nil {
10✔
1889
                        // If AuthProof is nil and an alias was found for this
4✔
1890
                        // ChannelID (meaning the option-scid-alias feature was
4✔
1891
                        // negotiated), we'll replace the ShortChannelID in the
4✔
1892
                        // update with the peer's alias. We do this after
4✔
1893
                        // updateChannel so that the alias isn't persisted to
4✔
1894
                        // the database.
4✔
1895
                        chanID := lnwire.NewChanIDFromOutPoint(
4✔
1896
                                edgeInfo.Info.ChannelPoint,
4✔
1897
                        )
4✔
1898

4✔
1899
                        var defaultAlias lnwire.ShortChannelID
4✔
1900
                        foundAlias, _ := d.cfg.GetAlias(chanID)
4✔
1901
                        if foundAlias != defaultAlias {
7✔
1902
                                chanUpdate.ShortChannelID = foundAlias
3✔
1903

3✔
1904
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
3✔
1905
                                if err != nil {
3✔
1906
                                        log.Errorf("Unable to sign alias "+
×
1907
                                                "update: %v", err)
×
1908
                                        continue
×
1909
                                }
1910

1911
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
1912
                                if err != nil {
3✔
1913
                                        log.Errorf("Unable to create sig: %v",
×
1914
                                                err)
×
1915
                                        continue
×
1916
                                }
1917

1918
                                chanUpdate.Signature = lnSig
3✔
1919
                        }
1920

1921
                        remotePubKey := remotePubFromChanInfo(
4✔
1922
                                edgeInfo.Info, chanUpdate.ChannelFlags,
4✔
1923
                        )
4✔
1924
                        err := d.reliableSender.sendMessage(
4✔
1925
                                ctx, chanUpdate, remotePubKey,
4✔
1926
                        )
4✔
1927
                        if err != nil {
4✔
1928
                                log.Errorf("Unable to reliably send %v for "+
×
1929
                                        "channel=%v to peer=%x: %v",
×
1930
                                        chanUpdate.MsgType(),
×
1931
                                        chanUpdate.ShortChannelID,
×
1932
                                        remotePubKey, err)
×
1933
                        }
×
1934
                        continue
4✔
1935
                }
1936

1937
                // We set ourselves as the source of this message to indicate
1938
                // that we shouldn't skip any peers when sending this message.
1939
                chanUpdates = append(chanUpdates, networkMsg{
5✔
1940
                        source:   d.selfKey,
5✔
1941
                        isRemote: false,
5✔
1942
                        msg:      chanUpdate,
5✔
1943
                })
5✔
1944
        }
1945

1946
        return chanUpdates, nil
4✔
1947
}
1948

1949
// remotePubFromChanInfo returns the public key of the remote peer given a
1950
// ChannelEdgeInfo that describe a channel we have with them.
1951
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1952
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
15✔
1953

15✔
1954
        var remotePubKey [33]byte
15✔
1955
        switch {
15✔
1956
        case chanFlags&lnwire.ChanUpdateDirection == 0:
15✔
1957
                remotePubKey = chanInfo.NodeKey2Bytes
15✔
1958
        case chanFlags&lnwire.ChanUpdateDirection == 1:
3✔
1959
                remotePubKey = chanInfo.NodeKey1Bytes
3✔
1960
        }
1961

1962
        return remotePubKey
15✔
1963
}
1964

1965
// processRejectedEdge examines a rejected edge to see if we can extract any
1966
// new announcements from it.  An edge will get rejected if we already added
1967
// the same edge without AuthProof to the graph. If the received announcement
1968
// contains a proof, we can add this proof to our edge.  We can end up in this
1969
// situation in the case where we create a channel, but for some reason fail
1970
// to receive the remote peer's proof, while the remote peer is able to fully
1971
// assemble the proof and craft the ChannelAnnouncement.
1972
func (d *AuthenticatedGossiper) processRejectedEdge(_ context.Context,
1973
        chanAnnMsg *lnwire.ChannelAnnouncement1,
1974
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
3✔
1975

3✔
1976
        // First, we'll fetch the state of the channel as we know if from the
3✔
1977
        // database.
3✔
1978
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
3✔
1979
                chanAnnMsg.ShortChannelID,
3✔
1980
        )
3✔
1981
        if err != nil {
3✔
1982
                return nil, err
×
1983
        }
×
1984

1985
        // The edge is in the graph, and has a proof attached, then we'll just
1986
        // reject it as normal.
1987
        if chanInfo.AuthProof != nil {
6✔
1988
                return nil, nil
3✔
1989
        }
3✔
1990

1991
        // Otherwise, this means that the edge is within the graph, but it
1992
        // doesn't yet have a proper proof attached. If we did not receive
1993
        // the proof such that we now can add it, there's nothing more we
1994
        // can do.
1995
        if proof == nil {
×
1996
                return nil, nil
×
1997
        }
×
1998

1999
        // We'll then create then validate the new fully assembled
2000
        // announcement.
2001
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
2002
                proof, chanInfo, e1, e2,
×
2003
        )
×
2004
        if err != nil {
×
2005
                return nil, err
×
2006
        }
×
2007
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
2008
        if err != nil {
×
2009
                err := fmt.Errorf("assembled channel announcement proof "+
×
2010
                        "for shortChanID=%v isn't valid: %v",
×
2011
                        chanAnnMsg.ShortChannelID, err)
×
2012
                log.Error(err)
×
2013
                return nil, err
×
2014
        }
×
2015

2016
        // If everything checks out, then we'll add the fully assembled proof
2017
        // to the database.
2018
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
2019
        if err != nil {
×
2020
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
2021
                        chanAnnMsg.ShortChannelID, err)
×
2022
                log.Error(err)
×
2023
                return nil, err
×
2024
        }
×
2025

2026
        // As we now have a complete channel announcement for this channel,
2027
        // we'll construct the announcement so they can be broadcast out to all
2028
        // our peers.
2029
        announcements := make([]networkMsg, 0, 3)
×
2030
        announcements = append(announcements, networkMsg{
×
2031
                source: d.selfKey,
×
2032
                msg:    chanAnn,
×
2033
        })
×
2034
        if e1Ann != nil {
×
2035
                announcements = append(announcements, networkMsg{
×
2036
                        source: d.selfKey,
×
2037
                        msg:    e1Ann,
×
2038
                })
×
2039
        }
×
2040
        if e2Ann != nil {
×
2041
                announcements = append(announcements, networkMsg{
×
2042
                        source: d.selfKey,
×
2043
                        msg:    e2Ann,
×
2044
                })
×
2045

×
2046
        }
×
2047

2048
        return announcements, nil
×
2049
}
2050

2051
// fetchPKScript fetches the output script for the given SCID.
2052
func (d *AuthenticatedGossiper) fetchPKScript(chanID lnwire.ShortChannelID) (
2053
        txscript.ScriptClass, btcutil.Address, error) {
×
2054

×
2055
        pkScript, err := lnwallet.FetchPKScriptWithQuit(
×
2056
                d.cfg.ChainIO, chanID, d.quit,
×
2057
        )
×
2058
        if err != nil {
×
2059
                return txscript.WitnessUnknownTy, nil, err
×
2060
        }
×
2061

2062
        scriptClass, addrs, _, err := txscript.ExtractPkScriptAddrs(
×
2063
                pkScript, d.cfg.ChainParams,
×
2064
        )
×
2065
        if err != nil {
×
2066
                return txscript.WitnessUnknownTy, nil, err
×
2067
        }
×
2068

2069
        if len(addrs) != 1 {
×
2070
                return txscript.WitnessUnknownTy, nil, fmt.Errorf("expected "+
×
2071
                        "1 address, got: %d", len(addrs))
×
2072
        }
×
2073

2074
        return scriptClass, addrs[0], nil
×
2075
}
2076

2077
// addNode processes the given node announcement, and adds it to our channel
2078
// graph.
2079
func (d *AuthenticatedGossiper) addNode(ctx context.Context,
2080
        msg *lnwire.NodeAnnouncement1, op ...batch.SchedulerOption) error {
20✔
2081

20✔
2082
        if err := netann.ValidateNodeAnn(msg); err != nil {
21✔
2083
                return fmt.Errorf("unable to validate node announcement: %w",
1✔
2084
                        err)
1✔
2085
        }
1✔
2086

2087
        return d.cfg.Graph.AddNode(
19✔
2088
                ctx, models.NodeFromWireAnnouncement(msg), op...,
19✔
2089
        )
19✔
2090
}
2091

2092
// isPremature decides whether a given network message has a block height+delta
2093
// value specified in the future. If so, the message will be added to the
2094
// future message map and be processed when the block height as reached.
2095
//
2096
// NOTE: must be used inside a lock.
2097
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2098
        delta uint32, msg *networkMsg) bool {
292✔
2099

292✔
2100
        // The channel is already confirmed at chanID.BlockHeight so we minus
292✔
2101
        // one block. For instance, if the required confirmation for this
292✔
2102
        // channel announcement is 6, we then only need to wait for 5 more
292✔
2103
        // blocks once the funding tx is confirmed.
292✔
2104
        if delta > 0 {
295✔
2105
                delta--
3✔
2106
        }
3✔
2107

2108
        msgHeight := chanID.BlockHeight + delta
292✔
2109

292✔
2110
        // The message height is smaller or equal to our best known height,
292✔
2111
        // thus the message is mature.
292✔
2112
        if msgHeight <= d.bestHeight {
583✔
2113
                return false
291✔
2114
        }
291✔
2115

2116
        // Add the premature message to our future messages which will be
2117
        // resent once the block height has reached.
2118
        //
2119
        // Copy the networkMsgs since the old message's err chan will be
2120
        // consumed.
2121
        copied := &networkMsg{
4✔
2122
                peer:              msg.peer,
4✔
2123
                source:            msg.source,
4✔
2124
                msg:               msg.msg,
4✔
2125
                optionalMsgFields: msg.optionalMsgFields,
4✔
2126
                isRemote:          msg.isRemote,
4✔
2127
                err:               make(chan error, 1),
4✔
2128
        }
4✔
2129

4✔
2130
        // Create the cached message.
4✔
2131
        cachedMsg := &cachedFutureMsg{
4✔
2132
                msg:    copied,
4✔
2133
                height: msgHeight,
4✔
2134
        }
4✔
2135

4✔
2136
        // Increment the msg ID and add it to the cache.
4✔
2137
        nextMsgID := d.futureMsgs.nextMsgID()
4✔
2138
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
4✔
2139
        if err != nil {
4✔
2140
                log.Errorf("Adding future message got error: %v", err)
×
2141
        }
×
2142

2143
        log.Debugf("Network message: %v added to future messages for "+
4✔
2144
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
4✔
2145
                msgHeight, d.bestHeight)
4✔
2146

4✔
2147
        return true
4✔
2148
}
2149

2150
// processNetworkAnnouncement processes a new network relate authenticated
2151
// channel or node announcement or announcements proofs. If the announcement
2152
// didn't affect the internal state due to either being out of date, invalid,
2153
// or redundant, then nil is returned. Otherwise, the set of announcements will
2154
// be returned which should be broadcasted to the rest of the network. The
2155
// boolean returned indicates whether any dependents of the announcement should
2156
// attempt to be processed as well.
2157
func (d *AuthenticatedGossiper) processNetworkAnnouncement(ctx context.Context,
2158
        nMsg *networkMsg) ([]networkMsg, bool) {
340✔
2159

340✔
2160
        // If this is a remote update, we set the scheduler option to lazily
340✔
2161
        // add it to the graph.
340✔
2162
        var schedulerOp []batch.SchedulerOption
340✔
2163
        if nMsg.isRemote {
633✔
2164
                schedulerOp = append(schedulerOp, batch.LazyAdd())
293✔
2165
        }
293✔
2166

2167
        switch msg := nMsg.msg.(type) {
340✔
2168
        // A new node announcement has arrived which either presents new
2169
        // information about a node in one of the channels we know about, or a
2170
        // updating previously advertised information.
2171
        case *lnwire.NodeAnnouncement1:
27✔
2172
                return d.handleNodeAnnouncement(ctx, nMsg, msg, schedulerOp)
27✔
2173

2174
        // A new channel announcement has arrived, this indicates the
2175
        // *creation* of a new channel within the network. This only advertises
2176
        // the existence of a channel and not yet the routing policies in
2177
        // either direction of the channel.
2178
        case *lnwire.ChannelAnnouncement1:
234✔
2179
                return d.handleChanAnnouncement(ctx, nMsg, msg, schedulerOp...)
234✔
2180

2181
        // A new authenticated channel edge update has arrived. This indicates
2182
        // that the directional information for an already known channel has
2183
        // been updated.
2184
        case *lnwire.ChannelUpdate1:
64✔
2185
                return d.handleChanUpdate(ctx, nMsg, msg, schedulerOp)
64✔
2186

2187
        // A new signature announcement has been received. This indicates
2188
        // willingness of nodes involved in the funding of a channel to
2189
        // announce this new channel to the rest of the world.
2190
        case *lnwire.AnnounceSignatures1:
24✔
2191
                return d.handleAnnSig(ctx, nMsg, msg)
24✔
2192

2193
        default:
×
2194
                err := errors.New("wrong type of the announcement")
×
2195
                nMsg.err <- err
×
2196
                return nil, false
×
2197
        }
2198
}
2199

2200
// processZombieUpdate determines whether the provided channel update should
2201
// resurrect a given zombie edge.
2202
//
2203
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2204
// should be inspected.
2205
func (d *AuthenticatedGossiper) processZombieUpdate(_ context.Context,
2206
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2207
        msg *lnwire.ChannelUpdate1) error {
3✔
2208

3✔
2209
        // The least-significant bit in the flag on the channel update tells us
3✔
2210
        // which edge is being updated.
3✔
2211
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
2212

3✔
2213
        // Since we've deemed the update as not stale above, before marking it
3✔
2214
        // live, we'll make sure it has been signed by the correct party. If we
3✔
2215
        // have both pubkeys, either party can resurrect the channel. If we've
3✔
2216
        // already marked this with the stricter, single-sided resurrection we
3✔
2217
        // will only have the pubkey of the node with the oldest timestamp.
3✔
2218
        var pubKey *btcec.PublicKey
3✔
2219
        switch {
3✔
2220
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2221
                pubKey, _ = chanInfo.NodeKey1()
×
2222
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
2✔
2223
                pubKey, _ = chanInfo.NodeKey2()
2✔
2224
        }
2225
        if pubKey == nil {
4✔
2226
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
1✔
2227
                        "with chan_id=%v", msg.ShortChannelID)
1✔
2228
        }
1✔
2229

2230
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
2✔
2231
        if err != nil {
3✔
2232
                return fmt.Errorf("unable to verify channel "+
1✔
2233
                        "update signature: %v", err)
1✔
2234
        }
1✔
2235

2236
        // With the signature valid, we'll proceed to mark the
2237
        // edge as live and wait for the channel announcement to
2238
        // come through again.
2239
        err = d.cfg.Graph.MarkEdgeLive(scid)
1✔
2240
        switch {
1✔
2241
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2242
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2243
                        "zombie index: %v", err)
×
2244

×
2245
                return nil
×
2246

2247
        case err != nil:
×
2248
                return fmt.Errorf("unable to remove edge with "+
×
2249
                        "chan_id=%v from zombie index: %v",
×
2250
                        msg.ShortChannelID, err)
×
2251

2252
        default:
1✔
2253
        }
2254

2255
        log.Debugf("Removed edge with chan_id=%v from zombie "+
1✔
2256
                "index", msg.ShortChannelID)
1✔
2257

1✔
2258
        return nil
1✔
2259
}
2260

2261
// fetchNodeAnn fetches the latest signed node announcement from our point of
2262
// view for the node with the given public key. It also validates the node
2263
// announcement fields and returns an error if they are invalid to prevent
2264
// forwarding invalid node announcements to our peers.
2265
func (d *AuthenticatedGossiper) fetchNodeAnn(ctx context.Context,
2266
        pubKey [33]byte) (*lnwire.NodeAnnouncement1, error) {
23✔
2267

23✔
2268
        node, err := d.cfg.Graph.FetchNode(ctx, pubKey)
23✔
2269
        if err != nil {
29✔
2270
                return nil, err
6✔
2271
        }
6✔
2272

2273
        nodeAnn, err := node.NodeAnnouncement(true)
17✔
2274
        if err != nil {
20✔
2275
                return nil, err
3✔
2276
        }
3✔
2277

2278
        return nodeAnn, netann.ValidateNodeAnnFields(nodeAnn)
17✔
2279
}
2280

2281
// isMsgStale determines whether a message retrieved from the backing
2282
// MessageStore is seen as stale by the current graph.
2283
func (d *AuthenticatedGossiper) isMsgStale(_ context.Context,
2284
        msg lnwire.Message) bool {
15✔
2285

15✔
2286
        switch msg := msg.(type) {
15✔
2287
        case *lnwire.AnnounceSignatures1:
5✔
2288
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
5✔
2289
                        msg.ShortChannelID,
5✔
2290
                )
5✔
2291

5✔
2292
                // If the channel cannot be found, it is most likely a leftover
5✔
2293
                // message for a channel that was closed, so we can consider it
5✔
2294
                // stale.
5✔
2295
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
8✔
2296
                        return true
3✔
2297
                }
3✔
2298
                if err != nil {
5✔
2299
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2300
                                "%v", msg.ShortChannelID, err)
×
2301
                        return false
×
2302
                }
×
2303

2304
                // If the proof exists in the graph, then we have successfully
2305
                // received the remote proof and assembled the full proof, so we
2306
                // can safely delete the local proof from the database.
2307
                return chanInfo.AuthProof != nil
5✔
2308

2309
        case *lnwire.ChannelUpdate1:
13✔
2310
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
13✔
2311

13✔
2312
                // If the channel cannot be found, it is most likely a leftover
13✔
2313
                // message for a channel that was closed, so we can consider it
13✔
2314
                // stale.
13✔
2315
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
16✔
2316
                        return true
3✔
2317
                }
3✔
2318
                if err != nil {
13✔
2319
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2320
                                "%v", msg.ShortChannelID, err)
×
2321
                        return false
×
2322
                }
×
2323

2324
                // Otherwise, we'll retrieve the correct policy that we
2325
                // currently have stored within our graph to check if this
2326
                // message is stale by comparing its timestamp.
2327
                var p *models.ChannelEdgePolicy
13✔
2328
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
26✔
2329
                        p = p1
13✔
2330
                } else {
16✔
2331
                        p = p2
3✔
2332
                }
3✔
2333

2334
                // If the policy is still unknown, then we can consider this
2335
                // policy fresh.
2336
                if p == nil {
13✔
2337
                        return false
×
2338
                }
×
2339

2340
                timestamp := time.Unix(int64(msg.Timestamp), 0)
13✔
2341
                return p.LastUpdate.After(timestamp)
13✔
2342

2343
        default:
×
2344
                // We'll make sure to not mark any unsupported messages as stale
×
2345
                // to ensure they are not removed.
×
2346
                return false
×
2347
        }
2348
}
2349

2350
// updateChannel creates a new fully signed update for the channel, and updates
2351
// the underlying graph with the new state.
2352
func (d *AuthenticatedGossiper) updateChannel(ctx context.Context,
2353
        info *models.ChannelEdgeInfo,
2354
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2355
        *lnwire.ChannelUpdate1, error) {
7✔
2356

7✔
2357
        // Parse the unsigned edge into a channel update.
7✔
2358
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
7✔
2359

7✔
2360
        // We'll generate a new signature over a digest of the channel
7✔
2361
        // announcement itself and update the timestamp to ensure it propagate.
7✔
2362
        err := netann.SignChannelUpdate(
7✔
2363
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
7✔
2364
                netann.ChanUpdSetTimestamp,
7✔
2365
        )
7✔
2366
        if err != nil {
7✔
2367
                return nil, nil, err
×
2368
        }
×
2369

2370
        // Next, we'll set the new signature in place, and update the reference
2371
        // in the backing slice.
2372
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
7✔
2373
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
7✔
2374

7✔
2375
        // To ensure that our signature is valid, we'll verify it ourself
7✔
2376
        // before committing it to the slice returned.
7✔
2377
        err = netann.ValidateChannelUpdateAnn(
7✔
2378
                d.selfKey, info.Capacity, chanUpdate,
7✔
2379
        )
7✔
2380
        if err != nil {
7✔
2381
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2382
                        "update sig: %v", err)
×
2383
        }
×
2384

2385
        // Finally, we'll write the new edge policy to disk.
2386
        if err := d.cfg.Graph.UpdateEdge(ctx, edge); err != nil {
7✔
2387
                return nil, nil, err
×
2388
        }
×
2389

2390
        // We'll also create the original channel announcement so the two can
2391
        // be broadcast along side each other (if necessary), but only if we
2392
        // have a full channel announcement for this channel.
2393
        var chanAnn *lnwire.ChannelAnnouncement1
7✔
2394
        if info.AuthProof != nil {
13✔
2395
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
6✔
2396
                chanAnn = &lnwire.ChannelAnnouncement1{
6✔
2397
                        ShortChannelID:  chanID,
6✔
2398
                        NodeID1:         info.NodeKey1Bytes,
6✔
2399
                        NodeID2:         info.NodeKey2Bytes,
6✔
2400
                        ChainHash:       info.ChainHash,
6✔
2401
                        BitcoinKey1:     info.BitcoinKey1Bytes,
6✔
2402
                        Features:        lnwire.NewRawFeatureVector(),
6✔
2403
                        BitcoinKey2:     info.BitcoinKey2Bytes,
6✔
2404
                        ExtraOpaqueData: info.ExtraOpaqueData,
6✔
2405
                }
6✔
2406
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
6✔
2407
                        info.AuthProof.NodeSig1Bytes,
6✔
2408
                )
6✔
2409
                if err != nil {
6✔
2410
                        return nil, nil, err
×
2411
                }
×
2412
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
6✔
2413
                        info.AuthProof.NodeSig2Bytes,
6✔
2414
                )
6✔
2415
                if err != nil {
6✔
2416
                        return nil, nil, err
×
2417
                }
×
2418
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
6✔
2419
                        info.AuthProof.BitcoinSig1Bytes,
6✔
2420
                )
6✔
2421
                if err != nil {
6✔
2422
                        return nil, nil, err
×
2423
                }
×
2424
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
6✔
2425
                        info.AuthProof.BitcoinSig2Bytes,
6✔
2426
                )
6✔
2427
                if err != nil {
6✔
2428
                        return nil, nil, err
×
2429
                }
×
2430
        }
2431

2432
        return chanAnn, chanUpdate, err
7✔
2433
}
2434

2435
// SyncManager returns the gossiper's SyncManager instance.
2436
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
3✔
2437
        return d.syncMgr
3✔
2438
}
3✔
2439

2440
// IsKeepAliveUpdate determines whether this channel update is considered a
2441
// keep-alive update based on the previous channel update processed for the same
2442
// direction.
2443
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2444
        prev *models.ChannelEdgePolicy) bool {
20✔
2445

20✔
2446
        // Both updates should be from the same direction.
20✔
2447
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
20✔
2448
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
20✔
2449

×
2450
                return false
×
2451
        }
×
2452

2453
        // The timestamp should always increase for a keep-alive update.
2454
        timestamp := time.Unix(int64(update.Timestamp), 0)
20✔
2455
        if !timestamp.After(prev.LastUpdate) {
20✔
2456
                return false
×
2457
        }
×
2458

2459
        // None of the remaining fields should change for a keep-alive update.
2460
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
23✔
2461
                return false
3✔
2462
        }
3✔
2463
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
38✔
2464
                return false
18✔
2465
        }
18✔
2466
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
8✔
2467
                return false
3✔
2468
        }
3✔
2469
        if update.TimeLockDelta != prev.TimeLockDelta {
5✔
2470
                return false
×
2471
        }
×
2472
        if update.HtlcMinimumMsat != prev.MinHTLC {
5✔
2473
                return false
×
2474
        }
×
2475
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
5✔
2476
                return false
×
2477
        }
×
2478
        if update.HtlcMaximumMsat != prev.MaxHTLC {
5✔
2479
                return false
×
2480
        }
×
2481
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
8✔
2482
                return false
3✔
2483
        }
3✔
2484
        return true
5✔
2485
}
2486

2487
// latestHeight returns the gossiper's latest height known of the chain.
2488
func (d *AuthenticatedGossiper) latestHeight() uint32 {
3✔
2489
        d.Lock()
3✔
2490
        defer d.Unlock()
3✔
2491
        return d.bestHeight
3✔
2492
}
3✔
2493

2494
// handleNodeAnnouncement processes a new node announcement.
2495
func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context,
2496
        nMsg *networkMsg, nodeAnn *lnwire.NodeAnnouncement1,
2497
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
27✔
2498

27✔
2499
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
27✔
2500

27✔
2501
        log.Debugf("Processing NodeAnnouncement1: peer=%v, timestamp=%v, "+
27✔
2502
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
27✔
2503
                nMsg.source.SerializeCompressed())
27✔
2504

27✔
2505
        // We'll quickly ask the router if it already has a newer update for
27✔
2506
        // this node so we can skip validating signatures if not required.
27✔
2507
        if d.cfg.Graph.IsStaleNode(ctx, nodeAnn.NodeID, timestamp) {
38✔
2508
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
11✔
2509
                nMsg.err <- nil
11✔
2510
                return nil, true
11✔
2511
        }
11✔
2512

2513
        if err := d.addNode(ctx, nodeAnn, ops...); err != nil {
22✔
2514
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
3✔
2515
                        err)
3✔
2516

3✔
2517
                if !graph.IsError(
3✔
2518
                        err,
3✔
2519
                        graph.ErrOutdated,
3✔
2520
                        graph.ErrIgnored,
3✔
2521
                ) {
3✔
2522

×
2523
                        log.Error(err)
×
2524
                }
×
2525

2526
                nMsg.err <- err
3✔
2527
                return nil, false
3✔
2528
        }
2529

2530
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2531
        // quick check to ensure this node intends to publicly advertise itself
2532
        // to the network.
2533
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
19✔
2534
        if err != nil {
19✔
2535
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2536
                        nodeAnn.NodeID, err)
×
2537
                nMsg.err <- err
×
2538
                return nil, false
×
2539
        }
×
2540

2541
        var announcements []networkMsg
19✔
2542

19✔
2543
        // If it does, we'll add their announcement to our batch so that it can
19✔
2544
        // be broadcast to the rest of our peers.
19✔
2545
        if isPublic {
25✔
2546
                announcements = append(announcements, networkMsg{
6✔
2547
                        peer:     nMsg.peer,
6✔
2548
                        isRemote: nMsg.isRemote,
6✔
2549
                        source:   nMsg.source,
6✔
2550
                        msg:      nodeAnn,
6✔
2551
                })
6✔
2552
        } else {
22✔
2553
                log.Tracef("Skipping broadcasting node announcement for %x "+
16✔
2554
                        "due to being unadvertised", nodeAnn.NodeID)
16✔
2555
        }
16✔
2556

2557
        nMsg.err <- nil
19✔
2558
        // TODO(roasbeef): get rid of the above
19✔
2559

19✔
2560
        log.Debugf("Processed NodeAnnouncement1: peer=%v, timestamp=%v, "+
19✔
2561
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
19✔
2562
                nMsg.source.SerializeCompressed())
19✔
2563

19✔
2564
        return announcements, true
19✔
2565
}
2566

2567
// handleChanAnnouncement processes a new channel announcement.
2568
//
2569
//nolint:funlen
2570
func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
2571
        nMsg *networkMsg, ann *lnwire.ChannelAnnouncement1,
2572
        ops ...batch.SchedulerOption) ([]networkMsg, bool) {
237✔
2573

237✔
2574
        scid := ann.ShortChannelID
237✔
2575
        chainHash := d.cfg.ChainParams.GenesisHash
237✔
2576

237✔
2577
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
237✔
2578
                nMsg.peer, scid.ToUint64())
237✔
2579

237✔
2580
        // We'll ignore any channel announcements that target any chain other
237✔
2581
        // than the set of chains we know of.
237✔
2582
        if !bytes.Equal(ann.ChainHash[:], chainHash[:]) {
237✔
2583
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2584
                        ", gossiper on chain=%v", ann.ChainHash, chainHash)
×
2585
                log.Errorf(err.Error())
×
2586

×
2587
                key := newRejectCacheKey(
×
NEW
2588
                        ann.GossipVersion(),
×
2589
                        scid.ToUint64(),
×
2590
                        sourceToPub(nMsg.source),
×
2591
                )
×
2592
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2593

×
2594
                nMsg.err <- err
×
2595
                return nil, false
×
2596
        }
×
2597

2598
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2599
        // reject the announcement. Since the router accepts alias SCIDs,
2600
        // not erroring out would be a DoS vector.
2601
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
237✔
2602
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2603
                log.Errorf(err.Error())
×
2604

×
2605
                key := newRejectCacheKey(
×
NEW
2606
                        ann.GossipVersion(),
×
2607
                        scid.ToUint64(),
×
2608
                        sourceToPub(nMsg.source),
×
2609
                )
×
2610
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2611

×
2612
                nMsg.err <- err
×
2613
                return nil, false
×
2614
        }
×
2615

2616
        // If the advertised inclusionary block is beyond our knowledge of the
2617
        // chain tip, then we'll ignore it for now.
2618
        d.Lock()
237✔
2619
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
238✔
2620
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
1✔
2621
                        "advertises height %v, only height %v is known",
1✔
2622
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
1✔
2623
                d.Unlock()
1✔
2624
                nMsg.err <- nil
1✔
2625
                return nil, false
1✔
2626
        }
1✔
2627
        d.Unlock()
236✔
2628

236✔
2629
        // At this point, we'll now ask the router if this is a zombie/known
236✔
2630
        // edge. If so we can skip all the processing below.
236✔
2631
        if d.cfg.Graph.IsKnownEdge(scid) {
240✔
2632
                nMsg.err <- nil
4✔
2633
                return nil, true
4✔
2634
        }
4✔
2635

2636
        // Check if the channel is already closed in which case we can ignore
2637
        // it.
2638
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
235✔
2639
        if err != nil {
235✔
2640
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2641
                        err)
×
2642
                nMsg.err <- err
×
2643

×
2644
                return nil, false
×
2645
        }
×
2646

2647
        if closed {
236✔
2648
                err = fmt.Errorf("ignoring closed channel %v", scid)
1✔
2649

1✔
2650
                // If this is an announcement from us, we'll just ignore it.
1✔
2651
                if !nMsg.isRemote {
1✔
2652
                        nMsg.err <- err
×
2653
                        return nil, false
×
2654
                }
×
2655

2656
                log.Warnf("Increasing ban score for peer=%v due to outdated "+
1✔
2657
                        "channel announcement for channel %v", nMsg.peer, scid)
1✔
2658

1✔
2659
                // Increment the peer's ban score if they are sending closed
1✔
2660
                // channel announcements.
1✔
2661
                dcErr := d.handleBadPeer(nMsg.peer)
1✔
2662
                if dcErr != nil {
1✔
2663
                        err = dcErr
×
2664
                }
×
2665

2666
                nMsg.err <- err
1✔
2667

1✔
2668
                return nil, false
1✔
2669
        }
2670

2671
        // If this is a remote channel announcement, then we'll validate all
2672
        // the signatures within the proof as it should be well formed.
2673
        var proof *models.ChannelAuthProof
234✔
2674
        if nMsg.isRemote {
454✔
2675
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
220✔
2676
                if err != nil {
220✔
2677
                        err := fmt.Errorf("unable to validate announcement: "+
×
2678
                                "%v", err)
×
2679

×
2680
                        key := newRejectCacheKey(
×
NEW
2681
                                ann.GossipVersion(),
×
2682
                                scid.ToUint64(),
×
2683
                                sourceToPub(nMsg.source),
×
2684
                        )
×
2685
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2686

×
2687
                        log.Error(err)
×
2688
                        nMsg.err <- err
×
2689
                        return nil, false
×
2690
                }
×
2691

2692
                // If the proof checks out, then we'll save the proof itself to
2693
                // the database so we can fetch it later when gossiping with
2694
                // other nodes.
2695
                proof = &models.ChannelAuthProof{
220✔
2696
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
220✔
2697
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
220✔
2698
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
220✔
2699
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
220✔
2700
                }
220✔
2701
        }
2702

2703
        // With the proof validated (if necessary), we can now store it within
2704
        // the database for our path finding and syncing needs.
2705
        edge := &models.ChannelEdgeInfo{
234✔
2706
                ChannelID:        scid.ToUint64(),
234✔
2707
                ChainHash:        ann.ChainHash,
234✔
2708
                NodeKey1Bytes:    ann.NodeID1,
234✔
2709
                NodeKey2Bytes:    ann.NodeID2,
234✔
2710
                BitcoinKey1Bytes: ann.BitcoinKey1,
234✔
2711
                BitcoinKey2Bytes: ann.BitcoinKey2,
234✔
2712
                AuthProof:        proof,
234✔
2713
                Features: lnwire.NewFeatureVector(
234✔
2714
                        ann.Features, lnwire.Features,
234✔
2715
                ),
234✔
2716
                ExtraOpaqueData: ann.ExtraOpaqueData,
234✔
2717
        }
234✔
2718

234✔
2719
        // If there were any optional message fields provided, we'll include
234✔
2720
        // them in its serialized disk representation now.
234✔
2721
        var tapscriptRoot fn.Option[chainhash.Hash]
234✔
2722
        if nMsg.optionalMsgFields != nil {
251✔
2723
                if nMsg.optionalMsgFields.capacity != nil {
21✔
2724
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
4✔
2725
                }
4✔
2726
                if nMsg.optionalMsgFields.channelPoint != nil {
24✔
2727
                        cp := *nMsg.optionalMsgFields.channelPoint
7✔
2728
                        edge.ChannelPoint = cp
7✔
2729
                }
7✔
2730

2731
                // Optional tapscript root for custom channels.
2732
                tapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
17✔
2733
        }
2734

2735
        // Before we start validation or add the edge to the database, we obtain
2736
        // the mutex for this channel ID. We do this to ensure no other
2737
        // goroutine has read the database and is now making decisions based on
2738
        // this DB state, before it writes to the DB. It also ensures that we
2739
        // don't perform the expensive validation check on the same channel
2740
        // announcement at the same time.
2741
        d.channelMtx.Lock(scid.ToUint64())
234✔
2742

234✔
2743
        // If AssumeChannelValid is present, then we are unable to perform any
234✔
2744
        // of the expensive checks below, so we'll short-circuit our path
234✔
2745
        // straight to adding the edge to our graph. If the passed
234✔
2746
        // ShortChannelID is an alias, then we'll skip validation as it will
234✔
2747
        // not map to a legitimate tx. This is not a DoS vector as only we can
234✔
2748
        // add an alias ChannelAnnouncement from the gossiper.
234✔
2749
        if !(d.cfg.AssumeChannelValid || d.cfg.IsAlias(scid)) {
466✔
2750
                op, capacity, script, err := d.validateFundingTransaction(
232✔
2751
                        ctx, ann, tapscriptRoot,
232✔
2752
                )
232✔
2753
                if err != nil {
436✔
2754
                        defer d.channelMtx.Unlock(scid.ToUint64())
204✔
2755

204✔
2756
                        switch {
204✔
2757
                        case errors.Is(err, ErrNoFundingTransaction),
2758
                                errors.Is(err, ErrInvalidFundingOutput):
202✔
2759

202✔
2760
                                key := newRejectCacheKey(
202✔
2761
                                        ann.GossipVersion(),
202✔
2762
                                        scid.ToUint64(),
202✔
2763
                                        sourceToPub(nMsg.source),
202✔
2764
                                )
202✔
2765
                                _, _ = d.recentRejects.Put(
202✔
2766
                                        key, &cachedReject{},
202✔
2767
                                )
202✔
2768

2769
                        case errors.Is(err, ErrChannelSpent):
2✔
2770
                                key := newRejectCacheKey(
2✔
2771
                                        ann.GossipVersion(),
2✔
2772
                                        scid.ToUint64(),
2✔
2773
                                        sourceToPub(nMsg.source),
2✔
2774
                                )
2✔
2775
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
2✔
2776

2✔
2777
                                // Since this channel has already been closed,
2✔
2778
                                // we'll add it to the graph's closed channel
2✔
2779
                                // index such that we won't attempt to do
2✔
2780
                                // expensive validation checks on it again.
2✔
2781
                                // TODO: Populate the ScidCloser by using closed
2✔
2782
                                // channel notifications.
2✔
2783
                                dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
2✔
2784
                                if dbErr != nil {
2✔
2785
                                        log.Errorf("failed to mark scid(%v) "+
×
2786
                                                "as closed: %v", scid, dbErr)
×
2787

×
2788
                                        nMsg.err <- dbErr
×
2789

×
2790
                                        return nil, false
×
2791
                                }
×
2792

2793
                        default:
×
2794
                                // Otherwise, this is just a regular rejected
×
2795
                                // edge. We won't increase the ban score for the
×
2796
                                // remote peer.
×
2797
                                key := newRejectCacheKey(
×
NEW
2798
                                        ann.GossipVersion(),
×
2799
                                        scid.ToUint64(),
×
2800
                                        sourceToPub(nMsg.source),
×
2801
                                )
×
2802
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2803

×
2804
                                nMsg.err <- err
×
2805

×
2806
                                return nil, false
×
2807
                        }
2808

2809
                        if !nMsg.isRemote {
204✔
2810
                                log.Errorf("failed to add edge for local "+
×
2811
                                        "channel: %v", err)
×
2812
                                nMsg.err <- err
×
2813

×
2814
                                return nil, false
×
2815
                        }
×
2816

2817
                        log.Warnf("Increasing ban score for peer=%v due to "+
204✔
2818
                                "invalid channel announcement for channel %v",
204✔
2819
                                nMsg.peer, scid)
204✔
2820

204✔
2821
                        // Increment the peer's ban score if they are sending
204✔
2822
                        // us invalid channel announcements.
204✔
2823
                        dcErr := d.handleBadPeer(nMsg.peer)
204✔
2824
                        if dcErr != nil {
204✔
2825
                                err = dcErr
×
2826
                        }
×
2827

2828
                        nMsg.err <- err
204✔
2829

204✔
2830
                        return nil, false
204✔
2831
                }
2832

2833
                edge.FundingScript = fn.Some(script)
28✔
2834

28✔
2835
                // TODO(roasbeef): this is a hack, needs to be removed after
28✔
2836
                //  commitment fees are dynamic.
28✔
2837
                edge.Capacity = capacity
28✔
2838
                edge.ChannelPoint = op
28✔
2839
        }
2840

2841
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
30✔
2842

30✔
2843
        // We will add the edge to the channel router. If the nodes present in
30✔
2844
        // this channel are not present in the database, a partial node will be
30✔
2845
        // added to represent each node while we wait for a node announcement.
30✔
2846
        err = d.cfg.Graph.AddEdge(ctx, edge, ops...)
30✔
2847
        if err != nil {
34✔
2848
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
4✔
2849
                        scid.ToUint64(), err)
4✔
2850

4✔
2851
                defer d.channelMtx.Unlock(scid.ToUint64())
4✔
2852

4✔
2853
                // If the edge was rejected due to already being known, then it
4✔
2854
                // may be the case that this new message has a fresh channel
4✔
2855
                // proof, so we'll check.
4✔
2856
                if graph.IsError(err, graph.ErrIgnored) {
7✔
2857
                        // Attempt to process the rejected message to see if we
3✔
2858
                        // get any new announcements.
3✔
2859
                        anns, rErr := d.processRejectedEdge(ctx, ann, proof)
3✔
2860
                        if rErr != nil {
3✔
2861
                                key := newRejectCacheKey(
×
NEW
2862
                                        ann.GossipVersion(),
×
2863
                                        scid.ToUint64(),
×
2864
                                        sourceToPub(nMsg.source),
×
2865
                                )
×
2866
                                cr := &cachedReject{}
×
2867
                                _, _ = d.recentRejects.Put(key, cr)
×
2868

×
2869
                                nMsg.err <- rErr
×
2870

×
2871
                                return nil, false
×
2872
                        }
×
2873

2874
                        log.Debugf("Extracted %v announcements from rejected "+
3✔
2875
                                "msgs", len(anns))
3✔
2876

3✔
2877
                        // If while processing this rejected edge, we realized
3✔
2878
                        // there's a set of announcements we could extract,
3✔
2879
                        // then we'll return those directly.
3✔
2880
                        //
3✔
2881
                        // NOTE: since this is an ErrIgnored, we can return
3✔
2882
                        // true here to signal "allow" to its dependants.
3✔
2883
                        nMsg.err <- nil
3✔
2884

3✔
2885
                        return anns, true
3✔
2886
                }
2887

2888
                // Otherwise, this is just a regular rejected edge.
2889
                key := newRejectCacheKey(
1✔
2890
                        ann.GossipVersion(),
1✔
2891
                        scid.ToUint64(),
1✔
2892
                        sourceToPub(nMsg.source),
1✔
2893
                )
1✔
2894
                _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2895

1✔
2896
                if !nMsg.isRemote {
1✔
2897
                        log.Errorf("failed to add edge for local channel: %v",
×
2898
                                err)
×
2899
                        nMsg.err <- err
×
2900

×
2901
                        return nil, false
×
2902
                }
×
2903

2904
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2905
                if dcErr != nil {
1✔
2906
                        log.Errorf("failed to check if we should disconnect "+
×
2907
                                "peer: %v", dcErr)
×
2908
                        nMsg.err <- dcErr
×
2909

×
2910
                        return nil, false
×
2911
                }
×
2912

2913
                if shouldDc {
1✔
2914
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2915
                }
×
2916

2917
                nMsg.err <- err
1✔
2918

1✔
2919
                return nil, false
1✔
2920
        }
2921

2922
        // If err is nil, release the lock immediately.
2923
        d.channelMtx.Unlock(scid.ToUint64())
29✔
2924

29✔
2925
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
29✔
2926

29✔
2927
        // If we earlier received any ChannelUpdates for this channel, we can
29✔
2928
        // now process them, as the channel is added to the graph.
29✔
2929
        var channelUpdates []*processedNetworkMsg
29✔
2930

29✔
2931
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
29✔
2932
        if err == nil {
34✔
2933
                // There was actually an entry in the map, so we'll accumulate
5✔
2934
                // it. We don't worry about deletion, since it'll eventually
5✔
2935
                // fall out anyway.
5✔
2936
                chanMsgs := earlyChanUpdates
5✔
2937
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
5✔
2938
        }
5✔
2939

2940
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2941
        // ensure we don't block here, as we can handle only one announcement
2942
        // at a time.
2943
        for _, cu := range channelUpdates {
34✔
2944
                // Skip if already processed.
5✔
2945
                if cu.processed {
6✔
2946
                        continue
1✔
2947
                }
2948

2949
                // Mark the ChannelUpdate as processed. This ensures that a
2950
                // subsequent announcement in the option-scid-alias case does
2951
                // not re-use an old ChannelUpdate.
2952
                cu.processed = true
5✔
2953

5✔
2954
                d.wg.Add(1)
5✔
2955
                go func(updMsg *networkMsg) {
10✔
2956
                        defer d.wg.Done()
5✔
2957

5✔
2958
                        switch msg := updMsg.msg.(type) {
5✔
2959
                        // Reprocess the message, making sure we return an
2960
                        // error to the original caller in case the gossiper
2961
                        // shuts down.
2962
                        case *lnwire.ChannelUpdate1:
5✔
2963
                                log.Debugf("Reprocessing ChannelUpdate for "+
5✔
2964
                                        "shortChanID=%v", scid.ToUint64())
5✔
2965

5✔
2966
                                select {
5✔
2967
                                case d.networkMsgs <- updMsg:
5✔
2968
                                case <-d.quit:
×
2969
                                        updMsg.err <- ErrGossiperShuttingDown
×
2970
                                }
2971

2972
                        // We don't expect any other message type than
2973
                        // ChannelUpdate to be in this cache.
2974
                        default:
×
2975
                                log.Errorf("Unsupported message type found "+
×
2976
                                        "among ChannelUpdates: %T", msg)
×
2977
                        }
2978
                }(cu.msg)
2979
        }
2980

2981
        // Channel announcement was successfully processed and now it might be
2982
        // broadcast to other connected nodes if it was an announcement with
2983
        // proof (remote).
2984
        var announcements []networkMsg
29✔
2985

29✔
2986
        if proof != nil {
44✔
2987
                announcements = append(announcements, networkMsg{
15✔
2988
                        peer:     nMsg.peer,
15✔
2989
                        isRemote: nMsg.isRemote,
15✔
2990
                        source:   nMsg.source,
15✔
2991
                        msg:      ann,
15✔
2992
                })
15✔
2993
        }
15✔
2994

2995
        nMsg.err <- nil
29✔
2996

29✔
2997
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
29✔
2998
                nMsg.peer, scid.ToUint64())
29✔
2999

29✔
3000
        return announcements, true
29✔
3001
}
3002

3003
// handleChanUpdate processes a new channel update.
3004
//
3005
//nolint:funlen
3006
func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
3007
        nMsg *networkMsg, upd *lnwire.ChannelUpdate1,
3008
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
64✔
3009

64✔
3010
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
64✔
3011
                nMsg.peer, upd.ShortChannelID.ToUint64())
64✔
3012

64✔
3013
        chainHash := d.cfg.ChainParams.GenesisHash
64✔
3014

64✔
3015
        // We'll ignore any channel updates that target any chain other than
64✔
3016
        // the set of chains we know of.
64✔
3017
        if !bytes.Equal(upd.ChainHash[:], chainHash[:]) {
64✔
3018
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
3019
                        "gossiper on chain=%v", upd.ChainHash, chainHash)
×
3020
                log.Errorf(err.Error())
×
3021

×
3022
                key := newRejectCacheKey(
×
NEW
3023
                        upd.GossipVersion(),
×
3024
                        upd.ShortChannelID.ToUint64(),
×
3025
                        sourceToPub(nMsg.source),
×
3026
                )
×
3027
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3028

×
3029
                nMsg.err <- err
×
3030
                return nil, false
×
3031
        }
×
3032

3033
        blockHeight := upd.ShortChannelID.BlockHeight
64✔
3034
        shortChanID := upd.ShortChannelID.ToUint64()
64✔
3035

64✔
3036
        // If the advertised inclusionary block is beyond our knowledge of the
64✔
3037
        // chain tip, then we'll put the announcement in limbo to be fully
64✔
3038
        // verified once we advance forward in the chain. If the update has an
64✔
3039
        // alias SCID, we'll skip the isPremature check. This is necessary
64✔
3040
        // since aliases start at block height 16_000_000.
64✔
3041
        d.Lock()
64✔
3042
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
64✔
3043
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
64✔
UNCOV
3044

×
UNCOV
3045
                log.Warnf("Update announcement for short_chan_id(%v), is "+
×
UNCOV
3046
                        "premature: advertises height %v, only height %v is "+
×
UNCOV
3047
                        "known", shortChanID, blockHeight, d.bestHeight)
×
UNCOV
3048
                d.Unlock()
×
UNCOV
3049
                nMsg.err <- nil
×
UNCOV
3050
                return nil, false
×
UNCOV
3051
        }
×
3052
        d.Unlock()
64✔
3053

64✔
3054
        // Before we perform any of the expensive checks below, we'll check
64✔
3055
        // whether this update is stale or is for a zombie channel in order to
64✔
3056
        // quickly reject it.
64✔
3057
        timestamp := time.Unix(int64(upd.Timestamp), 0)
64✔
3058

64✔
3059
        // Fetch the SCID we should be using to lock the channelMtx and make
64✔
3060
        // graph queries with.
64✔
3061
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
64✔
3062
        if err != nil {
128✔
3063
                // Fallback and set the graphScid to the peer-provided SCID.
64✔
3064
                // This will occur for non-option-scid-alias channels and for
64✔
3065
                // public option-scid-alias channels after 6 confirmations.
64✔
3066
                // Once public option-scid-alias channels have 6 confs, we'll
64✔
3067
                // ignore ChannelUpdates with one of their aliases.
64✔
3068
                graphScid = upd.ShortChannelID
64✔
3069
        }
64✔
3070

3071
        // We make sure to obtain the mutex for this channel ID before we access
3072
        // the database. This ensures the state we read from the database has
3073
        // not changed between this point and when we call UpdateEdge() later.
3074
        d.channelMtx.Lock(graphScid.ToUint64())
64✔
3075
        defer d.channelMtx.Unlock(graphScid.ToUint64())
64✔
3076

64✔
3077
        if d.cfg.Graph.IsStaleEdgePolicy(
64✔
3078
                graphScid, timestamp, upd.ChannelFlags,
64✔
3079
        ) {
70✔
3080

6✔
3081
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
6✔
3082
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
6✔
3083
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
6✔
3084
                )
6✔
3085

6✔
3086
                nMsg.err <- nil
6✔
3087
                return nil, true
6✔
3088
        }
6✔
3089

3090
        // Check that the ChanUpdate is not too far into the future, this could
3091
        // reveal some faulty implementation therefore we log an error.
3092
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
61✔
3093
                err := fmt.Errorf("skewed timestamp of edge policy, "+
×
3094
                        "timestamp too far in the future: %v", timestamp.Unix())
×
3095

×
3096
                // If this is a channel_update from us, we'll just ignore it.
×
3097
                if !nMsg.isRemote {
×
3098
                        nMsg.err <- err
×
3099
                        return nil, false
×
3100
                }
×
3101

3102
                log.Errorf("Increasing ban score for peer=%v due to bad "+
×
3103
                        "channel_update with short_chan_id(%v): timestamp(%v) "+
×
3104
                        "too far in the future", nMsg.peer, shortChanID,
×
3105
                        timestamp.Unix())
×
3106

×
3107
                // Increment the peer's ban score if they are skewed channel
×
3108
                // updates.
×
3109
                dcErr := d.handleBadPeer(nMsg.peer)
×
3110
                if dcErr != nil {
×
3111
                        err = dcErr
×
3112
                }
×
3113

3114
                nMsg.err <- err
×
3115

×
3116
                return nil, false
×
3117
        }
3118

3119
        // Get the node pub key as far since we don't have it in the channel
3120
        // update announcement message. We'll need this to properly verify the
3121
        // message's signature.
3122
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
61✔
3123
        switch {
61✔
3124
        // No error, break.
3125
        case err == nil:
57✔
3126
                break
57✔
3127

3128
        case errors.Is(err, graphdb.ErrZombieEdge):
3✔
3129
                err = d.processZombieUpdate(ctx, chanInfo, graphScid, upd)
3✔
3130
                if err != nil {
5✔
3131
                        log.Debug(err)
2✔
3132
                        nMsg.err <- err
2✔
3133
                        return nil, false
2✔
3134
                }
2✔
3135

3136
                // We'll fallthrough to ensure we stash the update until we
3137
                // receive its corresponding ChannelAnnouncement. This is
3138
                // needed to ensure the edge exists in the graph before
3139
                // applying the update.
3140
                fallthrough
1✔
3141
        case errors.Is(err, graphdb.ErrGraphNotFound):
1✔
3142
                fallthrough
1✔
3143
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
1✔
3144
                fallthrough
1✔
3145
        case errors.Is(err, graphdb.ErrEdgeNotFound):
5✔
3146
                // If the edge corresponding to this ChannelUpdate was not
5✔
3147
                // found in the graph, this might be a channel in the process
5✔
3148
                // of being opened, and we haven't processed our own
5✔
3149
                // ChannelAnnouncement yet, hence it is not not found in the
5✔
3150
                // graph. This usually gets resolved after the channel proofs
5✔
3151
                // are exchanged and the channel is broadcasted to the rest of
5✔
3152
                // the network, but in case this is a private channel this
5✔
3153
                // won't ever happen. This can also happen in the case of a
5✔
3154
                // zombie channel with a fresh update for which we don't have a
5✔
3155
                // ChannelAnnouncement for since we reject them. Because of
5✔
3156
                // this, we temporarily add it to a map, and reprocess it after
5✔
3157
                // our own ChannelAnnouncement has been processed.
5✔
3158
                //
5✔
3159
                // The shortChanID may be an alias, but it is fine to use here
5✔
3160
                // since we don't have an edge in the graph and if the peer is
5✔
3161
                // not buggy, we should be able to use it once the gossiper
5✔
3162
                // receives the local announcement.
5✔
3163
                pMsg := &processedNetworkMsg{msg: nMsg}
5✔
3164

5✔
3165
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
5✔
3166
                switch {
5✔
3167
                // Nothing in the cache yet, we can just directly insert this
3168
                // element.
3169
                case err == cache.ErrElementNotFound:
5✔
3170
                        _, _ = d.prematureChannelUpdates.Put(
5✔
3171
                                shortChanID, &cachedNetworkMsg{
5✔
3172
                                        msgs: []*processedNetworkMsg{pMsg},
5✔
3173
                                })
5✔
3174

3175
                // There's already something in the cache, so we'll combine the
3176
                // set of messages into a single value.
3177
                default:
3✔
3178
                        msgs := earlyMsgs.msgs
3✔
3179
                        msgs = append(msgs, pMsg)
3✔
3180
                        _, _ = d.prematureChannelUpdates.Put(
3✔
3181
                                shortChanID, &cachedNetworkMsg{
3✔
3182
                                        msgs: msgs,
3✔
3183
                                })
3✔
3184
                }
3185

3186
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
5✔
3187
                        "(shortChanID=%v), saving for reprocessing later",
5✔
3188
                        shortChanID)
5✔
3189

5✔
3190
                // NOTE: We don't return anything on the error channel for this
5✔
3191
                // message, as we expect that will be done when this
5✔
3192
                // ChannelUpdate is later reprocessed. This might never happen
5✔
3193
                // if the corresponding ChannelAnnouncement is never received
5✔
3194
                // or the LRU cache is filled up and the entry is evicted.
5✔
3195
                return nil, false
5✔
3196

3197
        default:
×
3198
                err := fmt.Errorf("unable to validate channel update "+
×
3199
                        "short_chan_id=%v: %v", shortChanID, err)
×
3200
                log.Error(err)
×
3201
                nMsg.err <- err
×
3202

×
3203
                key := newRejectCacheKey(
×
NEW
3204
                        upd.GossipVersion(),
×
3205
                        upd.ShortChannelID.ToUint64(),
×
3206
                        sourceToPub(nMsg.source),
×
3207
                )
×
3208
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3209

×
3210
                return nil, false
×
3211
        }
3212

3213
        // The least-significant bit in the flag on the channel update
3214
        // announcement tells us "which" side of the channels directed edge is
3215
        // being updated.
3216
        var (
57✔
3217
                pubKey       *btcec.PublicKey
57✔
3218
                edgeToUpdate *models.ChannelEdgePolicy
57✔
3219
        )
57✔
3220
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
57✔
3221
        switch direction {
57✔
3222
        case 0:
41✔
3223
                pubKey, _ = chanInfo.NodeKey1()
41✔
3224
                edgeToUpdate = e1
41✔
3225
        case 1:
19✔
3226
                pubKey, _ = chanInfo.NodeKey2()
19✔
3227
                edgeToUpdate = e2
19✔
3228
        }
3229

3230
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
57✔
3231
                "edge policy=%v", chanInfo.ChannelID,
57✔
3232
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
57✔
3233

57✔
3234
        // Validate the channel announcement with the expected public key and
57✔
3235
        // channel capacity. In the case of an invalid channel update, we'll
57✔
3236
        // return an error to the caller and exit early.
57✔
3237
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
57✔
3238
        if err != nil {
61✔
3239
                rErr := fmt.Errorf("unable to validate channel update "+
4✔
3240
                        "announcement for short_chan_id=%v: %v",
4✔
3241
                        lnutils.SpewLogClosure(upd.ShortChannelID), err)
4✔
3242

4✔
3243
                log.Error(rErr)
4✔
3244
                nMsg.err <- rErr
4✔
3245
                return nil, false
4✔
3246
        }
4✔
3247

3248
        // If we have a previous version of the edge being updated, we'll want
3249
        // to rate limit its updates to prevent spam throughout the network.
3250
        if nMsg.isRemote && edgeToUpdate != nil {
73✔
3251
                // If it's a keep-alive update, we'll only propagate one if
20✔
3252
                // it's been a day since the previous. This follows our own
20✔
3253
                // heuristic of sending keep-alive updates after the same
20✔
3254
                // duration (see retransmitStaleAnns).
20✔
3255
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
20✔
3256
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
25✔
3257
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
9✔
3258
                                log.Debugf("Ignoring keep alive update not "+
4✔
3259
                                        "within %v period for channel %v",
4✔
3260
                                        d.cfg.RebroadcastInterval, shortChanID)
4✔
3261
                                nMsg.err <- nil
4✔
3262
                                return nil, false
4✔
3263
                        }
4✔
3264
                } else {
18✔
3265
                        // If it's not, we'll allow an update per minute with a
18✔
3266
                        // maximum burst of 10. If we haven't seen an update
18✔
3267
                        // for this channel before, we'll need to initialize a
18✔
3268
                        // rate limiter for each direction.
18✔
3269
                        //
18✔
3270
                        // Since the edge exists in the graph, we'll create a
18✔
3271
                        // rate limiter for chanInfo.ChannelID rather then the
18✔
3272
                        // SCID the peer sent. This is because there may be
18✔
3273
                        // multiple aliases for a channel and we may otherwise
18✔
3274
                        // rate-limit only a single alias of the channel,
18✔
3275
                        // instead of the whole channel.
18✔
3276
                        baseScid := chanInfo.ChannelID
18✔
3277
                        d.Lock()
18✔
3278
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
18✔
3279
                        if !ok {
23✔
3280
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
5✔
3281
                                b := d.cfg.MaxChannelUpdateBurst
5✔
3282
                                rls = [2]*rate.Limiter{
5✔
3283
                                        rate.NewLimiter(r, b),
5✔
3284
                                        rate.NewLimiter(r, b),
5✔
3285
                                }
5✔
3286
                                d.chanUpdateRateLimiter[baseScid] = rls
5✔
3287
                        }
5✔
3288
                        d.Unlock()
18✔
3289

18✔
3290
                        if !rls[direction].Allow() {
27✔
3291
                                log.Debugf("Rate limiting update for channel "+
9✔
3292
                                        "%v from direction %x", shortChanID,
9✔
3293
                                        pubKey.SerializeCompressed())
9✔
3294
                                nMsg.err <- nil
9✔
3295
                                return nil, false
9✔
3296
                        }
9✔
3297
                }
3298
        }
3299

3300
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3301
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3302
        // lookup the stored SCID. If we're sending the update, we'll always
3303
        // use the SCID stored in the database rather than a potentially
3304
        // different alias. This might mean that SigBytes is incorrect as it
3305
        // signs a different SCID than the database SCID, but since there will
3306
        // only be a difference if AuthProof == nil, this is fine.
3307
        update := &models.ChannelEdgePolicy{
46✔
3308
                SigBytes:                  upd.Signature.ToSignatureBytes(),
46✔
3309
                ChannelID:                 chanInfo.ChannelID,
46✔
3310
                LastUpdate:                timestamp,
46✔
3311
                MessageFlags:              upd.MessageFlags,
46✔
3312
                ChannelFlags:              upd.ChannelFlags,
46✔
3313
                TimeLockDelta:             upd.TimeLockDelta,
46✔
3314
                MinHTLC:                   upd.HtlcMinimumMsat,
46✔
3315
                MaxHTLC:                   upd.HtlcMaximumMsat,
46✔
3316
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
46✔
3317
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
46✔
3318
                InboundFee:                upd.InboundFee.ValOpt(),
46✔
3319
                ExtraOpaqueData:           upd.ExtraOpaqueData,
46✔
3320
        }
46✔
3321

46✔
3322
        if err := d.cfg.Graph.UpdateEdge(ctx, update, ops...); err != nil {
46✔
3323
                if graph.IsError(
×
3324
                        err, graph.ErrOutdated,
×
3325
                        graph.ErrIgnored,
×
3326
                ) {
×
3327

×
3328
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
×
3329
                                shortChanID, err)
×
3330
                } else {
×
3331
                        // Since we know the stored SCID in the graph, we'll
×
3332
                        // cache that SCID.
×
3333
                        key := newRejectCacheKey(
×
NEW
3334
                                upd.GossipVersion(),
×
3335
                                chanInfo.ChannelID,
×
3336
                                sourceToPub(nMsg.source),
×
3337
                        )
×
3338
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3339

×
3340
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3341
                                shortChanID, err)
×
3342
                }
×
3343

3344
                nMsg.err <- err
×
3345
                return nil, false
×
3346
        }
3347

3348
        // If this is a local ChannelUpdate without an AuthProof, it means it
3349
        // is an update to a channel that is not (yet) supposed to be announced
3350
        // to the greater network. However, our channel counter party will need
3351
        // to be given the update, so we'll try sending the update directly to
3352
        // the remote peer.
3353
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
60✔
3354
                if nMsg.optionalMsgFields != nil {
28✔
3355
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
14✔
3356
                        if remoteAlias != nil {
17✔
3357
                                // The remoteAlias field was specified, meaning
3✔
3358
                                // that we should replace the SCID in the
3✔
3359
                                // update with the remote's alias. We'll also
3✔
3360
                                // need to re-sign the channel update. This is
3✔
3361
                                // required for option-scid-alias feature-bit
3✔
3362
                                // negotiated channels.
3✔
3363
                                upd.ShortChannelID = *remoteAlias
3✔
3364

3✔
3365
                                sig, err := d.cfg.SignAliasUpdate(upd)
3✔
3366
                                if err != nil {
3✔
3367
                                        log.Error(err)
×
3368
                                        nMsg.err <- err
×
3369
                                        return nil, false
×
3370
                                }
×
3371

3372
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
3373
                                if err != nil {
3✔
3374
                                        log.Error(err)
×
3375
                                        nMsg.err <- err
×
3376
                                        return nil, false
×
3377
                                }
×
3378

3379
                                upd.Signature = lnSig
3✔
3380
                        }
3381
                }
3382

3383
                // Get our peer's public key.
3384
                remotePubKey := remotePubFromChanInfo(
14✔
3385
                        chanInfo, upd.ChannelFlags,
14✔
3386
                )
14✔
3387

14✔
3388
                log.Debugf("The message %v has no AuthProof, sending the "+
14✔
3389
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
14✔
3390

14✔
3391
                // Now we'll attempt to send the channel update message
14✔
3392
                // reliably to the remote peer in the background, so that we
14✔
3393
                // don't block if the peer happens to be offline at the moment.
14✔
3394
                err := d.reliableSender.sendMessage(ctx, upd, remotePubKey)
14✔
3395
                if err != nil {
14✔
3396
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3397
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3398
                                upd.ShortChannelID, remotePubKey, err)
×
3399
                        nMsg.err <- err
×
3400
                        return nil, false
×
3401
                }
×
3402
        }
3403

3404
        // Channel update announcement was successfully processed and now it
3405
        // can be broadcast to the rest of the network. However, we'll only
3406
        // broadcast the channel update announcement if it has an attached
3407
        // authentication proof. We also won't broadcast the update if it
3408
        // contains an alias because the network would reject this.
3409
        var announcements []networkMsg
46✔
3410
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
72✔
3411
                announcements = append(announcements, networkMsg{
26✔
3412
                        peer:     nMsg.peer,
26✔
3413
                        source:   nMsg.source,
26✔
3414
                        isRemote: nMsg.isRemote,
26✔
3415
                        msg:      upd,
26✔
3416
                })
26✔
3417
        }
26✔
3418

3419
        nMsg.err <- nil
46✔
3420

46✔
3421
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
46✔
3422
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
46✔
3423
                timestamp)
46✔
3424
        return announcements, true
46✔
3425
}
3426

3427
// handleAnnSig processes a new announcement signatures message.
3428
//
3429
//nolint:funlen
3430
func (d *AuthenticatedGossiper) handleAnnSig(ctx context.Context,
3431
        nMsg *networkMsg, ann *lnwire.AnnounceSignatures1) ([]networkMsg,
3432
        bool) {
24✔
3433

24✔
3434
        needBlockHeight := ann.ShortChannelID.BlockHeight +
24✔
3435
                d.cfg.ProofMatureDelta
24✔
3436
        shortChanID := ann.ShortChannelID.ToUint64()
24✔
3437

24✔
3438
        prefix := "local"
24✔
3439
        if nMsg.isRemote {
38✔
3440
                prefix = "remote"
14✔
3441
        }
14✔
3442

3443
        log.Infof("Received new %v announcement signature for %v", prefix,
24✔
3444
                ann.ShortChannelID)
24✔
3445

24✔
3446
        // By the specification, channel announcement proofs should be sent
24✔
3447
        // after some number of confirmations after channel was registered in
24✔
3448
        // bitcoin blockchain. Therefore, we check if the proof is mature.
24✔
3449
        d.Lock()
24✔
3450
        premature := d.isPremature(
24✔
3451
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
24✔
3452
        )
24✔
3453
        if premature {
27✔
3454
                log.Warnf("Premature proof announcement, current block height"+
3✔
3455
                        "lower than needed: %v < %v", d.bestHeight,
3✔
3456
                        needBlockHeight)
3✔
3457
                d.Unlock()
3✔
3458
                nMsg.err <- nil
3✔
3459
                return nil, false
3✔
3460
        }
3✔
3461
        d.Unlock()
24✔
3462

24✔
3463
        // Ensure that we know of a channel with the target channel ID before
24✔
3464
        // proceeding further.
24✔
3465
        //
24✔
3466
        // We must acquire the mutex for this channel ID before getting the
24✔
3467
        // channel from the database, to ensure what we read does not change
24✔
3468
        // before we call AddProof() later.
24✔
3469
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
24✔
3470
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
24✔
3471

24✔
3472
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
24✔
3473
                ann.ShortChannelID,
24✔
3474
        )
24✔
3475
        if err != nil {
28✔
3476
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
4✔
3477
                if err != nil {
7✔
3478
                        err := fmt.Errorf("unable to store the proof for "+
3✔
3479
                                "short_chan_id=%v: %v", shortChanID, err)
3✔
3480
                        log.Error(err)
3✔
3481
                        nMsg.err <- err
3✔
3482

3✔
3483
                        return nil, false
3✔
3484
                }
3✔
3485

3486
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
4✔
3487
                err := d.cfg.WaitingProofStore.Add(proof)
4✔
3488
                if err != nil {
4✔
3489
                        err := fmt.Errorf("unable to store the proof for "+
×
3490
                                "short_chan_id=%v: %v", shortChanID, err)
×
3491
                        log.Error(err)
×
3492
                        nMsg.err <- err
×
3493
                        return nil, false
×
3494
                }
×
3495

3496
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
4✔
3497
                        ", adding to waiting batch", prefix, shortChanID)
4✔
3498
                nMsg.err <- nil
4✔
3499
                return nil, false
4✔
3500
        }
3501

3502
        nodeID := nMsg.source.SerializeCompressed()
23✔
3503
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
23✔
3504
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
23✔
3505

23✔
3506
        // Ensure that channel that was retrieved belongs to the peer which
23✔
3507
        // sent the proof announcement.
23✔
3508
        if !(isFirstNode || isSecondNode) {
23✔
3509
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3510
                        "to the peer which sent the proof, short_chan_id=%v",
×
3511
                        shortChanID)
×
3512
                log.Error(err)
×
3513
                nMsg.err <- err
×
3514
                return nil, false
×
3515
        }
×
3516

3517
        // If proof was sent by a local sub-system, then we'll send the
3518
        // announcement signature to the remote node so they can also
3519
        // reconstruct the full channel announcement.
3520
        if !nMsg.isRemote {
36✔
3521
                var remotePubKey [33]byte
13✔
3522
                if isFirstNode {
26✔
3523
                        remotePubKey = chanInfo.NodeKey2Bytes
13✔
3524
                } else {
16✔
3525
                        remotePubKey = chanInfo.NodeKey1Bytes
3✔
3526
                }
3✔
3527

3528
                // Since the remote peer might not be online we'll call a
3529
                // method that will attempt to deliver the proof when it comes
3530
                // online.
3531
                err := d.reliableSender.sendMessage(ctx, ann, remotePubKey)
13✔
3532
                if err != nil {
13✔
3533
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3534
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3535
                                ann.ShortChannelID, remotePubKey, err)
×
3536
                        nMsg.err <- err
×
3537
                        return nil, false
×
3538
                }
×
3539
        }
3540

3541
        // Check if we already have the full proof for this channel.
3542
        if chanInfo.AuthProof != nil {
27✔
3543
                // If we already have the fully assembled proof, then the peer
4✔
3544
                // sending us their proof has probably not received our local
4✔
3545
                // proof yet. So be kind and send them the full proof.
4✔
3546
                if nMsg.isRemote {
8✔
3547
                        peerID := nMsg.source.SerializeCompressed()
4✔
3548
                        log.Debugf("Got AnnounceSignatures for channel with " +
4✔
3549
                                "full proof.")
4✔
3550

4✔
3551
                        d.wg.Add(1)
4✔
3552
                        go func() {
8✔
3553
                                defer d.wg.Done()
4✔
3554

4✔
3555
                                log.Debugf("Received half proof for channel "+
4✔
3556
                                        "%v with existing full proof. Sending"+
4✔
3557
                                        " full proof to peer=%x",
4✔
3558
                                        ann.ChannelID, peerID)
4✔
3559

4✔
3560
                                ca, _, _, err := netann.CreateChanAnnouncement(
4✔
3561
                                        chanInfo.AuthProof, chanInfo, e1, e2,
4✔
3562
                                )
4✔
3563
                                if err != nil {
4✔
3564
                                        log.Errorf("unable to gen ann: %v",
×
3565
                                                err)
×
3566
                                        return
×
3567
                                }
×
3568

3569
                                err = nMsg.peer.SendMessage(false, ca)
4✔
3570
                                if err != nil {
4✔
3571
                                        log.Errorf("Failed sending full proof"+
×
3572
                                                " to peer=%x: %v", peerID, err)
×
3573
                                        return
×
3574
                                }
×
3575

3576
                                log.Debugf("Full proof sent to peer=%x for "+
4✔
3577
                                        "chanID=%v", peerID, ann.ChannelID)
4✔
3578
                        }()
3579
                }
3580

3581
                log.Debugf("Already have proof for channel with chanID=%v",
4✔
3582
                        ann.ChannelID)
4✔
3583
                nMsg.err <- nil
4✔
3584
                return nil, true
4✔
3585
        }
3586

3587
        // Check that we received the opposite proof. If so, then we're now
3588
        // able to construct the full proof, and create the channel
3589
        // announcement. If we didn't receive the opposite half of the proof
3590
        // then we should store this one, and wait for the opposite to be
3591
        // received.
3592
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
22✔
3593
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
22✔
3594
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
22✔
3595
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3596
                        "short_chan_id=%v: %v", shortChanID, err)
×
3597
                log.Error(err)
×
3598
                nMsg.err <- err
×
3599
                return nil, false
×
3600
        }
×
3601

3602
        if err == channeldb.ErrWaitingProofNotFound {
34✔
3603
                err := d.cfg.WaitingProofStore.Add(proof)
12✔
3604
                if err != nil {
12✔
3605
                        err := fmt.Errorf("unable to store the proof for "+
×
3606
                                "short_chan_id=%v: %v", shortChanID, err)
×
3607
                        log.Error(err)
×
3608
                        nMsg.err <- err
×
3609
                        return nil, false
×
3610
                }
×
3611

3612
                log.Infof("1/2 of channel ann proof received for "+
12✔
3613
                        "short_chan_id=%v, waiting for other half",
12✔
3614
                        shortChanID)
12✔
3615

12✔
3616
                nMsg.err <- nil
12✔
3617
                return nil, false
12✔
3618
        }
3619

3620
        // We now have both halves of the channel announcement proof, then
3621
        // we'll reconstruct the initial announcement so we can validate it
3622
        // shortly below.
3623
        var dbProof models.ChannelAuthProof
13✔
3624
        if isFirstNode {
17✔
3625
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
4✔
3626
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
4✔
3627
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
4✔
3628
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
4✔
3629
        } else {
16✔
3630
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
12✔
3631
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
12✔
3632
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
12✔
3633
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
12✔
3634
        }
12✔
3635

3636
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
13✔
3637
                &dbProof, chanInfo, e1, e2,
13✔
3638
        )
13✔
3639
        if err != nil {
13✔
3640
                log.Error(err)
×
3641
                nMsg.err <- err
×
3642
                return nil, false
×
3643
        }
×
3644

3645
        // With all the necessary components assembled validate the full
3646
        // channel announcement proof.
3647
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
13✔
3648
        if err != nil {
13✔
3649
                err := fmt.Errorf("channel announcement proof for "+
×
3650
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3651

×
3652
                log.Error(err)
×
3653
                nMsg.err <- err
×
3654
                return nil, false
×
3655
        }
×
3656

3657
        // If the channel was returned by the router it means that existence of
3658
        // funding point and inclusion of nodes bitcoin keys in it already
3659
        // checked by the router. In this stage we should check that node keys
3660
        // attest to the bitcoin keys by validating the signatures of
3661
        // announcement. If proof is valid then we'll populate the channel edge
3662
        // with it, so we can announce it on peer connect.
3663
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
13✔
3664
        if err != nil {
13✔
3665
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3666
                        " %v", ann.ChannelID, err)
×
3667
                log.Error(err)
×
3668
                nMsg.err <- err
×
3669
                return nil, false
×
3670
        }
×
3671

3672
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
13✔
3673
        if err != nil {
13✔
3674
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3675
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3676
                log.Error(err)
×
3677
                nMsg.err <- err
×
3678
                return nil, false
×
3679
        }
×
3680

3681
        // Proof was successfully created and now can announce the channel to
3682
        // the remain network.
3683
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
13✔
3684
                ", adding to next ann batch", shortChanID)
13✔
3685

13✔
3686
        // Assemble the necessary announcements to add to the next broadcasting
13✔
3687
        // batch.
13✔
3688
        var announcements []networkMsg
13✔
3689
        announcements = append(announcements, networkMsg{
13✔
3690
                peer:   nMsg.peer,
13✔
3691
                source: nMsg.source,
13✔
3692
                msg:    chanAnn,
13✔
3693
        })
13✔
3694
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
25✔
3695
                announcements = append(announcements, networkMsg{
12✔
3696
                        peer:   nMsg.peer,
12✔
3697
                        source: src,
12✔
3698
                        msg:    e1Ann,
12✔
3699
                })
12✔
3700
        }
12✔
3701
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
24✔
3702
                announcements = append(announcements, networkMsg{
11✔
3703
                        peer:   nMsg.peer,
11✔
3704
                        source: src,
11✔
3705
                        msg:    e2Ann,
11✔
3706
                })
11✔
3707
        }
11✔
3708

3709
        // We'll also send along the node announcements for each channel
3710
        // participant if we know of them. To ensure our node announcement
3711
        // propagates to our channel counterparty, we'll set the source for
3712
        // each announcement to the node it belongs to, otherwise we won't send
3713
        // it since the source gets skipped. This isn't necessary for channel
3714
        // updates and announcement signatures since we send those directly to
3715
        // our channel counterparty through the gossiper's reliable sender.
3716
        node1Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey1Bytes)
13✔
3717
        if err != nil {
18✔
3718
                log.Debugf("Unable to fetch node announcement for %x: %v",
5✔
3719
                        chanInfo.NodeKey1Bytes, err)
5✔
3720
        } else {
16✔
3721
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
22✔
3722
                        announcements = append(announcements, networkMsg{
11✔
3723
                                peer:   nMsg.peer,
11✔
3724
                                source: nodeKey1,
11✔
3725
                                msg:    node1Ann,
11✔
3726
                        })
11✔
3727
                }
11✔
3728
        }
3729

3730
        node2Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey2Bytes)
13✔
3731
        if err != nil {
20✔
3732
                log.Debugf("Unable to fetch node announcement for %x: %v",
7✔
3733
                        chanInfo.NodeKey2Bytes, err)
7✔
3734
        } else {
16✔
3735
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
18✔
3736
                        announcements = append(announcements, networkMsg{
9✔
3737
                                peer:   nMsg.peer,
9✔
3738
                                source: nodeKey2,
9✔
3739
                                msg:    node2Ann,
9✔
3740
                        })
9✔
3741
                }
9✔
3742
        }
3743

3744
        nMsg.err <- nil
13✔
3745
        return announcements, true
13✔
3746
}
3747

3748
// isBanned returns true if the peer identified by pubkey is banned for sending
3749
// invalid channel announcements.
3750
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
211✔
3751
        return d.banman.isBanned(pubkey)
211✔
3752
}
211✔
3753

3754
// ShouldDisconnect returns true if we should disconnect the peer identified by
3755
// pubkey.
3756
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3757
        bool, error) {
209✔
3758

209✔
3759
        pubkeySer := pubkey.SerializeCompressed()
209✔
3760

209✔
3761
        var pubkeyBytes [33]byte
209✔
3762
        copy(pubkeyBytes[:], pubkeySer)
209✔
3763

209✔
3764
        // If the public key is banned, check whether or not this is a channel
209✔
3765
        // peer.
209✔
3766
        if d.isBanned(pubkeyBytes) {
211✔
3767
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
2✔
3768
                if err != nil {
2✔
3769
                        return false, err
×
3770
                }
×
3771

3772
                // We should only disconnect non-channel peers.
3773
                if !isChanPeer {
3✔
3774
                        return true, nil
1✔
3775
                }
1✔
3776
        }
3777

3778
        return false, nil
208✔
3779
}
3780

3781
// validateFundingTransaction fetches the channel announcements claimed funding
3782
// transaction from chain to ensure that it exists, is not spent and matches
3783
// the channel announcement proof. The transaction's outpoint and value are
3784
// returned if we can glean them from the work done in this method.
3785
func (d *AuthenticatedGossiper) validateFundingTransaction(_ context.Context,
3786
        ann *lnwire.ChannelAnnouncement1,
3787
        tapscriptRoot fn.Option[chainhash.Hash]) (wire.OutPoint, btcutil.Amount,
3788
        []byte, error) {
232✔
3789

232✔
3790
        scid := ann.ShortChannelID
232✔
3791

232✔
3792
        // Before we can add the channel to the channel graph, we need to obtain
232✔
3793
        // the full funding outpoint that's encoded within the channel ID.
232✔
3794
        fundingTx, err := lnwallet.FetchFundingTxWrapper(
232✔
3795
                d.cfg.ChainIO, scid, d.quit,
232✔
3796
        )
232✔
3797
        if err != nil {
233✔
3798
                //nolint:ll
1✔
3799
                //
1✔
3800
                // In order to ensure we don't erroneously mark a channel as a
1✔
3801
                // zombie due to an RPC failure, we'll attempt to string match
1✔
3802
                // for the relevant errors.
1✔
3803
                //
1✔
3804
                // * btcd:
1✔
3805
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
1✔
3806
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
1✔
3807
                // * bitcoind:
1✔
3808
                //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
1✔
3809
                //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
1✔
3810
                switch {
1✔
3811
                case strings.Contains(err.Error(), "not found"):
1✔
3812
                        fallthrough
1✔
3813

3814
                case strings.Contains(err.Error(), "out of range"):
1✔
3815
                        // If the funding transaction isn't found at all, then
1✔
3816
                        // we'll mark the edge itself as a zombie so we don't
1✔
3817
                        // continue to request it. We use the "zero key" for
1✔
3818
                        // both node pubkeys so this edge can't be resurrected.
1✔
3819
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
1✔
3820
                        if zErr != nil {
1✔
3821
                                return wire.OutPoint{}, 0, nil, zErr
×
3822
                        }
×
3823

3824
                default:
×
3825
                }
3826

3827
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
1✔
3828
                        ErrNoFundingTransaction, err)
1✔
3829
        }
3830

3831
        // Recreate witness output to be sure that declared in channel edge
3832
        // bitcoin keys and channel value corresponds to the reality.
3833
        fundingPkScript, err := makeFundingScript(
231✔
3834
                ann.BitcoinKey1[:], ann.BitcoinKey2[:], ann.Features,
231✔
3835
                tapscriptRoot,
231✔
3836
        )
231✔
3837
        if err != nil {
231✔
3838
                return wire.OutPoint{}, 0, nil, err
×
3839
        }
×
3840

3841
        // Next we'll validate that this channel is actually well formed. If
3842
        // this check fails, then this channel either doesn't exist, or isn't
3843
        // the one that was meant to be created according to the passed channel
3844
        // proofs.
3845
        fundingPoint, err := chanvalidate.Validate(
231✔
3846
                &chanvalidate.Context{
231✔
3847
                        Locator: &chanvalidate.ShortChanIDChanLocator{
231✔
3848
                                ID: scid,
231✔
3849
                        },
231✔
3850
                        MultiSigPkScript: fundingPkScript,
231✔
3851
                        FundingTx:        fundingTx,
231✔
3852
                },
231✔
3853
        )
231✔
3854
        if err != nil {
432✔
3855
                // Mark the edge as a zombie so we won't try to re-validate it
201✔
3856
                // on start up.
201✔
3857
                zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
201✔
3858
                if zErr != nil {
201✔
3859
                        return wire.OutPoint{}, 0, nil, zErr
×
3860
                }
×
3861

3862
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
201✔
3863
                        ErrInvalidFundingOutput, err)
201✔
3864
        }
3865

3866
        // Now that we have the funding outpoint of the channel, ensure
3867
        // that it hasn't yet been spent. If so, then this channel has
3868
        // been closed so we'll ignore it.
3869
        chanUtxo, err := d.cfg.ChainIO.GetUtxo(
30✔
3870
                fundingPoint, fundingPkScript, scid.BlockHeight, d.quit,
30✔
3871
        )
30✔
3872
        if err != nil {
32✔
3873
                if errors.Is(err, btcwallet.ErrOutputSpent) {
4✔
3874
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
2✔
3875
                        if zErr != nil {
2✔
3876
                                return wire.OutPoint{}, 0, nil, zErr
×
3877
                        }
×
3878
                }
3879

3880
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: unable to "+
2✔
3881
                        "fetch utxo for chan_id=%v, chan_point=%v: %w",
2✔
3882
                        ErrChannelSpent, scid.ToUint64(), fundingPoint, err)
2✔
3883
        }
3884

3885
        return *fundingPoint, btcutil.Amount(chanUtxo.Value), fundingPkScript,
28✔
3886
                nil
28✔
3887
}
3888

3889
// handleBadPeer takes a misbehaving peer and increases its ban score. Once
3890
// increased, it will disconnect the peer if its ban score has reached
3891
// `banThreshold` and it doesn't have a channel with us.
3892
func (d *AuthenticatedGossiper) handleBadPeer(peer lnpeer.Peer) error {
205✔
3893
        // Increment the peer's ban score for misbehavior.
205✔
3894
        d.banman.incrementBanScore(peer.PubKey())
205✔
3895

205✔
3896
        // If the peer is banned and not a channel peer, we'll disconnect them.
205✔
3897
        shouldDc, dcErr := d.ShouldDisconnect(peer.IdentityKey())
205✔
3898
        if dcErr != nil {
205✔
3899
                log.Errorf("failed to check if we should disconnect peer: %v",
×
3900
                        dcErr)
×
3901

×
3902
                return dcErr
×
3903
        }
×
3904

3905
        if shouldDc {
206✔
3906
                peer.Disconnect(ErrPeerBanned)
1✔
3907
        }
1✔
3908

3909
        return nil
205✔
3910
}
3911

3912
// makeFundingScript is used to make the funding script for both segwit v0 and
3913
// segwit v1 (taproot) channels.
3914
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
3915
        features *lnwire.RawFeatureVector,
3916
        tapscriptRoot fn.Option[chainhash.Hash]) ([]byte, error) {
231✔
3917

231✔
3918
        legacyFundingScript := func() ([]byte, error) {
462✔
3919
                witnessScript, err := input.GenMultiSigScript(
231✔
3920
                        bitcoinKey1, bitcoinKey2,
231✔
3921
                )
231✔
3922
                if err != nil {
231✔
3923
                        return nil, err
×
3924
                }
×
3925
                pkScript, err := input.WitnessScriptHash(witnessScript)
231✔
3926
                if err != nil {
231✔
3927
                        return nil, err
×
3928
                }
×
3929

3930
                return pkScript, nil
231✔
3931
        }
3932

3933
        if features.IsEmpty() {
462✔
3934
                return legacyFundingScript()
231✔
3935
        }
231✔
3936

3937
        chanFeatureBits := lnwire.NewFeatureVector(features, lnwire.Features)
3✔
3938
        if chanFeatureBits.HasFeature(
3✔
3939
                lnwire.SimpleTaprootChannelsOptionalStaging,
3✔
3940
        ) {
6✔
3941

3✔
3942
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
3✔
3943
                if err != nil {
3✔
3944
                        return nil, err
×
3945
                }
×
3946
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
3✔
3947
                if err != nil {
3✔
3948
                        return nil, err
×
3949
                }
×
3950

3951
                fundingScript, _, err := input.GenTaprootFundingScript(
3✔
3952
                        pubKey1, pubKey2, 0, tapscriptRoot,
3✔
3953
                )
3✔
3954
                if err != nil {
3✔
3955
                        return nil, err
×
3956
                }
×
3957

3958
                // TODO(roasbeef): add tapscript root to gossip v1.5
3959

3960
                return fundingScript, nil
3✔
3961
        }
3962

3963
        return legacyFundingScript()
×
3964
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc