• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157733617

05 Feb 2025 12:49PM UTC coverage: 57.712% (-1.1%) from 58.82%
13157733617

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19472 existing lines in 252 files now uncovered.

103634 of 179570 relevant lines covered (57.71%)

24840.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.08
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/davecgh/go-spew/spew"
17
        "github.com/lightninglabs/neutrino/cache"
18
        "github.com/lightninglabs/neutrino/cache/lru"
19
        "github.com/lightningnetwork/lnd/batch"
20
        "github.com/lightningnetwork/lnd/chainntnfs"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/fn/v2"
23
        "github.com/lightningnetwork/lnd/graph"
24
        graphdb "github.com/lightningnetwork/lnd/graph/db"
25
        "github.com/lightningnetwork/lnd/graph/db/models"
26
        "github.com/lightningnetwork/lnd/keychain"
27
        "github.com/lightningnetwork/lnd/lnpeer"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwire"
31
        "github.com/lightningnetwork/lnd/multimutex"
32
        "github.com/lightningnetwork/lnd/netann"
33
        "github.com/lightningnetwork/lnd/routing/route"
34
        "github.com/lightningnetwork/lnd/ticker"
35
        "golang.org/x/time/rate"
36
)
37

38
const (
39
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
40
        // for a specific channel and direction that we'll accept over an
41
        // interval.
42
        DefaultMaxChannelUpdateBurst = 10
43

44
        // DefaultChannelUpdateInterval is the default interval we'll use to
45
        // determine how often we should allow a new update for a specific
46
        // channel and direction.
47
        DefaultChannelUpdateInterval = time.Minute
48

49
        // maxPrematureUpdates tracks the max amount of premature channel
50
        // updates that we'll hold onto.
51
        maxPrematureUpdates = 100
52

53
        // maxFutureMessages tracks the max amount of future messages that
54
        // we'll hold onto.
55
        maxFutureMessages = 1000
56

57
        // DefaultSubBatchDelay is the default delay we'll use when
58
        // broadcasting the next announcement batch.
59
        DefaultSubBatchDelay = 5 * time.Second
60

61
        // maxRejectedUpdates tracks the max amount of rejected channel updates
62
        // we'll maintain. This is the global size across all peers. We'll
63
        // allocate ~3 MB max to the cache.
64
        maxRejectedUpdates = 10_000
65

66
        // DefaultProofMatureDelta specifies the default value used for
67
        // ProofMatureDelta, which is the number of confirmations needed before
68
        // processing the announcement signatures.
69
        DefaultProofMatureDelta = 6
70
)
71

72
var (
73
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
74
        // is in the process of being shut down.
75
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
76

77
        // ErrGossipSyncerNotFound signals that we were unable to find an active
78
        // gossip syncer corresponding to a gossip query message received from
79
        // the remote peer.
80
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
81

82
        // emptyPubkey is used to compare compressed pubkeys against an empty
83
        // byte array.
84
        emptyPubkey [33]byte
85
)
86

87
// optionalMsgFields is a set of optional message fields that external callers
88
// can provide that serve useful when processing a specific network
89
// announcement.
90
type optionalMsgFields struct {
91
        capacity      *btcutil.Amount
92
        channelPoint  *wire.OutPoint
93
        remoteAlias   *lnwire.ShortChannelID
94
        tapscriptRoot fn.Option[chainhash.Hash]
95
}
96

97
// apply applies the optional fields within the functional options.
98
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
47✔
99
        for _, optionalMsgField := range optionalMsgFields {
52✔
100
                optionalMsgField(f)
5✔
101
        }
5✔
102
}
103

104
// OptionalMsgField is a functional option parameter that can be used to provide
105
// external information that is not included within a network message but serves
106
// useful when processing it.
107
type OptionalMsgField func(*optionalMsgFields)
108

109
// ChannelCapacity is an optional field that lets the gossiper know of the
110
// capacity of a channel.
111
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
27✔
112
        return func(f *optionalMsgFields) {
28✔
113
                f.capacity = &capacity
1✔
114
        }
1✔
115
}
116

117
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
118
// of a channel.
119
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
30✔
120
        return func(f *optionalMsgFields) {
34✔
121
                f.channelPoint = &op
4✔
122
        }
4✔
123
}
124

125
// TapscriptRoot is an optional field that lets the gossiper know of the root of
126
// the tapscript tree for a custom channel.
127
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
26✔
128
        return func(f *optionalMsgFields) {
26✔
UNCOV
129
                f.tapscriptRoot = root
×
UNCOV
130
        }
×
131
}
132

133
// RemoteAlias is an optional field that lets the gossiper know that a locally
134
// sent channel update is actually an update for the peer that should replace
135
// the ShortChannelID field with the remote's alias. This is only used for
136
// channels with peers where the option-scid-alias feature bit was negotiated.
137
// The channel update will be added to the graph under the original SCID, but
138
// will be modified and re-signed with this alias.
139
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
26✔
140
        return func(f *optionalMsgFields) {
26✔
UNCOV
141
                f.remoteAlias = alias
×
UNCOV
142
        }
×
143
}
144

145
// networkMsg couples a routing related wire message with the peer that
146
// originally sent it.
147
type networkMsg struct {
148
        peer              lnpeer.Peer
149
        source            *btcec.PublicKey
150
        msg               lnwire.Message
151
        optionalMsgFields *optionalMsgFields
152

153
        isRemote bool
154

155
        err chan error
156
}
157

158
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
159
// wishes to update a particular set of channels. New ChannelUpdate messages
160
// will be crafted to be sent out during the next broadcast epoch and the fee
161
// updates committed to the lower layer.
162
type chanPolicyUpdateRequest struct {
163
        edgesToUpdate []EdgeWithInfo
164
        errChan       chan error
165
}
166

167
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
168
// syncer at all times.
169
type PinnedSyncers map[route.Vertex]struct{}
170

171
// Config defines the configuration for the service. ALL elements within the
172
// configuration MUST be non-nil for the service to carry out its duties.
173
type Config struct {
174
        // ChainHash is a hash that indicates which resident chain of the
175
        // AuthenticatedGossiper. Any announcements that don't match this
176
        // chain hash will be ignored.
177
        //
178
        // TODO(roasbeef): eventually make into map so can de-multiplex
179
        // incoming announcements
180
        //   * also need to do same for Notifier
181
        ChainHash chainhash.Hash
182

183
        // Graph is the subsystem which is responsible for managing the
184
        // topology of lightning network. After incoming channel, node, channel
185
        // updates announcements are validated they are sent to the router in
186
        // order to be included in the LN graph.
187
        Graph graph.ChannelGraphSource
188

189
        // ChainIO represents an abstraction over a source that can query the
190
        // blockchain.
191
        ChainIO lnwallet.BlockChainIO
192

193
        // ChanSeries is an interfaces that provides access to a time series
194
        // view of the current known channel graph. Each GossipSyncer enabled
195
        // peer will utilize this in order to create and respond to channel
196
        // graph time series queries.
197
        ChanSeries ChannelGraphTimeSeries
198

199
        // Notifier is used for receiving notifications of incoming blocks.
200
        // With each new incoming block found we process previously premature
201
        // announcements.
202
        //
203
        // TODO(roasbeef): could possibly just replace this with an epoch
204
        // channel.
205
        Notifier chainntnfs.ChainNotifier
206

207
        // Broadcast broadcasts a particular set of announcements to all peers
208
        // that the daemon is connected to. If supplied, the exclude parameter
209
        // indicates that the target peer should be excluded from the
210
        // broadcast.
211
        Broadcast func(skips map[route.Vertex]struct{},
212
                msg ...lnwire.Message) error
213

214
        // NotifyWhenOnline is a function that allows the gossiper to be
215
        // notified when a certain peer comes online, allowing it to
216
        // retry sending a peer message.
217
        //
218
        // NOTE: The peerChan channel must be buffered.
219
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
220

221
        // NotifyWhenOffline is a function that allows the gossiper to be
222
        // notified when a certain peer disconnects, allowing it to request a
223
        // notification for when it reconnects.
224
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
225

226
        // FetchSelfAnnouncement retrieves our current node announcement, for
227
        // use when determining whether we should update our peers about our
228
        // presence in the network.
229
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
230

231
        // UpdateSelfAnnouncement produces a new announcement for our node with
232
        // an updated timestamp which can be broadcast to our peers.
233
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
234

235
        // ProofMatureDelta the number of confirmations which is needed before
236
        // exchange the channel announcement proofs.
237
        ProofMatureDelta uint32
238

239
        // TrickleDelay the period of trickle timer which flushes to the
240
        // network the pending batch of new announcements we've received since
241
        // the last trickle tick.
242
        TrickleDelay time.Duration
243

244
        // RetransmitTicker is a ticker that ticks with a period which
245
        // indicates that we should check if we need re-broadcast any of our
246
        // personal channels.
247
        RetransmitTicker ticker.Ticker
248

249
        // RebroadcastInterval is the maximum time we wait between sending out
250
        // channel updates for our active channels and our own node
251
        // announcement. We do this to ensure our active presence on the
252
        // network is known, and we are not being considered a zombie node or
253
        // having zombie channels.
254
        RebroadcastInterval time.Duration
255

256
        // WaitingProofStore is a persistent storage of partial channel proof
257
        // announcement messages. We use it to buffer half of the material
258
        // needed to reconstruct a full authenticated channel announcement.
259
        // Once we receive the other half the channel proof, we'll be able to
260
        // properly validate it and re-broadcast it out to the network.
261
        //
262
        // TODO(wilmer): make interface to prevent channeldb dependency.
263
        WaitingProofStore *channeldb.WaitingProofStore
264

265
        // MessageStore is a persistent storage of gossip messages which we will
266
        // use to determine which messages need to be resent for a given peer.
267
        MessageStore GossipMessageStore
268

269
        // AnnSigner is an instance of the MessageSigner interface which will
270
        // be used to manually sign any outgoing channel updates. The signer
271
        // implementation should be backed by the public key of the backing
272
        // Lightning node.
273
        //
274
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
275
        // here?
276
        AnnSigner lnwallet.MessageSigner
277

278
        // ScidCloser is an instance of ClosedChannelTracker that helps the
279
        // gossiper cut down on spam channel announcements for already closed
280
        // channels.
281
        ScidCloser ClosedChannelTracker
282

283
        // NumActiveSyncers is the number of peers for which we should have
284
        // active syncers with. After reaching NumActiveSyncers, any future
285
        // gossip syncers will be passive.
286
        NumActiveSyncers int
287

288
        // NoTimestampQueries will prevent the GossipSyncer from querying
289
        // timestamps of announcement messages from the peer and from replying
290
        // to timestamp queries.
291
        NoTimestampQueries bool
292

293
        // RotateTicker is a ticker responsible for notifying the SyncManager
294
        // when it should rotate its active syncers. A single active syncer with
295
        // a chansSynced state will be exchanged for a passive syncer in order
296
        // to ensure we don't keep syncing with the same peers.
297
        RotateTicker ticker.Ticker
298

299
        // HistoricalSyncTicker is a ticker responsible for notifying the
300
        // syncManager when it should attempt a historical sync with a gossip
301
        // sync peer.
302
        HistoricalSyncTicker ticker.Ticker
303

304
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
305
        // syncManager when it should attempt to start the next pending
306
        // activeSyncer due to the current one not completing its state machine
307
        // within the timeout.
308
        ActiveSyncerTimeoutTicker ticker.Ticker
309

310
        // MinimumBatchSize is minimum size of a sub batch of announcement
311
        // messages.
312
        MinimumBatchSize int
313

314
        // SubBatchDelay is the delay between sending sub batches of
315
        // gossip messages.
316
        SubBatchDelay time.Duration
317

318
        // IgnoreHistoricalFilters will prevent syncers from replying with
319
        // historical data when the remote peer sets a gossip_timestamp_range.
320
        // This prevents ranges with old start times from causing us to dump the
321
        // graph on connect.
322
        IgnoreHistoricalFilters bool
323

324
        // PinnedSyncers is a set of peers that will always transition to
325
        // ActiveSync upon connection. These peers will never transition to
326
        // PassiveSync.
327
        PinnedSyncers PinnedSyncers
328

329
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
330
        // specific channel and direction that we'll accept over an interval.
331
        MaxChannelUpdateBurst int
332

333
        // ChannelUpdateInterval specifies the interval we'll use to determine
334
        // how often we should allow a new update for a specific channel and
335
        // direction.
336
        ChannelUpdateInterval time.Duration
337

338
        // IsAlias returns true if a given ShortChannelID is an alias for
339
        // option_scid_alias channels.
340
        IsAlias func(scid lnwire.ShortChannelID) bool
341

342
        // SignAliasUpdate is used to re-sign a channel update using the
343
        // remote's alias if the option-scid-alias feature bit was negotiated.
344
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
345
                error)
346

347
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
348
        // This is used for channels that have negotiated the option-scid-alias
349
        // feature bit.
350
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
351
                lnwire.ShortChannelID, error)
352

353
        // GetAlias allows the gossiper to look up the peer's alias for a given
354
        // ChannelID. This is used to sign updates for them if the channel has
355
        // no AuthProof and the option-scid-alias feature bit was negotiated.
356
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
357

358
        // FindChannel allows the gossiper to find a channel that we're party
359
        // to without iterating over the entire set of open channels.
360
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
361
                *channeldb.OpenChannel, error)
362

363
        // IsStillZombieChannel takes the timestamps of the latest channel
364
        // updates for a channel and returns true if the channel should be
365
        // considered a zombie based on these timestamps.
366
        IsStillZombieChannel func(time.Time, time.Time) bool
367
}
368

369
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
370
// used to let the caller of the lru.Cache know if a message has already been
371
// processed or not.
372
type processedNetworkMsg struct {
373
        processed bool
374
        msg       *networkMsg
375
}
376

377
// cachedNetworkMsg is a wrapper around a network message that can be used with
378
// *lru.Cache.
379
type cachedNetworkMsg struct {
380
        msgs []*processedNetworkMsg
381
}
382

383
// Size returns the "size" of an entry. We return the number of items as we
384
// just want to limit the total amount of entries rather than do accurate size
385
// accounting.
386
func (c *cachedNetworkMsg) Size() (uint64, error) {
2✔
387
        return uint64(len(c.msgs)), nil
2✔
388
}
2✔
389

390
// rejectCacheKey is the cache key that we'll use to track announcements we've
391
// recently rejected.
392
type rejectCacheKey struct {
393
        pubkey [33]byte
394
        chanID uint64
395
}
396

397
// newRejectCacheKey returns a new cache key for the reject cache.
398
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
462✔
399
        k := rejectCacheKey{
462✔
400
                chanID: cid,
462✔
401
                pubkey: pub,
462✔
402
        }
462✔
403

462✔
404
        return k
462✔
405
}
462✔
406

407
// sourceToPub returns a serialized-compressed public key for use in the reject
408
// cache.
409
func sourceToPub(pk *btcec.PublicKey) [33]byte {
476✔
410
        var pub [33]byte
476✔
411
        copy(pub[:], pk.SerializeCompressed())
476✔
412
        return pub
476✔
413
}
476✔
414

415
// cachedReject is the empty value used to track the value for rejects.
416
type cachedReject struct {
417
}
418

419
// Size returns the "size" of an entry. We return 1 as we just want to limit
420
// the total size.
421
func (c *cachedReject) Size() (uint64, error) {
203✔
422
        return 1, nil
203✔
423
}
203✔
424

425
// AuthenticatedGossiper is a subsystem which is responsible for receiving
426
// announcements, validating them and applying the changes to router, syncing
427
// lightning network with newly connected nodes, broadcasting announcements
428
// after validation, negotiating the channel announcement proofs exchange and
429
// handling the premature announcements. All outgoing announcements are
430
// expected to be properly signed as dictated in BOLT#7, additionally, all
431
// incoming message are expected to be well formed and signed. Invalid messages
432
// will be rejected by this struct.
433
type AuthenticatedGossiper struct {
434
        // Parameters which are needed to properly handle the start and stop of
435
        // the service.
436
        started sync.Once
437
        stopped sync.Once
438

439
        // bestHeight is the height of the block at the tip of the main chain
440
        // as we know it. Accesses *MUST* be done with the gossiper's lock
441
        // held.
442
        bestHeight uint32
443

444
        quit chan struct{}
445
        wg   sync.WaitGroup
446

447
        // cfg is a copy of the configuration struct that the gossiper service
448
        // was initialized with.
449
        cfg *Config
450

451
        // blockEpochs encapsulates a stream of block epochs that are sent at
452
        // every new block height.
453
        blockEpochs *chainntnfs.BlockEpochEvent
454

455
        // prematureChannelUpdates is a map of ChannelUpdates we have received
456
        // that wasn't associated with any channel we know about.  We store
457
        // them temporarily, such that we can reprocess them when a
458
        // ChannelAnnouncement for the channel is received.
459
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
460

461
        // banman tracks our peer's ban status.
462
        banman *banman
463

464
        // networkMsgs is a channel that carries new network broadcasted
465
        // message from outside the gossiper service to be processed by the
466
        // networkHandler.
467
        networkMsgs chan *networkMsg
468

469
        // futureMsgs is a list of premature network messages that have a block
470
        // height specified in the future. We will save them and resend it to
471
        // the chan networkMsgs once the block height has reached. The cached
472
        // map format is,
473
        //   {msgID1: msg1, msgID2: msg2, ...}
474
        futureMsgs *futureMsgCache
475

476
        // chanPolicyUpdates is a channel that requests to update the
477
        // forwarding policy of a set of channels is sent over.
478
        chanPolicyUpdates chan *chanPolicyUpdateRequest
479

480
        // selfKey is the identity public key of the backing Lightning node.
481
        selfKey *btcec.PublicKey
482

483
        // selfKeyLoc is the locator for the identity public key of the backing
484
        // Lightning node.
485
        selfKeyLoc keychain.KeyLocator
486

487
        // channelMtx is used to restrict the database access to one
488
        // goroutine per channel ID. This is done to ensure that when
489
        // the gossiper is handling an announcement, the db state stays
490
        // consistent between when the DB is first read until it's written.
491
        channelMtx *multimutex.Mutex[uint64]
492

493
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
494

495
        // syncMgr is a subsystem responsible for managing the gossip syncers
496
        // for peers currently connected. When a new peer is connected, the
497
        // manager will create its accompanying gossip syncer and determine
498
        // whether it should have an activeSync or passiveSync sync type based
499
        // on how many other gossip syncers are currently active. Any activeSync
500
        // gossip syncers are started in a round-robin manner to ensure we're
501
        // not syncing with multiple peers at the same time.
502
        syncMgr *SyncManager
503

504
        // reliableSender is a subsystem responsible for handling reliable
505
        // message send requests to peers. This should only be used for channels
506
        // that are unadvertised at the time of handling the message since if it
507
        // is advertised, then peers should be able to get the message from the
508
        // network.
509
        reliableSender *reliableSender
510

511
        // chanUpdateRateLimiter contains rate limiters for each direction of
512
        // a channel update we've processed. We'll use these to determine
513
        // whether we should accept a new update for a specific channel and
514
        // direction.
515
        //
516
        // NOTE: This map must be synchronized with the main
517
        // AuthenticatedGossiper lock.
518
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
519

520
        // vb is used to enforce job dependency ordering of gossip messages.
521
        vb *ValidationBarrier
522

523
        sync.Mutex
524
}
525

526
// New creates a new AuthenticatedGossiper instance, initialized with the
527
// passed configuration parameters.
528
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
27✔
529
        gossiper := &AuthenticatedGossiper{
27✔
530
                selfKey:           selfKeyDesc.PubKey,
27✔
531
                selfKeyLoc:        selfKeyDesc.KeyLocator,
27✔
532
                cfg:               &cfg,
27✔
533
                networkMsgs:       make(chan *networkMsg),
27✔
534
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
27✔
535
                quit:              make(chan struct{}),
27✔
536
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
27✔
537
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
27✔
538
                        maxPrematureUpdates,
27✔
539
                ),
27✔
540
                channelMtx: multimutex.NewMutex[uint64](),
27✔
541
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
27✔
542
                        maxRejectedUpdates,
27✔
543
                ),
27✔
544
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
27✔
545
                banman:                newBanman(),
27✔
546
        }
27✔
547

27✔
548
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
27✔
549

27✔
550
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
27✔
551
                ChainHash:               cfg.ChainHash,
27✔
552
                ChanSeries:              cfg.ChanSeries,
27✔
553
                RotateTicker:            cfg.RotateTicker,
27✔
554
                HistoricalSyncTicker:    cfg.HistoricalSyncTicker,
27✔
555
                NumActiveSyncers:        cfg.NumActiveSyncers,
27✔
556
                NoTimestampQueries:      cfg.NoTimestampQueries,
27✔
557
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
27✔
558
                BestHeight:              gossiper.latestHeight,
27✔
559
                PinnedSyncers:           cfg.PinnedSyncers,
27✔
560
                IsStillZombieChannel:    cfg.IsStillZombieChannel,
27✔
561
        })
27✔
562

27✔
563
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
27✔
564
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
27✔
565
                NotifyWhenOffline: cfg.NotifyWhenOffline,
27✔
566
                MessageStore:      cfg.MessageStore,
27✔
567
                IsMsgStale:        gossiper.isMsgStale,
27✔
568
        })
27✔
569

27✔
570
        return gossiper
27✔
571
}
27✔
572

573
// EdgeWithInfo contains the information that is required to update an edge.
574
type EdgeWithInfo struct {
575
        // Info describes the channel.
576
        Info *models.ChannelEdgeInfo
577

578
        // Edge describes the policy in one direction of the channel.
579
        Edge *models.ChannelEdgePolicy
580
}
581

582
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
583
// specified edge updates. Updates are done in two stages: first, the
584
// AuthenticatedGossiper ensures the update has been committed by dependent
585
// sub-systems, then it signs and broadcasts new updates to the network. A
586
// mapping between outpoints and updated channel policies is returned, which is
587
// used to update the forwarding policies of the underlying links.
588
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
589
        edgesToUpdate []EdgeWithInfo) error {
1✔
590

1✔
591
        errChan := make(chan error, 1)
1✔
592
        policyUpdate := &chanPolicyUpdateRequest{
1✔
593
                edgesToUpdate: edgesToUpdate,
1✔
594
                errChan:       errChan,
1✔
595
        }
1✔
596

1✔
597
        select {
1✔
598
        case d.chanPolicyUpdates <- policyUpdate:
1✔
599
                err := <-errChan
1✔
600
                return err
1✔
601
        case <-d.quit:
×
602
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
603
        }
604
}
605

606
// Start spawns network messages handler goroutine and registers on new block
607
// notifications in order to properly handle the premature announcements.
608
func (d *AuthenticatedGossiper) Start() error {
27✔
609
        var err error
27✔
610
        d.started.Do(func() {
54✔
611
                log.Info("Authenticated Gossiper starting")
27✔
612
                err = d.start()
27✔
613
        })
27✔
614
        return err
27✔
615
}
616

617
func (d *AuthenticatedGossiper) start() error {
27✔
618
        // First we register for new notifications of newly discovered blocks.
27✔
619
        // We do this immediately so we'll later be able to consume any/all
27✔
620
        // blocks which were discovered.
27✔
621
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
27✔
622
        if err != nil {
27✔
623
                return err
×
624
        }
×
625
        d.blockEpochs = blockEpochs
27✔
626

27✔
627
        height, err := d.cfg.Graph.CurrentBlockHeight()
27✔
628
        if err != nil {
27✔
629
                return err
×
630
        }
×
631
        d.bestHeight = height
27✔
632

27✔
633
        // Start the reliable sender. In case we had any pending messages ready
27✔
634
        // to be sent when the gossiper was last shut down, we must continue on
27✔
635
        // our quest to deliver them to their respective peers.
27✔
636
        if err := d.reliableSender.Start(); err != nil {
27✔
637
                return err
×
638
        }
×
639

640
        d.syncMgr.Start()
27✔
641

27✔
642
        d.banman.start()
27✔
643

27✔
644
        // Start receiving blocks in its dedicated goroutine.
27✔
645
        d.wg.Add(2)
27✔
646
        go d.syncBlockHeight()
27✔
647
        go d.networkHandler()
27✔
648

27✔
649
        return nil
27✔
650
}
651

652
// syncBlockHeight syncs the best block height for the gossiper by reading
653
// blockEpochs.
654
//
655
// NOTE: must be run as a goroutine.
656
func (d *AuthenticatedGossiper) syncBlockHeight() {
27✔
657
        defer d.wg.Done()
27✔
658

27✔
659
        for {
54✔
660
                select {
27✔
661
                // A new block has arrived, so we can re-process the previously
662
                // premature announcements.
UNCOV
663
                case newBlock, ok := <-d.blockEpochs.Epochs:
×
UNCOV
664
                        // If the channel has been closed, then this indicates
×
UNCOV
665
                        // the daemon is shutting down, so we exit ourselves.
×
UNCOV
666
                        if !ok {
×
UNCOV
667
                                return
×
UNCOV
668
                        }
×
669

670
                        // Once a new block arrives, we update our running
671
                        // track of the height of the chain tip.
UNCOV
672
                        d.Lock()
×
UNCOV
673
                        blockHeight := uint32(newBlock.Height)
×
UNCOV
674
                        d.bestHeight = blockHeight
×
UNCOV
675
                        d.Unlock()
×
UNCOV
676

×
UNCOV
677
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
×
UNCOV
678
                                newBlock.Hash)
×
UNCOV
679

×
UNCOV
680
                        // Resend future messages, if any.
×
UNCOV
681
                        d.resendFutureMessages(blockHeight)
×
682

683
                case <-d.quit:
27✔
684
                        return
27✔
685
                }
686
        }
687
}
688

689
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
690
// the unique ID when saving the message.
691
type futureMsgCache struct {
692
        *lru.Cache[uint64, *cachedFutureMsg]
693

694
        // msgID is a monotonically increased integer.
695
        msgID atomic.Uint64
696
}
697

698
// nextMsgID returns a unique message ID.
699
func (f *futureMsgCache) nextMsgID() uint64 {
3✔
700
        return f.msgID.Add(1)
3✔
701
}
3✔
702

703
// newFutureMsgCache creates a new future message cache with the underlying lru
704
// cache being initialized with the specified capacity.
705
func newFutureMsgCache(capacity uint64) *futureMsgCache {
28✔
706
        // Create a new cache.
28✔
707
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
28✔
708

28✔
709
        return &futureMsgCache{
28✔
710
                Cache: cache,
28✔
711
        }
28✔
712
}
28✔
713

714
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
715
type cachedFutureMsg struct {
716
        // msg is the network message.
717
        msg *networkMsg
718

719
        // height is the block height.
720
        height uint32
721
}
722

723
// Size returns the size of the message.
724
func (c *cachedFutureMsg) Size() (uint64, error) {
4✔
725
        // Return a constant 1.
4✔
726
        return 1, nil
4✔
727
}
4✔
728

729
// resendFutureMessages takes a block height, resends all the future messages
730
// found below and equal to that height and deletes those messages found in the
731
// gossiper's futureMsgs.
UNCOV
732
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
×
UNCOV
733
        var (
×
UNCOV
734
                // msgs are the target messages.
×
UNCOV
735
                msgs []*networkMsg
×
UNCOV
736

×
UNCOV
737
                // keys are the target messages' caching keys.
×
UNCOV
738
                keys []uint64
×
UNCOV
739
        )
×
UNCOV
740

×
UNCOV
741
        // filterMsgs is the visitor used when iterating the future cache.
×
UNCOV
742
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
×
UNCOV
743
                if cmsg.height <= height {
×
UNCOV
744
                        msgs = append(msgs, cmsg.msg)
×
UNCOV
745
                        keys = append(keys, k)
×
UNCOV
746
                }
×
747

UNCOV
748
                return true
×
749
        }
750

751
        // Filter out the target messages.
UNCOV
752
        d.futureMsgs.Range(filterMsgs)
×
UNCOV
753

×
UNCOV
754
        // Return early if no messages found.
×
UNCOV
755
        if len(msgs) == 0 {
×
UNCOV
756
                return
×
UNCOV
757
        }
×
758

759
        // Remove the filtered messages.
UNCOV
760
        for _, key := range keys {
×
UNCOV
761
                d.futureMsgs.Delete(key)
×
UNCOV
762
        }
×
763

UNCOV
764
        log.Debugf("Resending %d network messages at height %d",
×
UNCOV
765
                len(msgs), height)
×
UNCOV
766

×
UNCOV
767
        for _, msg := range msgs {
×
UNCOV
768
                select {
×
UNCOV
769
                case d.networkMsgs <- msg:
×
770
                case <-d.quit:
×
771
                        msg.err <- ErrGossiperShuttingDown
×
772
                }
773
        }
774
}
775

776
// Stop signals any active goroutines for a graceful closure.
777
func (d *AuthenticatedGossiper) Stop() error {
28✔
778
        d.stopped.Do(func() {
55✔
779
                log.Info("Authenticated gossiper shutting down...")
27✔
780
                defer log.Debug("Authenticated gossiper shutdown complete")
27✔
781

27✔
782
                d.stop()
27✔
783
        })
27✔
784
        return nil
28✔
785
}
786

787
func (d *AuthenticatedGossiper) stop() {
27✔
788
        log.Debug("Authenticated Gossiper is stopping")
27✔
789
        defer log.Debug("Authenticated Gossiper stopped")
27✔
790

27✔
791
        // `blockEpochs` is only initialized in the start routine so we make
27✔
792
        // sure we don't panic here in the case where the `Stop` method is
27✔
793
        // called when the `Start` method does not complete.
27✔
794
        if d.blockEpochs != nil {
54✔
795
                d.blockEpochs.Cancel()
27✔
796
        }
27✔
797

798
        d.syncMgr.Stop()
27✔
799

27✔
800
        d.banman.stop()
27✔
801

27✔
802
        close(d.quit)
27✔
803
        d.wg.Wait()
27✔
804

27✔
805
        // We'll stop our reliable sender after all of the gossiper's goroutines
27✔
806
        // have exited to ensure nothing can cause it to continue executing.
27✔
807
        d.reliableSender.Stop()
27✔
808
}
809

810
// TODO(roasbeef): need method to get current gossip timestamp?
811
//  * using mtx, check time rotate forward is needed?
812

813
// ProcessRemoteAnnouncement sends a new remote announcement message along with
814
// the peer that sent the routing message. The announcement will be processed
815
// then added to a queue for batched trickled announcement to all connected
816
// peers.  Remote channel announcements should contain the announcement proof
817
// and be fully validated.
818
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
819
        peer lnpeer.Peer) chan error {
284✔
820

284✔
821
        log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey())
284✔
822

284✔
823
        errChan := make(chan error, 1)
284✔
824

284✔
825
        // For messages in the known set of channel series queries, we'll
284✔
826
        // dispatch the message directly to the GossipSyncer, and skip the main
284✔
827
        // processing loop.
284✔
828
        switch m := msg.(type) {
284✔
829
        case *lnwire.QueryShortChanIDs,
830
                *lnwire.QueryChannelRange,
831
                *lnwire.ReplyChannelRange,
UNCOV
832
                *lnwire.ReplyShortChanIDsEnd:
×
UNCOV
833

×
UNCOV
834
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
×
UNCOV
835
                if !ok {
×
836
                        log.Warnf("Gossip syncer for peer=%x not found",
×
837
                                peer.PubKey())
×
838

×
839
                        errChan <- ErrGossipSyncerNotFound
×
840
                        return errChan
×
841
                }
×
842

843
                // If we've found the message target, then we'll dispatch the
844
                // message directly to it.
UNCOV
845
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
×
UNCOV
846
                if err != nil {
×
847
                        log.Errorf("Process query msg from peer %x got %v",
×
848
                                peer.PubKey(), err)
×
849
                }
×
850

UNCOV
851
                errChan <- err
×
UNCOV
852
                return errChan
×
853

854
        // If a peer is updating its current update horizon, then we'll dispatch
855
        // that directly to the proper GossipSyncer.
UNCOV
856
        case *lnwire.GossipTimestampRange:
×
UNCOV
857
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
×
UNCOV
858
                if !ok {
×
859
                        log.Warnf("Gossip syncer for peer=%x not found",
×
860
                                peer.PubKey())
×
861

×
862
                        errChan <- ErrGossipSyncerNotFound
×
863
                        return errChan
×
864
                }
×
865

866
                // If we've found the message target, then we'll dispatch the
867
                // message directly to it.
UNCOV
868
                if err := syncer.ApplyGossipFilter(m); err != nil {
×
869
                        log.Warnf("Unable to apply gossip filter for peer=%x: "+
×
870
                                "%v", peer.PubKey(), err)
×
871

×
872
                        errChan <- err
×
873
                        return errChan
×
874
                }
×
875

UNCOV
876
                errChan <- nil
×
UNCOV
877
                return errChan
×
878

879
        // To avoid inserting edges in the graph for our own channels that we
880
        // have already closed, we ignore such channel announcements coming
881
        // from the remote.
882
        case *lnwire.ChannelAnnouncement1:
219✔
883
                ownKey := d.selfKey.SerializeCompressed()
219✔
884
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
219✔
885
                        "for own channel")
219✔
886

219✔
887
                if bytes.Equal(m.NodeID1[:], ownKey) ||
219✔
888
                        bytes.Equal(m.NodeID2[:], ownKey) {
221✔
889

2✔
890
                        log.Warn(ownErr)
2✔
891
                        errChan <- ownErr
2✔
892
                        return errChan
2✔
893
                }
2✔
894
        }
895

896
        nMsg := &networkMsg{
282✔
897
                msg:      msg,
282✔
898
                isRemote: true,
282✔
899
                peer:     peer,
282✔
900
                source:   peer.IdentityKey(),
282✔
901
                err:      errChan,
282✔
902
        }
282✔
903

282✔
904
        select {
282✔
905
        case d.networkMsgs <- nMsg:
282✔
906

907
        // If the peer that sent us this error is quitting, then we don't need
908
        // to send back an error and can return immediately.
909
        case <-peer.QuitSignal():
×
910
                return nil
×
911
        case <-d.quit:
×
912
                nMsg.err <- ErrGossiperShuttingDown
×
913
        }
914

915
        return nMsg.err
282✔
916
}
917

918
// ProcessLocalAnnouncement sends a new remote announcement message along with
919
// the peer that sent the routing message. The announcement will be processed
920
// then added to a queue for batched trickled announcement to all connected
921
// peers.  Local channel announcements don't contain the announcement proof and
922
// will not be fully validated. Once the channel proofs are received, the
923
// entire channel announcement and update messages will be re-constructed and
924
// broadcast to the rest of the network.
925
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
926
        optionalFields ...OptionalMsgField) chan error {
47✔
927

47✔
928
        optionalMsgFields := &optionalMsgFields{}
47✔
929
        optionalMsgFields.apply(optionalFields...)
47✔
930

47✔
931
        nMsg := &networkMsg{
47✔
932
                msg:               msg,
47✔
933
                optionalMsgFields: optionalMsgFields,
47✔
934
                isRemote:          false,
47✔
935
                source:            d.selfKey,
47✔
936
                err:               make(chan error, 1),
47✔
937
        }
47✔
938

47✔
939
        select {
47✔
940
        case d.networkMsgs <- nMsg:
47✔
941
        case <-d.quit:
×
942
                nMsg.err <- ErrGossiperShuttingDown
×
943
        }
944

945
        return nMsg.err
47✔
946
}
947

948
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
949
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
950
// tuple.
951
type channelUpdateID struct {
952
        // channelID represents the set of data which is needed to
953
        // retrieve all necessary data to validate the channel existence.
954
        channelID lnwire.ShortChannelID
955

956
        // Flags least-significant bit must be set to 0 if the creating node
957
        // corresponds to the first node in the previously sent channel
958
        // announcement and 1 otherwise.
959
        flags lnwire.ChanUpdateChanFlags
960
}
961

962
// msgWithSenders is a wrapper struct around a message, and the set of peers
963
// that originally sent us this message. Using this struct, we can ensure that
964
// we don't re-send a message to the peer that sent it to us in the first
965
// place.
966
type msgWithSenders struct {
967
        // msg is the wire message itself.
968
        msg lnwire.Message
969

970
        // isLocal is true if this was a message that originated locally. We'll
971
        // use this to bypass our normal checks to ensure we prioritize sending
972
        // out our own updates.
973
        isLocal bool
974

975
        // sender is the set of peers that sent us this message.
976
        senders map[route.Vertex]struct{}
977
}
978

979
// mergeSyncerMap is used to merge the set of senders of a particular message
980
// with peers that we have an active GossipSyncer with. We do this to ensure
981
// that we don't broadcast messages to any peers that we have active gossip
982
// syncers for.
983
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
24✔
984
        for peerPub := range syncers {
24✔
UNCOV
985
                m.senders[peerPub] = struct{}{}
×
UNCOV
986
        }
×
987
}
988

989
// deDupedAnnouncements de-duplicates announcements that have been added to the
990
// batch. Internally, announcements are stored in three maps
991
// (one each for channel announcements, channel updates, and node
992
// announcements). These maps keep track of unique announcements and ensure no
993
// announcements are duplicated. We keep the three message types separate, such
994
// that we can send channel announcements first, then channel updates, and
995
// finally node announcements when it's time to broadcast them.
996
type deDupedAnnouncements struct {
997
        // channelAnnouncements are identified by the short channel id field.
998
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
999

1000
        // channelUpdates are identified by the channel update id field.
1001
        channelUpdates map[channelUpdateID]msgWithSenders
1002

1003
        // nodeAnnouncements are identified by the Vertex field.
1004
        nodeAnnouncements map[route.Vertex]msgWithSenders
1005

1006
        sync.Mutex
1007
}
1008

1009
// Reset operates on deDupedAnnouncements to reset the storage of
1010
// announcements.
1011
func (d *deDupedAnnouncements) Reset() {
29✔
1012
        d.Lock()
29✔
1013
        defer d.Unlock()
29✔
1014

29✔
1015
        d.reset()
29✔
1016
}
29✔
1017

1018
// reset is the private version of the Reset method. We have this so we can
1019
// call this method within method that are already holding the lock.
1020
func (d *deDupedAnnouncements) reset() {
316✔
1021
        // Storage of each type of announcement (channel announcements, channel
316✔
1022
        // updates, node announcements) is set to an empty map where the
316✔
1023
        // appropriate key points to the corresponding lnwire.Message.
316✔
1024
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
316✔
1025
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
316✔
1026
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
316✔
1027
}
316✔
1028

1029
// addMsg adds a new message to the current batch. If the message is already
1030
// present in the current batch, then this new instance replaces the latter,
1031
// and the set of senders is updated to reflect which node sent us this
1032
// message.
1033
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
86✔
1034
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
86✔
1035

86✔
1036
        // Depending on the message type (channel announcement, channel update,
86✔
1037
        // or node announcement), the message is added to the corresponding map
86✔
1038
        // in deDupedAnnouncements. Because each identifying key can have at
86✔
1039
        // most one value, the announcements are de-duplicated, with newer ones
86✔
1040
        // replacing older ones.
86✔
1041
        switch msg := message.msg.(type) {
86✔
1042

1043
        // Channel announcements are identified by the short channel id field.
1044
        case *lnwire.ChannelAnnouncement1:
22✔
1045
                deDupKey := msg.ShortChannelID
22✔
1046
                sender := route.NewVertex(message.source)
22✔
1047

22✔
1048
                mws, ok := d.channelAnnouncements[deDupKey]
22✔
1049
                if !ok {
43✔
1050
                        mws = msgWithSenders{
21✔
1051
                                msg:     msg,
21✔
1052
                                isLocal: !message.isRemote,
21✔
1053
                                senders: make(map[route.Vertex]struct{}),
21✔
1054
                        }
21✔
1055
                        mws.senders[sender] = struct{}{}
21✔
1056

21✔
1057
                        d.channelAnnouncements[deDupKey] = mws
21✔
1058

21✔
1059
                        return
21✔
1060
                }
21✔
1061

1062
                mws.msg = msg
1✔
1063
                mws.senders[sender] = struct{}{}
1✔
1064
                d.channelAnnouncements[deDupKey] = mws
1✔
1065

1066
        // Channel updates are identified by the (short channel id,
1067
        // channelflags) tuple.
1068
        case *lnwire.ChannelUpdate1:
42✔
1069
                sender := route.NewVertex(message.source)
42✔
1070
                deDupKey := channelUpdateID{
42✔
1071
                        msg.ShortChannelID,
42✔
1072
                        msg.ChannelFlags,
42✔
1073
                }
42✔
1074

42✔
1075
                oldTimestamp := uint32(0)
42✔
1076
                mws, ok := d.channelUpdates[deDupKey]
42✔
1077
                if ok {
45✔
1078
                        // If we already have seen this message, record its
3✔
1079
                        // timestamp.
3✔
1080
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
3✔
1081
                        if !ok {
3✔
1082
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1083
                                        "got: %T", mws.msg)
×
1084

×
1085
                                return
×
1086
                        }
×
1087

1088
                        oldTimestamp = update.Timestamp
3✔
1089
                }
1090

1091
                // If we already had this message with a strictly newer
1092
                // timestamp, then we'll just discard the message we got.
1093
                if oldTimestamp > msg.Timestamp {
43✔
1094
                        log.Debugf("Ignored outdated network message: "+
1✔
1095
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1✔
1096
                        return
1✔
1097
                }
1✔
1098

1099
                // If the message we just got is newer than what we previously
1100
                // have seen, or this is the first time we see it, then we'll
1101
                // add it to our map of announcements.
1102
                if oldTimestamp < msg.Timestamp {
81✔
1103
                        mws = msgWithSenders{
40✔
1104
                                msg:     msg,
40✔
1105
                                isLocal: !message.isRemote,
40✔
1106
                                senders: make(map[route.Vertex]struct{}),
40✔
1107
                        }
40✔
1108

40✔
1109
                        // We'll mark the sender of the message in the
40✔
1110
                        // senders map.
40✔
1111
                        mws.senders[sender] = struct{}{}
40✔
1112

40✔
1113
                        d.channelUpdates[deDupKey] = mws
40✔
1114

40✔
1115
                        return
40✔
1116
                }
40✔
1117

1118
                // Lastly, if we had seen this exact message from before, with
1119
                // the same timestamp, we'll add the sender to the map of
1120
                // senders, such that we can skip sending this message back in
1121
                // the next batch.
1122
                mws.msg = msg
1✔
1123
                mws.senders[sender] = struct{}{}
1✔
1124
                d.channelUpdates[deDupKey] = mws
1✔
1125

1126
        // Node announcements are identified by the Vertex field.  Use the
1127
        // NodeID to create the corresponding Vertex.
1128
        case *lnwire.NodeAnnouncement:
22✔
1129
                sender := route.NewVertex(message.source)
22✔
1130
                deDupKey := route.Vertex(msg.NodeID)
22✔
1131

22✔
1132
                // We do the same for node announcements as we did for channel
22✔
1133
                // updates, as they also carry a timestamp.
22✔
1134
                oldTimestamp := uint32(0)
22✔
1135
                mws, ok := d.nodeAnnouncements[deDupKey]
22✔
1136
                if ok {
27✔
1137
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
5✔
1138
                }
5✔
1139

1140
                // Discard the message if it's old.
1141
                if oldTimestamp > msg.Timestamp {
22✔
UNCOV
1142
                        return
×
UNCOV
1143
                }
×
1144

1145
                // Replace if it's newer.
1146
                if oldTimestamp < msg.Timestamp {
40✔
1147
                        mws = msgWithSenders{
18✔
1148
                                msg:     msg,
18✔
1149
                                isLocal: !message.isRemote,
18✔
1150
                                senders: make(map[route.Vertex]struct{}),
18✔
1151
                        }
18✔
1152

18✔
1153
                        mws.senders[sender] = struct{}{}
18✔
1154

18✔
1155
                        d.nodeAnnouncements[deDupKey] = mws
18✔
1156

18✔
1157
                        return
18✔
1158
                }
18✔
1159

1160
                // Add to senders map if it's the same as we had.
1161
                mws.msg = msg
4✔
1162
                mws.senders[sender] = struct{}{}
4✔
1163
                d.nodeAnnouncements[deDupKey] = mws
4✔
1164
        }
1165
}
1166

1167
// AddMsgs is a helper method to add multiple messages to the announcement
1168
// batch.
1169
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
54✔
1170
        d.Lock()
54✔
1171
        defer d.Unlock()
54✔
1172

54✔
1173
        for _, msg := range msgs {
140✔
1174
                d.addMsg(msg)
86✔
1175
        }
86✔
1176
}
1177

1178
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1179
// to broadcast next into messages that are locally sourced and those that are
1180
// sourced remotely.
1181
type msgsToBroadcast struct {
1182
        // localMsgs is the set of messages we created locally.
1183
        localMsgs []msgWithSenders
1184

1185
        // remoteMsgs is the set of messages that we received from a remote
1186
        // party.
1187
        remoteMsgs []msgWithSenders
1188
}
1189

1190
// addMsg adds a new message to the appropriate sub-slice.
1191
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
71✔
1192
        if msg.isLocal {
118✔
1193
                m.localMsgs = append(m.localMsgs, msg)
47✔
1194
        } else {
71✔
1195
                m.remoteMsgs = append(m.remoteMsgs, msg)
24✔
1196
        }
24✔
1197
}
1198

1199
// isEmpty returns true if the batch is empty.
1200
func (m *msgsToBroadcast) isEmpty() bool {
286✔
1201
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
286✔
1202
}
286✔
1203

1204
// length returns the length of the combined message set.
1205
func (m *msgsToBroadcast) length() int {
1✔
1206
        return len(m.localMsgs) + len(m.remoteMsgs)
1✔
1207
}
1✔
1208

1209
// Emit returns the set of de-duplicated announcements to be sent out during
1210
// the next announcement epoch, in the order of channel announcements, channel
1211
// updates, and node announcements. Each message emitted, contains the set of
1212
// peers that sent us the message. This way, we can ensure that we don't waste
1213
// bandwidth by re-sending a message to the peer that sent it to us in the
1214
// first place. Additionally, the set of stored messages are reset.
1215
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
287✔
1216
        d.Lock()
287✔
1217
        defer d.Unlock()
287✔
1218

287✔
1219
        // Get the total number of announcements.
287✔
1220
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
287✔
1221
                len(d.nodeAnnouncements)
287✔
1222

287✔
1223
        // Create an empty array of lnwire.Messages with a length equal to
287✔
1224
        // the total number of announcements.
287✔
1225
        msgs := msgsToBroadcast{
287✔
1226
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
287✔
1227
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
287✔
1228
        }
287✔
1229

287✔
1230
        // Add the channel announcements to the array first.
287✔
1231
        for _, message := range d.channelAnnouncements {
305✔
1232
                msgs.addMsg(message)
18✔
1233
        }
18✔
1234

1235
        // Then add the channel updates.
1236
        for _, message := range d.channelUpdates {
323✔
1237
                msgs.addMsg(message)
36✔
1238
        }
36✔
1239

1240
        // Finally add the node announcements.
1241
        for _, message := range d.nodeAnnouncements {
304✔
1242
                msgs.addMsg(message)
17✔
1243
        }
17✔
1244

1245
        d.reset()
287✔
1246

287✔
1247
        // Return the array of lnwire.messages.
287✔
1248
        return msgs
287✔
1249
}
1250

1251
// calculateSubBatchSize is a helper function that calculates the size to break
1252
// down the batchSize into.
1253
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1254
        minimumBatchSize, batchSize int) int {
13✔
1255
        if subBatchDelay > totalDelay {
15✔
1256
                return batchSize
2✔
1257
        }
2✔
1258

1259
        subBatchSize := (batchSize*int(subBatchDelay) +
11✔
1260
                int(totalDelay) - 1) / int(totalDelay)
11✔
1261

11✔
1262
        if subBatchSize < minimumBatchSize {
12✔
1263
                return minimumBatchSize
1✔
1264
        }
1✔
1265

1266
        return subBatchSize
10✔
1267
}
1268

1269
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1270
// this variable so the function can be mocked in our test.
1271
var batchSizeCalculator = calculateSubBatchSize
1272

1273
// splitAnnouncementBatches takes an exiting list of announcements and
1274
// decomposes it into sub batches controlled by the `subBatchSize`.
1275
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1276
        announcementBatch []msgWithSenders) [][]msgWithSenders {
69✔
1277

69✔
1278
        subBatchSize := batchSizeCalculator(
69✔
1279
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
69✔
1280
                d.cfg.MinimumBatchSize, len(announcementBatch),
69✔
1281
        )
69✔
1282

69✔
1283
        var splitAnnouncementBatch [][]msgWithSenders
69✔
1284

69✔
1285
        for subBatchSize < len(announcementBatch) {
190✔
1286
                // For slicing with minimal allocation
121✔
1287
                // https://github.com/golang/go/wiki/SliceTricks
121✔
1288
                announcementBatch, splitAnnouncementBatch =
121✔
1289
                        announcementBatch[subBatchSize:],
121✔
1290
                        append(splitAnnouncementBatch,
121✔
1291
                                announcementBatch[0:subBatchSize:subBatchSize])
121✔
1292
        }
121✔
1293
        splitAnnouncementBatch = append(
69✔
1294
                splitAnnouncementBatch, announcementBatch,
69✔
1295
        )
69✔
1296

69✔
1297
        return splitAnnouncementBatch
69✔
1298
}
1299

1300
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1301
// split size, and then sends out all items to the set of target peers. Locally
1302
// generated announcements are always sent before remotely generated
1303
// announcements.
1304
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
1305
        annBatch msgsToBroadcast) {
31✔
1306

31✔
1307
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
31✔
1308
        // duration to delay the sending of next announcement batch.
31✔
1309
        delayNextBatch := func() {
93✔
1310
                select {
62✔
1311
                case <-time.After(d.cfg.SubBatchDelay):
45✔
1312
                case <-d.quit:
17✔
1313
                        return
17✔
1314
                }
1315
        }
1316

1317
        // Fetch the local and remote announcements.
1318
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
31✔
1319
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
31✔
1320

31✔
1321
        d.wg.Add(1)
31✔
1322
        go func() {
62✔
1323
                defer d.wg.Done()
31✔
1324

31✔
1325
                log.Debugf("Broadcasting %v new local announcements in %d "+
31✔
1326
                        "sub batches", len(annBatch.localMsgs),
31✔
1327
                        len(localBatches))
31✔
1328

31✔
1329
                // Send out the local announcements first.
31✔
1330
                for _, annBatch := range localBatches {
62✔
1331
                        d.sendLocalBatch(annBatch)
31✔
1332
                        delayNextBatch()
31✔
1333
                }
31✔
1334

1335
                log.Debugf("Broadcasting %v new remote announcements in %d "+
31✔
1336
                        "sub batches", len(annBatch.remoteMsgs),
31✔
1337
                        len(remoteBatches))
31✔
1338

31✔
1339
                // Now send the remote announcements.
31✔
1340
                for _, annBatch := range remoteBatches {
62✔
1341
                        d.sendRemoteBatch(annBatch)
31✔
1342
                        delayNextBatch()
31✔
1343
                }
31✔
1344
        }()
1345
}
1346

1347
// sendLocalBatch broadcasts a list of locally generated announcements to our
1348
// peers. For local announcements, we skip the filter and dedup logic and just
1349
// send the announcements out to all our coonnected peers.
1350
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
31✔
1351
        msgsToSend := lnutils.Map(
31✔
1352
                annBatch, func(m msgWithSenders) lnwire.Message {
74✔
1353
                        return m.msg
43✔
1354
                },
43✔
1355
        )
1356

1357
        err := d.cfg.Broadcast(nil, msgsToSend...)
31✔
1358
        if err != nil {
31✔
1359
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1360
        }
×
1361
}
1362

1363
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1364
// peers.
1365
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
31✔
1366
        syncerPeers := d.syncMgr.GossipSyncers()
31✔
1367

31✔
1368
        // We'll first attempt to filter out this new message for all peers
31✔
1369
        // that have active gossip syncers active.
31✔
1370
        for pub, syncer := range syncerPeers {
31✔
UNCOV
1371
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
×
UNCOV
1372
                syncer.FilterGossipMsgs(annBatch...)
×
UNCOV
1373
        }
×
1374

1375
        for _, msgChunk := range annBatch {
55✔
1376
                msgChunk := msgChunk
24✔
1377

24✔
1378
                // With the syncers taken care of, we'll merge the sender map
24✔
1379
                // with the set of syncers, so we don't send out duplicate
24✔
1380
                // messages.
24✔
1381
                msgChunk.mergeSyncerMap(syncerPeers)
24✔
1382

24✔
1383
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
24✔
1384
                if err != nil {
24✔
1385
                        log.Errorf("Unable to send batch "+
×
1386
                                "announcements: %v", err)
×
1387
                        continue
×
1388
                }
1389
        }
1390
}
1391

1392
// networkHandler is the primary goroutine that drives this service. The roles
1393
// of this goroutine includes answering queries related to the state of the
1394
// network, syncing up newly connected peers, and also periodically
1395
// broadcasting our latest topology state to all connected peers.
1396
//
1397
// NOTE: This MUST be run as a goroutine.
1398
func (d *AuthenticatedGossiper) networkHandler() {
27✔
1399
        defer d.wg.Done()
27✔
1400

27✔
1401
        // Initialize empty deDupedAnnouncements to store announcement batch.
27✔
1402
        announcements := deDupedAnnouncements{}
27✔
1403
        announcements.Reset()
27✔
1404

27✔
1405
        d.cfg.RetransmitTicker.Resume()
27✔
1406
        defer d.cfg.RetransmitTicker.Stop()
27✔
1407

27✔
1408
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
27✔
1409
        defer trickleTimer.Stop()
27✔
1410

27✔
1411
        // To start, we'll first check to see if there are any stale channel or
27✔
1412
        // node announcements that we need to re-transmit.
27✔
1413
        if err := d.retransmitStaleAnns(time.Now()); err != nil {
27✔
1414
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1415
        }
×
1416

1417
        for {
673✔
1418
                select {
646✔
1419
                // A new policy update has arrived. We'll commit it to the
1420
                // sub-systems below us, then craft, sign, and broadcast a new
1421
                // ChannelUpdate for the set of affected clients.
1422
                case policyUpdate := <-d.chanPolicyUpdates:
1✔
1423
                        log.Tracef("Received channel %d policy update requests",
1✔
1424
                                len(policyUpdate.edgesToUpdate))
1✔
1425

1✔
1426
                        // First, we'll now create new fully signed updates for
1✔
1427
                        // the affected channels and also update the underlying
1✔
1428
                        // graph with the new state.
1✔
1429
                        newChanUpdates, err := d.processChanPolicyUpdate(
1✔
1430
                                policyUpdate.edgesToUpdate,
1✔
1431
                        )
1✔
1432
                        policyUpdate.errChan <- err
1✔
1433
                        if err != nil {
1✔
1434
                                log.Errorf("Unable to craft policy updates: %v",
×
1435
                                        err)
×
1436
                                continue
×
1437
                        }
1438

1439
                        // Finally, with the updates committed, we'll now add
1440
                        // them to the announcement batch to be flushed at the
1441
                        // start of the next epoch.
1442
                        announcements.AddMsgs(newChanUpdates...)
1✔
1443

1444
                case announcement := <-d.networkMsgs:
331✔
1445
                        log.Tracef("Received network message: "+
331✔
1446
                                "peer=%v, msg=%s, is_remote=%v",
331✔
1447
                                announcement.peer, announcement.msg.MsgType(),
331✔
1448
                                announcement.isRemote)
331✔
1449

331✔
1450
                        switch announcement.msg.(type) {
331✔
1451
                        // Channel announcement signatures are amongst the only
1452
                        // messages that we'll process serially.
1453
                        case *lnwire.AnnounceSignatures1:
21✔
1454
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
21✔
1455
                                        announcement,
21✔
1456
                                )
21✔
1457
                                log.Debugf("Processed network message %s, "+
21✔
1458
                                        "returned len(announcements)=%v",
21✔
1459
                                        announcement.msg.MsgType(),
21✔
1460
                                        len(emittedAnnouncements))
21✔
1461

21✔
1462
                                if emittedAnnouncements != nil {
31✔
1463
                                        announcements.AddMsgs(
10✔
1464
                                                emittedAnnouncements...,
10✔
1465
                                        )
10✔
1466
                                }
10✔
1467
                                continue
21✔
1468
                        }
1469

1470
                        // If this message was recently rejected, then we won't
1471
                        // attempt to re-process it.
1472
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
310✔
1473
                                announcement.msg,
310✔
1474
                                sourceToPub(announcement.source),
310✔
1475
                        ) {
311✔
1476

1✔
1477
                                announcement.err <- fmt.Errorf("recently " +
1✔
1478
                                        "rejected")
1✔
1479
                                continue
1✔
1480
                        }
1481

1482
                        // We'll set up any dependent, and wait until a free
1483
                        // slot for this job opens up, this allow us to not
1484
                        // have thousands of goroutines active.
1485
                        annJobID, err := d.vb.InitJobDependencies(
309✔
1486
                                announcement.msg,
309✔
1487
                        )
309✔
1488
                        if err != nil {
309✔
1489
                                announcement.err <- err
×
1490
                                continue
×
1491
                        }
1492

1493
                        d.wg.Add(1)
309✔
1494
                        go d.handleNetworkMessages(
309✔
1495
                                announcement, &announcements, annJobID,
309✔
1496
                        )
309✔
1497

1498
                // The trickle timer has ticked, which indicates we should
1499
                // flush to the network the pending batch of new announcements
1500
                // we've received since the last trickle tick.
1501
                case <-trickleTimer.C:
286✔
1502
                        // Emit the current batch of announcements from
286✔
1503
                        // deDupedAnnouncements.
286✔
1504
                        announcementBatch := announcements.Emit()
286✔
1505

286✔
1506
                        // If the current announcements batch is nil, then we
286✔
1507
                        // have no further work here.
286✔
1508
                        if announcementBatch.isEmpty() {
541✔
1509
                                continue
255✔
1510
                        }
1511

1512
                        // At this point, we have the set of local and remote
1513
                        // announcements we want to send out. We'll do the
1514
                        // batching as normal for both, but for local
1515
                        // announcements, we'll blast them out w/o regard for
1516
                        // our peer's policies so we ensure they propagate
1517
                        // properly.
1518
                        d.splitAndSendAnnBatch(announcementBatch)
31✔
1519

1520
                // The retransmission timer has ticked which indicates that we
1521
                // should check if we need to prune or re-broadcast any of our
1522
                // personal channels or node announcement. This addresses the
1523
                // case of "zombie" channels and channel advertisements that
1524
                // have been dropped, or not properly propagated through the
1525
                // network.
1526
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1✔
1527
                        if err := d.retransmitStaleAnns(tick); err != nil {
1✔
1528
                                log.Errorf("unable to rebroadcast stale "+
×
1529
                                        "announcements: %v", err)
×
1530
                        }
×
1531

1532
                // The gossiper has been signalled to exit, to we exit our
1533
                // main loop so the wait group can be decremented.
1534
                case <-d.quit:
27✔
1535
                        return
27✔
1536
                }
1537
        }
1538
}
1539

1540
// handleNetworkMessages is responsible for waiting for dependencies for a
1541
// given network message and processing the message. Once processed, it will
1542
// signal its dependants and add the new announcements to the announce batch.
1543
//
1544
// NOTE: must be run as a goroutine.
1545
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1546
        deDuped *deDupedAnnouncements, jobID JobID) {
309✔
1547

309✔
1548
        defer d.wg.Done()
309✔
1549
        defer d.vb.CompleteJob()
309✔
1550

309✔
1551
        // We should only broadcast this message forward if it originated from
309✔
1552
        // us or it wasn't received as part of our initial historical sync.
309✔
1553
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
309✔
1554

309✔
1555
        // If this message has an existing dependency, then we'll wait until
309✔
1556
        // that has been fully validated before we proceed.
309✔
1557
        err := d.vb.WaitForParents(jobID, nMsg.msg)
309✔
1558
        if err != nil {
309✔
1559
                log.Debugf("Validating network message %s got err: %v",
×
1560
                        nMsg.msg.MsgType(), err)
×
1561

×
1562
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
1563
                        log.Warnf("unexpected error during validation "+
×
1564
                                "barrier shutdown: %v", err)
×
1565
                }
×
1566
                nMsg.err <- err
×
1567

×
1568
                return
×
1569
        }
1570

1571
        // Process the network announcement to determine if this is either a
1572
        // new announcement from our PoV or an edges to a prior vertex/edge we
1573
        // previously proceeded.
1574
        newAnns, allow := d.processNetworkAnnouncement(nMsg)
309✔
1575

309✔
1576
        log.Tracef("Processed network message %s, returned "+
309✔
1577
                "len(announcements)=%v, allowDependents=%v",
309✔
1578
                nMsg.msg.MsgType(), len(newAnns), allow)
309✔
1579

309✔
1580
        // If this message had any dependencies, then we can now signal them to
309✔
1581
        // continue.
309✔
1582
        err = d.vb.SignalDependents(nMsg.msg, jobID)
309✔
1583
        if err != nil {
309✔
1584
                // Something is wrong if SignalDependents returns an error.
×
1585
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
1586
                        "JobID=%v", spew.Sdump(nMsg.msg), jobID)
×
1587

×
1588
                nMsg.err <- err
×
1589

×
1590
                return
×
1591
        }
×
1592

1593
        // If the announcement was accepted, then add the emitted announcements
1594
        // to our announce batch to be broadcast once the trickle timer ticks
1595
        // gain.
1596
        if newAnns != nil && shouldBroadcast {
341✔
1597
                // TODO(roasbeef): exclude peer that sent.
32✔
1598
                deDuped.AddMsgs(newAnns...)
32✔
1599
        } else if newAnns != nil {
310✔
1600
                log.Trace("Skipping broadcast of announcements received " +
1✔
1601
                        "during initial graph sync")
1✔
1602
        }
1✔
1603
}
1604

1605
// TODO(roasbeef): d/c peers that send updates not on our chain
1606

1607
// InitSyncState is called by outside sub-systems when a connection is
1608
// established to a new peer that understands how to perform channel range
1609
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1610
// needed to handle new queries.
UNCOV
1611
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
×
UNCOV
1612
        d.syncMgr.InitSyncState(syncPeer)
×
UNCOV
1613
}
×
1614

1615
// PruneSyncState is called by outside sub-systems once a peer that we were
1616
// previously connected to has been disconnected. In this case we can stop the
1617
// existing GossipSyncer assigned to the peer and free up resources.
UNCOV
1618
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
×
UNCOV
1619
        d.syncMgr.PruneSyncState(peer)
×
UNCOV
1620
}
×
1621

1622
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1623
// false otherwise, This avoids expensive reprocessing of the message.
1624
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1625
        peerPub [33]byte) bool {
273✔
1626

273✔
1627
        var scid uint64
273✔
1628
        switch m := msg.(type) {
273✔
1629
        case *lnwire.ChannelUpdate1:
42✔
1630
                scid = m.ShortChannelID.ToUint64()
42✔
1631

1632
        case *lnwire.ChannelAnnouncement1:
217✔
1633
                scid = m.ShortChannelID.ToUint64()
217✔
1634

1635
        default:
14✔
1636
                return false
14✔
1637
        }
1638

1639
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
259✔
1640
        return err != cache.ErrElementNotFound
259✔
1641
}
1642

1643
// retransmitStaleAnns examines all outgoing channels that the source node is
1644
// known to maintain to check to see if any of them are "stale". A channel is
1645
// stale iff, the last timestamp of its rebroadcast is older than the
1646
// RebroadcastInterval. We also check if a refreshed node announcement should
1647
// be resent.
1648
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
28✔
1649
        // Iterate over all of our channels and check if any of them fall
28✔
1650
        // within the prune interval or re-broadcast interval.
28✔
1651
        type updateTuple struct {
28✔
1652
                info *models.ChannelEdgeInfo
28✔
1653
                edge *models.ChannelEdgePolicy
28✔
1654
        }
28✔
1655

28✔
1656
        var (
28✔
1657
                havePublicChannels bool
28✔
1658
                edgesToUpdate      []updateTuple
28✔
1659
        )
28✔
1660
        err := d.cfg.Graph.ForAllOutgoingChannels(func(
28✔
1661
                info *models.ChannelEdgeInfo,
28✔
1662
                edge *models.ChannelEdgePolicy) error {
30✔
1663

2✔
1664
                // If there's no auth proof attached to this edge, it means
2✔
1665
                // that it is a private channel not meant to be announced to
2✔
1666
                // the greater network, so avoid sending channel updates for
2✔
1667
                // this channel to not leak its
2✔
1668
                // existence.
2✔
1669
                if info.AuthProof == nil {
3✔
1670
                        log.Debugf("Skipping retransmission of channel "+
1✔
1671
                                "without AuthProof: %v", info.ChannelID)
1✔
1672
                        return nil
1✔
1673
                }
1✔
1674

1675
                // We make a note that we have at least one public channel. We
1676
                // use this to determine whether we should send a node
1677
                // announcement below.
1678
                havePublicChannels = true
1✔
1679

1✔
1680
                // If this edge has a ChannelUpdate that was created before the
1✔
1681
                // introduction of the MaxHTLC field, then we'll update this
1✔
1682
                // edge to propagate this information in the network.
1✔
1683
                if !edge.MessageFlags.HasMaxHtlc() {
1✔
1684
                        // We'll make sure we support the new max_htlc field if
×
1685
                        // not already present.
×
1686
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1687
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1688

×
1689
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1690
                                info: info,
×
1691
                                edge: edge,
×
1692
                        })
×
1693
                        return nil
×
1694
                }
×
1695

1696
                timeElapsed := now.Sub(edge.LastUpdate)
1✔
1697

1✔
1698
                // If it's been longer than RebroadcastInterval since we've
1✔
1699
                // re-broadcasted the channel, add the channel to the set of
1✔
1700
                // edges we need to update.
1✔
1701
                if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
1702
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
1✔
1703
                                info: info,
1✔
1704
                                edge: edge,
1✔
1705
                        })
1✔
1706
                }
1✔
1707

1708
                return nil
1✔
1709
        })
1710
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
28✔
1711
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1712
                        err)
×
1713
        }
×
1714

1715
        var signedUpdates []lnwire.Message
28✔
1716
        for _, chanToUpdate := range edgesToUpdate {
29✔
1717
                // Re-sign and update the channel on disk and retrieve our
1✔
1718
                // ChannelUpdate to broadcast.
1✔
1719
                chanAnn, chanUpdate, err := d.updateChannel(
1✔
1720
                        chanToUpdate.info, chanToUpdate.edge,
1✔
1721
                )
1✔
1722
                if err != nil {
1✔
1723
                        return fmt.Errorf("unable to update channel: %w", err)
×
1724
                }
×
1725

1726
                // If we have a valid announcement to transmit, then we'll send
1727
                // that along with the update.
1728
                if chanAnn != nil {
2✔
1729
                        signedUpdates = append(signedUpdates, chanAnn)
1✔
1730
                }
1✔
1731

1732
                signedUpdates = append(signedUpdates, chanUpdate)
1✔
1733
        }
1734

1735
        // If we don't have any public channels, we return as we don't want to
1736
        // broadcast anything that would reveal our existence.
1737
        if !havePublicChannels {
55✔
1738
                return nil
27✔
1739
        }
27✔
1740

1741
        // We'll also check that our NodeAnnouncement is not too old.
1742
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
1✔
1743
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
1✔
1744
        timeElapsed := now.Sub(timestamp)
1✔
1745

1✔
1746
        // If it's been a full day since we've re-broadcasted the
1✔
1747
        // node announcement, refresh it and resend it.
1✔
1748
        nodeAnnStr := ""
1✔
1749
        if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
1750
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
1✔
1751
                if err != nil {
1✔
1752
                        return fmt.Errorf("unable to get refreshed node "+
×
1753
                                "announcement: %v", err)
×
1754
                }
×
1755

1756
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1757
                nodeAnnStr = " and our refreshed node announcement"
1✔
1758

1✔
1759
                // Before broadcasting the refreshed node announcement, add it
1✔
1760
                // to our own graph.
1✔
1761
                if err := d.addNode(&newNodeAnn); err != nil {
2✔
1762
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1763
                                "to graph: %v", err)
1✔
1764
                }
1✔
1765
        }
1766

1767
        // If we don't have any updates to re-broadcast, then we'll exit
1768
        // early.
1769
        if len(signedUpdates) == 0 {
1✔
UNCOV
1770
                return nil
×
UNCOV
1771
        }
×
1772

1773
        log.Infof("Retransmitting %v outgoing channels%v",
1✔
1774
                len(edgesToUpdate), nodeAnnStr)
1✔
1775

1✔
1776
        // With all the wire announcements properly crafted, we'll broadcast
1✔
1777
        // our known outgoing channels to all our immediate peers.
1✔
1778
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
1779
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1780
        }
×
1781

1782
        return nil
1✔
1783
}
1784

1785
// processChanPolicyUpdate generates a new set of channel updates for the
1786
// provided list of edges and updates the backing ChannelGraphSource.
1787
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
1788
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
1✔
1789

1✔
1790
        var chanUpdates []networkMsg
1✔
1791
        for _, edgeInfo := range edgesToUpdate {
4✔
1792
                // Now that we've collected all the channels we need to update,
3✔
1793
                // we'll re-sign and update the backing ChannelGraphSource, and
3✔
1794
                // retrieve our ChannelUpdate to broadcast.
3✔
1795
                _, chanUpdate, err := d.updateChannel(
3✔
1796
                        edgeInfo.Info, edgeInfo.Edge,
3✔
1797
                )
3✔
1798
                if err != nil {
3✔
1799
                        return nil, err
×
1800
                }
×
1801

1802
                // We'll avoid broadcasting any updates for private channels to
1803
                // avoid directly giving away their existence. Instead, we'll
1804
                // send the update directly to the remote party.
1805
                if edgeInfo.Info.AuthProof == nil {
4✔
1806
                        // If AuthProof is nil and an alias was found for this
1✔
1807
                        // ChannelID (meaning the option-scid-alias feature was
1✔
1808
                        // negotiated), we'll replace the ShortChannelID in the
1✔
1809
                        // update with the peer's alias. We do this after
1✔
1810
                        // updateChannel so that the alias isn't persisted to
1✔
1811
                        // the database.
1✔
1812
                        chanID := lnwire.NewChanIDFromOutPoint(
1✔
1813
                                edgeInfo.Info.ChannelPoint,
1✔
1814
                        )
1✔
1815

1✔
1816
                        var defaultAlias lnwire.ShortChannelID
1✔
1817
                        foundAlias, _ := d.cfg.GetAlias(chanID)
1✔
1818
                        if foundAlias != defaultAlias {
1✔
UNCOV
1819
                                chanUpdate.ShortChannelID = foundAlias
×
UNCOV
1820

×
UNCOV
1821
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
×
UNCOV
1822
                                if err != nil {
×
1823
                                        log.Errorf("Unable to sign alias "+
×
1824
                                                "update: %v", err)
×
1825
                                        continue
×
1826
                                }
1827

UNCOV
1828
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
UNCOV
1829
                                if err != nil {
×
1830
                                        log.Errorf("Unable to create sig: %v",
×
1831
                                                err)
×
1832
                                        continue
×
1833
                                }
1834

UNCOV
1835
                                chanUpdate.Signature = lnSig
×
1836
                        }
1837

1838
                        remotePubKey := remotePubFromChanInfo(
1✔
1839
                                edgeInfo.Info, chanUpdate.ChannelFlags,
1✔
1840
                        )
1✔
1841
                        err := d.reliableSender.sendMessage(
1✔
1842
                                chanUpdate, remotePubKey,
1✔
1843
                        )
1✔
1844
                        if err != nil {
1✔
1845
                                log.Errorf("Unable to reliably send %v for "+
×
1846
                                        "channel=%v to peer=%x: %v",
×
1847
                                        chanUpdate.MsgType(),
×
1848
                                        chanUpdate.ShortChannelID,
×
1849
                                        remotePubKey, err)
×
1850
                        }
×
1851
                        continue
1✔
1852
                }
1853

1854
                // We set ourselves as the source of this message to indicate
1855
                // that we shouldn't skip any peers when sending this message.
1856
                chanUpdates = append(chanUpdates, networkMsg{
2✔
1857
                        source:   d.selfKey,
2✔
1858
                        isRemote: false,
2✔
1859
                        msg:      chanUpdate,
2✔
1860
                })
2✔
1861
        }
1862

1863
        return chanUpdates, nil
1✔
1864
}
1865

1866
// remotePubFromChanInfo returns the public key of the remote peer given a
1867
// ChannelEdgeInfo that describe a channel we have with them.
1868
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1869
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
12✔
1870

12✔
1871
        var remotePubKey [33]byte
12✔
1872
        switch {
12✔
1873
        case chanFlags&lnwire.ChanUpdateDirection == 0:
12✔
1874
                remotePubKey = chanInfo.NodeKey2Bytes
12✔
UNCOV
1875
        case chanFlags&lnwire.ChanUpdateDirection == 1:
×
UNCOV
1876
                remotePubKey = chanInfo.NodeKey1Bytes
×
1877
        }
1878

1879
        return remotePubKey
12✔
1880
}
1881

1882
// processRejectedEdge examines a rejected edge to see if we can extract any
1883
// new announcements from it.  An edge will get rejected if we already added
1884
// the same edge without AuthProof to the graph. If the received announcement
1885
// contains a proof, we can add this proof to our edge.  We can end up in this
1886
// situation in the case where we create a channel, but for some reason fail
1887
// to receive the remote peer's proof, while the remote peer is able to fully
1888
// assemble the proof and craft the ChannelAnnouncement.
1889
func (d *AuthenticatedGossiper) processRejectedEdge(
1890
        chanAnnMsg *lnwire.ChannelAnnouncement1,
UNCOV
1891
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
×
UNCOV
1892

×
UNCOV
1893
        // First, we'll fetch the state of the channel as we know if from the
×
UNCOV
1894
        // database.
×
UNCOV
1895
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
×
UNCOV
1896
                chanAnnMsg.ShortChannelID,
×
UNCOV
1897
        )
×
UNCOV
1898
        if err != nil {
×
1899
                return nil, err
×
1900
        }
×
1901

1902
        // The edge is in the graph, and has a proof attached, then we'll just
1903
        // reject it as normal.
UNCOV
1904
        if chanInfo.AuthProof != nil {
×
UNCOV
1905
                return nil, nil
×
UNCOV
1906
        }
×
1907

1908
        // Otherwise, this means that the edge is within the graph, but it
1909
        // doesn't yet have a proper proof attached. If we did not receive
1910
        // the proof such that we now can add it, there's nothing more we
1911
        // can do.
1912
        if proof == nil {
×
1913
                return nil, nil
×
1914
        }
×
1915

1916
        // We'll then create then validate the new fully assembled
1917
        // announcement.
1918
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1919
                proof, chanInfo, e1, e2,
×
1920
        )
×
1921
        if err != nil {
×
1922
                return nil, err
×
1923
        }
×
1924
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1925
        if err != nil {
×
1926
                err := fmt.Errorf("assembled channel announcement proof "+
×
1927
                        "for shortChanID=%v isn't valid: %v",
×
1928
                        chanAnnMsg.ShortChannelID, err)
×
1929
                log.Error(err)
×
1930
                return nil, err
×
1931
        }
×
1932

1933
        // If everything checks out, then we'll add the fully assembled proof
1934
        // to the database.
1935
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
1936
        if err != nil {
×
1937
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
1938
                        chanAnnMsg.ShortChannelID, err)
×
1939
                log.Error(err)
×
1940
                return nil, err
×
1941
        }
×
1942

1943
        // As we now have a complete channel announcement for this channel,
1944
        // we'll construct the announcement so they can be broadcast out to all
1945
        // our peers.
1946
        announcements := make([]networkMsg, 0, 3)
×
1947
        announcements = append(announcements, networkMsg{
×
1948
                source: d.selfKey,
×
1949
                msg:    chanAnn,
×
1950
        })
×
1951
        if e1Ann != nil {
×
1952
                announcements = append(announcements, networkMsg{
×
1953
                        source: d.selfKey,
×
1954
                        msg:    e1Ann,
×
1955
                })
×
1956
        }
×
1957
        if e2Ann != nil {
×
1958
                announcements = append(announcements, networkMsg{
×
1959
                        source: d.selfKey,
×
1960
                        msg:    e2Ann,
×
1961
                })
×
1962

×
1963
        }
×
1964

1965
        return announcements, nil
×
1966
}
1967

1968
// fetchPKScript fetches the output script for the given SCID.
1969
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
1970
        []byte, error) {
×
1971

×
1972
        return lnwallet.FetchPKScriptWithQuit(d.cfg.ChainIO, chanID, d.quit)
×
1973
}
×
1974

1975
// addNode processes the given node announcement, and adds it to our channel
1976
// graph.
1977
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
1978
        op ...batch.SchedulerOption) error {
17✔
1979

17✔
1980
        if err := graph.ValidateNodeAnn(msg); err != nil {
18✔
1981
                return fmt.Errorf("unable to validate node announcement: %w",
1✔
1982
                        err)
1✔
1983
        }
1✔
1984

1985
        timestamp := time.Unix(int64(msg.Timestamp), 0)
16✔
1986
        features := lnwire.NewFeatureVector(msg.Features, lnwire.Features)
16✔
1987
        node := &models.LightningNode{
16✔
1988
                HaveNodeAnnouncement: true,
16✔
1989
                LastUpdate:           timestamp,
16✔
1990
                Addresses:            msg.Addresses,
16✔
1991
                PubKeyBytes:          msg.NodeID,
16✔
1992
                Alias:                msg.Alias.String(),
16✔
1993
                AuthSigBytes:         msg.Signature.ToSignatureBytes(),
16✔
1994
                Features:             features,
16✔
1995
                Color:                msg.RGBColor,
16✔
1996
                ExtraOpaqueData:      msg.ExtraOpaqueData,
16✔
1997
        }
16✔
1998

16✔
1999
        return d.cfg.Graph.AddNode(node, op...)
16✔
2000
}
2001

2002
// isPremature decides whether a given network message has a block height+delta
2003
// value specified in the future. If so, the message will be added to the
2004
// future message map and be processed when the block height as reached.
2005
//
2006
// NOTE: must be used inside a lock.
2007
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2008
        delta uint32, msg *networkMsg) bool {
279✔
2009

279✔
2010
        // The channel is already confirmed at chanID.BlockHeight so we minus
279✔
2011
        // one block. For instance, if the required confirmation for this
279✔
2012
        // channel announcement is 6, we then only need to wait for 5 more
279✔
2013
        // blocks once the funding tx is confirmed.
279✔
2014
        if delta > 0 {
279✔
UNCOV
2015
                delta--
×
UNCOV
2016
        }
×
2017

2018
        msgHeight := chanID.BlockHeight + delta
279✔
2019

279✔
2020
        // The message height is smaller or equal to our best known height,
279✔
2021
        // thus the message is mature.
279✔
2022
        if msgHeight <= d.bestHeight {
557✔
2023
                return false
278✔
2024
        }
278✔
2025

2026
        // Add the premature message to our future messages which will be
2027
        // resent once the block height has reached.
2028
        //
2029
        // Copy the networkMsgs since the old message's err chan will be
2030
        // consumed.
2031
        copied := &networkMsg{
1✔
2032
                peer:              msg.peer,
1✔
2033
                source:            msg.source,
1✔
2034
                msg:               msg.msg,
1✔
2035
                optionalMsgFields: msg.optionalMsgFields,
1✔
2036
                isRemote:          msg.isRemote,
1✔
2037
                err:               make(chan error, 1),
1✔
2038
        }
1✔
2039

1✔
2040
        // Create the cached message.
1✔
2041
        cachedMsg := &cachedFutureMsg{
1✔
2042
                msg:    copied,
1✔
2043
                height: msgHeight,
1✔
2044
        }
1✔
2045

1✔
2046
        // Increment the msg ID and add it to the cache.
1✔
2047
        nextMsgID := d.futureMsgs.nextMsgID()
1✔
2048
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
1✔
2049
        if err != nil {
1✔
2050
                log.Errorf("Adding future message got error: %v", err)
×
2051
        }
×
2052

2053
        log.Debugf("Network message: %v added to future messages for "+
1✔
2054
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
1✔
2055
                msgHeight, d.bestHeight)
1✔
2056

1✔
2057
        return true
1✔
2058
}
2059

2060
// processNetworkAnnouncement processes a new network relate authenticated
2061
// channel or node announcement or announcements proofs. If the announcement
2062
// didn't affect the internal state due to either being out of date, invalid,
2063
// or redundant, then nil is returned. Otherwise, the set of announcements will
2064
// be returned which should be broadcasted to the rest of the network. The
2065
// boolean returned indicates whether any dependents of the announcement should
2066
// attempt to be processed as well.
2067
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
2068
        nMsg *networkMsg) ([]networkMsg, bool) {
330✔
2069

330✔
2070
        // If this is a remote update, we set the scheduler option to lazily
330✔
2071
        // add it to the graph.
330✔
2072
        var schedulerOp []batch.SchedulerOption
330✔
2073
        if nMsg.isRemote {
613✔
2074
                schedulerOp = append(schedulerOp, batch.LazyAdd())
283✔
2075
        }
283✔
2076

2077
        switch msg := nMsg.msg.(type) {
330✔
2078
        // A new node announcement has arrived which either presents new
2079
        // information about a node in one of the channels we know about, or a
2080
        // updating previously advertised information.
2081
        case *lnwire.NodeAnnouncement:
24✔
2082
                return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
24✔
2083

2084
        // A new channel announcement has arrived, this indicates the
2085
        // *creation* of a new channel within the network. This only advertises
2086
        // the existence of a channel and not yet the routing policies in
2087
        // either direction of the channel.
2088
        case *lnwire.ChannelAnnouncement1:
230✔
2089
                return d.handleChanAnnouncement(nMsg, msg, schedulerOp)
230✔
2090

2091
        // A new authenticated channel edge update has arrived. This indicates
2092
        // that the directional information for an already known channel has
2093
        // been updated.
2094
        case *lnwire.ChannelUpdate1:
55✔
2095
                return d.handleChanUpdate(nMsg, msg, schedulerOp)
55✔
2096

2097
        // A new signature announcement has been received. This indicates
2098
        // willingness of nodes involved in the funding of a channel to
2099
        // announce this new channel to the rest of the world.
2100
        case *lnwire.AnnounceSignatures1:
21✔
2101
                return d.handleAnnSig(nMsg, msg)
21✔
2102

2103
        default:
×
2104
                err := errors.New("wrong type of the announcement")
×
2105
                nMsg.err <- err
×
2106
                return nil, false
×
2107
        }
2108
}
2109

2110
// processZombieUpdate determines whether the provided channel update should
2111
// resurrect a given zombie edge.
2112
//
2113
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2114
// should be inspected.
2115
func (d *AuthenticatedGossiper) processZombieUpdate(
2116
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2117
        msg *lnwire.ChannelUpdate1) error {
3✔
2118

3✔
2119
        // The least-significant bit in the flag on the channel update tells us
3✔
2120
        // which edge is being updated.
3✔
2121
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
2122

3✔
2123
        // Since we've deemed the update as not stale above, before marking it
3✔
2124
        // live, we'll make sure it has been signed by the correct party. If we
3✔
2125
        // have both pubkeys, either party can resurrect the channel. If we've
3✔
2126
        // already marked this with the stricter, single-sided resurrection we
3✔
2127
        // will only have the pubkey of the node with the oldest timestamp.
3✔
2128
        var pubKey *btcec.PublicKey
3✔
2129
        switch {
3✔
2130
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2131
                pubKey, _ = chanInfo.NodeKey1()
×
2132
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
2✔
2133
                pubKey, _ = chanInfo.NodeKey2()
2✔
2134
        }
2135
        if pubKey == nil {
4✔
2136
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
1✔
2137
                        "with chan_id=%v", msg.ShortChannelID)
1✔
2138
        }
1✔
2139

2140
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
2✔
2141
        if err != nil {
3✔
2142
                return fmt.Errorf("unable to verify channel "+
1✔
2143
                        "update signature: %v", err)
1✔
2144
        }
1✔
2145

2146
        // With the signature valid, we'll proceed to mark the
2147
        // edge as live and wait for the channel announcement to
2148
        // come through again.
2149
        err = d.cfg.Graph.MarkEdgeLive(scid)
1✔
2150
        switch {
1✔
2151
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2152
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2153
                        "zombie index: %v", err)
×
2154

×
2155
                return nil
×
2156

2157
        case err != nil:
×
2158
                return fmt.Errorf("unable to remove edge with "+
×
2159
                        "chan_id=%v from zombie index: %v",
×
2160
                        msg.ShortChannelID, err)
×
2161

2162
        default:
1✔
2163
        }
2164

2165
        log.Debugf("Removed edge with chan_id=%v from zombie "+
1✔
2166
                "index", msg.ShortChannelID)
1✔
2167

1✔
2168
        return nil
1✔
2169
}
2170

2171
// fetchNodeAnn fetches the latest signed node announcement from our point of
2172
// view for the node with the given public key.
2173
func (d *AuthenticatedGossiper) fetchNodeAnn(
2174
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
20✔
2175

20✔
2176
        node, err := d.cfg.Graph.FetchLightningNode(pubKey)
20✔
2177
        if err != nil {
26✔
2178
                return nil, err
6✔
2179
        }
6✔
2180

2181
        return node.NodeAnnouncement(true)
14✔
2182
}
2183

2184
// isMsgStale determines whether a message retrieved from the backing
2185
// MessageStore is seen as stale by the current graph.
2186
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
12✔
2187
        switch msg := msg.(type) {
12✔
2188
        case *lnwire.AnnounceSignatures1:
2✔
2189
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
2✔
2190
                        msg.ShortChannelID,
2✔
2191
                )
2✔
2192

2✔
2193
                // If the channel cannot be found, it is most likely a leftover
2✔
2194
                // message for a channel that was closed, so we can consider it
2✔
2195
                // stale.
2✔
2196
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
2✔
UNCOV
2197
                        return true
×
UNCOV
2198
                }
×
2199
                if err != nil {
2✔
2200
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2201
                                "%v", chanInfo.ChannelID, err)
×
2202
                        return false
×
2203
                }
×
2204

2205
                // If the proof exists in the graph, then we have successfully
2206
                // received the remote proof and assembled the full proof, so we
2207
                // can safely delete the local proof from the database.
2208
                return chanInfo.AuthProof != nil
2✔
2209

2210
        case *lnwire.ChannelUpdate1:
10✔
2211
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
10✔
2212

10✔
2213
                // If the channel cannot be found, it is most likely a leftover
10✔
2214
                // message for a channel that was closed, so we can consider it
10✔
2215
                // stale.
10✔
2216
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
10✔
UNCOV
2217
                        return true
×
UNCOV
2218
                }
×
2219
                if err != nil {
10✔
2220
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2221
                                "%v", msg.ShortChannelID, err)
×
2222
                        return false
×
2223
                }
×
2224

2225
                // Otherwise, we'll retrieve the correct policy that we
2226
                // currently have stored within our graph to check if this
2227
                // message is stale by comparing its timestamp.
2228
                var p *models.ChannelEdgePolicy
10✔
2229
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
20✔
2230
                        p = p1
10✔
2231
                } else {
10✔
UNCOV
2232
                        p = p2
×
UNCOV
2233
                }
×
2234

2235
                // If the policy is still unknown, then we can consider this
2236
                // policy fresh.
2237
                if p == nil {
10✔
2238
                        return false
×
2239
                }
×
2240

2241
                timestamp := time.Unix(int64(msg.Timestamp), 0)
10✔
2242
                return p.LastUpdate.After(timestamp)
10✔
2243

2244
        default:
×
2245
                // We'll make sure to not mark any unsupported messages as stale
×
2246
                // to ensure they are not removed.
×
2247
                return false
×
2248
        }
2249
}
2250

2251
// updateChannel creates a new fully signed update for the channel, and updates
2252
// the underlying graph with the new state.
2253
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
2254
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2255
        *lnwire.ChannelUpdate1, error) {
4✔
2256

4✔
2257
        // Parse the unsigned edge into a channel update.
4✔
2258
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
4✔
2259

4✔
2260
        // We'll generate a new signature over a digest of the channel
4✔
2261
        // announcement itself and update the timestamp to ensure it propagate.
4✔
2262
        err := netann.SignChannelUpdate(
4✔
2263
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
4✔
2264
                netann.ChanUpdSetTimestamp,
4✔
2265
        )
4✔
2266
        if err != nil {
4✔
2267
                return nil, nil, err
×
2268
        }
×
2269

2270
        // Next, we'll set the new signature in place, and update the reference
2271
        // in the backing slice.
2272
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
4✔
2273
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
4✔
2274

4✔
2275
        // To ensure that our signature is valid, we'll verify it ourself
4✔
2276
        // before committing it to the slice returned.
4✔
2277
        err = netann.ValidateChannelUpdateAnn(
4✔
2278
                d.selfKey, info.Capacity, chanUpdate,
4✔
2279
        )
4✔
2280
        if err != nil {
4✔
2281
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2282
                        "update sig: %v", err)
×
2283
        }
×
2284

2285
        // Finally, we'll write the new edge policy to disk.
2286
        if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
4✔
2287
                return nil, nil, err
×
2288
        }
×
2289

2290
        // We'll also create the original channel announcement so the two can
2291
        // be broadcast along side each other (if necessary), but only if we
2292
        // have a full channel announcement for this channel.
2293
        var chanAnn *lnwire.ChannelAnnouncement1
4✔
2294
        if info.AuthProof != nil {
7✔
2295
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
3✔
2296
                chanAnn = &lnwire.ChannelAnnouncement1{
3✔
2297
                        ShortChannelID:  chanID,
3✔
2298
                        NodeID1:         info.NodeKey1Bytes,
3✔
2299
                        NodeID2:         info.NodeKey2Bytes,
3✔
2300
                        ChainHash:       info.ChainHash,
3✔
2301
                        BitcoinKey1:     info.BitcoinKey1Bytes,
3✔
2302
                        Features:        lnwire.NewRawFeatureVector(),
3✔
2303
                        BitcoinKey2:     info.BitcoinKey2Bytes,
3✔
2304
                        ExtraOpaqueData: info.ExtraOpaqueData,
3✔
2305
                }
3✔
2306
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2307
                        info.AuthProof.NodeSig1Bytes,
3✔
2308
                )
3✔
2309
                if err != nil {
3✔
2310
                        return nil, nil, err
×
2311
                }
×
2312
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2313
                        info.AuthProof.NodeSig2Bytes,
3✔
2314
                )
3✔
2315
                if err != nil {
3✔
2316
                        return nil, nil, err
×
2317
                }
×
2318
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2319
                        info.AuthProof.BitcoinSig1Bytes,
3✔
2320
                )
3✔
2321
                if err != nil {
3✔
2322
                        return nil, nil, err
×
2323
                }
×
2324
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2325
                        info.AuthProof.BitcoinSig2Bytes,
3✔
2326
                )
3✔
2327
                if err != nil {
3✔
2328
                        return nil, nil, err
×
2329
                }
×
2330
        }
2331

2332
        return chanAnn, chanUpdate, err
4✔
2333
}
2334

2335
// SyncManager returns the gossiper's SyncManager instance.
UNCOV
2336
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
×
UNCOV
2337
        return d.syncMgr
×
UNCOV
2338
}
×
2339

2340
// IsKeepAliveUpdate determines whether this channel update is considered a
2341
// keep-alive update based on the previous channel update processed for the same
2342
// direction.
2343
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2344
        prev *models.ChannelEdgePolicy) bool {
14✔
2345

14✔
2346
        // Both updates should be from the same direction.
14✔
2347
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
14✔
2348
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
14✔
2349

×
2350
                return false
×
2351
        }
×
2352

2353
        // The timestamp should always increase for a keep-alive update.
2354
        timestamp := time.Unix(int64(update.Timestamp), 0)
14✔
2355
        if !timestamp.After(prev.LastUpdate) {
14✔
UNCOV
2356
                return false
×
UNCOV
2357
        }
×
2358

2359
        // None of the remaining fields should change for a keep-alive update.
2360
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
14✔
UNCOV
2361
                return false
×
UNCOV
2362
        }
×
2363
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
26✔
2364
                return false
12✔
2365
        }
12✔
2366
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
2✔
UNCOV
2367
                return false
×
UNCOV
2368
        }
×
2369
        if update.TimeLockDelta != prev.TimeLockDelta {
2✔
2370
                return false
×
2371
        }
×
2372
        if update.HtlcMinimumMsat != prev.MinHTLC {
2✔
2373
                return false
×
2374
        }
×
2375
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
2✔
2376
                return false
×
2377
        }
×
2378
        if update.HtlcMaximumMsat != prev.MaxHTLC {
2✔
2379
                return false
×
2380
        }
×
2381
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
2✔
UNCOV
2382
                return false
×
UNCOV
2383
        }
×
2384
        return true
2✔
2385
}
2386

2387
// latestHeight returns the gossiper's latest height known of the chain.
UNCOV
2388
func (d *AuthenticatedGossiper) latestHeight() uint32 {
×
UNCOV
2389
        d.Lock()
×
UNCOV
2390
        defer d.Unlock()
×
UNCOV
2391
        return d.bestHeight
×
UNCOV
2392
}
×
2393

2394
// handleNodeAnnouncement processes a new node announcement.
2395
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
2396
        nodeAnn *lnwire.NodeAnnouncement,
2397
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
24✔
2398

24✔
2399
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
24✔
2400

24✔
2401
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
24✔
2402
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
24✔
2403
                nMsg.source.SerializeCompressed())
24✔
2404

24✔
2405
        // We'll quickly ask the router if it already has a newer update for
24✔
2406
        // this node so we can skip validating signatures if not required.
24✔
2407
        if d.cfg.Graph.IsStaleNode(nodeAnn.NodeID, timestamp) {
32✔
2408
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
8✔
2409
                nMsg.err <- nil
8✔
2410
                return nil, true
8✔
2411
        }
8✔
2412

2413
        if err := d.addNode(nodeAnn, ops...); err != nil {
16✔
UNCOV
2414
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
×
UNCOV
2415
                        err)
×
UNCOV
2416

×
UNCOV
2417
                if !graph.IsError(
×
UNCOV
2418
                        err,
×
UNCOV
2419
                        graph.ErrOutdated,
×
UNCOV
2420
                        graph.ErrIgnored,
×
UNCOV
2421
                ) {
×
2422

×
2423
                        log.Error(err)
×
2424
                }
×
2425

UNCOV
2426
                nMsg.err <- err
×
UNCOV
2427
                return nil, false
×
2428
        }
2429

2430
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2431
        // quick check to ensure this node intends to publicly advertise itself
2432
        // to the network.
2433
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
16✔
2434
        if err != nil {
16✔
2435
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2436
                        nodeAnn.NodeID, err)
×
2437
                nMsg.err <- err
×
2438
                return nil, false
×
2439
        }
×
2440

2441
        var announcements []networkMsg
16✔
2442

16✔
2443
        // If it does, we'll add their announcement to our batch so that it can
16✔
2444
        // be broadcast to the rest of our peers.
16✔
2445
        if isPublic {
19✔
2446
                announcements = append(announcements, networkMsg{
3✔
2447
                        peer:     nMsg.peer,
3✔
2448
                        isRemote: nMsg.isRemote,
3✔
2449
                        source:   nMsg.source,
3✔
2450
                        msg:      nodeAnn,
3✔
2451
                })
3✔
2452
        } else {
16✔
2453
                log.Tracef("Skipping broadcasting node announcement for %x "+
13✔
2454
                        "due to being unadvertised", nodeAnn.NodeID)
13✔
2455
        }
13✔
2456

2457
        nMsg.err <- nil
16✔
2458
        // TODO(roasbeef): get rid of the above
16✔
2459

16✔
2460
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
16✔
2461
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
16✔
2462
                nMsg.source.SerializeCompressed())
16✔
2463

16✔
2464
        return announcements, true
16✔
2465
}
2466

2467
// handleChanAnnouncement processes a new channel announcement.
2468
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
2469
        ann *lnwire.ChannelAnnouncement1,
2470
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
230✔
2471

230✔
2472
        scid := ann.ShortChannelID
230✔
2473

230✔
2474
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
230✔
2475
                nMsg.peer, scid.ToUint64())
230✔
2476

230✔
2477
        // We'll ignore any channel announcements that target any chain other
230✔
2478
        // than the set of chains we know of.
230✔
2479
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
230✔
2480
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2481
                        ", gossiper on chain=%v", ann.ChainHash,
×
2482
                        d.cfg.ChainHash)
×
2483
                log.Errorf(err.Error())
×
2484

×
2485
                key := newRejectCacheKey(
×
2486
                        scid.ToUint64(),
×
2487
                        sourceToPub(nMsg.source),
×
2488
                )
×
2489
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2490

×
2491
                nMsg.err <- err
×
2492
                return nil, false
×
2493
        }
×
2494

2495
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2496
        // reject the announcement. Since the router accepts alias SCIDs,
2497
        // not erroring out would be a DoS vector.
2498
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
230✔
2499
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2500
                log.Errorf(err.Error())
×
2501

×
2502
                key := newRejectCacheKey(
×
2503
                        scid.ToUint64(),
×
2504
                        sourceToPub(nMsg.source),
×
2505
                )
×
2506
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2507

×
2508
                nMsg.err <- err
×
2509
                return nil, false
×
2510
        }
×
2511

2512
        // If the advertised inclusionary block is beyond our knowledge of the
2513
        // chain tip, then we'll ignore it for now.
2514
        d.Lock()
230✔
2515
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
231✔
2516
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
1✔
2517
                        "advertises height %v, only height %v is known",
1✔
2518
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
1✔
2519
                d.Unlock()
1✔
2520
                nMsg.err <- nil
1✔
2521
                return nil, false
1✔
2522
        }
1✔
2523
        d.Unlock()
229✔
2524

229✔
2525
        // At this point, we'll now ask the router if this is a zombie/known
229✔
2526
        // edge. If so we can skip all the processing below.
229✔
2527
        if d.cfg.Graph.IsKnownEdge(scid) {
230✔
2528
                nMsg.err <- nil
1✔
2529
                return nil, true
1✔
2530
        }
1✔
2531

2532
        // Check if the channel is already closed in which case we can ignore
2533
        // it.
2534
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
228✔
2535
        if err != nil {
228✔
2536
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2537
                        err)
×
2538
                nMsg.err <- err
×
2539

×
2540
                return nil, false
×
2541
        }
×
2542

2543
        if closed {
229✔
2544
                err = fmt.Errorf("ignoring closed channel %v", scid)
1✔
2545
                log.Error(err)
1✔
2546

1✔
2547
                // If this is an announcement from us, we'll just ignore it.
1✔
2548
                if !nMsg.isRemote {
1✔
2549
                        nMsg.err <- err
×
2550
                        return nil, false
×
2551
                }
×
2552

2553
                // Increment the peer's ban score if they are sending closed
2554
                // channel announcements.
2555
                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2556

1✔
2557
                // If the peer is banned and not a channel peer, we'll
1✔
2558
                // disconnect them.
1✔
2559
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2560
                if dcErr != nil {
1✔
2561
                        log.Errorf("failed to check if we should disconnect "+
×
2562
                                "peer: %v", dcErr)
×
2563
                        nMsg.err <- dcErr
×
2564

×
2565
                        return nil, false
×
2566
                }
×
2567

2568
                if shouldDc {
1✔
2569
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2570
                }
×
2571

2572
                nMsg.err <- err
1✔
2573

1✔
2574
                return nil, false
1✔
2575
        }
2576

2577
        // If this is a remote channel announcement, then we'll validate all
2578
        // the signatures within the proof as it should be well formed.
2579
        var proof *models.ChannelAuthProof
227✔
2580
        if nMsg.isRemote {
440✔
2581
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
213✔
2582
                if err != nil {
213✔
2583
                        err := fmt.Errorf("unable to validate announcement: "+
×
2584
                                "%v", err)
×
2585

×
2586
                        key := newRejectCacheKey(
×
2587
                                scid.ToUint64(),
×
2588
                                sourceToPub(nMsg.source),
×
2589
                        )
×
2590
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2591

×
2592
                        log.Error(err)
×
2593
                        nMsg.err <- err
×
2594
                        return nil, false
×
2595
                }
×
2596

2597
                // If the proof checks out, then we'll save the proof itself to
2598
                // the database so we can fetch it later when gossiping with
2599
                // other nodes.
2600
                proof = &models.ChannelAuthProof{
213✔
2601
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
213✔
2602
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
213✔
2603
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
213✔
2604
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
213✔
2605
                }
213✔
2606
        }
2607

2608
        // With the proof validated (if necessary), we can now store it within
2609
        // the database for our path finding and syncing needs.
2610
        var featureBuf bytes.Buffer
227✔
2611
        if err := ann.Features.Encode(&featureBuf); err != nil {
227✔
2612
                log.Errorf("unable to encode features: %v", err)
×
2613
                nMsg.err <- err
×
2614
                return nil, false
×
2615
        }
×
2616

2617
        edge := &models.ChannelEdgeInfo{
227✔
2618
                ChannelID:        scid.ToUint64(),
227✔
2619
                ChainHash:        ann.ChainHash,
227✔
2620
                NodeKey1Bytes:    ann.NodeID1,
227✔
2621
                NodeKey2Bytes:    ann.NodeID2,
227✔
2622
                BitcoinKey1Bytes: ann.BitcoinKey1,
227✔
2623
                BitcoinKey2Bytes: ann.BitcoinKey2,
227✔
2624
                AuthProof:        proof,
227✔
2625
                Features:         featureBuf.Bytes(),
227✔
2626
                ExtraOpaqueData:  ann.ExtraOpaqueData,
227✔
2627
        }
227✔
2628

227✔
2629
        // If there were any optional message fields provided, we'll include
227✔
2630
        // them in its serialized disk representation now.
227✔
2631
        if nMsg.optionalMsgFields != nil {
241✔
2632
                if nMsg.optionalMsgFields.capacity != nil {
15✔
2633
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
1✔
2634
                }
1✔
2635
                if nMsg.optionalMsgFields.channelPoint != nil {
18✔
2636
                        cp := *nMsg.optionalMsgFields.channelPoint
4✔
2637
                        edge.ChannelPoint = cp
4✔
2638
                }
4✔
2639

2640
                // Optional tapscript root for custom channels.
2641
                edge.TapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
14✔
2642
        }
2643

2644
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
227✔
2645

227✔
2646
        // We will add the edge to the channel router. If the nodes present in
227✔
2647
        // this channel are not present in the database, a partial node will be
227✔
2648
        // added to represent each node while we wait for a node announcement.
227✔
2649
        //
227✔
2650
        // Before we add the edge to the database, we obtain the mutex for this
227✔
2651
        // channel ID. We do this to ensure no other goroutine has read the
227✔
2652
        // database and is now making decisions based on this DB state, before
227✔
2653
        // it writes to the DB.
227✔
2654
        d.channelMtx.Lock(scid.ToUint64())
227✔
2655
        err = d.cfg.Graph.AddEdge(edge, ops...)
227✔
2656
        if err != nil {
429✔
2657
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
202✔
2658
                        scid.ToUint64(), err)
202✔
2659

202✔
2660
                defer d.channelMtx.Unlock(scid.ToUint64())
202✔
2661

202✔
2662
                // If the edge was rejected due to already being known, then it
202✔
2663
                // may be the case that this new message has a fresh channel
202✔
2664
                // proof, so we'll check.
202✔
2665
                switch {
202✔
UNCOV
2666
                case graph.IsError(err, graph.ErrIgnored):
×
UNCOV
2667
                        // Attempt to process the rejected message to see if we
×
UNCOV
2668
                        // get any new announcements.
×
UNCOV
2669
                        anns, rErr := d.processRejectedEdge(ann, proof)
×
UNCOV
2670
                        if rErr != nil {
×
2671
                                key := newRejectCacheKey(
×
2672
                                        scid.ToUint64(),
×
2673
                                        sourceToPub(nMsg.source),
×
2674
                                )
×
2675
                                cr := &cachedReject{}
×
2676
                                _, _ = d.recentRejects.Put(key, cr)
×
2677

×
2678
                                nMsg.err <- rErr
×
2679
                                return nil, false
×
2680
                        }
×
2681

UNCOV
2682
                        log.Debugf("Extracted %v announcements from rejected "+
×
UNCOV
2683
                                "msgs", len(anns))
×
UNCOV
2684

×
UNCOV
2685
                        // If while processing this rejected edge, we realized
×
UNCOV
2686
                        // there's a set of announcements we could extract,
×
UNCOV
2687
                        // then we'll return those directly.
×
UNCOV
2688
                        //
×
UNCOV
2689
                        // NOTE: since this is an ErrIgnored, we can return
×
UNCOV
2690
                        // true here to signal "allow" to its dependants.
×
UNCOV
2691
                        nMsg.err <- nil
×
UNCOV
2692

×
UNCOV
2693
                        return anns, true
×
2694

2695
                case graph.IsError(
2696
                        err, graph.ErrNoFundingTransaction,
2697
                        graph.ErrInvalidFundingOutput,
2698
                ):
200✔
2699
                        key := newRejectCacheKey(
200✔
2700
                                scid.ToUint64(),
200✔
2701
                                sourceToPub(nMsg.source),
200✔
2702
                        )
200✔
2703
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
200✔
2704

200✔
2705
                        // Increment the peer's ban score. We check isRemote
200✔
2706
                        // so we don't actually ban the peer in case of a local
200✔
2707
                        // bug.
200✔
2708
                        if nMsg.isRemote {
400✔
2709
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
200✔
2710
                        }
200✔
2711

2712
                case graph.IsError(err, graph.ErrChannelSpent):
1✔
2713
                        key := newRejectCacheKey(
1✔
2714
                                scid.ToUint64(),
1✔
2715
                                sourceToPub(nMsg.source),
1✔
2716
                        )
1✔
2717
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2718

1✔
2719
                        // Since this channel has already been closed, we'll
1✔
2720
                        // add it to the graph's closed channel index such that
1✔
2721
                        // we won't attempt to do expensive validation checks
1✔
2722
                        // on it again.
1✔
2723
                        // TODO: Populate the ScidCloser by using closed
1✔
2724
                        // channel notifications.
1✔
2725
                        dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
1✔
2726
                        if dbErr != nil {
1✔
2727
                                log.Errorf("failed to mark scid(%v) as "+
×
2728
                                        "closed: %v", scid, dbErr)
×
2729

×
2730
                                nMsg.err <- dbErr
×
2731

×
2732
                                return nil, false
×
2733
                        }
×
2734

2735
                        // Increment the peer's ban score. We check isRemote
2736
                        // so we don't accidentally ban ourselves in case of a
2737
                        // bug.
2738
                        if nMsg.isRemote {
2✔
2739
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2740
                        }
1✔
2741

2742
                default:
1✔
2743
                        // Otherwise, this is just a regular rejected edge.
1✔
2744
                        key := newRejectCacheKey(
1✔
2745
                                scid.ToUint64(),
1✔
2746
                                sourceToPub(nMsg.source),
1✔
2747
                        )
1✔
2748
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2749
                }
2750

2751
                if !nMsg.isRemote {
202✔
2752
                        log.Errorf("failed to add edge for local channel: %v",
×
2753
                                err)
×
2754
                        nMsg.err <- err
×
2755

×
2756
                        return nil, false
×
2757
                }
×
2758

2759
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
202✔
2760
                if dcErr != nil {
202✔
2761
                        log.Errorf("failed to check if we should disconnect "+
×
2762
                                "peer: %v", dcErr)
×
2763
                        nMsg.err <- dcErr
×
2764

×
2765
                        return nil, false
×
2766
                }
×
2767

2768
                if shouldDc {
203✔
2769
                        nMsg.peer.Disconnect(ErrPeerBanned)
1✔
2770
                }
1✔
2771

2772
                nMsg.err <- err
202✔
2773

202✔
2774
                return nil, false
202✔
2775
        }
2776

2777
        // If err is nil, release the lock immediately.
2778
        d.channelMtx.Unlock(scid.ToUint64())
25✔
2779

25✔
2780
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
25✔
2781

25✔
2782
        // If we earlier received any ChannelUpdates for this channel, we can
25✔
2783
        // now process them, as the channel is added to the graph.
25✔
2784
        var channelUpdates []*processedNetworkMsg
25✔
2785

25✔
2786
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
25✔
2787
        if err == nil {
27✔
2788
                // There was actually an entry in the map, so we'll accumulate
2✔
2789
                // it. We don't worry about deletion, since it'll eventually
2✔
2790
                // fall out anyway.
2✔
2791
                chanMsgs := earlyChanUpdates
2✔
2792
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
2✔
2793
        }
2✔
2794

2795
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2796
        // ensure we don't block here, as we can handle only one announcement
2797
        // at a time.
2798
        for _, cu := range channelUpdates {
27✔
2799
                // Skip if already processed.
2✔
2800
                if cu.processed {
2✔
UNCOV
2801
                        continue
×
2802
                }
2803

2804
                // Mark the ChannelUpdate as processed. This ensures that a
2805
                // subsequent announcement in the option-scid-alias case does
2806
                // not re-use an old ChannelUpdate.
2807
                cu.processed = true
2✔
2808

2✔
2809
                d.wg.Add(1)
2✔
2810
                go func(updMsg *networkMsg) {
4✔
2811
                        defer d.wg.Done()
2✔
2812

2✔
2813
                        switch msg := updMsg.msg.(type) {
2✔
2814
                        // Reprocess the message, making sure we return an
2815
                        // error to the original caller in case the gossiper
2816
                        // shuts down.
2817
                        case *lnwire.ChannelUpdate1:
2✔
2818
                                log.Debugf("Reprocessing ChannelUpdate for "+
2✔
2819
                                        "shortChanID=%v", scid.ToUint64())
2✔
2820

2✔
2821
                                select {
2✔
2822
                                case d.networkMsgs <- updMsg:
2✔
2823
                                case <-d.quit:
×
2824
                                        updMsg.err <- ErrGossiperShuttingDown
×
2825
                                }
2826

2827
                        // We don't expect any other message type than
2828
                        // ChannelUpdate to be in this cache.
2829
                        default:
×
2830
                                log.Errorf("Unsupported message type found "+
×
2831
                                        "among ChannelUpdates: %T", msg)
×
2832
                        }
2833
                }(cu.msg)
2834
        }
2835

2836
        // Channel announcement was successfully processed and now it might be
2837
        // broadcast to other connected nodes if it was an announcement with
2838
        // proof (remote).
2839
        var announcements []networkMsg
25✔
2840

25✔
2841
        if proof != nil {
36✔
2842
                announcements = append(announcements, networkMsg{
11✔
2843
                        peer:     nMsg.peer,
11✔
2844
                        isRemote: nMsg.isRemote,
11✔
2845
                        source:   nMsg.source,
11✔
2846
                        msg:      ann,
11✔
2847
                })
11✔
2848
        }
11✔
2849

2850
        nMsg.err <- nil
25✔
2851

25✔
2852
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
25✔
2853
                nMsg.peer, scid.ToUint64())
25✔
2854

25✔
2855
        return announcements, true
25✔
2856
}
2857

2858
// handleChanUpdate processes a new channel update.
2859
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
2860
        upd *lnwire.ChannelUpdate1,
2861
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
55✔
2862

55✔
2863
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
55✔
2864
                nMsg.peer, upd.ShortChannelID.ToUint64())
55✔
2865

55✔
2866
        // We'll ignore any channel updates that target any chain other than
55✔
2867
        // the set of chains we know of.
55✔
2868
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
55✔
2869
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2870
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
×
2871
                log.Errorf(err.Error())
×
2872

×
2873
                key := newRejectCacheKey(
×
2874
                        upd.ShortChannelID.ToUint64(),
×
2875
                        sourceToPub(nMsg.source),
×
2876
                )
×
2877
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2878

×
2879
                nMsg.err <- err
×
2880
                return nil, false
×
2881
        }
×
2882

2883
        blockHeight := upd.ShortChannelID.BlockHeight
55✔
2884
        shortChanID := upd.ShortChannelID.ToUint64()
55✔
2885

55✔
2886
        // If the advertised inclusionary block is beyond our knowledge of the
55✔
2887
        // chain tip, then we'll put the announcement in limbo to be fully
55✔
2888
        // verified once we advance forward in the chain. If the update has an
55✔
2889
        // alias SCID, we'll skip the isPremature check. This is necessary
55✔
2890
        // since aliases start at block height 16_000_000.
55✔
2891
        d.Lock()
55✔
2892
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
55✔
2893
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
55✔
UNCOV
2894

×
UNCOV
2895
                log.Warnf("Update announcement for short_chan_id(%v), is "+
×
UNCOV
2896
                        "premature: advertises height %v, only height %v is "+
×
UNCOV
2897
                        "known", shortChanID, blockHeight, d.bestHeight)
×
UNCOV
2898
                d.Unlock()
×
UNCOV
2899
                nMsg.err <- nil
×
UNCOV
2900
                return nil, false
×
UNCOV
2901
        }
×
2902
        d.Unlock()
55✔
2903

55✔
2904
        // Before we perform any of the expensive checks below, we'll check
55✔
2905
        // whether this update is stale or is for a zombie channel in order to
55✔
2906
        // quickly reject it.
55✔
2907
        timestamp := time.Unix(int64(upd.Timestamp), 0)
55✔
2908

55✔
2909
        // Fetch the SCID we should be using to lock the channelMtx and make
55✔
2910
        // graph queries with.
55✔
2911
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
55✔
2912
        if err != nil {
110✔
2913
                // Fallback and set the graphScid to the peer-provided SCID.
55✔
2914
                // This will occur for non-option-scid-alias channels and for
55✔
2915
                // public option-scid-alias channels after 6 confirmations.
55✔
2916
                // Once public option-scid-alias channels have 6 confs, we'll
55✔
2917
                // ignore ChannelUpdates with one of their aliases.
55✔
2918
                graphScid = upd.ShortChannelID
55✔
2919
        }
55✔
2920

2921
        if d.cfg.Graph.IsStaleEdgePolicy(
55✔
2922
                graphScid, timestamp, upd.ChannelFlags,
55✔
2923
        ) {
57✔
2924

2✔
2925
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
2✔
2926
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
2✔
2927
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
2✔
2928
                )
2✔
2929

2✔
2930
                nMsg.err <- nil
2✔
2931
                return nil, true
2✔
2932
        }
2✔
2933

2934
        // Check that the ChanUpdate is not too far into the future, this could
2935
        // reveal some faulty implementation therefore we log an error.
2936
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
53✔
2937
                log.Errorf("Skewed timestamp (%v) for edge policy of "+
×
2938
                        "short_chan_id(%v), timestamp too far in the future: "+
×
2939
                        "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
×
2940
                        shortChanID, nMsg.peer, nMsg.msg.MsgType(),
×
2941
                        nMsg.isRemote,
×
2942
                )
×
2943

×
2944
                nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
×
2945
                        "timestamp too far in the future: %v", timestamp.Unix())
×
2946

×
2947
                return nil, false
×
2948
        }
×
2949

2950
        // Get the node pub key as far since we don't have it in the channel
2951
        // update announcement message. We'll need this to properly verify the
2952
        // message's signature.
2953
        //
2954
        // We make sure to obtain the mutex for this channel ID before we
2955
        // access the database. This ensures the state we read from the
2956
        // database has not changed between this point and when we call
2957
        // UpdateEdge() later.
2958
        d.channelMtx.Lock(graphScid.ToUint64())
53✔
2959
        defer d.channelMtx.Unlock(graphScid.ToUint64())
53✔
2960

53✔
2961
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
53✔
2962
        switch {
53✔
2963
        // No error, break.
2964
        case err == nil:
49✔
2965
                break
49✔
2966

2967
        case errors.Is(err, graphdb.ErrZombieEdge):
3✔
2968
                err = d.processZombieUpdate(chanInfo, graphScid, upd)
3✔
2969
                if err != nil {
5✔
2970
                        log.Debug(err)
2✔
2971
                        nMsg.err <- err
2✔
2972
                        return nil, false
2✔
2973
                }
2✔
2974

2975
                // We'll fallthrough to ensure we stash the update until we
2976
                // receive its corresponding ChannelAnnouncement. This is
2977
                // needed to ensure the edge exists in the graph before
2978
                // applying the update.
2979
                fallthrough
1✔
2980
        case errors.Is(err, graphdb.ErrGraphNotFound):
1✔
2981
                fallthrough
1✔
2982
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
1✔
2983
                fallthrough
1✔
2984
        case errors.Is(err, graphdb.ErrEdgeNotFound):
2✔
2985
                // If the edge corresponding to this ChannelUpdate was not
2✔
2986
                // found in the graph, this might be a channel in the process
2✔
2987
                // of being opened, and we haven't processed our own
2✔
2988
                // ChannelAnnouncement yet, hence it is not not found in the
2✔
2989
                // graph. This usually gets resolved after the channel proofs
2✔
2990
                // are exchanged and the channel is broadcasted to the rest of
2✔
2991
                // the network, but in case this is a private channel this
2✔
2992
                // won't ever happen. This can also happen in the case of a
2✔
2993
                // zombie channel with a fresh update for which we don't have a
2✔
2994
                // ChannelAnnouncement for since we reject them. Because of
2✔
2995
                // this, we temporarily add it to a map, and reprocess it after
2✔
2996
                // our own ChannelAnnouncement has been processed.
2✔
2997
                //
2✔
2998
                // The shortChanID may be an alias, but it is fine to use here
2✔
2999
                // since we don't have an edge in the graph and if the peer is
2✔
3000
                // not buggy, we should be able to use it once the gossiper
2✔
3001
                // receives the local announcement.
2✔
3002
                pMsg := &processedNetworkMsg{msg: nMsg}
2✔
3003

2✔
3004
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
2✔
3005
                switch {
2✔
3006
                // Nothing in the cache yet, we can just directly insert this
3007
                // element.
3008
                case err == cache.ErrElementNotFound:
2✔
3009
                        _, _ = d.prematureChannelUpdates.Put(
2✔
3010
                                shortChanID, &cachedNetworkMsg{
2✔
3011
                                        msgs: []*processedNetworkMsg{pMsg},
2✔
3012
                                })
2✔
3013

3014
                // There's already something in the cache, so we'll combine the
3015
                // set of messages into a single value.
UNCOV
3016
                default:
×
UNCOV
3017
                        msgs := earlyMsgs.msgs
×
UNCOV
3018
                        msgs = append(msgs, pMsg)
×
UNCOV
3019
                        _, _ = d.prematureChannelUpdates.Put(
×
UNCOV
3020
                                shortChanID, &cachedNetworkMsg{
×
UNCOV
3021
                                        msgs: msgs,
×
UNCOV
3022
                                })
×
3023
                }
3024

3025
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
2✔
3026
                        "(shortChanID=%v), saving for reprocessing later",
2✔
3027
                        shortChanID)
2✔
3028

2✔
3029
                // NOTE: We don't return anything on the error channel for this
2✔
3030
                // message, as we expect that will be done when this
2✔
3031
                // ChannelUpdate is later reprocessed.
2✔
3032
                return nil, false
2✔
3033

3034
        default:
×
3035
                err := fmt.Errorf("unable to validate channel update "+
×
3036
                        "short_chan_id=%v: %v", shortChanID, err)
×
3037
                log.Error(err)
×
3038
                nMsg.err <- err
×
3039

×
3040
                key := newRejectCacheKey(
×
3041
                        upd.ShortChannelID.ToUint64(),
×
3042
                        sourceToPub(nMsg.source),
×
3043
                )
×
3044
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3045

×
3046
                return nil, false
×
3047
        }
3048

3049
        // The least-significant bit in the flag on the channel update
3050
        // announcement tells us "which" side of the channels directed edge is
3051
        // being updated.
3052
        var (
49✔
3053
                pubKey       *btcec.PublicKey
49✔
3054
                edgeToUpdate *models.ChannelEdgePolicy
49✔
3055
        )
49✔
3056
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
49✔
3057
        switch direction {
49✔
3058
        case 0:
34✔
3059
                pubKey, _ = chanInfo.NodeKey1()
34✔
3060
                edgeToUpdate = e1
34✔
3061
        case 1:
15✔
3062
                pubKey, _ = chanInfo.NodeKey2()
15✔
3063
                edgeToUpdate = e2
15✔
3064
        }
3065

3066
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
49✔
3067
                "edge policy=%v", chanInfo.ChannelID,
49✔
3068
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
49✔
3069

49✔
3070
        // Validate the channel announcement with the expected public key and
49✔
3071
        // channel capacity. In the case of an invalid channel update, we'll
49✔
3072
        // return an error to the caller and exit early.
49✔
3073
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
49✔
3074
        if err != nil {
53✔
3075
                rErr := fmt.Errorf("unable to validate channel update "+
4✔
3076
                        "announcement for short_chan_id=%v: %v",
4✔
3077
                        spew.Sdump(upd.ShortChannelID), err)
4✔
3078

4✔
3079
                log.Error(rErr)
4✔
3080
                nMsg.err <- rErr
4✔
3081
                return nil, false
4✔
3082
        }
4✔
3083

3084
        // If we have a previous version of the edge being updated, we'll want
3085
        // to rate limit its updates to prevent spam throughout the network.
3086
        if nMsg.isRemote && edgeToUpdate != nil {
59✔
3087
                // If it's a keep-alive update, we'll only propagate one if
14✔
3088
                // it's been a day since the previous. This follows our own
14✔
3089
                // heuristic of sending keep-alive updates after the same
14✔
3090
                // duration (see retransmitStaleAnns).
14✔
3091
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
14✔
3092
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
16✔
3093
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
3✔
3094
                                log.Debugf("Ignoring keep alive update not "+
1✔
3095
                                        "within %v period for channel %v",
1✔
3096
                                        d.cfg.RebroadcastInterval, shortChanID)
1✔
3097
                                nMsg.err <- nil
1✔
3098
                                return nil, false
1✔
3099
                        }
1✔
3100
                } else {
12✔
3101
                        // If it's not, we'll allow an update per minute with a
12✔
3102
                        // maximum burst of 10. If we haven't seen an update
12✔
3103
                        // for this channel before, we'll need to initialize a
12✔
3104
                        // rate limiter for each direction.
12✔
3105
                        //
12✔
3106
                        // Since the edge exists in the graph, we'll create a
12✔
3107
                        // rate limiter for chanInfo.ChannelID rather then the
12✔
3108
                        // SCID the peer sent. This is because there may be
12✔
3109
                        // multiple aliases for a channel and we may otherwise
12✔
3110
                        // rate-limit only a single alias of the channel,
12✔
3111
                        // instead of the whole channel.
12✔
3112
                        baseScid := chanInfo.ChannelID
12✔
3113
                        d.Lock()
12✔
3114
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
12✔
3115
                        if !ok {
13✔
3116
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
1✔
3117
                                b := d.cfg.MaxChannelUpdateBurst
1✔
3118
                                rls = [2]*rate.Limiter{
1✔
3119
                                        rate.NewLimiter(r, b),
1✔
3120
                                        rate.NewLimiter(r, b),
1✔
3121
                                }
1✔
3122
                                d.chanUpdateRateLimiter[baseScid] = rls
1✔
3123
                        }
1✔
3124
                        d.Unlock()
12✔
3125

12✔
3126
                        if !rls[direction].Allow() {
17✔
3127
                                log.Debugf("Rate limiting update for channel "+
5✔
3128
                                        "%v from direction %x", shortChanID,
5✔
3129
                                        pubKey.SerializeCompressed())
5✔
3130
                                nMsg.err <- nil
5✔
3131
                                return nil, false
5✔
3132
                        }
5✔
3133
                }
3134
        }
3135

3136
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3137
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3138
        // lookup the stored SCID. If we're sending the update, we'll always
3139
        // use the SCID stored in the database rather than a potentially
3140
        // different alias. This might mean that SigBytes is incorrect as it
3141
        // signs a different SCID than the database SCID, but since there will
3142
        // only be a difference if AuthProof == nil, this is fine.
3143
        update := &models.ChannelEdgePolicy{
39✔
3144
                SigBytes:                  upd.Signature.ToSignatureBytes(),
39✔
3145
                ChannelID:                 chanInfo.ChannelID,
39✔
3146
                LastUpdate:                timestamp,
39✔
3147
                MessageFlags:              upd.MessageFlags,
39✔
3148
                ChannelFlags:              upd.ChannelFlags,
39✔
3149
                TimeLockDelta:             upd.TimeLockDelta,
39✔
3150
                MinHTLC:                   upd.HtlcMinimumMsat,
39✔
3151
                MaxHTLC:                   upd.HtlcMaximumMsat,
39✔
3152
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
39✔
3153
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
39✔
3154
                ExtraOpaqueData:           upd.ExtraOpaqueData,
39✔
3155
        }
39✔
3156

39✔
3157
        if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
39✔
UNCOV
3158
                if graph.IsError(
×
UNCOV
3159
                        err, graph.ErrOutdated,
×
UNCOV
3160
                        graph.ErrIgnored,
×
UNCOV
3161
                ) {
×
UNCOV
3162

×
UNCOV
3163
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
×
UNCOV
3164
                                shortChanID, err)
×
UNCOV
3165
                } else {
×
3166
                        // Since we know the stored SCID in the graph, we'll
×
3167
                        // cache that SCID.
×
3168
                        key := newRejectCacheKey(
×
3169
                                chanInfo.ChannelID,
×
3170
                                sourceToPub(nMsg.source),
×
3171
                        )
×
3172
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3173

×
3174
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3175
                                shortChanID, err)
×
3176
                }
×
3177

UNCOV
3178
                nMsg.err <- err
×
UNCOV
3179
                return nil, false
×
3180
        }
3181

3182
        // If this is a local ChannelUpdate without an AuthProof, it means it
3183
        // is an update to a channel that is not (yet) supposed to be announced
3184
        // to the greater network. However, our channel counter party will need
3185
        // to be given the update, so we'll try sending the update directly to
3186
        // the remote peer.
3187
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
50✔
3188
                if nMsg.optionalMsgFields != nil {
22✔
3189
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
11✔
3190
                        if remoteAlias != nil {
11✔
UNCOV
3191
                                // The remoteAlias field was specified, meaning
×
UNCOV
3192
                                // that we should replace the SCID in the
×
UNCOV
3193
                                // update with the remote's alias. We'll also
×
UNCOV
3194
                                // need to re-sign the channel update. This is
×
UNCOV
3195
                                // required for option-scid-alias feature-bit
×
UNCOV
3196
                                // negotiated channels.
×
UNCOV
3197
                                upd.ShortChannelID = *remoteAlias
×
UNCOV
3198

×
UNCOV
3199
                                sig, err := d.cfg.SignAliasUpdate(upd)
×
UNCOV
3200
                                if err != nil {
×
3201
                                        log.Error(err)
×
3202
                                        nMsg.err <- err
×
3203
                                        return nil, false
×
3204
                                }
×
3205

UNCOV
3206
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
UNCOV
3207
                                if err != nil {
×
3208
                                        log.Error(err)
×
3209
                                        nMsg.err <- err
×
3210
                                        return nil, false
×
3211
                                }
×
3212

UNCOV
3213
                                upd.Signature = lnSig
×
3214
                        }
3215
                }
3216

3217
                // Get our peer's public key.
3218
                remotePubKey := remotePubFromChanInfo(
11✔
3219
                        chanInfo, upd.ChannelFlags,
11✔
3220
                )
11✔
3221

11✔
3222
                log.Debugf("The message %v has no AuthProof, sending the "+
11✔
3223
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
11✔
3224

11✔
3225
                // Now we'll attempt to send the channel update message
11✔
3226
                // reliably to the remote peer in the background, so that we
11✔
3227
                // don't block if the peer happens to be offline at the moment.
11✔
3228
                err := d.reliableSender.sendMessage(upd, remotePubKey)
11✔
3229
                if err != nil {
11✔
3230
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3231
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3232
                                upd.ShortChannelID, remotePubKey, err)
×
3233
                        nMsg.err <- err
×
3234
                        return nil, false
×
3235
                }
×
3236
        }
3237

3238
        // Channel update announcement was successfully processed and now it
3239
        // can be broadcast to the rest of the network. However, we'll only
3240
        // broadcast the channel update announcement if it has an attached
3241
        // authentication proof. We also won't broadcast the update if it
3242
        // contains an alias because the network would reject this.
3243
        var announcements []networkMsg
39✔
3244
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
58✔
3245
                announcements = append(announcements, networkMsg{
19✔
3246
                        peer:     nMsg.peer,
19✔
3247
                        source:   nMsg.source,
19✔
3248
                        isRemote: nMsg.isRemote,
19✔
3249
                        msg:      upd,
19✔
3250
                })
19✔
3251
        }
19✔
3252

3253
        nMsg.err <- nil
39✔
3254

39✔
3255
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
39✔
3256
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
39✔
3257
                timestamp)
39✔
3258
        return announcements, true
39✔
3259
}
3260

3261
// handleAnnSig processes a new announcement signatures message.
3262
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
3263
        ann *lnwire.AnnounceSignatures1) ([]networkMsg, bool) {
21✔
3264

21✔
3265
        needBlockHeight := ann.ShortChannelID.BlockHeight +
21✔
3266
                d.cfg.ProofMatureDelta
21✔
3267
        shortChanID := ann.ShortChannelID.ToUint64()
21✔
3268

21✔
3269
        prefix := "local"
21✔
3270
        if nMsg.isRemote {
32✔
3271
                prefix = "remote"
11✔
3272
        }
11✔
3273

3274
        log.Infof("Received new %v announcement signature for %v", prefix,
21✔
3275
                ann.ShortChannelID)
21✔
3276

21✔
3277
        // By the specification, channel announcement proofs should be sent
21✔
3278
        // after some number of confirmations after channel was registered in
21✔
3279
        // bitcoin blockchain. Therefore, we check if the proof is mature.
21✔
3280
        d.Lock()
21✔
3281
        premature := d.isPremature(
21✔
3282
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
21✔
3283
        )
21✔
3284
        if premature {
21✔
UNCOV
3285
                log.Warnf("Premature proof announcement, current block height"+
×
UNCOV
3286
                        "lower than needed: %v < %v", d.bestHeight,
×
UNCOV
3287
                        needBlockHeight)
×
UNCOV
3288
                d.Unlock()
×
UNCOV
3289
                nMsg.err <- nil
×
UNCOV
3290
                return nil, false
×
UNCOV
3291
        }
×
3292
        d.Unlock()
21✔
3293

21✔
3294
        // Ensure that we know of a channel with the target channel ID before
21✔
3295
        // proceeding further.
21✔
3296
        //
21✔
3297
        // We must acquire the mutex for this channel ID before getting the
21✔
3298
        // channel from the database, to ensure what we read does not change
21✔
3299
        // before we call AddProof() later.
21✔
3300
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
21✔
3301
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
21✔
3302

21✔
3303
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
21✔
3304
                ann.ShortChannelID,
21✔
3305
        )
21✔
3306
        if err != nil {
22✔
3307
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
1✔
3308
                if err != nil {
1✔
UNCOV
3309
                        err := fmt.Errorf("unable to store the proof for "+
×
UNCOV
3310
                                "short_chan_id=%v: %v", shortChanID, err)
×
UNCOV
3311
                        log.Error(err)
×
UNCOV
3312
                        nMsg.err <- err
×
UNCOV
3313

×
UNCOV
3314
                        return nil, false
×
UNCOV
3315
                }
×
3316

3317
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
1✔
3318
                err := d.cfg.WaitingProofStore.Add(proof)
1✔
3319
                if err != nil {
1✔
3320
                        err := fmt.Errorf("unable to store the proof for "+
×
3321
                                "short_chan_id=%v: %v", shortChanID, err)
×
3322
                        log.Error(err)
×
3323
                        nMsg.err <- err
×
3324
                        return nil, false
×
3325
                }
×
3326

3327
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
1✔
3328
                        ", adding to waiting batch", prefix, shortChanID)
1✔
3329
                nMsg.err <- nil
1✔
3330
                return nil, false
1✔
3331
        }
3332

3333
        nodeID := nMsg.source.SerializeCompressed()
20✔
3334
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
20✔
3335
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
20✔
3336

20✔
3337
        // Ensure that channel that was retrieved belongs to the peer which
20✔
3338
        // sent the proof announcement.
20✔
3339
        if !(isFirstNode || isSecondNode) {
20✔
3340
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3341
                        "to the peer which sent the proof, short_chan_id=%v",
×
3342
                        shortChanID)
×
3343
                log.Error(err)
×
3344
                nMsg.err <- err
×
3345
                return nil, false
×
3346
        }
×
3347

3348
        // If proof was sent by a local sub-system, then we'll send the
3349
        // announcement signature to the remote node so they can also
3350
        // reconstruct the full channel announcement.
3351
        if !nMsg.isRemote {
30✔
3352
                var remotePubKey [33]byte
10✔
3353
                if isFirstNode {
20✔
3354
                        remotePubKey = chanInfo.NodeKey2Bytes
10✔
3355
                } else {
10✔
UNCOV
3356
                        remotePubKey = chanInfo.NodeKey1Bytes
×
UNCOV
3357
                }
×
3358

3359
                // Since the remote peer might not be online we'll call a
3360
                // method that will attempt to deliver the proof when it comes
3361
                // online.
3362
                err := d.reliableSender.sendMessage(ann, remotePubKey)
10✔
3363
                if err != nil {
10✔
3364
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3365
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3366
                                ann.ShortChannelID, remotePubKey, err)
×
3367
                        nMsg.err <- err
×
3368
                        return nil, false
×
3369
                }
×
3370
        }
3371

3372
        // Check if we already have the full proof for this channel.
3373
        if chanInfo.AuthProof != nil {
21✔
3374
                // If we already have the fully assembled proof, then the peer
1✔
3375
                // sending us their proof has probably not received our local
1✔
3376
                // proof yet. So be kind and send them the full proof.
1✔
3377
                if nMsg.isRemote {
2✔
3378
                        peerID := nMsg.source.SerializeCompressed()
1✔
3379
                        log.Debugf("Got AnnounceSignatures for channel with " +
1✔
3380
                                "full proof.")
1✔
3381

1✔
3382
                        d.wg.Add(1)
1✔
3383
                        go func() {
2✔
3384
                                defer d.wg.Done()
1✔
3385

1✔
3386
                                log.Debugf("Received half proof for channel "+
1✔
3387
                                        "%v with existing full proof. Sending"+
1✔
3388
                                        " full proof to peer=%x",
1✔
3389
                                        ann.ChannelID, peerID)
1✔
3390

1✔
3391
                                ca, _, _, err := netann.CreateChanAnnouncement(
1✔
3392
                                        chanInfo.AuthProof, chanInfo, e1, e2,
1✔
3393
                                )
1✔
3394
                                if err != nil {
1✔
3395
                                        log.Errorf("unable to gen ann: %v",
×
3396
                                                err)
×
3397
                                        return
×
3398
                                }
×
3399

3400
                                err = nMsg.peer.SendMessage(false, ca)
1✔
3401
                                if err != nil {
1✔
3402
                                        log.Errorf("Failed sending full proof"+
×
3403
                                                " to peer=%x: %v", peerID, err)
×
3404
                                        return
×
3405
                                }
×
3406

3407
                                log.Debugf("Full proof sent to peer=%x for "+
1✔
3408
                                        "chanID=%v", peerID, ann.ChannelID)
1✔
3409
                        }()
3410
                }
3411

3412
                log.Debugf("Already have proof for channel with chanID=%v",
1✔
3413
                        ann.ChannelID)
1✔
3414
                nMsg.err <- nil
1✔
3415
                return nil, true
1✔
3416
        }
3417

3418
        // Check that we received the opposite proof. If so, then we're now
3419
        // able to construct the full proof, and create the channel
3420
        // announcement. If we didn't receive the opposite half of the proof
3421
        // then we should store this one, and wait for the opposite to be
3422
        // received.
3423
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
19✔
3424
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
19✔
3425
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
19✔
3426
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3427
                        "short_chan_id=%v: %v", shortChanID, err)
×
3428
                log.Error(err)
×
3429
                nMsg.err <- err
×
3430
                return nil, false
×
3431
        }
×
3432

3433
        if err == channeldb.ErrWaitingProofNotFound {
28✔
3434
                err := d.cfg.WaitingProofStore.Add(proof)
9✔
3435
                if err != nil {
9✔
3436
                        err := fmt.Errorf("unable to store the proof for "+
×
3437
                                "short_chan_id=%v: %v", shortChanID, err)
×
3438
                        log.Error(err)
×
3439
                        nMsg.err <- err
×
3440
                        return nil, false
×
3441
                }
×
3442

3443
                log.Infof("1/2 of channel ann proof received for "+
9✔
3444
                        "short_chan_id=%v, waiting for other half",
9✔
3445
                        shortChanID)
9✔
3446

9✔
3447
                nMsg.err <- nil
9✔
3448
                return nil, false
9✔
3449
        }
3450

3451
        // We now have both halves of the channel announcement proof, then
3452
        // we'll reconstruct the initial announcement so we can validate it
3453
        // shortly below.
3454
        var dbProof models.ChannelAuthProof
10✔
3455
        if isFirstNode {
11✔
3456
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
1✔
3457
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
1✔
3458
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
1✔
3459
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
1✔
3460
        } else {
10✔
3461
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
9✔
3462
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
9✔
3463
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
9✔
3464
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
9✔
3465
        }
9✔
3466

3467
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
10✔
3468
                &dbProof, chanInfo, e1, e2,
10✔
3469
        )
10✔
3470
        if err != nil {
10✔
3471
                log.Error(err)
×
3472
                nMsg.err <- err
×
3473
                return nil, false
×
3474
        }
×
3475

3476
        // With all the necessary components assembled validate the full
3477
        // channel announcement proof.
3478
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
10✔
3479
        if err != nil {
10✔
3480
                err := fmt.Errorf("channel announcement proof for "+
×
3481
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3482

×
3483
                log.Error(err)
×
3484
                nMsg.err <- err
×
3485
                return nil, false
×
3486
        }
×
3487

3488
        // If the channel was returned by the router it means that existence of
3489
        // funding point and inclusion of nodes bitcoin keys in it already
3490
        // checked by the router. In this stage we should check that node keys
3491
        // attest to the bitcoin keys by validating the signatures of
3492
        // announcement. If proof is valid then we'll populate the channel edge
3493
        // with it, so we can announce it on peer connect.
3494
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
10✔
3495
        if err != nil {
10✔
3496
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3497
                        " %v", ann.ChannelID, err)
×
3498
                log.Error(err)
×
3499
                nMsg.err <- err
×
3500
                return nil, false
×
3501
        }
×
3502

3503
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
10✔
3504
        if err != nil {
10✔
3505
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3506
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3507
                log.Error(err)
×
3508
                nMsg.err <- err
×
3509
                return nil, false
×
3510
        }
×
3511

3512
        // Proof was successfully created and now can announce the channel to
3513
        // the remain network.
3514
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
10✔
3515
                ", adding to next ann batch", shortChanID)
10✔
3516

10✔
3517
        // Assemble the necessary announcements to add to the next broadcasting
10✔
3518
        // batch.
10✔
3519
        var announcements []networkMsg
10✔
3520
        announcements = append(announcements, networkMsg{
10✔
3521
                peer:   nMsg.peer,
10✔
3522
                source: nMsg.source,
10✔
3523
                msg:    chanAnn,
10✔
3524
        })
10✔
3525
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
19✔
3526
                announcements = append(announcements, networkMsg{
9✔
3527
                        peer:   nMsg.peer,
9✔
3528
                        source: src,
9✔
3529
                        msg:    e1Ann,
9✔
3530
                })
9✔
3531
        }
9✔
3532
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
18✔
3533
                announcements = append(announcements, networkMsg{
8✔
3534
                        peer:   nMsg.peer,
8✔
3535
                        source: src,
8✔
3536
                        msg:    e2Ann,
8✔
3537
                })
8✔
3538
        }
8✔
3539

3540
        // We'll also send along the node announcements for each channel
3541
        // participant if we know of them. To ensure our node announcement
3542
        // propagates to our channel counterparty, we'll set the source for
3543
        // each announcement to the node it belongs to, otherwise we won't send
3544
        // it since the source gets skipped. This isn't necessary for channel
3545
        // updates and announcement signatures since we send those directly to
3546
        // our channel counterparty through the gossiper's reliable sender.
3547
        node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
10✔
3548
        if err != nil {
12✔
3549
                log.Debugf("Unable to fetch node announcement for %x: %v",
2✔
3550
                        chanInfo.NodeKey1Bytes, err)
2✔
3551
        } else {
10✔
3552
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
16✔
3553
                        announcements = append(announcements, networkMsg{
8✔
3554
                                peer:   nMsg.peer,
8✔
3555
                                source: nodeKey1,
8✔
3556
                                msg:    node1Ann,
8✔
3557
                        })
8✔
3558
                }
8✔
3559
        }
3560

3561
        node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
10✔
3562
        if err != nil {
14✔
3563
                log.Debugf("Unable to fetch node announcement for %x: %v",
4✔
3564
                        chanInfo.NodeKey2Bytes, err)
4✔
3565
        } else {
10✔
3566
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
12✔
3567
                        announcements = append(announcements, networkMsg{
6✔
3568
                                peer:   nMsg.peer,
6✔
3569
                                source: nodeKey2,
6✔
3570
                                msg:    node2Ann,
6✔
3571
                        })
6✔
3572
                }
6✔
3573
        }
3574

3575
        nMsg.err <- nil
10✔
3576
        return announcements, true
10✔
3577
}
3578

3579
// isBanned returns true if the peer identified by pubkey is banned for sending
3580
// invalid channel announcements.
3581
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
205✔
3582
        return d.banman.isBanned(pubkey)
205✔
3583
}
205✔
3584

3585
// ShouldDisconnect returns true if we should disconnect the peer identified by
3586
// pubkey.
3587
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3588
        bool, error) {
203✔
3589

203✔
3590
        pubkeySer := pubkey.SerializeCompressed()
203✔
3591

203✔
3592
        var pubkeyBytes [33]byte
203✔
3593
        copy(pubkeyBytes[:], pubkeySer)
203✔
3594

203✔
3595
        // If the public key is banned, check whether or not this is a channel
203✔
3596
        // peer.
203✔
3597
        if d.isBanned(pubkeyBytes) {
205✔
3598
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
2✔
3599
                if err != nil {
2✔
3600
                        return false, err
×
3601
                }
×
3602

3603
                // We should only disconnect non-channel peers.
3604
                if !isChanPeer {
3✔
3605
                        return true, nil
1✔
3606
                }
1✔
3607
        }
3608

3609
        return false, nil
202✔
3610
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc