• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13236757158

10 Feb 2025 08:39AM UTC coverage: 57.649% (-1.2%) from 58.815%
13236757158

Pull #9493

github

ziggie1984
lncli: for some cmds we don't replace the data of the response.

For some cmds it is not very practical to replace the json output
because we might pipe it into other commands. For example when
creating the route we want to pipe it into sendtoRoute.
Pull Request #9493: For some lncli cmds we should not replace the content with other data

0 of 9 new or added lines in 2 files covered. (0.0%)

19535 existing lines in 252 files now uncovered.

103517 of 179563 relevant lines covered (57.65%)

24878.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.9
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/davecgh/go-spew/spew"
17
        "github.com/lightninglabs/neutrino/cache"
18
        "github.com/lightninglabs/neutrino/cache/lru"
19
        "github.com/lightningnetwork/lnd/batch"
20
        "github.com/lightningnetwork/lnd/chainntnfs"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/fn/v2"
23
        "github.com/lightningnetwork/lnd/graph"
24
        graphdb "github.com/lightningnetwork/lnd/graph/db"
25
        "github.com/lightningnetwork/lnd/graph/db/models"
26
        "github.com/lightningnetwork/lnd/keychain"
27
        "github.com/lightningnetwork/lnd/lnpeer"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwire"
31
        "github.com/lightningnetwork/lnd/multimutex"
32
        "github.com/lightningnetwork/lnd/netann"
33
        "github.com/lightningnetwork/lnd/routing/route"
34
        "github.com/lightningnetwork/lnd/ticker"
35
        "golang.org/x/time/rate"
36
)
37

38
const (
39
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
40
        // for a specific channel and direction that we'll accept over an
41
        // interval.
42
        DefaultMaxChannelUpdateBurst = 10
43

44
        // DefaultChannelUpdateInterval is the default interval we'll use to
45
        // determine how often we should allow a new update for a specific
46
        // channel and direction.
47
        DefaultChannelUpdateInterval = time.Minute
48

49
        // maxPrematureUpdates tracks the max amount of premature channel
50
        // updates that we'll hold onto.
51
        maxPrematureUpdates = 100
52

53
        // maxFutureMessages tracks the max amount of future messages that
54
        // we'll hold onto.
55
        maxFutureMessages = 1000
56

57
        // DefaultSubBatchDelay is the default delay we'll use when
58
        // broadcasting the next announcement batch.
59
        DefaultSubBatchDelay = 5 * time.Second
60

61
        // maxRejectedUpdates tracks the max amount of rejected channel updates
62
        // we'll maintain. This is the global size across all peers. We'll
63
        // allocate ~3 MB max to the cache.
64
        maxRejectedUpdates = 10_000
65

66
        // DefaultProofMatureDelta specifies the default value used for
67
        // ProofMatureDelta, which is the number of confirmations needed before
68
        // processing the announcement signatures.
69
        DefaultProofMatureDelta = 6
70
)
71

72
var (
73
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
74
        // is in the process of being shut down.
75
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
76

77
        // ErrGossipSyncerNotFound signals that we were unable to find an active
78
        // gossip syncer corresponding to a gossip query message received from
79
        // the remote peer.
80
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
81

82
        // emptyPubkey is used to compare compressed pubkeys against an empty
83
        // byte array.
84
        emptyPubkey [33]byte
85
)
86

87
// optionalMsgFields is a set of optional message fields that external callers
88
// can provide that serve useful when processing a specific network
89
// announcement.
90
type optionalMsgFields struct {
91
        capacity      *btcutil.Amount
92
        channelPoint  *wire.OutPoint
93
        remoteAlias   *lnwire.ShortChannelID
94
        tapscriptRoot fn.Option[chainhash.Hash]
95
}
96

97
// apply applies the optional fields within the functional options.
98
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
47✔
99
        for _, optionalMsgField := range optionalMsgFields {
52✔
100
                optionalMsgField(f)
5✔
101
        }
5✔
102
}
103

104
// OptionalMsgField is a functional option parameter that can be used to provide
105
// external information that is not included within a network message but serves
106
// useful when processing it.
107
type OptionalMsgField func(*optionalMsgFields)
108

109
// ChannelCapacity is an optional field that lets the gossiper know of the
110
// capacity of a channel.
111
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
27✔
112
        return func(f *optionalMsgFields) {
28✔
113
                f.capacity = &capacity
1✔
114
        }
1✔
115
}
116

117
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
118
// of a channel.
119
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
30✔
120
        return func(f *optionalMsgFields) {
34✔
121
                f.channelPoint = &op
4✔
122
        }
4✔
123
}
124

125
// TapscriptRoot is an optional field that lets the gossiper know of the root of
126
// the tapscript tree for a custom channel.
127
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
26✔
128
        return func(f *optionalMsgFields) {
26✔
UNCOV
129
                f.tapscriptRoot = root
×
UNCOV
130
        }
×
131
}
132

133
// RemoteAlias is an optional field that lets the gossiper know that a locally
134
// sent channel update is actually an update for the peer that should replace
135
// the ShortChannelID field with the remote's alias. This is only used for
136
// channels with peers where the option-scid-alias feature bit was negotiated.
137
// The channel update will be added to the graph under the original SCID, but
138
// will be modified and re-signed with this alias.
139
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
26✔
140
        return func(f *optionalMsgFields) {
26✔
UNCOV
141
                f.remoteAlias = alias
×
UNCOV
142
        }
×
143
}
144

145
// networkMsg couples a routing related wire message with the peer that
146
// originally sent it.
147
type networkMsg struct {
148
        peer              lnpeer.Peer
149
        source            *btcec.PublicKey
150
        msg               lnwire.Message
151
        optionalMsgFields *optionalMsgFields
152

153
        isRemote bool
154

155
        err chan error
156
}
157

158
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
159
// wishes to update a particular set of channels. New ChannelUpdate messages
160
// will be crafted to be sent out during the next broadcast epoch and the fee
161
// updates committed to the lower layer.
162
type chanPolicyUpdateRequest struct {
163
        edgesToUpdate []EdgeWithInfo
164
        errChan       chan error
165
}
166

167
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
168
// syncer at all times.
169
type PinnedSyncers map[route.Vertex]struct{}
170

171
// Config defines the configuration for the service. ALL elements within the
172
// configuration MUST be non-nil for the service to carry out its duties.
173
type Config struct {
174
        // ChainHash is a hash that indicates which resident chain of the
175
        // AuthenticatedGossiper. Any announcements that don't match this
176
        // chain hash will be ignored.
177
        //
178
        // TODO(roasbeef): eventually make into map so can de-multiplex
179
        // incoming announcements
180
        //   * also need to do same for Notifier
181
        ChainHash chainhash.Hash
182

183
        // Graph is the subsystem which is responsible for managing the
184
        // topology of lightning network. After incoming channel, node, channel
185
        // updates announcements are validated they are sent to the router in
186
        // order to be included in the LN graph.
187
        Graph graph.ChannelGraphSource
188

189
        // ChainIO represents an abstraction over a source that can query the
190
        // blockchain.
191
        ChainIO lnwallet.BlockChainIO
192

193
        // ChanSeries is an interfaces that provides access to a time series
194
        // view of the current known channel graph. Each GossipSyncer enabled
195
        // peer will utilize this in order to create and respond to channel
196
        // graph time series queries.
197
        ChanSeries ChannelGraphTimeSeries
198

199
        // Notifier is used for receiving notifications of incoming blocks.
200
        // With each new incoming block found we process previously premature
201
        // announcements.
202
        //
203
        // TODO(roasbeef): could possibly just replace this with an epoch
204
        // channel.
205
        Notifier chainntnfs.ChainNotifier
206

207
        // Broadcast broadcasts a particular set of announcements to all peers
208
        // that the daemon is connected to. If supplied, the exclude parameter
209
        // indicates that the target peer should be excluded from the
210
        // broadcast.
211
        Broadcast func(skips map[route.Vertex]struct{},
212
                msg ...lnwire.Message) error
213

214
        // NotifyWhenOnline is a function that allows the gossiper to be
215
        // notified when a certain peer comes online, allowing it to
216
        // retry sending a peer message.
217
        //
218
        // NOTE: The peerChan channel must be buffered.
219
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
220

221
        // NotifyWhenOffline is a function that allows the gossiper to be
222
        // notified when a certain peer disconnects, allowing it to request a
223
        // notification for when it reconnects.
224
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
225

226
        // FetchSelfAnnouncement retrieves our current node announcement, for
227
        // use when determining whether we should update our peers about our
228
        // presence in the network.
229
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
230

231
        // UpdateSelfAnnouncement produces a new announcement for our node with
232
        // an updated timestamp which can be broadcast to our peers.
233
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
234

235
        // ProofMatureDelta the number of confirmations which is needed before
236
        // exchange the channel announcement proofs.
237
        ProofMatureDelta uint32
238

239
        // TrickleDelay the period of trickle timer which flushes to the
240
        // network the pending batch of new announcements we've received since
241
        // the last trickle tick.
242
        TrickleDelay time.Duration
243

244
        // RetransmitTicker is a ticker that ticks with a period which
245
        // indicates that we should check if we need re-broadcast any of our
246
        // personal channels.
247
        RetransmitTicker ticker.Ticker
248

249
        // RebroadcastInterval is the maximum time we wait between sending out
250
        // channel updates for our active channels and our own node
251
        // announcement. We do this to ensure our active presence on the
252
        // network is known, and we are not being considered a zombie node or
253
        // having zombie channels.
254
        RebroadcastInterval time.Duration
255

256
        // WaitingProofStore is a persistent storage of partial channel proof
257
        // announcement messages. We use it to buffer half of the material
258
        // needed to reconstruct a full authenticated channel announcement.
259
        // Once we receive the other half the channel proof, we'll be able to
260
        // properly validate it and re-broadcast it out to the network.
261
        //
262
        // TODO(wilmer): make interface to prevent channeldb dependency.
263
        WaitingProofStore *channeldb.WaitingProofStore
264

265
        // MessageStore is a persistent storage of gossip messages which we will
266
        // use to determine which messages need to be resent for a given peer.
267
        MessageStore GossipMessageStore
268

269
        // AnnSigner is an instance of the MessageSigner interface which will
270
        // be used to manually sign any outgoing channel updates. The signer
271
        // implementation should be backed by the public key of the backing
272
        // Lightning node.
273
        //
274
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
275
        // here?
276
        AnnSigner lnwallet.MessageSigner
277

278
        // ScidCloser is an instance of ClosedChannelTracker that helps the
279
        // gossiper cut down on spam channel announcements for already closed
280
        // channels.
281
        ScidCloser ClosedChannelTracker
282

283
        // NumActiveSyncers is the number of peers for which we should have
284
        // active syncers with. After reaching NumActiveSyncers, any future
285
        // gossip syncers will be passive.
286
        NumActiveSyncers int
287

288
        // NoTimestampQueries will prevent the GossipSyncer from querying
289
        // timestamps of announcement messages from the peer and from replying
290
        // to timestamp queries.
291
        NoTimestampQueries bool
292

293
        // RotateTicker is a ticker responsible for notifying the SyncManager
294
        // when it should rotate its active syncers. A single active syncer with
295
        // a chansSynced state will be exchanged for a passive syncer in order
296
        // to ensure we don't keep syncing with the same peers.
297
        RotateTicker ticker.Ticker
298

299
        // HistoricalSyncTicker is a ticker responsible for notifying the
300
        // syncManager when it should attempt a historical sync with a gossip
301
        // sync peer.
302
        HistoricalSyncTicker ticker.Ticker
303

304
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
305
        // syncManager when it should attempt to start the next pending
306
        // activeSyncer due to the current one not completing its state machine
307
        // within the timeout.
308
        ActiveSyncerTimeoutTicker ticker.Ticker
309

310
        // MinimumBatchSize is minimum size of a sub batch of announcement
311
        // messages.
312
        MinimumBatchSize int
313

314
        // SubBatchDelay is the delay between sending sub batches of
315
        // gossip messages.
316
        SubBatchDelay time.Duration
317

318
        // IgnoreHistoricalFilters will prevent syncers from replying with
319
        // historical data when the remote peer sets a gossip_timestamp_range.
320
        // This prevents ranges with old start times from causing us to dump the
321
        // graph on connect.
322
        IgnoreHistoricalFilters bool
323

324
        // PinnedSyncers is a set of peers that will always transition to
325
        // ActiveSync upon connection. These peers will never transition to
326
        // PassiveSync.
327
        PinnedSyncers PinnedSyncers
328

329
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
330
        // specific channel and direction that we'll accept over an interval.
331
        MaxChannelUpdateBurst int
332

333
        // ChannelUpdateInterval specifies the interval we'll use to determine
334
        // how often we should allow a new update for a specific channel and
335
        // direction.
336
        ChannelUpdateInterval time.Duration
337

338
        // IsAlias returns true if a given ShortChannelID is an alias for
339
        // option_scid_alias channels.
340
        IsAlias func(scid lnwire.ShortChannelID) bool
341

342
        // SignAliasUpdate is used to re-sign a channel update using the
343
        // remote's alias if the option-scid-alias feature bit was negotiated.
344
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
345
                error)
346

347
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
348
        // This is used for channels that have negotiated the option-scid-alias
349
        // feature bit.
350
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
351
                lnwire.ShortChannelID, error)
352

353
        // GetAlias allows the gossiper to look up the peer's alias for a given
354
        // ChannelID. This is used to sign updates for them if the channel has
355
        // no AuthProof and the option-scid-alias feature bit was negotiated.
356
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
357

358
        // FindChannel allows the gossiper to find a channel that we're party
359
        // to without iterating over the entire set of open channels.
360
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
361
                *channeldb.OpenChannel, error)
362

363
        // IsStillZombieChannel takes the timestamps of the latest channel
364
        // updates for a channel and returns true if the channel should be
365
        // considered a zombie based on these timestamps.
366
        IsStillZombieChannel func(time.Time, time.Time) bool
367
}
368

369
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
370
// used to let the caller of the lru.Cache know if a message has already been
371
// processed or not.
372
type processedNetworkMsg struct {
373
        processed bool
374
        msg       *networkMsg
375
}
376

377
// cachedNetworkMsg is a wrapper around a network message that can be used with
378
// *lru.Cache.
379
type cachedNetworkMsg struct {
380
        msgs []*processedNetworkMsg
381
}
382

383
// Size returns the "size" of an entry. We return the number of items as we
384
// just want to limit the total amount of entries rather than do accurate size
385
// accounting.
386
func (c *cachedNetworkMsg) Size() (uint64, error) {
2✔
387
        return uint64(len(c.msgs)), nil
2✔
388
}
2✔
389

390
// rejectCacheKey is the cache key that we'll use to track announcements we've
391
// recently rejected.
392
type rejectCacheKey struct {
393
        pubkey [33]byte
394
        chanID uint64
395
}
396

397
// newRejectCacheKey returns a new cache key for the reject cache.
398
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
462✔
399
        k := rejectCacheKey{
462✔
400
                chanID: cid,
462✔
401
                pubkey: pub,
462✔
402
        }
462✔
403

462✔
404
        return k
462✔
405
}
462✔
406

407
// sourceToPub returns a serialized-compressed public key for use in the reject
408
// cache.
409
func sourceToPub(pk *btcec.PublicKey) [33]byte {
476✔
410
        var pub [33]byte
476✔
411
        copy(pub[:], pk.SerializeCompressed())
476✔
412
        return pub
476✔
413
}
476✔
414

415
// cachedReject is the empty value used to track the value for rejects.
416
type cachedReject struct {
417
}
418

419
// Size returns the "size" of an entry. We return 1 as we just want to limit
420
// the total size.
421
func (c *cachedReject) Size() (uint64, error) {
203✔
422
        return 1, nil
203✔
423
}
203✔
424

425
// AuthenticatedGossiper is a subsystem which is responsible for receiving
426
// announcements, validating them and applying the changes to router, syncing
427
// lightning network with newly connected nodes, broadcasting announcements
428
// after validation, negotiating the channel announcement proofs exchange and
429
// handling the premature announcements. All outgoing announcements are
430
// expected to be properly signed as dictated in BOLT#7, additionally, all
431
// incoming message are expected to be well formed and signed. Invalid messages
432
// will be rejected by this struct.
433
type AuthenticatedGossiper struct {
434
        // Parameters which are needed to properly handle the start and stop of
435
        // the service.
436
        started sync.Once
437
        stopped sync.Once
438

439
        // bestHeight is the height of the block at the tip of the main chain
440
        // as we know it. Accesses *MUST* be done with the gossiper's lock
441
        // held.
442
        bestHeight uint32
443

444
        quit chan struct{}
445
        wg   sync.WaitGroup
446

447
        // cfg is a copy of the configuration struct that the gossiper service
448
        // was initialized with.
449
        cfg *Config
450

451
        // blockEpochs encapsulates a stream of block epochs that are sent at
452
        // every new block height.
453
        blockEpochs *chainntnfs.BlockEpochEvent
454

455
        // prematureChannelUpdates is a map of ChannelUpdates we have received
456
        // that wasn't associated with any channel we know about.  We store
457
        // them temporarily, such that we can reprocess them when a
458
        // ChannelAnnouncement for the channel is received.
459
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
460

461
        // banman tracks our peer's ban status.
462
        banman *banman
463

464
        // networkMsgs is a channel that carries new network broadcasted
465
        // message from outside the gossiper service to be processed by the
466
        // networkHandler.
467
        networkMsgs chan *networkMsg
468

469
        // futureMsgs is a list of premature network messages that have a block
470
        // height specified in the future. We will save them and resend it to
471
        // the chan networkMsgs once the block height has reached. The cached
472
        // map format is,
473
        //   {msgID1: msg1, msgID2: msg2, ...}
474
        futureMsgs *futureMsgCache
475

476
        // chanPolicyUpdates is a channel that requests to update the
477
        // forwarding policy of a set of channels is sent over.
478
        chanPolicyUpdates chan *chanPolicyUpdateRequest
479

480
        // selfKey is the identity public key of the backing Lightning node.
481
        selfKey *btcec.PublicKey
482

483
        // selfKeyLoc is the locator for the identity public key of the backing
484
        // Lightning node.
485
        selfKeyLoc keychain.KeyLocator
486

487
        // channelMtx is used to restrict the database access to one
488
        // goroutine per channel ID. This is done to ensure that when
489
        // the gossiper is handling an announcement, the db state stays
490
        // consistent between when the DB is first read until it's written.
491
        channelMtx *multimutex.Mutex[uint64]
492

493
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
494

495
        // syncMgr is a subsystem responsible for managing the gossip syncers
496
        // for peers currently connected. When a new peer is connected, the
497
        // manager will create its accompanying gossip syncer and determine
498
        // whether it should have an activeSync or passiveSync sync type based
499
        // on how many other gossip syncers are currently active. Any activeSync
500
        // gossip syncers are started in a round-robin manner to ensure we're
501
        // not syncing with multiple peers at the same time.
502
        syncMgr *SyncManager
503

504
        // reliableSender is a subsystem responsible for handling reliable
505
        // message send requests to peers. This should only be used for channels
506
        // that are unadvertised at the time of handling the message since if it
507
        // is advertised, then peers should be able to get the message from the
508
        // network.
509
        reliableSender *reliableSender
510

511
        // chanUpdateRateLimiter contains rate limiters for each direction of
512
        // a channel update we've processed. We'll use these to determine
513
        // whether we should accept a new update for a specific channel and
514
        // direction.
515
        //
516
        // NOTE: This map must be synchronized with the main
517
        // AuthenticatedGossiper lock.
518
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
519

520
        // vb is used to enforce job dependency ordering of gossip messages.
521
        vb *ValidationBarrier
522

523
        sync.Mutex
524
}
525

526
// New creates a new AuthenticatedGossiper instance, initialized with the
527
// passed configuration parameters.
528
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
27✔
529
        gossiper := &AuthenticatedGossiper{
27✔
530
                selfKey:           selfKeyDesc.PubKey,
27✔
531
                selfKeyLoc:        selfKeyDesc.KeyLocator,
27✔
532
                cfg:               &cfg,
27✔
533
                networkMsgs:       make(chan *networkMsg),
27✔
534
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
27✔
535
                quit:              make(chan struct{}),
27✔
536
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
27✔
537
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
27✔
538
                        maxPrematureUpdates,
27✔
539
                ),
27✔
540
                channelMtx: multimutex.NewMutex[uint64](),
27✔
541
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
27✔
542
                        maxRejectedUpdates,
27✔
543
                ),
27✔
544
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
27✔
545
                banman:                newBanman(),
27✔
546
        }
27✔
547

27✔
548
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
27✔
549

27✔
550
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
27✔
551
                ChainHash:               cfg.ChainHash,
27✔
552
                ChanSeries:              cfg.ChanSeries,
27✔
553
                RotateTicker:            cfg.RotateTicker,
27✔
554
                HistoricalSyncTicker:    cfg.HistoricalSyncTicker,
27✔
555
                NumActiveSyncers:        cfg.NumActiveSyncers,
27✔
556
                NoTimestampQueries:      cfg.NoTimestampQueries,
27✔
557
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
27✔
558
                BestHeight:              gossiper.latestHeight,
27✔
559
                PinnedSyncers:           cfg.PinnedSyncers,
27✔
560
                IsStillZombieChannel:    cfg.IsStillZombieChannel,
27✔
561
        })
27✔
562

27✔
563
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
27✔
564
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
27✔
565
                NotifyWhenOffline: cfg.NotifyWhenOffline,
27✔
566
                MessageStore:      cfg.MessageStore,
27✔
567
                IsMsgStale:        gossiper.isMsgStale,
27✔
568
        })
27✔
569

27✔
570
        return gossiper
27✔
571
}
27✔
572

573
// EdgeWithInfo contains the information that is required to update an edge.
574
type EdgeWithInfo struct {
575
        // Info describes the channel.
576
        Info *models.ChannelEdgeInfo
577

578
        // Edge describes the policy in one direction of the channel.
579
        Edge *models.ChannelEdgePolicy
580
}
581

582
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
583
// specified edge updates. Updates are done in two stages: first, the
584
// AuthenticatedGossiper ensures the update has been committed by dependent
585
// sub-systems, then it signs and broadcasts new updates to the network. A
586
// mapping between outpoints and updated channel policies is returned, which is
587
// used to update the forwarding policies of the underlying links.
588
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
589
        edgesToUpdate []EdgeWithInfo) error {
1✔
590

1✔
591
        errChan := make(chan error, 1)
1✔
592
        policyUpdate := &chanPolicyUpdateRequest{
1✔
593
                edgesToUpdate: edgesToUpdate,
1✔
594
                errChan:       errChan,
1✔
595
        }
1✔
596

1✔
597
        select {
1✔
598
        case d.chanPolicyUpdates <- policyUpdate:
1✔
599
                err := <-errChan
1✔
600
                return err
1✔
601
        case <-d.quit:
×
602
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
603
        }
604
}
605

606
// Start spawns network messages handler goroutine and registers on new block
607
// notifications in order to properly handle the premature announcements.
608
func (d *AuthenticatedGossiper) Start() error {
27✔
609
        var err error
27✔
610
        d.started.Do(func() {
54✔
611
                log.Info("Authenticated Gossiper starting")
27✔
612
                err = d.start()
27✔
613
        })
27✔
614
        return err
27✔
615
}
616

617
func (d *AuthenticatedGossiper) start() error {
27✔
618
        // First we register for new notifications of newly discovered blocks.
27✔
619
        // We do this immediately so we'll later be able to consume any/all
27✔
620
        // blocks which were discovered.
27✔
621
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
27✔
622
        if err != nil {
27✔
623
                return err
×
624
        }
×
625
        d.blockEpochs = blockEpochs
27✔
626

27✔
627
        height, err := d.cfg.Graph.CurrentBlockHeight()
27✔
628
        if err != nil {
27✔
629
                return err
×
630
        }
×
631
        d.bestHeight = height
27✔
632

27✔
633
        // Start the reliable sender. In case we had any pending messages ready
27✔
634
        // to be sent when the gossiper was last shut down, we must continue on
27✔
635
        // our quest to deliver them to their respective peers.
27✔
636
        if err := d.reliableSender.Start(); err != nil {
27✔
637
                return err
×
638
        }
×
639

640
        d.syncMgr.Start()
27✔
641

27✔
642
        d.banman.start()
27✔
643

27✔
644
        // Start receiving blocks in its dedicated goroutine.
27✔
645
        d.wg.Add(2)
27✔
646
        go d.syncBlockHeight()
27✔
647
        go d.networkHandler()
27✔
648

27✔
649
        return nil
27✔
650
}
651

652
// syncBlockHeight syncs the best block height for the gossiper by reading
653
// blockEpochs.
654
//
655
// NOTE: must be run as a goroutine.
656
func (d *AuthenticatedGossiper) syncBlockHeight() {
27✔
657
        defer d.wg.Done()
27✔
658

27✔
659
        for {
54✔
660
                select {
27✔
661
                // A new block has arrived, so we can re-process the previously
662
                // premature announcements.
UNCOV
663
                case newBlock, ok := <-d.blockEpochs.Epochs:
×
UNCOV
664
                        // If the channel has been closed, then this indicates
×
UNCOV
665
                        // the daemon is shutting down, so we exit ourselves.
×
UNCOV
666
                        if !ok {
×
UNCOV
667
                                return
×
UNCOV
668
                        }
×
669

670
                        // Once a new block arrives, we update our running
671
                        // track of the height of the chain tip.
UNCOV
672
                        d.Lock()
×
UNCOV
673
                        blockHeight := uint32(newBlock.Height)
×
UNCOV
674
                        d.bestHeight = blockHeight
×
UNCOV
675
                        d.Unlock()
×
UNCOV
676

×
UNCOV
677
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
×
UNCOV
678
                                newBlock.Hash)
×
UNCOV
679

×
UNCOV
680
                        // Resend future messages, if any.
×
UNCOV
681
                        d.resendFutureMessages(blockHeight)
×
682

683
                case <-d.quit:
27✔
684
                        return
27✔
685
                }
686
        }
687
}
688

689
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
690
// the unique ID when saving the message.
691
type futureMsgCache struct {
692
        *lru.Cache[uint64, *cachedFutureMsg]
693

694
        // msgID is a monotonically increased integer.
695
        msgID atomic.Uint64
696
}
697

698
// nextMsgID returns a unique message ID.
699
func (f *futureMsgCache) nextMsgID() uint64 {
3✔
700
        return f.msgID.Add(1)
3✔
701
}
3✔
702

703
// newFutureMsgCache creates a new future message cache with the underlying lru
704
// cache being initialized with the specified capacity.
705
func newFutureMsgCache(capacity uint64) *futureMsgCache {
28✔
706
        // Create a new cache.
28✔
707
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
28✔
708

28✔
709
        return &futureMsgCache{
28✔
710
                Cache: cache,
28✔
711
        }
28✔
712
}
28✔
713

714
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
715
type cachedFutureMsg struct {
716
        // msg is the network message.
717
        msg *networkMsg
718

719
        // height is the block height.
720
        height uint32
721
}
722

723
// Size returns the size of the message.
724
func (c *cachedFutureMsg) Size() (uint64, error) {
4✔
725
        // Return a constant 1.
4✔
726
        return 1, nil
4✔
727
}
4✔
728

729
// resendFutureMessages takes a block height, resends all the future messages
730
// found below and equal to that height and deletes those messages found in the
731
// gossiper's futureMsgs.
UNCOV
732
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
×
UNCOV
733
        var (
×
UNCOV
734
                // msgs are the target messages.
×
UNCOV
735
                msgs []*networkMsg
×
UNCOV
736

×
UNCOV
737
                // keys are the target messages' caching keys.
×
UNCOV
738
                keys []uint64
×
UNCOV
739
        )
×
UNCOV
740

×
UNCOV
741
        // filterMsgs is the visitor used when iterating the future cache.
×
UNCOV
742
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
×
UNCOV
743
                if cmsg.height <= height {
×
UNCOV
744
                        msgs = append(msgs, cmsg.msg)
×
UNCOV
745
                        keys = append(keys, k)
×
UNCOV
746
                }
×
747

UNCOV
748
                return true
×
749
        }
750

751
        // Filter out the target messages.
UNCOV
752
        d.futureMsgs.Range(filterMsgs)
×
UNCOV
753

×
UNCOV
754
        // Return early if no messages found.
×
UNCOV
755
        if len(msgs) == 0 {
×
UNCOV
756
                return
×
UNCOV
757
        }
×
758

759
        // Remove the filtered messages.
UNCOV
760
        for _, key := range keys {
×
UNCOV
761
                d.futureMsgs.Delete(key)
×
UNCOV
762
        }
×
763

UNCOV
764
        log.Debugf("Resending %d network messages at height %d",
×
UNCOV
765
                len(msgs), height)
×
UNCOV
766

×
UNCOV
767
        for _, msg := range msgs {
×
UNCOV
768
                select {
×
UNCOV
769
                case d.networkMsgs <- msg:
×
770
                case <-d.quit:
×
771
                        msg.err <- ErrGossiperShuttingDown
×
772
                }
773
        }
774
}
775

776
// Stop signals any active goroutines for a graceful closure.
777
func (d *AuthenticatedGossiper) Stop() error {
28✔
778
        d.stopped.Do(func() {
55✔
779
                log.Info("Authenticated gossiper shutting down...")
27✔
780
                defer log.Debug("Authenticated gossiper shutdown complete")
27✔
781

27✔
782
                d.stop()
27✔
783
        })
27✔
784
        return nil
28✔
785
}
786

787
func (d *AuthenticatedGossiper) stop() {
27✔
788
        log.Debug("Authenticated Gossiper is stopping")
27✔
789
        defer log.Debug("Authenticated Gossiper stopped")
27✔
790

27✔
791
        // `blockEpochs` is only initialized in the start routine so we make
27✔
792
        // sure we don't panic here in the case where the `Stop` method is
27✔
793
        // called when the `Start` method does not complete.
27✔
794
        if d.blockEpochs != nil {
54✔
795
                d.blockEpochs.Cancel()
27✔
796
        }
27✔
797

798
        d.syncMgr.Stop()
27✔
799

27✔
800
        d.banman.stop()
27✔
801

27✔
802
        close(d.quit)
27✔
803
        d.wg.Wait()
27✔
804

27✔
805
        // We'll stop our reliable sender after all of the gossiper's goroutines
27✔
806
        // have exited to ensure nothing can cause it to continue executing.
27✔
807
        d.reliableSender.Stop()
27✔
808
}
809

810
// TODO(roasbeef): need method to get current gossip timestamp?
811
//  * using mtx, check time rotate forward is needed?
812

813
// ProcessRemoteAnnouncement sends a new remote announcement message along with
814
// the peer that sent the routing message. The announcement will be processed
815
// then added to a queue for batched trickled announcement to all connected
816
// peers.  Remote channel announcements should contain the announcement proof
817
// and be fully validated.
818
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
819
        peer lnpeer.Peer) chan error {
284✔
820

284✔
821
        log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey())
284✔
822

284✔
823
        errChan := make(chan error, 1)
284✔
824

284✔
825
        // For messages in the known set of channel series queries, we'll
284✔
826
        // dispatch the message directly to the GossipSyncer, and skip the main
284✔
827
        // processing loop.
284✔
828
        switch m := msg.(type) {
284✔
829
        case *lnwire.QueryShortChanIDs,
830
                *lnwire.QueryChannelRange,
831
                *lnwire.ReplyChannelRange,
UNCOV
832
                *lnwire.ReplyShortChanIDsEnd:
×
UNCOV
833

×
UNCOV
834
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
×
UNCOV
835
                if !ok {
×
836
                        log.Warnf("Gossip syncer for peer=%x not found",
×
837
                                peer.PubKey())
×
838

×
839
                        errChan <- ErrGossipSyncerNotFound
×
840
                        return errChan
×
841
                }
×
842

843
                // If we've found the message target, then we'll dispatch the
844
                // message directly to it.
UNCOV
845
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
×
UNCOV
846
                if err != nil {
×
UNCOV
847
                        log.Errorf("Process query msg from peer %x got %v",
×
UNCOV
848
                                peer.PubKey(), err)
×
UNCOV
849
                }
×
850

UNCOV
851
                errChan <- err
×
UNCOV
852
                return errChan
×
853

854
        // If a peer is updating its current update horizon, then we'll dispatch
855
        // that directly to the proper GossipSyncer.
UNCOV
856
        case *lnwire.GossipTimestampRange:
×
UNCOV
857
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
×
UNCOV
858
                if !ok {
×
859
                        log.Warnf("Gossip syncer for peer=%x not found",
×
860
                                peer.PubKey())
×
861

×
862
                        errChan <- ErrGossipSyncerNotFound
×
863
                        return errChan
×
864
                }
×
865

866
                // If we've found the message target, then we'll dispatch the
867
                // message directly to it.
UNCOV
868
                if err := syncer.ApplyGossipFilter(m); err != nil {
×
869
                        log.Warnf("Unable to apply gossip filter for peer=%x: "+
×
870
                                "%v", peer.PubKey(), err)
×
871

×
872
                        errChan <- err
×
873
                        return errChan
×
874
                }
×
875

UNCOV
876
                errChan <- nil
×
UNCOV
877
                return errChan
×
878

879
        // To avoid inserting edges in the graph for our own channels that we
880
        // have already closed, we ignore such channel announcements coming
881
        // from the remote.
882
        case *lnwire.ChannelAnnouncement1:
219✔
883
                ownKey := d.selfKey.SerializeCompressed()
219✔
884
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
219✔
885
                        "for own channel")
219✔
886

219✔
887
                if bytes.Equal(m.NodeID1[:], ownKey) ||
219✔
888
                        bytes.Equal(m.NodeID2[:], ownKey) {
221✔
889

2✔
890
                        log.Warn(ownErr)
2✔
891
                        errChan <- ownErr
2✔
892
                        return errChan
2✔
893
                }
2✔
894
        }
895

896
        nMsg := &networkMsg{
282✔
897
                msg:      msg,
282✔
898
                isRemote: true,
282✔
899
                peer:     peer,
282✔
900
                source:   peer.IdentityKey(),
282✔
901
                err:      errChan,
282✔
902
        }
282✔
903

282✔
904
        select {
282✔
905
        case d.networkMsgs <- nMsg:
282✔
906

907
        // If the peer that sent us this error is quitting, then we don't need
908
        // to send back an error and can return immediately.
909
        case <-peer.QuitSignal():
×
910
                return nil
×
911
        case <-d.quit:
×
912
                nMsg.err <- ErrGossiperShuttingDown
×
913
        }
914

915
        return nMsg.err
282✔
916
}
917

918
// ProcessLocalAnnouncement sends a new remote announcement message along with
919
// the peer that sent the routing message. The announcement will be processed
920
// then added to a queue for batched trickled announcement to all connected
921
// peers.  Local channel announcements don't contain the announcement proof and
922
// will not be fully validated. Once the channel proofs are received, the
923
// entire channel announcement and update messages will be re-constructed and
924
// broadcast to the rest of the network.
925
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
926
        optionalFields ...OptionalMsgField) chan error {
47✔
927

47✔
928
        optionalMsgFields := &optionalMsgFields{}
47✔
929
        optionalMsgFields.apply(optionalFields...)
47✔
930

47✔
931
        nMsg := &networkMsg{
47✔
932
                msg:               msg,
47✔
933
                optionalMsgFields: optionalMsgFields,
47✔
934
                isRemote:          false,
47✔
935
                source:            d.selfKey,
47✔
936
                err:               make(chan error, 1),
47✔
937
        }
47✔
938

47✔
939
        select {
47✔
940
        case d.networkMsgs <- nMsg:
47✔
941
        case <-d.quit:
×
942
                nMsg.err <- ErrGossiperShuttingDown
×
943
        }
944

945
        return nMsg.err
47✔
946
}
947

948
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
949
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
950
// tuple.
951
type channelUpdateID struct {
952
        // channelID represents the set of data which is needed to
953
        // retrieve all necessary data to validate the channel existence.
954
        channelID lnwire.ShortChannelID
955

956
        // Flags least-significant bit must be set to 0 if the creating node
957
        // corresponds to the first node in the previously sent channel
958
        // announcement and 1 otherwise.
959
        flags lnwire.ChanUpdateChanFlags
960
}
961

962
// msgWithSenders is a wrapper struct around a message, and the set of peers
963
// that originally sent us this message. Using this struct, we can ensure that
964
// we don't re-send a message to the peer that sent it to us in the first
965
// place.
966
type msgWithSenders struct {
967
        // msg is the wire message itself.
968
        msg lnwire.Message
969

970
        // isLocal is true if this was a message that originated locally. We'll
971
        // use this to bypass our normal checks to ensure we prioritize sending
972
        // out our own updates.
973
        isLocal bool
974

975
        // sender is the set of peers that sent us this message.
976
        senders map[route.Vertex]struct{}
977
}
978

979
// mergeSyncerMap is used to merge the set of senders of a particular message
980
// with peers that we have an active GossipSyncer with. We do this to ensure
981
// that we don't broadcast messages to any peers that we have active gossip
982
// syncers for.
983
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
24✔
984
        for peerPub := range syncers {
24✔
UNCOV
985
                m.senders[peerPub] = struct{}{}
×
UNCOV
986
        }
×
987
}
988

989
// deDupedAnnouncements de-duplicates announcements that have been added to the
990
// batch. Internally, announcements are stored in three maps
991
// (one each for channel announcements, channel updates, and node
992
// announcements). These maps keep track of unique announcements and ensure no
993
// announcements are duplicated. We keep the three message types separate, such
994
// that we can send channel announcements first, then channel updates, and
995
// finally node announcements when it's time to broadcast them.
996
type deDupedAnnouncements struct {
997
        // channelAnnouncements are identified by the short channel id field.
998
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
999

1000
        // channelUpdates are identified by the channel update id field.
1001
        channelUpdates map[channelUpdateID]msgWithSenders
1002

1003
        // nodeAnnouncements are identified by the Vertex field.
1004
        nodeAnnouncements map[route.Vertex]msgWithSenders
1005

1006
        sync.Mutex
1007
}
1008

1009
// Reset operates on deDupedAnnouncements to reset the storage of
1010
// announcements.
1011
func (d *deDupedAnnouncements) Reset() {
29✔
1012
        d.Lock()
29✔
1013
        defer d.Unlock()
29✔
1014

29✔
1015
        d.reset()
29✔
1016
}
29✔
1017

1018
// reset is the private version of the Reset method. We have this so we can
1019
// call this method within method that are already holding the lock.
1020
func (d *deDupedAnnouncements) reset() {
316✔
1021
        // Storage of each type of announcement (channel announcements, channel
316✔
1022
        // updates, node announcements) is set to an empty map where the
316✔
1023
        // appropriate key points to the corresponding lnwire.Message.
316✔
1024
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
316✔
1025
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
316✔
1026
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
316✔
1027
}
316✔
1028

1029
// addMsg adds a new message to the current batch. If the message is already
1030
// present in the current batch, then this new instance replaces the latter,
1031
// and the set of senders is updated to reflect which node sent us this
1032
// message.
1033
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
86✔
1034
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
86✔
1035

86✔
1036
        // Depending on the message type (channel announcement, channel update,
86✔
1037
        // or node announcement), the message is added to the corresponding map
86✔
1038
        // in deDupedAnnouncements. Because each identifying key can have at
86✔
1039
        // most one value, the announcements are de-duplicated, with newer ones
86✔
1040
        // replacing older ones.
86✔
1041
        switch msg := message.msg.(type) {
86✔
1042

1043
        // Channel announcements are identified by the short channel id field.
1044
        case *lnwire.ChannelAnnouncement1:
22✔
1045
                deDupKey := msg.ShortChannelID
22✔
1046
                sender := route.NewVertex(message.source)
22✔
1047

22✔
1048
                mws, ok := d.channelAnnouncements[deDupKey]
22✔
1049
                if !ok {
43✔
1050
                        mws = msgWithSenders{
21✔
1051
                                msg:     msg,
21✔
1052
                                isLocal: !message.isRemote,
21✔
1053
                                senders: make(map[route.Vertex]struct{}),
21✔
1054
                        }
21✔
1055
                        mws.senders[sender] = struct{}{}
21✔
1056

21✔
1057
                        d.channelAnnouncements[deDupKey] = mws
21✔
1058

21✔
1059
                        return
21✔
1060
                }
21✔
1061

1062
                mws.msg = msg
1✔
1063
                mws.senders[sender] = struct{}{}
1✔
1064
                d.channelAnnouncements[deDupKey] = mws
1✔
1065

1066
        // Channel updates are identified by the (short channel id,
1067
        // channelflags) tuple.
1068
        case *lnwire.ChannelUpdate1:
42✔
1069
                sender := route.NewVertex(message.source)
42✔
1070
                deDupKey := channelUpdateID{
42✔
1071
                        msg.ShortChannelID,
42✔
1072
                        msg.ChannelFlags,
42✔
1073
                }
42✔
1074

42✔
1075
                oldTimestamp := uint32(0)
42✔
1076
                mws, ok := d.channelUpdates[deDupKey]
42✔
1077
                if ok {
45✔
1078
                        // If we already have seen this message, record its
3✔
1079
                        // timestamp.
3✔
1080
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
3✔
1081
                        if !ok {
3✔
1082
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1083
                                        "got: %T", mws.msg)
×
1084

×
1085
                                return
×
1086
                        }
×
1087

1088
                        oldTimestamp = update.Timestamp
3✔
1089
                }
1090

1091
                // If we already had this message with a strictly newer
1092
                // timestamp, then we'll just discard the message we got.
1093
                if oldTimestamp > msg.Timestamp {
43✔
1094
                        log.Debugf("Ignored outdated network message: "+
1✔
1095
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1✔
1096
                        return
1✔
1097
                }
1✔
1098

1099
                // If the message we just got is newer than what we previously
1100
                // have seen, or this is the first time we see it, then we'll
1101
                // add it to our map of announcements.
1102
                if oldTimestamp < msg.Timestamp {
81✔
1103
                        mws = msgWithSenders{
40✔
1104
                                msg:     msg,
40✔
1105
                                isLocal: !message.isRemote,
40✔
1106
                                senders: make(map[route.Vertex]struct{}),
40✔
1107
                        }
40✔
1108

40✔
1109
                        // We'll mark the sender of the message in the
40✔
1110
                        // senders map.
40✔
1111
                        mws.senders[sender] = struct{}{}
40✔
1112

40✔
1113
                        d.channelUpdates[deDupKey] = mws
40✔
1114

40✔
1115
                        return
40✔
1116
                }
40✔
1117

1118
                // Lastly, if we had seen this exact message from before, with
1119
                // the same timestamp, we'll add the sender to the map of
1120
                // senders, such that we can skip sending this message back in
1121
                // the next batch.
1122
                mws.msg = msg
1✔
1123
                mws.senders[sender] = struct{}{}
1✔
1124
                d.channelUpdates[deDupKey] = mws
1✔
1125

1126
        // Node announcements are identified by the Vertex field.  Use the
1127
        // NodeID to create the corresponding Vertex.
1128
        case *lnwire.NodeAnnouncement:
22✔
1129
                sender := route.NewVertex(message.source)
22✔
1130
                deDupKey := route.Vertex(msg.NodeID)
22✔
1131

22✔
1132
                // We do the same for node announcements as we did for channel
22✔
1133
                // updates, as they also carry a timestamp.
22✔
1134
                oldTimestamp := uint32(0)
22✔
1135
                mws, ok := d.nodeAnnouncements[deDupKey]
22✔
1136
                if ok {
27✔
1137
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
5✔
1138
                }
5✔
1139

1140
                // Discard the message if it's old.
1141
                if oldTimestamp > msg.Timestamp {
22✔
UNCOV
1142
                        return
×
UNCOV
1143
                }
×
1144

1145
                // Replace if it's newer.
1146
                if oldTimestamp < msg.Timestamp {
40✔
1147
                        mws = msgWithSenders{
18✔
1148
                                msg:     msg,
18✔
1149
                                isLocal: !message.isRemote,
18✔
1150
                                senders: make(map[route.Vertex]struct{}),
18✔
1151
                        }
18✔
1152

18✔
1153
                        mws.senders[sender] = struct{}{}
18✔
1154

18✔
1155
                        d.nodeAnnouncements[deDupKey] = mws
18✔
1156

18✔
1157
                        return
18✔
1158
                }
18✔
1159

1160
                // Add to senders map if it's the same as we had.
1161
                mws.msg = msg
4✔
1162
                mws.senders[sender] = struct{}{}
4✔
1163
                d.nodeAnnouncements[deDupKey] = mws
4✔
1164
        }
1165
}
1166

1167
// AddMsgs is a helper method to add multiple messages to the announcement
1168
// batch.
1169
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
54✔
1170
        d.Lock()
54✔
1171
        defer d.Unlock()
54✔
1172

54✔
1173
        for _, msg := range msgs {
140✔
1174
                d.addMsg(msg)
86✔
1175
        }
86✔
1176
}
1177

1178
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1179
// to broadcast next into messages that are locally sourced and those that are
1180
// sourced remotely.
1181
type msgsToBroadcast struct {
1182
        // localMsgs is the set of messages we created locally.
1183
        localMsgs []msgWithSenders
1184

1185
        // remoteMsgs is the set of messages that we received from a remote
1186
        // party.
1187
        remoteMsgs []msgWithSenders
1188
}
1189

1190
// addMsg adds a new message to the appropriate sub-slice.
1191
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
71✔
1192
        if msg.isLocal {
118✔
1193
                m.localMsgs = append(m.localMsgs, msg)
47✔
1194
        } else {
71✔
1195
                m.remoteMsgs = append(m.remoteMsgs, msg)
24✔
1196
        }
24✔
1197
}
1198

1199
// isEmpty returns true if the batch is empty.
1200
func (m *msgsToBroadcast) isEmpty() bool {
286✔
1201
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
286✔
1202
}
286✔
1203

1204
// length returns the length of the combined message set.
1205
func (m *msgsToBroadcast) length() int {
1✔
1206
        return len(m.localMsgs) + len(m.remoteMsgs)
1✔
1207
}
1✔
1208

1209
// Emit returns the set of de-duplicated announcements to be sent out during
1210
// the next announcement epoch, in the order of channel announcements, channel
1211
// updates, and node announcements. Each message emitted, contains the set of
1212
// peers that sent us the message. This way, we can ensure that we don't waste
1213
// bandwidth by re-sending a message to the peer that sent it to us in the
1214
// first place. Additionally, the set of stored messages are reset.
1215
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
287✔
1216
        d.Lock()
287✔
1217
        defer d.Unlock()
287✔
1218

287✔
1219
        // Get the total number of announcements.
287✔
1220
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
287✔
1221
                len(d.nodeAnnouncements)
287✔
1222

287✔
1223
        // Create an empty array of lnwire.Messages with a length equal to
287✔
1224
        // the total number of announcements.
287✔
1225
        msgs := msgsToBroadcast{
287✔
1226
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
287✔
1227
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
287✔
1228
        }
287✔
1229

287✔
1230
        // Add the channel announcements to the array first.
287✔
1231
        for _, message := range d.channelAnnouncements {
305✔
1232
                msgs.addMsg(message)
18✔
1233
        }
18✔
1234

1235
        // Then add the channel updates.
1236
        for _, message := range d.channelUpdates {
323✔
1237
                msgs.addMsg(message)
36✔
1238
        }
36✔
1239

1240
        // Finally add the node announcements.
1241
        for _, message := range d.nodeAnnouncements {
304✔
1242
                msgs.addMsg(message)
17✔
1243
        }
17✔
1244

1245
        d.reset()
287✔
1246

287✔
1247
        // Return the array of lnwire.messages.
287✔
1248
        return msgs
287✔
1249
}
1250

1251
// calculateSubBatchSize is a helper function that calculates the size to break
1252
// down the batchSize into.
1253
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1254
        minimumBatchSize, batchSize int) int {
13✔
1255
        if subBatchDelay > totalDelay {
15✔
1256
                return batchSize
2✔
1257
        }
2✔
1258

1259
        subBatchSize := (batchSize*int(subBatchDelay) +
11✔
1260
                int(totalDelay) - 1) / int(totalDelay)
11✔
1261

11✔
1262
        if subBatchSize < minimumBatchSize {
12✔
1263
                return minimumBatchSize
1✔
1264
        }
1✔
1265

1266
        return subBatchSize
10✔
1267
}
1268

1269
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1270
// this variable so the function can be mocked in our test.
1271
var batchSizeCalculator = calculateSubBatchSize
1272

1273
// splitAnnouncementBatches takes an exiting list of announcements and
1274
// decomposes it into sub batches controlled by the `subBatchSize`.
1275
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1276
        announcementBatch []msgWithSenders) [][]msgWithSenders {
69✔
1277

69✔
1278
        subBatchSize := batchSizeCalculator(
69✔
1279
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
69✔
1280
                d.cfg.MinimumBatchSize, len(announcementBatch),
69✔
1281
        )
69✔
1282

69✔
1283
        var splitAnnouncementBatch [][]msgWithSenders
69✔
1284

69✔
1285
        for subBatchSize < len(announcementBatch) {
190✔
1286
                // For slicing with minimal allocation
121✔
1287
                // https://github.com/golang/go/wiki/SliceTricks
121✔
1288
                announcementBatch, splitAnnouncementBatch =
121✔
1289
                        announcementBatch[subBatchSize:],
121✔
1290
                        append(splitAnnouncementBatch,
121✔
1291
                                announcementBatch[0:subBatchSize:subBatchSize])
121✔
1292
        }
121✔
1293
        splitAnnouncementBatch = append(
69✔
1294
                splitAnnouncementBatch, announcementBatch,
69✔
1295
        )
69✔
1296

69✔
1297
        return splitAnnouncementBatch
69✔
1298
}
1299

1300
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1301
// split size, and then sends out all items to the set of target peers. Locally
1302
// generated announcements are always sent before remotely generated
1303
// announcements.
1304
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
1305
        annBatch msgsToBroadcast) {
31✔
1306

31✔
1307
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
31✔
1308
        // duration to delay the sending of next announcement batch.
31✔
1309
        delayNextBatch := func() {
93✔
1310
                select {
62✔
1311
                case <-time.After(d.cfg.SubBatchDelay):
45✔
1312
                case <-d.quit:
17✔
1313
                        return
17✔
1314
                }
1315
        }
1316

1317
        // Fetch the local and remote announcements.
1318
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
31✔
1319
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
31✔
1320

31✔
1321
        d.wg.Add(1)
31✔
1322
        go func() {
62✔
1323
                defer d.wg.Done()
31✔
1324

31✔
1325
                log.Debugf("Broadcasting %v new local announcements in %d "+
31✔
1326
                        "sub batches", len(annBatch.localMsgs),
31✔
1327
                        len(localBatches))
31✔
1328

31✔
1329
                // Send out the local announcements first.
31✔
1330
                for _, annBatch := range localBatches {
62✔
1331
                        d.sendLocalBatch(annBatch)
31✔
1332
                        delayNextBatch()
31✔
1333
                }
31✔
1334

1335
                log.Debugf("Broadcasting %v new remote announcements in %d "+
31✔
1336
                        "sub batches", len(annBatch.remoteMsgs),
31✔
1337
                        len(remoteBatches))
31✔
1338

31✔
1339
                // Now send the remote announcements.
31✔
1340
                for _, annBatch := range remoteBatches {
62✔
1341
                        d.sendRemoteBatch(annBatch)
31✔
1342
                        delayNextBatch()
31✔
1343
                }
31✔
1344
        }()
1345
}
1346

1347
// sendLocalBatch broadcasts a list of locally generated announcements to our
1348
// peers. For local announcements, we skip the filter and dedup logic and just
1349
// send the announcements out to all our coonnected peers.
1350
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
31✔
1351
        msgsToSend := lnutils.Map(
31✔
1352
                annBatch, func(m msgWithSenders) lnwire.Message {
74✔
1353
                        return m.msg
43✔
1354
                },
43✔
1355
        )
1356

1357
        err := d.cfg.Broadcast(nil, msgsToSend...)
31✔
1358
        if err != nil {
31✔
1359
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1360
        }
×
1361
}
1362

1363
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1364
// peers.
1365
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
31✔
1366
        syncerPeers := d.syncMgr.GossipSyncers()
31✔
1367

31✔
1368
        // We'll first attempt to filter out this new message for all peers
31✔
1369
        // that have active gossip syncers active.
31✔
1370
        for pub, syncer := range syncerPeers {
31✔
UNCOV
1371
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
×
UNCOV
1372
                syncer.FilterGossipMsgs(annBatch...)
×
UNCOV
1373
        }
×
1374

1375
        for _, msgChunk := range annBatch {
55✔
1376
                msgChunk := msgChunk
24✔
1377

24✔
1378
                // With the syncers taken care of, we'll merge the sender map
24✔
1379
                // with the set of syncers, so we don't send out duplicate
24✔
1380
                // messages.
24✔
1381
                msgChunk.mergeSyncerMap(syncerPeers)
24✔
1382

24✔
1383
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
24✔
1384
                if err != nil {
24✔
1385
                        log.Errorf("Unable to send batch "+
×
1386
                                "announcements: %v", err)
×
1387
                        continue
×
1388
                }
1389
        }
1390
}
1391

1392
// networkHandler is the primary goroutine that drives this service. The roles
1393
// of this goroutine includes answering queries related to the state of the
1394
// network, syncing up newly connected peers, and also periodically
1395
// broadcasting our latest topology state to all connected peers.
1396
//
1397
// NOTE: This MUST be run as a goroutine.
1398
func (d *AuthenticatedGossiper) networkHandler() {
27✔
1399
        defer d.wg.Done()
27✔
1400

27✔
1401
        // Initialize empty deDupedAnnouncements to store announcement batch.
27✔
1402
        announcements := deDupedAnnouncements{}
27✔
1403
        announcements.Reset()
27✔
1404

27✔
1405
        d.cfg.RetransmitTicker.Resume()
27✔
1406
        defer d.cfg.RetransmitTicker.Stop()
27✔
1407

27✔
1408
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
27✔
1409
        defer trickleTimer.Stop()
27✔
1410

27✔
1411
        // To start, we'll first check to see if there are any stale channel or
27✔
1412
        // node announcements that we need to re-transmit.
27✔
1413
        if err := d.retransmitStaleAnns(time.Now()); err != nil {
27✔
1414
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1415
        }
×
1416

1417
        for {
673✔
1418
                select {
646✔
1419
                // A new policy update has arrived. We'll commit it to the
1420
                // sub-systems below us, then craft, sign, and broadcast a new
1421
                // ChannelUpdate for the set of affected clients.
1422
                case policyUpdate := <-d.chanPolicyUpdates:
1✔
1423
                        log.Tracef("Received channel %d policy update requests",
1✔
1424
                                len(policyUpdate.edgesToUpdate))
1✔
1425

1✔
1426
                        // First, we'll now create new fully signed updates for
1✔
1427
                        // the affected channels and also update the underlying
1✔
1428
                        // graph with the new state.
1✔
1429
                        newChanUpdates, err := d.processChanPolicyUpdate(
1✔
1430
                                policyUpdate.edgesToUpdate,
1✔
1431
                        )
1✔
1432
                        policyUpdate.errChan <- err
1✔
1433
                        if err != nil {
1✔
1434
                                log.Errorf("Unable to craft policy updates: %v",
×
1435
                                        err)
×
1436
                                continue
×
1437
                        }
1438

1439
                        // Finally, with the updates committed, we'll now add
1440
                        // them to the announcement batch to be flushed at the
1441
                        // start of the next epoch.
1442
                        announcements.AddMsgs(newChanUpdates...)
1✔
1443

1444
                case announcement := <-d.networkMsgs:
331✔
1445
                        log.Tracef("Received network message: "+
331✔
1446
                                "peer=%v, msg=%s, is_remote=%v",
331✔
1447
                                announcement.peer, announcement.msg.MsgType(),
331✔
1448
                                announcement.isRemote)
331✔
1449

331✔
1450
                        switch announcement.msg.(type) {
331✔
1451
                        // Channel announcement signatures are amongst the only
1452
                        // messages that we'll process serially.
1453
                        case *lnwire.AnnounceSignatures1:
21✔
1454
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
21✔
1455
                                        announcement,
21✔
1456
                                )
21✔
1457
                                log.Debugf("Processed network message %s, "+
21✔
1458
                                        "returned len(announcements)=%v",
21✔
1459
                                        announcement.msg.MsgType(),
21✔
1460
                                        len(emittedAnnouncements))
21✔
1461

21✔
1462
                                if emittedAnnouncements != nil {
31✔
1463
                                        announcements.AddMsgs(
10✔
1464
                                                emittedAnnouncements...,
10✔
1465
                                        )
10✔
1466
                                }
10✔
1467
                                continue
21✔
1468
                        }
1469

1470
                        // If this message was recently rejected, then we won't
1471
                        // attempt to re-process it.
1472
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
310✔
1473
                                announcement.msg,
310✔
1474
                                sourceToPub(announcement.source),
310✔
1475
                        ) {
311✔
1476

1✔
1477
                                announcement.err <- fmt.Errorf("recently " +
1✔
1478
                                        "rejected")
1✔
1479
                                continue
1✔
1480
                        }
1481

1482
                        // We'll set up any dependent, and wait until a free
1483
                        // slot for this job opens up, this allow us to not
1484
                        // have thousands of goroutines active.
1485
                        annJobID, err := d.vb.InitJobDependencies(
309✔
1486
                                announcement.msg,
309✔
1487
                        )
309✔
1488
                        if err != nil {
309✔
1489
                                announcement.err <- err
×
1490
                                continue
×
1491
                        }
1492

1493
                        d.wg.Add(1)
309✔
1494
                        go d.handleNetworkMessages(
309✔
1495
                                announcement, &announcements, annJobID,
309✔
1496
                        )
309✔
1497

1498
                // The trickle timer has ticked, which indicates we should
1499
                // flush to the network the pending batch of new announcements
1500
                // we've received since the last trickle tick.
1501
                case <-trickleTimer.C:
286✔
1502
                        // Emit the current batch of announcements from
286✔
1503
                        // deDupedAnnouncements.
286✔
1504
                        announcementBatch := announcements.Emit()
286✔
1505

286✔
1506
                        // If the current announcements batch is nil, then we
286✔
1507
                        // have no further work here.
286✔
1508
                        if announcementBatch.isEmpty() {
541✔
1509
                                continue
255✔
1510
                        }
1511

1512
                        // At this point, we have the set of local and remote
1513
                        // announcements we want to send out. We'll do the
1514
                        // batching as normal for both, but for local
1515
                        // announcements, we'll blast them out w/o regard for
1516
                        // our peer's policies so we ensure they propagate
1517
                        // properly.
1518
                        d.splitAndSendAnnBatch(announcementBatch)
31✔
1519

1520
                // The retransmission timer has ticked which indicates that we
1521
                // should check if we need to prune or re-broadcast any of our
1522
                // personal channels or node announcement. This addresses the
1523
                // case of "zombie" channels and channel advertisements that
1524
                // have been dropped, or not properly propagated through the
1525
                // network.
1526
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1✔
1527
                        if err := d.retransmitStaleAnns(tick); err != nil {
1✔
1528
                                log.Errorf("unable to rebroadcast stale "+
×
1529
                                        "announcements: %v", err)
×
1530
                        }
×
1531

1532
                // The gossiper has been signalled to exit, to we exit our
1533
                // main loop so the wait group can be decremented.
1534
                case <-d.quit:
27✔
1535
                        return
27✔
1536
                }
1537
        }
1538
}
1539

1540
// handleNetworkMessages is responsible for waiting for dependencies for a
1541
// given network message and processing the message. Once processed, it will
1542
// signal its dependants and add the new announcements to the announce batch.
1543
//
1544
// NOTE: must be run as a goroutine.
1545
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1546
        deDuped *deDupedAnnouncements, jobID JobID) {
309✔
1547

309✔
1548
        defer d.wg.Done()
309✔
1549
        defer d.vb.CompleteJob()
309✔
1550

309✔
1551
        // We should only broadcast this message forward if it originated from
309✔
1552
        // us or it wasn't received as part of our initial historical sync.
309✔
1553
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
309✔
1554

309✔
1555
        // If this message has an existing dependency, then we'll wait until
309✔
1556
        // that has been fully validated before we proceed.
309✔
1557
        err := d.vb.WaitForParents(jobID, nMsg.msg)
309✔
1558
        if err != nil {
309✔
1559
                log.Debugf("Validating network message %s got err: %v",
×
1560
                        nMsg.msg.MsgType(), err)
×
1561

×
1562
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
1563
                        log.Warnf("unexpected error during validation "+
×
1564
                                "barrier shutdown: %v", err)
×
1565
                }
×
1566
                nMsg.err <- err
×
1567

×
1568
                return
×
1569
        }
1570

1571
        // Process the network announcement to determine if this is either a
1572
        // new announcement from our PoV or an edges to a prior vertex/edge we
1573
        // previously proceeded.
1574
        newAnns, allow := d.processNetworkAnnouncement(nMsg)
309✔
1575

309✔
1576
        log.Tracef("Processed network message %s, returned "+
309✔
1577
                "len(announcements)=%v, allowDependents=%v",
309✔
1578
                nMsg.msg.MsgType(), len(newAnns), allow)
309✔
1579

309✔
1580
        // If this message had any dependencies, then we can now signal them to
309✔
1581
        // continue.
309✔
1582
        err = d.vb.SignalDependents(nMsg.msg, jobID)
309✔
1583
        if err != nil {
309✔
1584
                // Something is wrong if SignalDependents returns an error.
×
1585
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
1586
                        "JobID=%v", spew.Sdump(nMsg.msg), jobID)
×
1587

×
1588
                nMsg.err <- err
×
1589

×
1590
                return
×
1591
        }
×
1592

1593
        // If the announcement was accepted, then add the emitted announcements
1594
        // to our announce batch to be broadcast once the trickle timer ticks
1595
        // gain.
1596
        if newAnns != nil && shouldBroadcast {
341✔
1597
                // TODO(roasbeef): exclude peer that sent.
32✔
1598
                deDuped.AddMsgs(newAnns...)
32✔
1599
        } else if newAnns != nil {
310✔
1600
                log.Trace("Skipping broadcast of announcements received " +
1✔
1601
                        "during initial graph sync")
1✔
1602
        }
1✔
1603
}
1604

1605
// TODO(roasbeef): d/c peers that send updates not on our chain
1606

1607
// InitSyncState is called by outside sub-systems when a connection is
1608
// established to a new peer that understands how to perform channel range
1609
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1610
// needed to handle new queries.
UNCOV
1611
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
×
UNCOV
1612
        d.syncMgr.InitSyncState(syncPeer)
×
UNCOV
1613
}
×
1614

1615
// PruneSyncState is called by outside sub-systems once a peer that we were
1616
// previously connected to has been disconnected. In this case we can stop the
1617
// existing GossipSyncer assigned to the peer and free up resources.
UNCOV
1618
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
×
UNCOV
1619
        d.syncMgr.PruneSyncState(peer)
×
UNCOV
1620
}
×
1621

1622
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1623
// false otherwise, This avoids expensive reprocessing of the message.
1624
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1625
        peerPub [33]byte) bool {
273✔
1626

273✔
1627
        var scid uint64
273✔
1628
        switch m := msg.(type) {
273✔
1629
        case *lnwire.ChannelUpdate1:
42✔
1630
                scid = m.ShortChannelID.ToUint64()
42✔
1631

1632
        case *lnwire.ChannelAnnouncement1:
217✔
1633
                scid = m.ShortChannelID.ToUint64()
217✔
1634

1635
        default:
14✔
1636
                return false
14✔
1637
        }
1638

1639
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
259✔
1640
        return err != cache.ErrElementNotFound
259✔
1641
}
1642

1643
// retransmitStaleAnns examines all outgoing channels that the source node is
1644
// known to maintain to check to see if any of them are "stale". A channel is
1645
// stale iff, the last timestamp of its rebroadcast is older than the
1646
// RebroadcastInterval. We also check if a refreshed node announcement should
1647
// be resent.
1648
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
28✔
1649
        // Iterate over all of our channels and check if any of them fall
28✔
1650
        // within the prune interval or re-broadcast interval.
28✔
1651
        type updateTuple struct {
28✔
1652
                info *models.ChannelEdgeInfo
28✔
1653
                edge *models.ChannelEdgePolicy
28✔
1654
        }
28✔
1655

28✔
1656
        var (
28✔
1657
                havePublicChannels bool
28✔
1658
                edgesToUpdate      []updateTuple
28✔
1659
        )
28✔
1660
        err := d.cfg.Graph.ForAllOutgoingChannels(func(
28✔
1661
                info *models.ChannelEdgeInfo,
28✔
1662
                edge *models.ChannelEdgePolicy) error {
30✔
1663

2✔
1664
                // If there's no auth proof attached to this edge, it means
2✔
1665
                // that it is a private channel not meant to be announced to
2✔
1666
                // the greater network, so avoid sending channel updates for
2✔
1667
                // this channel to not leak its
2✔
1668
                // existence.
2✔
1669
                if info.AuthProof == nil {
3✔
1670
                        log.Debugf("Skipping retransmission of channel "+
1✔
1671
                                "without AuthProof: %v", info.ChannelID)
1✔
1672
                        return nil
1✔
1673
                }
1✔
1674

1675
                // We make a note that we have at least one public channel. We
1676
                // use this to determine whether we should send a node
1677
                // announcement below.
1678
                havePublicChannels = true
1✔
1679

1✔
1680
                // If this edge has a ChannelUpdate that was created before the
1✔
1681
                // introduction of the MaxHTLC field, then we'll update this
1✔
1682
                // edge to propagate this information in the network.
1✔
1683
                if !edge.MessageFlags.HasMaxHtlc() {
1✔
1684
                        // We'll make sure we support the new max_htlc field if
×
1685
                        // not already present.
×
1686
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1687
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1688

×
1689
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1690
                                info: info,
×
1691
                                edge: edge,
×
1692
                        })
×
1693
                        return nil
×
1694
                }
×
1695

1696
                timeElapsed := now.Sub(edge.LastUpdate)
1✔
1697

1✔
1698
                // If it's been longer than RebroadcastInterval since we've
1✔
1699
                // re-broadcasted the channel, add the channel to the set of
1✔
1700
                // edges we need to update.
1✔
1701
                if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
1702
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
1✔
1703
                                info: info,
1✔
1704
                                edge: edge,
1✔
1705
                        })
1✔
1706
                }
1✔
1707

1708
                return nil
1✔
1709
        })
1710
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
28✔
1711
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1712
                        err)
×
1713
        }
×
1714

1715
        var signedUpdates []lnwire.Message
28✔
1716
        for _, chanToUpdate := range edgesToUpdate {
29✔
1717
                // Re-sign and update the channel on disk and retrieve our
1✔
1718
                // ChannelUpdate to broadcast.
1✔
1719
                chanAnn, chanUpdate, err := d.updateChannel(
1✔
1720
                        chanToUpdate.info, chanToUpdate.edge,
1✔
1721
                )
1✔
1722
                if err != nil {
1✔
1723
                        return fmt.Errorf("unable to update channel: %w", err)
×
1724
                }
×
1725

1726
                // If we have a valid announcement to transmit, then we'll send
1727
                // that along with the update.
1728
                if chanAnn != nil {
2✔
1729
                        signedUpdates = append(signedUpdates, chanAnn)
1✔
1730
                }
1✔
1731

1732
                signedUpdates = append(signedUpdates, chanUpdate)
1✔
1733
        }
1734

1735
        // If we don't have any public channels, we return as we don't want to
1736
        // broadcast anything that would reveal our existence.
1737
        if !havePublicChannels {
55✔
1738
                return nil
27✔
1739
        }
27✔
1740

1741
        // We'll also check that our NodeAnnouncement is not too old.
1742
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
1✔
1743
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
1✔
1744
        timeElapsed := now.Sub(timestamp)
1✔
1745

1✔
1746
        // If it's been a full day since we've re-broadcasted the
1✔
1747
        // node announcement, refresh it and resend it.
1✔
1748
        nodeAnnStr := ""
1✔
1749
        if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
1750
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
1✔
1751
                if err != nil {
1✔
1752
                        return fmt.Errorf("unable to get refreshed node "+
×
1753
                                "announcement: %v", err)
×
1754
                }
×
1755

1756
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1757
                nodeAnnStr = " and our refreshed node announcement"
1✔
1758

1✔
1759
                // Before broadcasting the refreshed node announcement, add it
1✔
1760
                // to our own graph.
1✔
1761
                if err := d.addNode(&newNodeAnn); err != nil {
2✔
1762
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1763
                                "to graph: %v", err)
1✔
1764
                }
1✔
1765
        }
1766

1767
        // If we don't have any updates to re-broadcast, then we'll exit
1768
        // early.
1769
        if len(signedUpdates) == 0 {
1✔
UNCOV
1770
                return nil
×
UNCOV
1771
        }
×
1772

1773
        log.Infof("Retransmitting %v outgoing channels%v",
1✔
1774
                len(edgesToUpdate), nodeAnnStr)
1✔
1775

1✔
1776
        // With all the wire announcements properly crafted, we'll broadcast
1✔
1777
        // our known outgoing channels to all our immediate peers.
1✔
1778
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
1779
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1780
        }
×
1781

1782
        return nil
1✔
1783
}
1784

1785
// processChanPolicyUpdate generates a new set of channel updates for the
1786
// provided list of edges and updates the backing ChannelGraphSource.
1787
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
1788
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
1✔
1789

1✔
1790
        var chanUpdates []networkMsg
1✔
1791
        for _, edgeInfo := range edgesToUpdate {
4✔
1792
                // Now that we've collected all the channels we need to update,
3✔
1793
                // we'll re-sign and update the backing ChannelGraphSource, and
3✔
1794
                // retrieve our ChannelUpdate to broadcast.
3✔
1795
                _, chanUpdate, err := d.updateChannel(
3✔
1796
                        edgeInfo.Info, edgeInfo.Edge,
3✔
1797
                )
3✔
1798
                if err != nil {
3✔
1799
                        return nil, err
×
1800
                }
×
1801

1802
                // We'll avoid broadcasting any updates for private channels to
1803
                // avoid directly giving away their existence. Instead, we'll
1804
                // send the update directly to the remote party.
1805
                if edgeInfo.Info.AuthProof == nil {
4✔
1806
                        // If AuthProof is nil and an alias was found for this
1✔
1807
                        // ChannelID (meaning the option-scid-alias feature was
1✔
1808
                        // negotiated), we'll replace the ShortChannelID in the
1✔
1809
                        // update with the peer's alias. We do this after
1✔
1810
                        // updateChannel so that the alias isn't persisted to
1✔
1811
                        // the database.
1✔
1812
                        chanID := lnwire.NewChanIDFromOutPoint(
1✔
1813
                                edgeInfo.Info.ChannelPoint,
1✔
1814
                        )
1✔
1815

1✔
1816
                        var defaultAlias lnwire.ShortChannelID
1✔
1817
                        foundAlias, _ := d.cfg.GetAlias(chanID)
1✔
1818
                        if foundAlias != defaultAlias {
1✔
UNCOV
1819
                                chanUpdate.ShortChannelID = foundAlias
×
UNCOV
1820

×
UNCOV
1821
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
×
UNCOV
1822
                                if err != nil {
×
1823
                                        log.Errorf("Unable to sign alias "+
×
1824
                                                "update: %v", err)
×
1825
                                        continue
×
1826
                                }
1827

UNCOV
1828
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
UNCOV
1829
                                if err != nil {
×
1830
                                        log.Errorf("Unable to create sig: %v",
×
1831
                                                err)
×
1832
                                        continue
×
1833
                                }
1834

UNCOV
1835
                                chanUpdate.Signature = lnSig
×
1836
                        }
1837

1838
                        remotePubKey := remotePubFromChanInfo(
1✔
1839
                                edgeInfo.Info, chanUpdate.ChannelFlags,
1✔
1840
                        )
1✔
1841
                        err := d.reliableSender.sendMessage(
1✔
1842
                                chanUpdate, remotePubKey,
1✔
1843
                        )
1✔
1844
                        if err != nil {
1✔
1845
                                log.Errorf("Unable to reliably send %v for "+
×
1846
                                        "channel=%v to peer=%x: %v",
×
1847
                                        chanUpdate.MsgType(),
×
1848
                                        chanUpdate.ShortChannelID,
×
1849
                                        remotePubKey, err)
×
1850
                        }
×
1851
                        continue
1✔
1852
                }
1853

1854
                // We set ourselves as the source of this message to indicate
1855
                // that we shouldn't skip any peers when sending this message.
1856
                chanUpdates = append(chanUpdates, networkMsg{
2✔
1857
                        source:   d.selfKey,
2✔
1858
                        isRemote: false,
2✔
1859
                        msg:      chanUpdate,
2✔
1860
                })
2✔
1861
        }
1862

1863
        return chanUpdates, nil
1✔
1864
}
1865

1866
// remotePubFromChanInfo returns the public key of the remote peer given a
1867
// ChannelEdgeInfo that describe a channel we have with them.
1868
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1869
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
12✔
1870

12✔
1871
        var remotePubKey [33]byte
12✔
1872
        switch {
12✔
1873
        case chanFlags&lnwire.ChanUpdateDirection == 0:
12✔
1874
                remotePubKey = chanInfo.NodeKey2Bytes
12✔
UNCOV
1875
        case chanFlags&lnwire.ChanUpdateDirection == 1:
×
UNCOV
1876
                remotePubKey = chanInfo.NodeKey1Bytes
×
1877
        }
1878

1879
        return remotePubKey
12✔
1880
}
1881

1882
// processRejectedEdge examines a rejected edge to see if we can extract any
1883
// new announcements from it.  An edge will get rejected if we already added
1884
// the same edge without AuthProof to the graph. If the received announcement
1885
// contains a proof, we can add this proof to our edge.  We can end up in this
1886
// situation in the case where we create a channel, but for some reason fail
1887
// to receive the remote peer's proof, while the remote peer is able to fully
1888
// assemble the proof and craft the ChannelAnnouncement.
1889
func (d *AuthenticatedGossiper) processRejectedEdge(
1890
        chanAnnMsg *lnwire.ChannelAnnouncement1,
UNCOV
1891
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
×
UNCOV
1892

×
UNCOV
1893
        // First, we'll fetch the state of the channel as we know if from the
×
UNCOV
1894
        // database.
×
UNCOV
1895
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
×
UNCOV
1896
                chanAnnMsg.ShortChannelID,
×
UNCOV
1897
        )
×
UNCOV
1898
        if err != nil {
×
1899
                return nil, err
×
1900
        }
×
1901

1902
        // The edge is in the graph, and has a proof attached, then we'll just
1903
        // reject it as normal.
UNCOV
1904
        if chanInfo.AuthProof != nil {
×
UNCOV
1905
                return nil, nil
×
UNCOV
1906
        }
×
1907

1908
        // Otherwise, this means that the edge is within the graph, but it
1909
        // doesn't yet have a proper proof attached. If we did not receive
1910
        // the proof such that we now can add it, there's nothing more we
1911
        // can do.
1912
        if proof == nil {
×
1913
                return nil, nil
×
1914
        }
×
1915

1916
        // We'll then create then validate the new fully assembled
1917
        // announcement.
1918
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1919
                proof, chanInfo, e1, e2,
×
1920
        )
×
1921
        if err != nil {
×
1922
                return nil, err
×
1923
        }
×
1924
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1925
        if err != nil {
×
1926
                err := fmt.Errorf("assembled channel announcement proof "+
×
1927
                        "for shortChanID=%v isn't valid: %v",
×
1928
                        chanAnnMsg.ShortChannelID, err)
×
1929
                log.Error(err)
×
1930
                return nil, err
×
1931
        }
×
1932

1933
        // If everything checks out, then we'll add the fully assembled proof
1934
        // to the database.
1935
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
1936
        if err != nil {
×
1937
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
1938
                        chanAnnMsg.ShortChannelID, err)
×
1939
                log.Error(err)
×
1940
                return nil, err
×
1941
        }
×
1942

1943
        // As we now have a complete channel announcement for this channel,
1944
        // we'll construct the announcement so they can be broadcast out to all
1945
        // our peers.
1946
        announcements := make([]networkMsg, 0, 3)
×
1947
        announcements = append(announcements, networkMsg{
×
1948
                source: d.selfKey,
×
1949
                msg:    chanAnn,
×
1950
        })
×
1951
        if e1Ann != nil {
×
1952
                announcements = append(announcements, networkMsg{
×
1953
                        source: d.selfKey,
×
1954
                        msg:    e1Ann,
×
1955
                })
×
1956
        }
×
1957
        if e2Ann != nil {
×
1958
                announcements = append(announcements, networkMsg{
×
1959
                        source: d.selfKey,
×
1960
                        msg:    e2Ann,
×
1961
                })
×
1962

×
1963
        }
×
1964

1965
        return announcements, nil
×
1966
}
1967

1968
// fetchPKScript fetches the output script for the given SCID.
1969
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
1970
        []byte, error) {
×
1971

×
1972
        return lnwallet.FetchPKScriptWithQuit(d.cfg.ChainIO, chanID, d.quit)
×
1973
}
×
1974

1975
// addNode processes the given node announcement, and adds it to our channel
1976
// graph.
1977
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
1978
        op ...batch.SchedulerOption) error {
17✔
1979

17✔
1980
        if err := netann.ValidateNodeAnn(msg); err != nil {
18✔
1981
                return fmt.Errorf("unable to validate node announcement: %w",
1✔
1982
                        err)
1✔
1983
        }
1✔
1984

1985
        return d.cfg.Graph.AddNode(models.NodeFromWireAnnouncement(msg), op...)
16✔
1986
}
1987

1988
// isPremature decides whether a given network message has a block height+delta
1989
// value specified in the future. If so, the message will be added to the
1990
// future message map and be processed when the block height as reached.
1991
//
1992
// NOTE: must be used inside a lock.
1993
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
1994
        delta uint32, msg *networkMsg) bool {
279✔
1995

279✔
1996
        // The channel is already confirmed at chanID.BlockHeight so we minus
279✔
1997
        // one block. For instance, if the required confirmation for this
279✔
1998
        // channel announcement is 6, we then only need to wait for 5 more
279✔
1999
        // blocks once the funding tx is confirmed.
279✔
2000
        if delta > 0 {
279✔
UNCOV
2001
                delta--
×
UNCOV
2002
        }
×
2003

2004
        msgHeight := chanID.BlockHeight + delta
279✔
2005

279✔
2006
        // The message height is smaller or equal to our best known height,
279✔
2007
        // thus the message is mature.
279✔
2008
        if msgHeight <= d.bestHeight {
557✔
2009
                return false
278✔
2010
        }
278✔
2011

2012
        // Add the premature message to our future messages which will be
2013
        // resent once the block height has reached.
2014
        //
2015
        // Copy the networkMsgs since the old message's err chan will be
2016
        // consumed.
2017
        copied := &networkMsg{
1✔
2018
                peer:              msg.peer,
1✔
2019
                source:            msg.source,
1✔
2020
                msg:               msg.msg,
1✔
2021
                optionalMsgFields: msg.optionalMsgFields,
1✔
2022
                isRemote:          msg.isRemote,
1✔
2023
                err:               make(chan error, 1),
1✔
2024
        }
1✔
2025

1✔
2026
        // Create the cached message.
1✔
2027
        cachedMsg := &cachedFutureMsg{
1✔
2028
                msg:    copied,
1✔
2029
                height: msgHeight,
1✔
2030
        }
1✔
2031

1✔
2032
        // Increment the msg ID and add it to the cache.
1✔
2033
        nextMsgID := d.futureMsgs.nextMsgID()
1✔
2034
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
1✔
2035
        if err != nil {
1✔
2036
                log.Errorf("Adding future message got error: %v", err)
×
2037
        }
×
2038

2039
        log.Debugf("Network message: %v added to future messages for "+
1✔
2040
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
1✔
2041
                msgHeight, d.bestHeight)
1✔
2042

1✔
2043
        return true
1✔
2044
}
2045

2046
// processNetworkAnnouncement processes a new network relate authenticated
2047
// channel or node announcement or announcements proofs. If the announcement
2048
// didn't affect the internal state due to either being out of date, invalid,
2049
// or redundant, then nil is returned. Otherwise, the set of announcements will
2050
// be returned which should be broadcasted to the rest of the network. The
2051
// boolean returned indicates whether any dependents of the announcement should
2052
// attempt to be processed as well.
2053
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
2054
        nMsg *networkMsg) ([]networkMsg, bool) {
330✔
2055

330✔
2056
        // If this is a remote update, we set the scheduler option to lazily
330✔
2057
        // add it to the graph.
330✔
2058
        var schedulerOp []batch.SchedulerOption
330✔
2059
        if nMsg.isRemote {
613✔
2060
                schedulerOp = append(schedulerOp, batch.LazyAdd())
283✔
2061
        }
283✔
2062

2063
        switch msg := nMsg.msg.(type) {
330✔
2064
        // A new node announcement has arrived which either presents new
2065
        // information about a node in one of the channels we know about, or a
2066
        // updating previously advertised information.
2067
        case *lnwire.NodeAnnouncement:
24✔
2068
                return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
24✔
2069

2070
        // A new channel announcement has arrived, this indicates the
2071
        // *creation* of a new channel within the network. This only advertises
2072
        // the existence of a channel and not yet the routing policies in
2073
        // either direction of the channel.
2074
        case *lnwire.ChannelAnnouncement1:
230✔
2075
                return d.handleChanAnnouncement(nMsg, msg, schedulerOp)
230✔
2076

2077
        // A new authenticated channel edge update has arrived. This indicates
2078
        // that the directional information for an already known channel has
2079
        // been updated.
2080
        case *lnwire.ChannelUpdate1:
55✔
2081
                return d.handleChanUpdate(nMsg, msg, schedulerOp)
55✔
2082

2083
        // A new signature announcement has been received. This indicates
2084
        // willingness of nodes involved in the funding of a channel to
2085
        // announce this new channel to the rest of the world.
2086
        case *lnwire.AnnounceSignatures1:
21✔
2087
                return d.handleAnnSig(nMsg, msg)
21✔
2088

2089
        default:
×
2090
                err := errors.New("wrong type of the announcement")
×
2091
                nMsg.err <- err
×
2092
                return nil, false
×
2093
        }
2094
}
2095

2096
// processZombieUpdate determines whether the provided channel update should
2097
// resurrect a given zombie edge.
2098
//
2099
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2100
// should be inspected.
2101
func (d *AuthenticatedGossiper) processZombieUpdate(
2102
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2103
        msg *lnwire.ChannelUpdate1) error {
3✔
2104

3✔
2105
        // The least-significant bit in the flag on the channel update tells us
3✔
2106
        // which edge is being updated.
3✔
2107
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
2108

3✔
2109
        // Since we've deemed the update as not stale above, before marking it
3✔
2110
        // live, we'll make sure it has been signed by the correct party. If we
3✔
2111
        // have both pubkeys, either party can resurrect the channel. If we've
3✔
2112
        // already marked this with the stricter, single-sided resurrection we
3✔
2113
        // will only have the pubkey of the node with the oldest timestamp.
3✔
2114
        var pubKey *btcec.PublicKey
3✔
2115
        switch {
3✔
2116
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2117
                pubKey, _ = chanInfo.NodeKey1()
×
2118
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
2✔
2119
                pubKey, _ = chanInfo.NodeKey2()
2✔
2120
        }
2121
        if pubKey == nil {
4✔
2122
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
1✔
2123
                        "with chan_id=%v", msg.ShortChannelID)
1✔
2124
        }
1✔
2125

2126
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
2✔
2127
        if err != nil {
3✔
2128
                return fmt.Errorf("unable to verify channel "+
1✔
2129
                        "update signature: %v", err)
1✔
2130
        }
1✔
2131

2132
        // With the signature valid, we'll proceed to mark the
2133
        // edge as live and wait for the channel announcement to
2134
        // come through again.
2135
        err = d.cfg.Graph.MarkEdgeLive(scid)
1✔
2136
        switch {
1✔
2137
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2138
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2139
                        "zombie index: %v", err)
×
2140

×
2141
                return nil
×
2142

2143
        case err != nil:
×
2144
                return fmt.Errorf("unable to remove edge with "+
×
2145
                        "chan_id=%v from zombie index: %v",
×
2146
                        msg.ShortChannelID, err)
×
2147

2148
        default:
1✔
2149
        }
2150

2151
        log.Debugf("Removed edge with chan_id=%v from zombie "+
1✔
2152
                "index", msg.ShortChannelID)
1✔
2153

1✔
2154
        return nil
1✔
2155
}
2156

2157
// fetchNodeAnn fetches the latest signed node announcement from our point of
2158
// view for the node with the given public key.
2159
func (d *AuthenticatedGossiper) fetchNodeAnn(
2160
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
20✔
2161

20✔
2162
        node, err := d.cfg.Graph.FetchLightningNode(pubKey)
20✔
2163
        if err != nil {
26✔
2164
                return nil, err
6✔
2165
        }
6✔
2166

2167
        return node.NodeAnnouncement(true)
14✔
2168
}
2169

2170
// isMsgStale determines whether a message retrieved from the backing
2171
// MessageStore is seen as stale by the current graph.
2172
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
12✔
2173
        switch msg := msg.(type) {
12✔
2174
        case *lnwire.AnnounceSignatures1:
2✔
2175
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
2✔
2176
                        msg.ShortChannelID,
2✔
2177
                )
2✔
2178

2✔
2179
                // If the channel cannot be found, it is most likely a leftover
2✔
2180
                // message for a channel that was closed, so we can consider it
2✔
2181
                // stale.
2✔
2182
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
2✔
UNCOV
2183
                        return true
×
UNCOV
2184
                }
×
2185
                if err != nil {
2✔
2186
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2187
                                "%v", chanInfo.ChannelID, err)
×
2188
                        return false
×
2189
                }
×
2190

2191
                // If the proof exists in the graph, then we have successfully
2192
                // received the remote proof and assembled the full proof, so we
2193
                // can safely delete the local proof from the database.
2194
                return chanInfo.AuthProof != nil
2✔
2195

2196
        case *lnwire.ChannelUpdate1:
10✔
2197
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
10✔
2198

10✔
2199
                // If the channel cannot be found, it is most likely a leftover
10✔
2200
                // message for a channel that was closed, so we can consider it
10✔
2201
                // stale.
10✔
2202
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
10✔
UNCOV
2203
                        return true
×
UNCOV
2204
                }
×
2205
                if err != nil {
10✔
2206
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2207
                                "%v", msg.ShortChannelID, err)
×
2208
                        return false
×
2209
                }
×
2210

2211
                // Otherwise, we'll retrieve the correct policy that we
2212
                // currently have stored within our graph to check if this
2213
                // message is stale by comparing its timestamp.
2214
                var p *models.ChannelEdgePolicy
10✔
2215
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
20✔
2216
                        p = p1
10✔
2217
                } else {
10✔
UNCOV
2218
                        p = p2
×
UNCOV
2219
                }
×
2220

2221
                // If the policy is still unknown, then we can consider this
2222
                // policy fresh.
2223
                if p == nil {
10✔
2224
                        return false
×
2225
                }
×
2226

2227
                timestamp := time.Unix(int64(msg.Timestamp), 0)
10✔
2228
                return p.LastUpdate.After(timestamp)
10✔
2229

2230
        default:
×
2231
                // We'll make sure to not mark any unsupported messages as stale
×
2232
                // to ensure they are not removed.
×
2233
                return false
×
2234
        }
2235
}
2236

2237
// updateChannel creates a new fully signed update for the channel, and updates
2238
// the underlying graph with the new state.
2239
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
2240
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2241
        *lnwire.ChannelUpdate1, error) {
4✔
2242

4✔
2243
        // Parse the unsigned edge into a channel update.
4✔
2244
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
4✔
2245

4✔
2246
        // We'll generate a new signature over a digest of the channel
4✔
2247
        // announcement itself and update the timestamp to ensure it propagate.
4✔
2248
        err := netann.SignChannelUpdate(
4✔
2249
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
4✔
2250
                netann.ChanUpdSetTimestamp,
4✔
2251
        )
4✔
2252
        if err != nil {
4✔
2253
                return nil, nil, err
×
2254
        }
×
2255

2256
        // Next, we'll set the new signature in place, and update the reference
2257
        // in the backing slice.
2258
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
4✔
2259
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
4✔
2260

4✔
2261
        // To ensure that our signature is valid, we'll verify it ourself
4✔
2262
        // before committing it to the slice returned.
4✔
2263
        err = netann.ValidateChannelUpdateAnn(
4✔
2264
                d.selfKey, info.Capacity, chanUpdate,
4✔
2265
        )
4✔
2266
        if err != nil {
4✔
2267
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2268
                        "update sig: %v", err)
×
2269
        }
×
2270

2271
        // Finally, we'll write the new edge policy to disk.
2272
        if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
4✔
2273
                return nil, nil, err
×
2274
        }
×
2275

2276
        // We'll also create the original channel announcement so the two can
2277
        // be broadcast along side each other (if necessary), but only if we
2278
        // have a full channel announcement for this channel.
2279
        var chanAnn *lnwire.ChannelAnnouncement1
4✔
2280
        if info.AuthProof != nil {
7✔
2281
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
3✔
2282
                chanAnn = &lnwire.ChannelAnnouncement1{
3✔
2283
                        ShortChannelID:  chanID,
3✔
2284
                        NodeID1:         info.NodeKey1Bytes,
3✔
2285
                        NodeID2:         info.NodeKey2Bytes,
3✔
2286
                        ChainHash:       info.ChainHash,
3✔
2287
                        BitcoinKey1:     info.BitcoinKey1Bytes,
3✔
2288
                        Features:        lnwire.NewRawFeatureVector(),
3✔
2289
                        BitcoinKey2:     info.BitcoinKey2Bytes,
3✔
2290
                        ExtraOpaqueData: info.ExtraOpaqueData,
3✔
2291
                }
3✔
2292
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2293
                        info.AuthProof.NodeSig1Bytes,
3✔
2294
                )
3✔
2295
                if err != nil {
3✔
2296
                        return nil, nil, err
×
2297
                }
×
2298
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2299
                        info.AuthProof.NodeSig2Bytes,
3✔
2300
                )
3✔
2301
                if err != nil {
3✔
2302
                        return nil, nil, err
×
2303
                }
×
2304
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2305
                        info.AuthProof.BitcoinSig1Bytes,
3✔
2306
                )
3✔
2307
                if err != nil {
3✔
2308
                        return nil, nil, err
×
2309
                }
×
2310
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2311
                        info.AuthProof.BitcoinSig2Bytes,
3✔
2312
                )
3✔
2313
                if err != nil {
3✔
2314
                        return nil, nil, err
×
2315
                }
×
2316
        }
2317

2318
        return chanAnn, chanUpdate, err
4✔
2319
}
2320

2321
// SyncManager returns the gossiper's SyncManager instance.
UNCOV
2322
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
×
UNCOV
2323
        return d.syncMgr
×
UNCOV
2324
}
×
2325

2326
// IsKeepAliveUpdate determines whether this channel update is considered a
2327
// keep-alive update based on the previous channel update processed for the same
2328
// direction.
2329
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2330
        prev *models.ChannelEdgePolicy) bool {
14✔
2331

14✔
2332
        // Both updates should be from the same direction.
14✔
2333
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
14✔
2334
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
14✔
2335

×
2336
                return false
×
2337
        }
×
2338

2339
        // The timestamp should always increase for a keep-alive update.
2340
        timestamp := time.Unix(int64(update.Timestamp), 0)
14✔
2341
        if !timestamp.After(prev.LastUpdate) {
14✔
UNCOV
2342
                return false
×
UNCOV
2343
        }
×
2344

2345
        // None of the remaining fields should change for a keep-alive update.
2346
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
14✔
UNCOV
2347
                return false
×
UNCOV
2348
        }
×
2349
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
26✔
2350
                return false
12✔
2351
        }
12✔
2352
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
2✔
UNCOV
2353
                return false
×
UNCOV
2354
        }
×
2355
        if update.TimeLockDelta != prev.TimeLockDelta {
2✔
2356
                return false
×
2357
        }
×
2358
        if update.HtlcMinimumMsat != prev.MinHTLC {
2✔
2359
                return false
×
2360
        }
×
2361
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
2✔
2362
                return false
×
2363
        }
×
2364
        if update.HtlcMaximumMsat != prev.MaxHTLC {
2✔
2365
                return false
×
2366
        }
×
2367
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
2✔
UNCOV
2368
                return false
×
UNCOV
2369
        }
×
2370
        return true
2✔
2371
}
2372

2373
// latestHeight returns the gossiper's latest height known of the chain.
UNCOV
2374
func (d *AuthenticatedGossiper) latestHeight() uint32 {
×
UNCOV
2375
        d.Lock()
×
UNCOV
2376
        defer d.Unlock()
×
UNCOV
2377
        return d.bestHeight
×
UNCOV
2378
}
×
2379

2380
// handleNodeAnnouncement processes a new node announcement.
2381
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
2382
        nodeAnn *lnwire.NodeAnnouncement,
2383
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
24✔
2384

24✔
2385
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
24✔
2386

24✔
2387
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
24✔
2388
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
24✔
2389
                nMsg.source.SerializeCompressed())
24✔
2390

24✔
2391
        // We'll quickly ask the router if it already has a newer update for
24✔
2392
        // this node so we can skip validating signatures if not required.
24✔
2393
        if d.cfg.Graph.IsStaleNode(nodeAnn.NodeID, timestamp) {
32✔
2394
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
8✔
2395
                nMsg.err <- nil
8✔
2396
                return nil, true
8✔
2397
        }
8✔
2398

2399
        if err := d.addNode(nodeAnn, ops...); err != nil {
16✔
UNCOV
2400
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
×
UNCOV
2401
                        err)
×
UNCOV
2402

×
UNCOV
2403
                if !graph.IsError(
×
UNCOV
2404
                        err,
×
UNCOV
2405
                        graph.ErrOutdated,
×
UNCOV
2406
                        graph.ErrIgnored,
×
UNCOV
2407
                ) {
×
2408

×
2409
                        log.Error(err)
×
2410
                }
×
2411

UNCOV
2412
                nMsg.err <- err
×
UNCOV
2413
                return nil, false
×
2414
        }
2415

2416
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2417
        // quick check to ensure this node intends to publicly advertise itself
2418
        // to the network.
2419
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
16✔
2420
        if err != nil {
16✔
2421
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2422
                        nodeAnn.NodeID, err)
×
2423
                nMsg.err <- err
×
2424
                return nil, false
×
2425
        }
×
2426

2427
        var announcements []networkMsg
16✔
2428

16✔
2429
        // If it does, we'll add their announcement to our batch so that it can
16✔
2430
        // be broadcast to the rest of our peers.
16✔
2431
        if isPublic {
19✔
2432
                announcements = append(announcements, networkMsg{
3✔
2433
                        peer:     nMsg.peer,
3✔
2434
                        isRemote: nMsg.isRemote,
3✔
2435
                        source:   nMsg.source,
3✔
2436
                        msg:      nodeAnn,
3✔
2437
                })
3✔
2438
        } else {
16✔
2439
                log.Tracef("Skipping broadcasting node announcement for %x "+
13✔
2440
                        "due to being unadvertised", nodeAnn.NodeID)
13✔
2441
        }
13✔
2442

2443
        nMsg.err <- nil
16✔
2444
        // TODO(roasbeef): get rid of the above
16✔
2445

16✔
2446
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
16✔
2447
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
16✔
2448
                nMsg.source.SerializeCompressed())
16✔
2449

16✔
2450
        return announcements, true
16✔
2451
}
2452

2453
// handleChanAnnouncement processes a new channel announcement.
2454
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
2455
        ann *lnwire.ChannelAnnouncement1,
2456
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
230✔
2457

230✔
2458
        scid := ann.ShortChannelID
230✔
2459

230✔
2460
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
230✔
2461
                nMsg.peer, scid.ToUint64())
230✔
2462

230✔
2463
        // We'll ignore any channel announcements that target any chain other
230✔
2464
        // than the set of chains we know of.
230✔
2465
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
230✔
2466
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2467
                        ", gossiper on chain=%v", ann.ChainHash,
×
2468
                        d.cfg.ChainHash)
×
2469
                log.Errorf(err.Error())
×
2470

×
2471
                key := newRejectCacheKey(
×
2472
                        scid.ToUint64(),
×
2473
                        sourceToPub(nMsg.source),
×
2474
                )
×
2475
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2476

×
2477
                nMsg.err <- err
×
2478
                return nil, false
×
2479
        }
×
2480

2481
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2482
        // reject the announcement. Since the router accepts alias SCIDs,
2483
        // not erroring out would be a DoS vector.
2484
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
230✔
2485
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2486
                log.Errorf(err.Error())
×
2487

×
2488
                key := newRejectCacheKey(
×
2489
                        scid.ToUint64(),
×
2490
                        sourceToPub(nMsg.source),
×
2491
                )
×
2492
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2493

×
2494
                nMsg.err <- err
×
2495
                return nil, false
×
2496
        }
×
2497

2498
        // If the advertised inclusionary block is beyond our knowledge of the
2499
        // chain tip, then we'll ignore it for now.
2500
        d.Lock()
230✔
2501
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
231✔
2502
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
1✔
2503
                        "advertises height %v, only height %v is known",
1✔
2504
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
1✔
2505
                d.Unlock()
1✔
2506
                nMsg.err <- nil
1✔
2507
                return nil, false
1✔
2508
        }
1✔
2509
        d.Unlock()
229✔
2510

229✔
2511
        // At this point, we'll now ask the router if this is a zombie/known
229✔
2512
        // edge. If so we can skip all the processing below.
229✔
2513
        if d.cfg.Graph.IsKnownEdge(scid) {
230✔
2514
                nMsg.err <- nil
1✔
2515
                return nil, true
1✔
2516
        }
1✔
2517

2518
        // Check if the channel is already closed in which case we can ignore
2519
        // it.
2520
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
228✔
2521
        if err != nil {
228✔
2522
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2523
                        err)
×
2524
                nMsg.err <- err
×
2525

×
2526
                return nil, false
×
2527
        }
×
2528

2529
        if closed {
229✔
2530
                err = fmt.Errorf("ignoring closed channel %v", scid)
1✔
2531
                log.Error(err)
1✔
2532

1✔
2533
                // If this is an announcement from us, we'll just ignore it.
1✔
2534
                if !nMsg.isRemote {
1✔
2535
                        nMsg.err <- err
×
2536
                        return nil, false
×
2537
                }
×
2538

2539
                // Increment the peer's ban score if they are sending closed
2540
                // channel announcements.
2541
                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2542

1✔
2543
                // If the peer is banned and not a channel peer, we'll
1✔
2544
                // disconnect them.
1✔
2545
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2546
                if dcErr != nil {
1✔
2547
                        log.Errorf("failed to check if we should disconnect "+
×
2548
                                "peer: %v", dcErr)
×
2549
                        nMsg.err <- dcErr
×
2550

×
2551
                        return nil, false
×
2552
                }
×
2553

2554
                if shouldDc {
1✔
2555
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2556
                }
×
2557

2558
                nMsg.err <- err
1✔
2559

1✔
2560
                return nil, false
1✔
2561
        }
2562

2563
        // If this is a remote channel announcement, then we'll validate all
2564
        // the signatures within the proof as it should be well formed.
2565
        var proof *models.ChannelAuthProof
227✔
2566
        if nMsg.isRemote {
440✔
2567
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
213✔
2568
                if err != nil {
213✔
2569
                        err := fmt.Errorf("unable to validate announcement: "+
×
2570
                                "%v", err)
×
2571

×
2572
                        key := newRejectCacheKey(
×
2573
                                scid.ToUint64(),
×
2574
                                sourceToPub(nMsg.source),
×
2575
                        )
×
2576
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2577

×
2578
                        log.Error(err)
×
2579
                        nMsg.err <- err
×
2580
                        return nil, false
×
2581
                }
×
2582

2583
                // If the proof checks out, then we'll save the proof itself to
2584
                // the database so we can fetch it later when gossiping with
2585
                // other nodes.
2586
                proof = &models.ChannelAuthProof{
213✔
2587
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
213✔
2588
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
213✔
2589
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
213✔
2590
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
213✔
2591
                }
213✔
2592
        }
2593

2594
        // With the proof validated (if necessary), we can now store it within
2595
        // the database for our path finding and syncing needs.
2596
        var featureBuf bytes.Buffer
227✔
2597
        if err := ann.Features.Encode(&featureBuf); err != nil {
227✔
2598
                log.Errorf("unable to encode features: %v", err)
×
2599
                nMsg.err <- err
×
2600
                return nil, false
×
2601
        }
×
2602

2603
        edge := &models.ChannelEdgeInfo{
227✔
2604
                ChannelID:        scid.ToUint64(),
227✔
2605
                ChainHash:        ann.ChainHash,
227✔
2606
                NodeKey1Bytes:    ann.NodeID1,
227✔
2607
                NodeKey2Bytes:    ann.NodeID2,
227✔
2608
                BitcoinKey1Bytes: ann.BitcoinKey1,
227✔
2609
                BitcoinKey2Bytes: ann.BitcoinKey2,
227✔
2610
                AuthProof:        proof,
227✔
2611
                Features:         featureBuf.Bytes(),
227✔
2612
                ExtraOpaqueData:  ann.ExtraOpaqueData,
227✔
2613
        }
227✔
2614

227✔
2615
        // If there were any optional message fields provided, we'll include
227✔
2616
        // them in its serialized disk representation now.
227✔
2617
        if nMsg.optionalMsgFields != nil {
241✔
2618
                if nMsg.optionalMsgFields.capacity != nil {
15✔
2619
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
1✔
2620
                }
1✔
2621
                if nMsg.optionalMsgFields.channelPoint != nil {
18✔
2622
                        cp := *nMsg.optionalMsgFields.channelPoint
4✔
2623
                        edge.ChannelPoint = cp
4✔
2624
                }
4✔
2625

2626
                // Optional tapscript root for custom channels.
2627
                edge.TapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
14✔
2628
        }
2629

2630
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
227✔
2631

227✔
2632
        // We will add the edge to the channel router. If the nodes present in
227✔
2633
        // this channel are not present in the database, a partial node will be
227✔
2634
        // added to represent each node while we wait for a node announcement.
227✔
2635
        //
227✔
2636
        // Before we add the edge to the database, we obtain the mutex for this
227✔
2637
        // channel ID. We do this to ensure no other goroutine has read the
227✔
2638
        // database and is now making decisions based on this DB state, before
227✔
2639
        // it writes to the DB.
227✔
2640
        d.channelMtx.Lock(scid.ToUint64())
227✔
2641
        err = d.cfg.Graph.AddEdge(edge, ops...)
227✔
2642
        if err != nil {
429✔
2643
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
202✔
2644
                        scid.ToUint64(), err)
202✔
2645

202✔
2646
                defer d.channelMtx.Unlock(scid.ToUint64())
202✔
2647

202✔
2648
                // If the edge was rejected due to already being known, then it
202✔
2649
                // may be the case that this new message has a fresh channel
202✔
2650
                // proof, so we'll check.
202✔
2651
                switch {
202✔
UNCOV
2652
                case graph.IsError(err, graph.ErrIgnored):
×
UNCOV
2653
                        // Attempt to process the rejected message to see if we
×
UNCOV
2654
                        // get any new announcements.
×
UNCOV
2655
                        anns, rErr := d.processRejectedEdge(ann, proof)
×
UNCOV
2656
                        if rErr != nil {
×
2657
                                key := newRejectCacheKey(
×
2658
                                        scid.ToUint64(),
×
2659
                                        sourceToPub(nMsg.source),
×
2660
                                )
×
2661
                                cr := &cachedReject{}
×
2662
                                _, _ = d.recentRejects.Put(key, cr)
×
2663

×
2664
                                nMsg.err <- rErr
×
2665
                                return nil, false
×
2666
                        }
×
2667

UNCOV
2668
                        log.Debugf("Extracted %v announcements from rejected "+
×
UNCOV
2669
                                "msgs", len(anns))
×
UNCOV
2670

×
UNCOV
2671
                        // If while processing this rejected edge, we realized
×
UNCOV
2672
                        // there's a set of announcements we could extract,
×
UNCOV
2673
                        // then we'll return those directly.
×
UNCOV
2674
                        //
×
UNCOV
2675
                        // NOTE: since this is an ErrIgnored, we can return
×
UNCOV
2676
                        // true here to signal "allow" to its dependants.
×
UNCOV
2677
                        nMsg.err <- nil
×
UNCOV
2678

×
UNCOV
2679
                        return anns, true
×
2680

2681
                case graph.IsError(
2682
                        err, graph.ErrNoFundingTransaction,
2683
                        graph.ErrInvalidFundingOutput,
2684
                ):
200✔
2685
                        key := newRejectCacheKey(
200✔
2686
                                scid.ToUint64(),
200✔
2687
                                sourceToPub(nMsg.source),
200✔
2688
                        )
200✔
2689
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
200✔
2690

200✔
2691
                        // Increment the peer's ban score. We check isRemote
200✔
2692
                        // so we don't actually ban the peer in case of a local
200✔
2693
                        // bug.
200✔
2694
                        if nMsg.isRemote {
400✔
2695
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
200✔
2696
                        }
200✔
2697

2698
                case graph.IsError(err, graph.ErrChannelSpent):
1✔
2699
                        key := newRejectCacheKey(
1✔
2700
                                scid.ToUint64(),
1✔
2701
                                sourceToPub(nMsg.source),
1✔
2702
                        )
1✔
2703
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2704

1✔
2705
                        // Since this channel has already been closed, we'll
1✔
2706
                        // add it to the graph's closed channel index such that
1✔
2707
                        // we won't attempt to do expensive validation checks
1✔
2708
                        // on it again.
1✔
2709
                        // TODO: Populate the ScidCloser by using closed
1✔
2710
                        // channel notifications.
1✔
2711
                        dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
1✔
2712
                        if dbErr != nil {
1✔
2713
                                log.Errorf("failed to mark scid(%v) as "+
×
2714
                                        "closed: %v", scid, dbErr)
×
2715

×
2716
                                nMsg.err <- dbErr
×
2717

×
2718
                                return nil, false
×
2719
                        }
×
2720

2721
                        // Increment the peer's ban score. We check isRemote
2722
                        // so we don't accidentally ban ourselves in case of a
2723
                        // bug.
2724
                        if nMsg.isRemote {
2✔
2725
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2726
                        }
1✔
2727

2728
                default:
1✔
2729
                        // Otherwise, this is just a regular rejected edge.
1✔
2730
                        key := newRejectCacheKey(
1✔
2731
                                scid.ToUint64(),
1✔
2732
                                sourceToPub(nMsg.source),
1✔
2733
                        )
1✔
2734
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2735
                }
2736

2737
                if !nMsg.isRemote {
202✔
2738
                        log.Errorf("failed to add edge for local channel: %v",
×
2739
                                err)
×
2740
                        nMsg.err <- err
×
2741

×
2742
                        return nil, false
×
2743
                }
×
2744

2745
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
202✔
2746
                if dcErr != nil {
202✔
2747
                        log.Errorf("failed to check if we should disconnect "+
×
2748
                                "peer: %v", dcErr)
×
2749
                        nMsg.err <- dcErr
×
2750

×
2751
                        return nil, false
×
2752
                }
×
2753

2754
                if shouldDc {
203✔
2755
                        nMsg.peer.Disconnect(ErrPeerBanned)
1✔
2756
                }
1✔
2757

2758
                nMsg.err <- err
202✔
2759

202✔
2760
                return nil, false
202✔
2761
        }
2762

2763
        // If err is nil, release the lock immediately.
2764
        d.channelMtx.Unlock(scid.ToUint64())
25✔
2765

25✔
2766
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
25✔
2767

25✔
2768
        // If we earlier received any ChannelUpdates for this channel, we can
25✔
2769
        // now process them, as the channel is added to the graph.
25✔
2770
        var channelUpdates []*processedNetworkMsg
25✔
2771

25✔
2772
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
25✔
2773
        if err == nil {
27✔
2774
                // There was actually an entry in the map, so we'll accumulate
2✔
2775
                // it. We don't worry about deletion, since it'll eventually
2✔
2776
                // fall out anyway.
2✔
2777
                chanMsgs := earlyChanUpdates
2✔
2778
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
2✔
2779
        }
2✔
2780

2781
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2782
        // ensure we don't block here, as we can handle only one announcement
2783
        // at a time.
2784
        for _, cu := range channelUpdates {
27✔
2785
                // Skip if already processed.
2✔
2786
                if cu.processed {
2✔
UNCOV
2787
                        continue
×
2788
                }
2789

2790
                // Mark the ChannelUpdate as processed. This ensures that a
2791
                // subsequent announcement in the option-scid-alias case does
2792
                // not re-use an old ChannelUpdate.
2793
                cu.processed = true
2✔
2794

2✔
2795
                d.wg.Add(1)
2✔
2796
                go func(updMsg *networkMsg) {
4✔
2797
                        defer d.wg.Done()
2✔
2798

2✔
2799
                        switch msg := updMsg.msg.(type) {
2✔
2800
                        // Reprocess the message, making sure we return an
2801
                        // error to the original caller in case the gossiper
2802
                        // shuts down.
2803
                        case *lnwire.ChannelUpdate1:
2✔
2804
                                log.Debugf("Reprocessing ChannelUpdate for "+
2✔
2805
                                        "shortChanID=%v", scid.ToUint64())
2✔
2806

2✔
2807
                                select {
2✔
2808
                                case d.networkMsgs <- updMsg:
2✔
2809
                                case <-d.quit:
×
2810
                                        updMsg.err <- ErrGossiperShuttingDown
×
2811
                                }
2812

2813
                        // We don't expect any other message type than
2814
                        // ChannelUpdate to be in this cache.
2815
                        default:
×
2816
                                log.Errorf("Unsupported message type found "+
×
2817
                                        "among ChannelUpdates: %T", msg)
×
2818
                        }
2819
                }(cu.msg)
2820
        }
2821

2822
        // Channel announcement was successfully processed and now it might be
2823
        // broadcast to other connected nodes if it was an announcement with
2824
        // proof (remote).
2825
        var announcements []networkMsg
25✔
2826

25✔
2827
        if proof != nil {
36✔
2828
                announcements = append(announcements, networkMsg{
11✔
2829
                        peer:     nMsg.peer,
11✔
2830
                        isRemote: nMsg.isRemote,
11✔
2831
                        source:   nMsg.source,
11✔
2832
                        msg:      ann,
11✔
2833
                })
11✔
2834
        }
11✔
2835

2836
        nMsg.err <- nil
25✔
2837

25✔
2838
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
25✔
2839
                nMsg.peer, scid.ToUint64())
25✔
2840

25✔
2841
        return announcements, true
25✔
2842
}
2843

2844
// handleChanUpdate processes a new channel update.
2845
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
2846
        upd *lnwire.ChannelUpdate1,
2847
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
55✔
2848

55✔
2849
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
55✔
2850
                nMsg.peer, upd.ShortChannelID.ToUint64())
55✔
2851

55✔
2852
        // We'll ignore any channel updates that target any chain other than
55✔
2853
        // the set of chains we know of.
55✔
2854
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
55✔
2855
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2856
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
×
2857
                log.Errorf(err.Error())
×
2858

×
2859
                key := newRejectCacheKey(
×
2860
                        upd.ShortChannelID.ToUint64(),
×
2861
                        sourceToPub(nMsg.source),
×
2862
                )
×
2863
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2864

×
2865
                nMsg.err <- err
×
2866
                return nil, false
×
2867
        }
×
2868

2869
        blockHeight := upd.ShortChannelID.BlockHeight
55✔
2870
        shortChanID := upd.ShortChannelID.ToUint64()
55✔
2871

55✔
2872
        // If the advertised inclusionary block is beyond our knowledge of the
55✔
2873
        // chain tip, then we'll put the announcement in limbo to be fully
55✔
2874
        // verified once we advance forward in the chain. If the update has an
55✔
2875
        // alias SCID, we'll skip the isPremature check. This is necessary
55✔
2876
        // since aliases start at block height 16_000_000.
55✔
2877
        d.Lock()
55✔
2878
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
55✔
2879
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
55✔
UNCOV
2880

×
UNCOV
2881
                log.Warnf("Update announcement for short_chan_id(%v), is "+
×
UNCOV
2882
                        "premature: advertises height %v, only height %v is "+
×
UNCOV
2883
                        "known", shortChanID, blockHeight, d.bestHeight)
×
UNCOV
2884
                d.Unlock()
×
UNCOV
2885
                nMsg.err <- nil
×
UNCOV
2886
                return nil, false
×
UNCOV
2887
        }
×
2888
        d.Unlock()
55✔
2889

55✔
2890
        // Before we perform any of the expensive checks below, we'll check
55✔
2891
        // whether this update is stale or is for a zombie channel in order to
55✔
2892
        // quickly reject it.
55✔
2893
        timestamp := time.Unix(int64(upd.Timestamp), 0)
55✔
2894

55✔
2895
        // Fetch the SCID we should be using to lock the channelMtx and make
55✔
2896
        // graph queries with.
55✔
2897
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
55✔
2898
        if err != nil {
110✔
2899
                // Fallback and set the graphScid to the peer-provided SCID.
55✔
2900
                // This will occur for non-option-scid-alias channels and for
55✔
2901
                // public option-scid-alias channels after 6 confirmations.
55✔
2902
                // Once public option-scid-alias channels have 6 confs, we'll
55✔
2903
                // ignore ChannelUpdates with one of their aliases.
55✔
2904
                graphScid = upd.ShortChannelID
55✔
2905
        }
55✔
2906

2907
        if d.cfg.Graph.IsStaleEdgePolicy(
55✔
2908
                graphScid, timestamp, upd.ChannelFlags,
55✔
2909
        ) {
57✔
2910

2✔
2911
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
2✔
2912
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
2✔
2913
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
2✔
2914
                )
2✔
2915

2✔
2916
                nMsg.err <- nil
2✔
2917
                return nil, true
2✔
2918
        }
2✔
2919

2920
        // Check that the ChanUpdate is not too far into the future, this could
2921
        // reveal some faulty implementation therefore we log an error.
2922
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
53✔
2923
                log.Errorf("Skewed timestamp (%v) for edge policy of "+
×
2924
                        "short_chan_id(%v), timestamp too far in the future: "+
×
2925
                        "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
×
2926
                        shortChanID, nMsg.peer, nMsg.msg.MsgType(),
×
2927
                        nMsg.isRemote,
×
2928
                )
×
2929

×
2930
                nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
×
2931
                        "timestamp too far in the future: %v", timestamp.Unix())
×
2932

×
2933
                return nil, false
×
2934
        }
×
2935

2936
        // Get the node pub key as far since we don't have it in the channel
2937
        // update announcement message. We'll need this to properly verify the
2938
        // message's signature.
2939
        //
2940
        // We make sure to obtain the mutex for this channel ID before we
2941
        // access the database. This ensures the state we read from the
2942
        // database has not changed between this point and when we call
2943
        // UpdateEdge() later.
2944
        d.channelMtx.Lock(graphScid.ToUint64())
53✔
2945
        defer d.channelMtx.Unlock(graphScid.ToUint64())
53✔
2946

53✔
2947
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
53✔
2948
        switch {
53✔
2949
        // No error, break.
2950
        case err == nil:
49✔
2951
                break
49✔
2952

2953
        case errors.Is(err, graphdb.ErrZombieEdge):
3✔
2954
                err = d.processZombieUpdate(chanInfo, graphScid, upd)
3✔
2955
                if err != nil {
5✔
2956
                        log.Debug(err)
2✔
2957
                        nMsg.err <- err
2✔
2958
                        return nil, false
2✔
2959
                }
2✔
2960

2961
                // We'll fallthrough to ensure we stash the update until we
2962
                // receive its corresponding ChannelAnnouncement. This is
2963
                // needed to ensure the edge exists in the graph before
2964
                // applying the update.
2965
                fallthrough
1✔
2966
        case errors.Is(err, graphdb.ErrGraphNotFound):
1✔
2967
                fallthrough
1✔
2968
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
1✔
2969
                fallthrough
1✔
2970
        case errors.Is(err, graphdb.ErrEdgeNotFound):
2✔
2971
                // If the edge corresponding to this ChannelUpdate was not
2✔
2972
                // found in the graph, this might be a channel in the process
2✔
2973
                // of being opened, and we haven't processed our own
2✔
2974
                // ChannelAnnouncement yet, hence it is not not found in the
2✔
2975
                // graph. This usually gets resolved after the channel proofs
2✔
2976
                // are exchanged and the channel is broadcasted to the rest of
2✔
2977
                // the network, but in case this is a private channel this
2✔
2978
                // won't ever happen. This can also happen in the case of a
2✔
2979
                // zombie channel with a fresh update for which we don't have a
2✔
2980
                // ChannelAnnouncement for since we reject them. Because of
2✔
2981
                // this, we temporarily add it to a map, and reprocess it after
2✔
2982
                // our own ChannelAnnouncement has been processed.
2✔
2983
                //
2✔
2984
                // The shortChanID may be an alias, but it is fine to use here
2✔
2985
                // since we don't have an edge in the graph and if the peer is
2✔
2986
                // not buggy, we should be able to use it once the gossiper
2✔
2987
                // receives the local announcement.
2✔
2988
                pMsg := &processedNetworkMsg{msg: nMsg}
2✔
2989

2✔
2990
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
2✔
2991
                switch {
2✔
2992
                // Nothing in the cache yet, we can just directly insert this
2993
                // element.
2994
                case err == cache.ErrElementNotFound:
2✔
2995
                        _, _ = d.prematureChannelUpdates.Put(
2✔
2996
                                shortChanID, &cachedNetworkMsg{
2✔
2997
                                        msgs: []*processedNetworkMsg{pMsg},
2✔
2998
                                })
2✔
2999

3000
                // There's already something in the cache, so we'll combine the
3001
                // set of messages into a single value.
UNCOV
3002
                default:
×
UNCOV
3003
                        msgs := earlyMsgs.msgs
×
UNCOV
3004
                        msgs = append(msgs, pMsg)
×
UNCOV
3005
                        _, _ = d.prematureChannelUpdates.Put(
×
UNCOV
3006
                                shortChanID, &cachedNetworkMsg{
×
UNCOV
3007
                                        msgs: msgs,
×
UNCOV
3008
                                })
×
3009
                }
3010

3011
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
2✔
3012
                        "(shortChanID=%v), saving for reprocessing later",
2✔
3013
                        shortChanID)
2✔
3014

2✔
3015
                // NOTE: We don't return anything on the error channel for this
2✔
3016
                // message, as we expect that will be done when this
2✔
3017
                // ChannelUpdate is later reprocessed.
2✔
3018
                return nil, false
2✔
3019

3020
        default:
×
3021
                err := fmt.Errorf("unable to validate channel update "+
×
3022
                        "short_chan_id=%v: %v", shortChanID, err)
×
3023
                log.Error(err)
×
3024
                nMsg.err <- err
×
3025

×
3026
                key := newRejectCacheKey(
×
3027
                        upd.ShortChannelID.ToUint64(),
×
3028
                        sourceToPub(nMsg.source),
×
3029
                )
×
3030
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3031

×
3032
                return nil, false
×
3033
        }
3034

3035
        // The least-significant bit in the flag on the channel update
3036
        // announcement tells us "which" side of the channels directed edge is
3037
        // being updated.
3038
        var (
49✔
3039
                pubKey       *btcec.PublicKey
49✔
3040
                edgeToUpdate *models.ChannelEdgePolicy
49✔
3041
        )
49✔
3042
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
49✔
3043
        switch direction {
49✔
3044
        case 0:
34✔
3045
                pubKey, _ = chanInfo.NodeKey1()
34✔
3046
                edgeToUpdate = e1
34✔
3047
        case 1:
15✔
3048
                pubKey, _ = chanInfo.NodeKey2()
15✔
3049
                edgeToUpdate = e2
15✔
3050
        }
3051

3052
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
49✔
3053
                "edge policy=%v", chanInfo.ChannelID,
49✔
3054
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
49✔
3055

49✔
3056
        // Validate the channel announcement with the expected public key and
49✔
3057
        // channel capacity. In the case of an invalid channel update, we'll
49✔
3058
        // return an error to the caller and exit early.
49✔
3059
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
49✔
3060
        if err != nil {
53✔
3061
                rErr := fmt.Errorf("unable to validate channel update "+
4✔
3062
                        "announcement for short_chan_id=%v: %v",
4✔
3063
                        spew.Sdump(upd.ShortChannelID), err)
4✔
3064

4✔
3065
                log.Error(rErr)
4✔
3066
                nMsg.err <- rErr
4✔
3067
                return nil, false
4✔
3068
        }
4✔
3069

3070
        // If we have a previous version of the edge being updated, we'll want
3071
        // to rate limit its updates to prevent spam throughout the network.
3072
        if nMsg.isRemote && edgeToUpdate != nil {
59✔
3073
                // If it's a keep-alive update, we'll only propagate one if
14✔
3074
                // it's been a day since the previous. This follows our own
14✔
3075
                // heuristic of sending keep-alive updates after the same
14✔
3076
                // duration (see retransmitStaleAnns).
14✔
3077
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
14✔
3078
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
16✔
3079
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
3✔
3080
                                log.Debugf("Ignoring keep alive update not "+
1✔
3081
                                        "within %v period for channel %v",
1✔
3082
                                        d.cfg.RebroadcastInterval, shortChanID)
1✔
3083
                                nMsg.err <- nil
1✔
3084
                                return nil, false
1✔
3085
                        }
1✔
3086
                } else {
12✔
3087
                        // If it's not, we'll allow an update per minute with a
12✔
3088
                        // maximum burst of 10. If we haven't seen an update
12✔
3089
                        // for this channel before, we'll need to initialize a
12✔
3090
                        // rate limiter for each direction.
12✔
3091
                        //
12✔
3092
                        // Since the edge exists in the graph, we'll create a
12✔
3093
                        // rate limiter for chanInfo.ChannelID rather then the
12✔
3094
                        // SCID the peer sent. This is because there may be
12✔
3095
                        // multiple aliases for a channel and we may otherwise
12✔
3096
                        // rate-limit only a single alias of the channel,
12✔
3097
                        // instead of the whole channel.
12✔
3098
                        baseScid := chanInfo.ChannelID
12✔
3099
                        d.Lock()
12✔
3100
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
12✔
3101
                        if !ok {
13✔
3102
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
1✔
3103
                                b := d.cfg.MaxChannelUpdateBurst
1✔
3104
                                rls = [2]*rate.Limiter{
1✔
3105
                                        rate.NewLimiter(r, b),
1✔
3106
                                        rate.NewLimiter(r, b),
1✔
3107
                                }
1✔
3108
                                d.chanUpdateRateLimiter[baseScid] = rls
1✔
3109
                        }
1✔
3110
                        d.Unlock()
12✔
3111

12✔
3112
                        if !rls[direction].Allow() {
17✔
3113
                                log.Debugf("Rate limiting update for channel "+
5✔
3114
                                        "%v from direction %x", shortChanID,
5✔
3115
                                        pubKey.SerializeCompressed())
5✔
3116
                                nMsg.err <- nil
5✔
3117
                                return nil, false
5✔
3118
                        }
5✔
3119
                }
3120
        }
3121

3122
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3123
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3124
        // lookup the stored SCID. If we're sending the update, we'll always
3125
        // use the SCID stored in the database rather than a potentially
3126
        // different alias. This might mean that SigBytes is incorrect as it
3127
        // signs a different SCID than the database SCID, but since there will
3128
        // only be a difference if AuthProof == nil, this is fine.
3129
        update := &models.ChannelEdgePolicy{
39✔
3130
                SigBytes:                  upd.Signature.ToSignatureBytes(),
39✔
3131
                ChannelID:                 chanInfo.ChannelID,
39✔
3132
                LastUpdate:                timestamp,
39✔
3133
                MessageFlags:              upd.MessageFlags,
39✔
3134
                ChannelFlags:              upd.ChannelFlags,
39✔
3135
                TimeLockDelta:             upd.TimeLockDelta,
39✔
3136
                MinHTLC:                   upd.HtlcMinimumMsat,
39✔
3137
                MaxHTLC:                   upd.HtlcMaximumMsat,
39✔
3138
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
39✔
3139
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
39✔
3140
                ExtraOpaqueData:           upd.ExtraOpaqueData,
39✔
3141
        }
39✔
3142

39✔
3143
        if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
39✔
UNCOV
3144
                if graph.IsError(
×
UNCOV
3145
                        err, graph.ErrOutdated,
×
UNCOV
3146
                        graph.ErrIgnored,
×
UNCOV
3147
                ) {
×
UNCOV
3148

×
UNCOV
3149
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
×
UNCOV
3150
                                shortChanID, err)
×
UNCOV
3151
                } else {
×
3152
                        // Since we know the stored SCID in the graph, we'll
×
3153
                        // cache that SCID.
×
3154
                        key := newRejectCacheKey(
×
3155
                                chanInfo.ChannelID,
×
3156
                                sourceToPub(nMsg.source),
×
3157
                        )
×
3158
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3159

×
3160
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3161
                                shortChanID, err)
×
3162
                }
×
3163

UNCOV
3164
                nMsg.err <- err
×
UNCOV
3165
                return nil, false
×
3166
        }
3167

3168
        // If this is a local ChannelUpdate without an AuthProof, it means it
3169
        // is an update to a channel that is not (yet) supposed to be announced
3170
        // to the greater network. However, our channel counter party will need
3171
        // to be given the update, so we'll try sending the update directly to
3172
        // the remote peer.
3173
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
50✔
3174
                if nMsg.optionalMsgFields != nil {
22✔
3175
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
11✔
3176
                        if remoteAlias != nil {
11✔
UNCOV
3177
                                // The remoteAlias field was specified, meaning
×
UNCOV
3178
                                // that we should replace the SCID in the
×
UNCOV
3179
                                // update with the remote's alias. We'll also
×
UNCOV
3180
                                // need to re-sign the channel update. This is
×
UNCOV
3181
                                // required for option-scid-alias feature-bit
×
UNCOV
3182
                                // negotiated channels.
×
UNCOV
3183
                                upd.ShortChannelID = *remoteAlias
×
UNCOV
3184

×
UNCOV
3185
                                sig, err := d.cfg.SignAliasUpdate(upd)
×
UNCOV
3186
                                if err != nil {
×
3187
                                        log.Error(err)
×
3188
                                        nMsg.err <- err
×
3189
                                        return nil, false
×
3190
                                }
×
3191

UNCOV
3192
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
UNCOV
3193
                                if err != nil {
×
3194
                                        log.Error(err)
×
3195
                                        nMsg.err <- err
×
3196
                                        return nil, false
×
3197
                                }
×
3198

UNCOV
3199
                                upd.Signature = lnSig
×
3200
                        }
3201
                }
3202

3203
                // Get our peer's public key.
3204
                remotePubKey := remotePubFromChanInfo(
11✔
3205
                        chanInfo, upd.ChannelFlags,
11✔
3206
                )
11✔
3207

11✔
3208
                log.Debugf("The message %v has no AuthProof, sending the "+
11✔
3209
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
11✔
3210

11✔
3211
                // Now we'll attempt to send the channel update message
11✔
3212
                // reliably to the remote peer in the background, so that we
11✔
3213
                // don't block if the peer happens to be offline at the moment.
11✔
3214
                err := d.reliableSender.sendMessage(upd, remotePubKey)
11✔
3215
                if err != nil {
11✔
3216
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3217
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3218
                                upd.ShortChannelID, remotePubKey, err)
×
3219
                        nMsg.err <- err
×
3220
                        return nil, false
×
3221
                }
×
3222
        }
3223

3224
        // Channel update announcement was successfully processed and now it
3225
        // can be broadcast to the rest of the network. However, we'll only
3226
        // broadcast the channel update announcement if it has an attached
3227
        // authentication proof. We also won't broadcast the update if it
3228
        // contains an alias because the network would reject this.
3229
        var announcements []networkMsg
39✔
3230
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
58✔
3231
                announcements = append(announcements, networkMsg{
19✔
3232
                        peer:     nMsg.peer,
19✔
3233
                        source:   nMsg.source,
19✔
3234
                        isRemote: nMsg.isRemote,
19✔
3235
                        msg:      upd,
19✔
3236
                })
19✔
3237
        }
19✔
3238

3239
        nMsg.err <- nil
39✔
3240

39✔
3241
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
39✔
3242
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
39✔
3243
                timestamp)
39✔
3244
        return announcements, true
39✔
3245
}
3246

3247
// handleAnnSig processes a new announcement signatures message.
3248
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
3249
        ann *lnwire.AnnounceSignatures1) ([]networkMsg, bool) {
21✔
3250

21✔
3251
        needBlockHeight := ann.ShortChannelID.BlockHeight +
21✔
3252
                d.cfg.ProofMatureDelta
21✔
3253
        shortChanID := ann.ShortChannelID.ToUint64()
21✔
3254

21✔
3255
        prefix := "local"
21✔
3256
        if nMsg.isRemote {
32✔
3257
                prefix = "remote"
11✔
3258
        }
11✔
3259

3260
        log.Infof("Received new %v announcement signature for %v", prefix,
21✔
3261
                ann.ShortChannelID)
21✔
3262

21✔
3263
        // By the specification, channel announcement proofs should be sent
21✔
3264
        // after some number of confirmations after channel was registered in
21✔
3265
        // bitcoin blockchain. Therefore, we check if the proof is mature.
21✔
3266
        d.Lock()
21✔
3267
        premature := d.isPremature(
21✔
3268
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
21✔
3269
        )
21✔
3270
        if premature {
21✔
UNCOV
3271
                log.Warnf("Premature proof announcement, current block height"+
×
UNCOV
3272
                        "lower than needed: %v < %v", d.bestHeight,
×
UNCOV
3273
                        needBlockHeight)
×
UNCOV
3274
                d.Unlock()
×
UNCOV
3275
                nMsg.err <- nil
×
UNCOV
3276
                return nil, false
×
UNCOV
3277
        }
×
3278
        d.Unlock()
21✔
3279

21✔
3280
        // Ensure that we know of a channel with the target channel ID before
21✔
3281
        // proceeding further.
21✔
3282
        //
21✔
3283
        // We must acquire the mutex for this channel ID before getting the
21✔
3284
        // channel from the database, to ensure what we read does not change
21✔
3285
        // before we call AddProof() later.
21✔
3286
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
21✔
3287
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
21✔
3288

21✔
3289
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
21✔
3290
                ann.ShortChannelID,
21✔
3291
        )
21✔
3292
        if err != nil {
22✔
3293
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
1✔
3294
                if err != nil {
1✔
UNCOV
3295
                        err := fmt.Errorf("unable to store the proof for "+
×
UNCOV
3296
                                "short_chan_id=%v: %v", shortChanID, err)
×
UNCOV
3297
                        log.Error(err)
×
UNCOV
3298
                        nMsg.err <- err
×
UNCOV
3299

×
UNCOV
3300
                        return nil, false
×
UNCOV
3301
                }
×
3302

3303
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
1✔
3304
                err := d.cfg.WaitingProofStore.Add(proof)
1✔
3305
                if err != nil {
1✔
3306
                        err := fmt.Errorf("unable to store the proof for "+
×
3307
                                "short_chan_id=%v: %v", shortChanID, err)
×
3308
                        log.Error(err)
×
3309
                        nMsg.err <- err
×
3310
                        return nil, false
×
3311
                }
×
3312

3313
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
1✔
3314
                        ", adding to waiting batch", prefix, shortChanID)
1✔
3315
                nMsg.err <- nil
1✔
3316
                return nil, false
1✔
3317
        }
3318

3319
        nodeID := nMsg.source.SerializeCompressed()
20✔
3320
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
20✔
3321
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
20✔
3322

20✔
3323
        // Ensure that channel that was retrieved belongs to the peer which
20✔
3324
        // sent the proof announcement.
20✔
3325
        if !(isFirstNode || isSecondNode) {
20✔
3326
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3327
                        "to the peer which sent the proof, short_chan_id=%v",
×
3328
                        shortChanID)
×
3329
                log.Error(err)
×
3330
                nMsg.err <- err
×
3331
                return nil, false
×
3332
        }
×
3333

3334
        // If proof was sent by a local sub-system, then we'll send the
3335
        // announcement signature to the remote node so they can also
3336
        // reconstruct the full channel announcement.
3337
        if !nMsg.isRemote {
30✔
3338
                var remotePubKey [33]byte
10✔
3339
                if isFirstNode {
20✔
3340
                        remotePubKey = chanInfo.NodeKey2Bytes
10✔
3341
                } else {
10✔
UNCOV
3342
                        remotePubKey = chanInfo.NodeKey1Bytes
×
UNCOV
3343
                }
×
3344

3345
                // Since the remote peer might not be online we'll call a
3346
                // method that will attempt to deliver the proof when it comes
3347
                // online.
3348
                err := d.reliableSender.sendMessage(ann, remotePubKey)
10✔
3349
                if err != nil {
10✔
3350
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3351
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3352
                                ann.ShortChannelID, remotePubKey, err)
×
3353
                        nMsg.err <- err
×
3354
                        return nil, false
×
3355
                }
×
3356
        }
3357

3358
        // Check if we already have the full proof for this channel.
3359
        if chanInfo.AuthProof != nil {
21✔
3360
                // If we already have the fully assembled proof, then the peer
1✔
3361
                // sending us their proof has probably not received our local
1✔
3362
                // proof yet. So be kind and send them the full proof.
1✔
3363
                if nMsg.isRemote {
2✔
3364
                        peerID := nMsg.source.SerializeCompressed()
1✔
3365
                        log.Debugf("Got AnnounceSignatures for channel with " +
1✔
3366
                                "full proof.")
1✔
3367

1✔
3368
                        d.wg.Add(1)
1✔
3369
                        go func() {
2✔
3370
                                defer d.wg.Done()
1✔
3371

1✔
3372
                                log.Debugf("Received half proof for channel "+
1✔
3373
                                        "%v with existing full proof. Sending"+
1✔
3374
                                        " full proof to peer=%x",
1✔
3375
                                        ann.ChannelID, peerID)
1✔
3376

1✔
3377
                                ca, _, _, err := netann.CreateChanAnnouncement(
1✔
3378
                                        chanInfo.AuthProof, chanInfo, e1, e2,
1✔
3379
                                )
1✔
3380
                                if err != nil {
1✔
3381
                                        log.Errorf("unable to gen ann: %v",
×
3382
                                                err)
×
3383
                                        return
×
3384
                                }
×
3385

3386
                                err = nMsg.peer.SendMessage(false, ca)
1✔
3387
                                if err != nil {
1✔
3388
                                        log.Errorf("Failed sending full proof"+
×
3389
                                                " to peer=%x: %v", peerID, err)
×
3390
                                        return
×
3391
                                }
×
3392

3393
                                log.Debugf("Full proof sent to peer=%x for "+
1✔
3394
                                        "chanID=%v", peerID, ann.ChannelID)
1✔
3395
                        }()
3396
                }
3397

3398
                log.Debugf("Already have proof for channel with chanID=%v",
1✔
3399
                        ann.ChannelID)
1✔
3400
                nMsg.err <- nil
1✔
3401
                return nil, true
1✔
3402
        }
3403

3404
        // Check that we received the opposite proof. If so, then we're now
3405
        // able to construct the full proof, and create the channel
3406
        // announcement. If we didn't receive the opposite half of the proof
3407
        // then we should store this one, and wait for the opposite to be
3408
        // received.
3409
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
19✔
3410
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
19✔
3411
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
19✔
3412
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3413
                        "short_chan_id=%v: %v", shortChanID, err)
×
3414
                log.Error(err)
×
3415
                nMsg.err <- err
×
3416
                return nil, false
×
3417
        }
×
3418

3419
        if err == channeldb.ErrWaitingProofNotFound {
28✔
3420
                err := d.cfg.WaitingProofStore.Add(proof)
9✔
3421
                if err != nil {
9✔
3422
                        err := fmt.Errorf("unable to store the proof for "+
×
3423
                                "short_chan_id=%v: %v", shortChanID, err)
×
3424
                        log.Error(err)
×
3425
                        nMsg.err <- err
×
3426
                        return nil, false
×
3427
                }
×
3428

3429
                log.Infof("1/2 of channel ann proof received for "+
9✔
3430
                        "short_chan_id=%v, waiting for other half",
9✔
3431
                        shortChanID)
9✔
3432

9✔
3433
                nMsg.err <- nil
9✔
3434
                return nil, false
9✔
3435
        }
3436

3437
        // We now have both halves of the channel announcement proof, then
3438
        // we'll reconstruct the initial announcement so we can validate it
3439
        // shortly below.
3440
        var dbProof models.ChannelAuthProof
10✔
3441
        if isFirstNode {
11✔
3442
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
1✔
3443
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
1✔
3444
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
1✔
3445
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
1✔
3446
        } else {
10✔
3447
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
9✔
3448
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
9✔
3449
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
9✔
3450
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
9✔
3451
        }
9✔
3452

3453
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
10✔
3454
                &dbProof, chanInfo, e1, e2,
10✔
3455
        )
10✔
3456
        if err != nil {
10✔
3457
                log.Error(err)
×
3458
                nMsg.err <- err
×
3459
                return nil, false
×
3460
        }
×
3461

3462
        // With all the necessary components assembled validate the full
3463
        // channel announcement proof.
3464
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
10✔
3465
        if err != nil {
10✔
3466
                err := fmt.Errorf("channel announcement proof for "+
×
3467
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3468

×
3469
                log.Error(err)
×
3470
                nMsg.err <- err
×
3471
                return nil, false
×
3472
        }
×
3473

3474
        // If the channel was returned by the router it means that existence of
3475
        // funding point and inclusion of nodes bitcoin keys in it already
3476
        // checked by the router. In this stage we should check that node keys
3477
        // attest to the bitcoin keys by validating the signatures of
3478
        // announcement. If proof is valid then we'll populate the channel edge
3479
        // with it, so we can announce it on peer connect.
3480
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
10✔
3481
        if err != nil {
10✔
3482
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3483
                        " %v", ann.ChannelID, err)
×
3484
                log.Error(err)
×
3485
                nMsg.err <- err
×
3486
                return nil, false
×
3487
        }
×
3488

3489
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
10✔
3490
        if err != nil {
10✔
3491
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3492
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3493
                log.Error(err)
×
3494
                nMsg.err <- err
×
3495
                return nil, false
×
3496
        }
×
3497

3498
        // Proof was successfully created and now can announce the channel to
3499
        // the remain network.
3500
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
10✔
3501
                ", adding to next ann batch", shortChanID)
10✔
3502

10✔
3503
        // Assemble the necessary announcements to add to the next broadcasting
10✔
3504
        // batch.
10✔
3505
        var announcements []networkMsg
10✔
3506
        announcements = append(announcements, networkMsg{
10✔
3507
                peer:   nMsg.peer,
10✔
3508
                source: nMsg.source,
10✔
3509
                msg:    chanAnn,
10✔
3510
        })
10✔
3511
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
19✔
3512
                announcements = append(announcements, networkMsg{
9✔
3513
                        peer:   nMsg.peer,
9✔
3514
                        source: src,
9✔
3515
                        msg:    e1Ann,
9✔
3516
                })
9✔
3517
        }
9✔
3518
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
18✔
3519
                announcements = append(announcements, networkMsg{
8✔
3520
                        peer:   nMsg.peer,
8✔
3521
                        source: src,
8✔
3522
                        msg:    e2Ann,
8✔
3523
                })
8✔
3524
        }
8✔
3525

3526
        // We'll also send along the node announcements for each channel
3527
        // participant if we know of them. To ensure our node announcement
3528
        // propagates to our channel counterparty, we'll set the source for
3529
        // each announcement to the node it belongs to, otherwise we won't send
3530
        // it since the source gets skipped. This isn't necessary for channel
3531
        // updates and announcement signatures since we send those directly to
3532
        // our channel counterparty through the gossiper's reliable sender.
3533
        node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
10✔
3534
        if err != nil {
12✔
3535
                log.Debugf("Unable to fetch node announcement for %x: %v",
2✔
3536
                        chanInfo.NodeKey1Bytes, err)
2✔
3537
        } else {
10✔
3538
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
16✔
3539
                        announcements = append(announcements, networkMsg{
8✔
3540
                                peer:   nMsg.peer,
8✔
3541
                                source: nodeKey1,
8✔
3542
                                msg:    node1Ann,
8✔
3543
                        })
8✔
3544
                }
8✔
3545
        }
3546

3547
        node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
10✔
3548
        if err != nil {
14✔
3549
                log.Debugf("Unable to fetch node announcement for %x: %v",
4✔
3550
                        chanInfo.NodeKey2Bytes, err)
4✔
3551
        } else {
10✔
3552
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
12✔
3553
                        announcements = append(announcements, networkMsg{
6✔
3554
                                peer:   nMsg.peer,
6✔
3555
                                source: nodeKey2,
6✔
3556
                                msg:    node2Ann,
6✔
3557
                        })
6✔
3558
                }
6✔
3559
        }
3560

3561
        nMsg.err <- nil
10✔
3562
        return announcements, true
10✔
3563
}
3564

3565
// isBanned returns true if the peer identified by pubkey is banned for sending
3566
// invalid channel announcements.
3567
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
205✔
3568
        return d.banman.isBanned(pubkey)
205✔
3569
}
205✔
3570

3571
// ShouldDisconnect returns true if we should disconnect the peer identified by
3572
// pubkey.
3573
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3574
        bool, error) {
203✔
3575

203✔
3576
        pubkeySer := pubkey.SerializeCompressed()
203✔
3577

203✔
3578
        var pubkeyBytes [33]byte
203✔
3579
        copy(pubkeyBytes[:], pubkeySer)
203✔
3580

203✔
3581
        // If the public key is banned, check whether or not this is a channel
203✔
3582
        // peer.
203✔
3583
        if d.isBanned(pubkeyBytes) {
205✔
3584
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
2✔
3585
                if err != nil {
2✔
3586
                        return false, err
×
3587
                }
×
3588

3589
                // We should only disconnect non-channel peers.
3590
                if !isChanPeer {
3✔
3591
                        return true, nil
1✔
3592
                }
1✔
3593
        }
3594

3595
        return false, nil
202✔
3596
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc