• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12984608230

27 Jan 2025 08:12AM UTC coverage: 58.777% (+0.003%) from 58.774%
12984608230

push

github

web-flow
Merge pull request #9445 from yyforyongyu/itest-flake

itest: fix flake in `testAnchorThirdPartySpend`

136025 of 231424 relevant lines covered (58.78%)

19256.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.83
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/davecgh/go-spew/spew"
17
        "github.com/lightninglabs/neutrino/cache"
18
        "github.com/lightninglabs/neutrino/cache/lru"
19
        "github.com/lightningnetwork/lnd/batch"
20
        "github.com/lightningnetwork/lnd/chainntnfs"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/fn/v2"
23
        "github.com/lightningnetwork/lnd/graph"
24
        graphdb "github.com/lightningnetwork/lnd/graph/db"
25
        "github.com/lightningnetwork/lnd/graph/db/models"
26
        "github.com/lightningnetwork/lnd/keychain"
27
        "github.com/lightningnetwork/lnd/lnpeer"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwire"
31
        "github.com/lightningnetwork/lnd/multimutex"
32
        "github.com/lightningnetwork/lnd/netann"
33
        "github.com/lightningnetwork/lnd/routing/route"
34
        "github.com/lightningnetwork/lnd/ticker"
35
        "golang.org/x/time/rate"
36
)
37

38
const (
39
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
40
        // for a specific channel and direction that we'll accept over an
41
        // interval.
42
        DefaultMaxChannelUpdateBurst = 10
43

44
        // DefaultChannelUpdateInterval is the default interval we'll use to
45
        // determine how often we should allow a new update for a specific
46
        // channel and direction.
47
        DefaultChannelUpdateInterval = time.Minute
48

49
        // maxPrematureUpdates tracks the max amount of premature channel
50
        // updates that we'll hold onto.
51
        maxPrematureUpdates = 100
52

53
        // maxFutureMessages tracks the max amount of future messages that
54
        // we'll hold onto.
55
        maxFutureMessages = 1000
56

57
        // DefaultSubBatchDelay is the default delay we'll use when
58
        // broadcasting the next announcement batch.
59
        DefaultSubBatchDelay = 5 * time.Second
60

61
        // maxRejectedUpdates tracks the max amount of rejected channel updates
62
        // we'll maintain. This is the global size across all peers. We'll
63
        // allocate ~3 MB max to the cache.
64
        maxRejectedUpdates = 10_000
65

66
        // DefaultProofMatureDelta specifies the default value used for
67
        // ProofMatureDelta, which is the number of confirmations needed before
68
        // processing the announcement signatures.
69
        DefaultProofMatureDelta = 6
70
)
71

72
var (
73
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
74
        // is in the process of being shut down.
75
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
76

77
        // ErrGossipSyncerNotFound signals that we were unable to find an active
78
        // gossip syncer corresponding to a gossip query message received from
79
        // the remote peer.
80
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
81

82
        // emptyPubkey is used to compare compressed pubkeys against an empty
83
        // byte array.
84
        emptyPubkey [33]byte
85
)
86

87
// optionalMsgFields is a set of optional message fields that external callers
88
// can provide that serve useful when processing a specific network
89
// announcement.
90
type optionalMsgFields struct {
91
        capacity      *btcutil.Amount
92
        channelPoint  *wire.OutPoint
93
        remoteAlias   *lnwire.ShortChannelID
94
        tapscriptRoot fn.Option[chainhash.Hash]
95
}
96

97
// apply applies the optional fields within the functional options.
98
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
50✔
99
        for _, optionalMsgField := range optionalMsgFields {
58✔
100
                optionalMsgField(f)
8✔
101
        }
8✔
102
}
103

104
// OptionalMsgField is a functional option parameter that can be used to provide
105
// external information that is not included within a network message but serves
106
// useful when processing it.
107
type OptionalMsgField func(*optionalMsgFields)
108

109
// ChannelCapacity is an optional field that lets the gossiper know of the
110
// capacity of a channel.
111
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
30✔
112
        return func(f *optionalMsgFields) {
34✔
113
                f.capacity = &capacity
4✔
114
        }
4✔
115
}
116

117
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
118
// of a channel.
119
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
33✔
120
        return func(f *optionalMsgFields) {
40✔
121
                f.channelPoint = &op
7✔
122
        }
7✔
123
}
124

125
// TapscriptRoot is an optional field that lets the gossiper know of the root of
126
// the tapscript tree for a custom channel.
127
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
29✔
128
        return func(f *optionalMsgFields) {
32✔
129
                f.tapscriptRoot = root
3✔
130
        }
3✔
131
}
132

133
// RemoteAlias is an optional field that lets the gossiper know that a locally
134
// sent channel update is actually an update for the peer that should replace
135
// the ShortChannelID field with the remote's alias. This is only used for
136
// channels with peers where the option-scid-alias feature bit was negotiated.
137
// The channel update will be added to the graph under the original SCID, but
138
// will be modified and re-signed with this alias.
139
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
29✔
140
        return func(f *optionalMsgFields) {
32✔
141
                f.remoteAlias = alias
3✔
142
        }
3✔
143
}
144

145
// networkMsg couples a routing related wire message with the peer that
146
// originally sent it.
147
type networkMsg struct {
148
        peer              lnpeer.Peer
149
        source            *btcec.PublicKey
150
        msg               lnwire.Message
151
        optionalMsgFields *optionalMsgFields
152

153
        isRemote bool
154

155
        err chan error
156
}
157

158
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
159
// wishes to update a particular set of channels. New ChannelUpdate messages
160
// will be crafted to be sent out during the next broadcast epoch and the fee
161
// updates committed to the lower layer.
162
type chanPolicyUpdateRequest struct {
163
        edgesToUpdate []EdgeWithInfo
164
        errChan       chan error
165
}
166

167
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
168
// syncer at all times.
169
type PinnedSyncers map[route.Vertex]struct{}
170

171
// Config defines the configuration for the service. ALL elements within the
172
// configuration MUST be non-nil for the service to carry out its duties.
173
type Config struct {
174
        // ChainHash is a hash that indicates which resident chain of the
175
        // AuthenticatedGossiper. Any announcements that don't match this
176
        // chain hash will be ignored.
177
        //
178
        // TODO(roasbeef): eventually make into map so can de-multiplex
179
        // incoming announcements
180
        //   * also need to do same for Notifier
181
        ChainHash chainhash.Hash
182

183
        // Graph is the subsystem which is responsible for managing the
184
        // topology of lightning network. After incoming channel, node, channel
185
        // updates announcements are validated they are sent to the router in
186
        // order to be included in the LN graph.
187
        Graph graph.ChannelGraphSource
188

189
        // ChainIO represents an abstraction over a source that can query the
190
        // blockchain.
191
        ChainIO lnwallet.BlockChainIO
192

193
        // ChanSeries is an interfaces that provides access to a time series
194
        // view of the current known channel graph. Each GossipSyncer enabled
195
        // peer will utilize this in order to create and respond to channel
196
        // graph time series queries.
197
        ChanSeries ChannelGraphTimeSeries
198

199
        // Notifier is used for receiving notifications of incoming blocks.
200
        // With each new incoming block found we process previously premature
201
        // announcements.
202
        //
203
        // TODO(roasbeef): could possibly just replace this with an epoch
204
        // channel.
205
        Notifier chainntnfs.ChainNotifier
206

207
        // Broadcast broadcasts a particular set of announcements to all peers
208
        // that the daemon is connected to. If supplied, the exclude parameter
209
        // indicates that the target peer should be excluded from the
210
        // broadcast.
211
        Broadcast func(skips map[route.Vertex]struct{},
212
                msg ...lnwire.Message) error
213

214
        // NotifyWhenOnline is a function that allows the gossiper to be
215
        // notified when a certain peer comes online, allowing it to
216
        // retry sending a peer message.
217
        //
218
        // NOTE: The peerChan channel must be buffered.
219
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
220

221
        // NotifyWhenOffline is a function that allows the gossiper to be
222
        // notified when a certain peer disconnects, allowing it to request a
223
        // notification for when it reconnects.
224
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
225

226
        // FetchSelfAnnouncement retrieves our current node announcement, for
227
        // use when determining whether we should update our peers about our
228
        // presence in the network.
229
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
230

231
        // UpdateSelfAnnouncement produces a new announcement for our node with
232
        // an updated timestamp which can be broadcast to our peers.
233
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
234

235
        // ProofMatureDelta the number of confirmations which is needed before
236
        // exchange the channel announcement proofs.
237
        ProofMatureDelta uint32
238

239
        // TrickleDelay the period of trickle timer which flushes to the
240
        // network the pending batch of new announcements we've received since
241
        // the last trickle tick.
242
        TrickleDelay time.Duration
243

244
        // RetransmitTicker is a ticker that ticks with a period which
245
        // indicates that we should check if we need re-broadcast any of our
246
        // personal channels.
247
        RetransmitTicker ticker.Ticker
248

249
        // RebroadcastInterval is the maximum time we wait between sending out
250
        // channel updates for our active channels and our own node
251
        // announcement. We do this to ensure our active presence on the
252
        // network is known, and we are not being considered a zombie node or
253
        // having zombie channels.
254
        RebroadcastInterval time.Duration
255

256
        // WaitingProofStore is a persistent storage of partial channel proof
257
        // announcement messages. We use it to buffer half of the material
258
        // needed to reconstruct a full authenticated channel announcement.
259
        // Once we receive the other half the channel proof, we'll be able to
260
        // properly validate it and re-broadcast it out to the network.
261
        //
262
        // TODO(wilmer): make interface to prevent channeldb dependency.
263
        WaitingProofStore *channeldb.WaitingProofStore
264

265
        // MessageStore is a persistent storage of gossip messages which we will
266
        // use to determine which messages need to be resent for a given peer.
267
        MessageStore GossipMessageStore
268

269
        // AnnSigner is an instance of the MessageSigner interface which will
270
        // be used to manually sign any outgoing channel updates. The signer
271
        // implementation should be backed by the public key of the backing
272
        // Lightning node.
273
        //
274
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
275
        // here?
276
        AnnSigner lnwallet.MessageSigner
277

278
        // ScidCloser is an instance of ClosedChannelTracker that helps the
279
        // gossiper cut down on spam channel announcements for already closed
280
        // channels.
281
        ScidCloser ClosedChannelTracker
282

283
        // NumActiveSyncers is the number of peers for which we should have
284
        // active syncers with. After reaching NumActiveSyncers, any future
285
        // gossip syncers will be passive.
286
        NumActiveSyncers int
287

288
        // NoTimestampQueries will prevent the GossipSyncer from querying
289
        // timestamps of announcement messages from the peer and from replying
290
        // to timestamp queries.
291
        NoTimestampQueries bool
292

293
        // RotateTicker is a ticker responsible for notifying the SyncManager
294
        // when it should rotate its active syncers. A single active syncer with
295
        // a chansSynced state will be exchanged for a passive syncer in order
296
        // to ensure we don't keep syncing with the same peers.
297
        RotateTicker ticker.Ticker
298

299
        // HistoricalSyncTicker is a ticker responsible for notifying the
300
        // syncManager when it should attempt a historical sync with a gossip
301
        // sync peer.
302
        HistoricalSyncTicker ticker.Ticker
303

304
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
305
        // syncManager when it should attempt to start the next pending
306
        // activeSyncer due to the current one not completing its state machine
307
        // within the timeout.
308
        ActiveSyncerTimeoutTicker ticker.Ticker
309

310
        // MinimumBatchSize is minimum size of a sub batch of announcement
311
        // messages.
312
        MinimumBatchSize int
313

314
        // SubBatchDelay is the delay between sending sub batches of
315
        // gossip messages.
316
        SubBatchDelay time.Duration
317

318
        // IgnoreHistoricalFilters will prevent syncers from replying with
319
        // historical data when the remote peer sets a gossip_timestamp_range.
320
        // This prevents ranges with old start times from causing us to dump the
321
        // graph on connect.
322
        IgnoreHistoricalFilters bool
323

324
        // PinnedSyncers is a set of peers that will always transition to
325
        // ActiveSync upon connection. These peers will never transition to
326
        // PassiveSync.
327
        PinnedSyncers PinnedSyncers
328

329
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
330
        // specific channel and direction that we'll accept over an interval.
331
        MaxChannelUpdateBurst int
332

333
        // ChannelUpdateInterval specifies the interval we'll use to determine
334
        // how often we should allow a new update for a specific channel and
335
        // direction.
336
        ChannelUpdateInterval time.Duration
337

338
        // IsAlias returns true if a given ShortChannelID is an alias for
339
        // option_scid_alias channels.
340
        IsAlias func(scid lnwire.ShortChannelID) bool
341

342
        // SignAliasUpdate is used to re-sign a channel update using the
343
        // remote's alias if the option-scid-alias feature bit was negotiated.
344
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
345
                error)
346

347
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
348
        // This is used for channels that have negotiated the option-scid-alias
349
        // feature bit.
350
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
351
                lnwire.ShortChannelID, error)
352

353
        // GetAlias allows the gossiper to look up the peer's alias for a given
354
        // ChannelID. This is used to sign updates for them if the channel has
355
        // no AuthProof and the option-scid-alias feature bit was negotiated.
356
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
357

358
        // FindChannel allows the gossiper to find a channel that we're party
359
        // to without iterating over the entire set of open channels.
360
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
361
                *channeldb.OpenChannel, error)
362

363
        // IsStillZombieChannel takes the timestamps of the latest channel
364
        // updates for a channel and returns true if the channel should be
365
        // considered a zombie based on these timestamps.
366
        IsStillZombieChannel func(time.Time, time.Time) bool
367
}
368

369
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
370
// used to let the caller of the lru.Cache know if a message has already been
371
// processed or not.
372
type processedNetworkMsg struct {
373
        processed bool
374
        msg       *networkMsg
375
}
376

377
// cachedNetworkMsg is a wrapper around a network message that can be used with
378
// *lru.Cache.
379
type cachedNetworkMsg struct {
380
        msgs []*processedNetworkMsg
381
}
382

383
// Size returns the "size" of an entry. We return the number of items as we
384
// just want to limit the total amount of entries rather than do accurate size
385
// accounting.
386
func (c *cachedNetworkMsg) Size() (uint64, error) {
5✔
387
        return uint64(len(c.msgs)), nil
5✔
388
}
5✔
389

390
// rejectCacheKey is the cache key that we'll use to track announcements we've
391
// recently rejected.
392
type rejectCacheKey struct {
393
        pubkey [33]byte
394
        chanID uint64
395
}
396

397
// newRejectCacheKey returns a new cache key for the reject cache.
398
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
465✔
399
        k := rejectCacheKey{
465✔
400
                chanID: cid,
465✔
401
                pubkey: pub,
465✔
402
        }
465✔
403

465✔
404
        return k
465✔
405
}
465✔
406

407
// sourceToPub returns a serialized-compressed public key for use in the reject
408
// cache.
409
func sourceToPub(pk *btcec.PublicKey) [33]byte {
479✔
410
        var pub [33]byte
479✔
411
        copy(pub[:], pk.SerializeCompressed())
479✔
412
        return pub
479✔
413
}
479✔
414

415
// cachedReject is the empty value used to track the value for rejects.
416
type cachedReject struct {
417
}
418

419
// Size returns the "size" of an entry. We return 1 as we just want to limit
420
// the total size.
421
func (c *cachedReject) Size() (uint64, error) {
203✔
422
        return 1, nil
203✔
423
}
203✔
424

425
// AuthenticatedGossiper is a subsystem which is responsible for receiving
426
// announcements, validating them and applying the changes to router, syncing
427
// lightning network with newly connected nodes, broadcasting announcements
428
// after validation, negotiating the channel announcement proofs exchange and
429
// handling the premature announcements. All outgoing announcements are
430
// expected to be properly signed as dictated in BOLT#7, additionally, all
431
// incoming message are expected to be well formed and signed. Invalid messages
432
// will be rejected by this struct.
433
type AuthenticatedGossiper struct {
434
        // Parameters which are needed to properly handle the start and stop of
435
        // the service.
436
        started sync.Once
437
        stopped sync.Once
438

439
        // bestHeight is the height of the block at the tip of the main chain
440
        // as we know it. Accesses *MUST* be done with the gossiper's lock
441
        // held.
442
        bestHeight uint32
443

444
        quit chan struct{}
445
        wg   sync.WaitGroup
446

447
        // cfg is a copy of the configuration struct that the gossiper service
448
        // was initialized with.
449
        cfg *Config
450

451
        // blockEpochs encapsulates a stream of block epochs that are sent at
452
        // every new block height.
453
        blockEpochs *chainntnfs.BlockEpochEvent
454

455
        // prematureChannelUpdates is a map of ChannelUpdates we have received
456
        // that wasn't associated with any channel we know about.  We store
457
        // them temporarily, such that we can reprocess them when a
458
        // ChannelAnnouncement for the channel is received.
459
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
460

461
        // banman tracks our peer's ban status.
462
        banman *banman
463

464
        // networkMsgs is a channel that carries new network broadcasted
465
        // message from outside the gossiper service to be processed by the
466
        // networkHandler.
467
        networkMsgs chan *networkMsg
468

469
        // futureMsgs is a list of premature network messages that have a block
470
        // height specified in the future. We will save them and resend it to
471
        // the chan networkMsgs once the block height has reached. The cached
472
        // map format is,
473
        //   {msgID1: msg1, msgID2: msg2, ...}
474
        futureMsgs *futureMsgCache
475

476
        // chanPolicyUpdates is a channel that requests to update the
477
        // forwarding policy of a set of channels is sent over.
478
        chanPolicyUpdates chan *chanPolicyUpdateRequest
479

480
        // selfKey is the identity public key of the backing Lightning node.
481
        selfKey *btcec.PublicKey
482

483
        // selfKeyLoc is the locator for the identity public key of the backing
484
        // Lightning node.
485
        selfKeyLoc keychain.KeyLocator
486

487
        // channelMtx is used to restrict the database access to one
488
        // goroutine per channel ID. This is done to ensure that when
489
        // the gossiper is handling an announcement, the db state stays
490
        // consistent between when the DB is first read until it's written.
491
        channelMtx *multimutex.Mutex[uint64]
492

493
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
494

495
        // syncMgr is a subsystem responsible for managing the gossip syncers
496
        // for peers currently connected. When a new peer is connected, the
497
        // manager will create its accompanying gossip syncer and determine
498
        // whether it should have an activeSync or passiveSync sync type based
499
        // on how many other gossip syncers are currently active. Any activeSync
500
        // gossip syncers are started in a round-robin manner to ensure we're
501
        // not syncing with multiple peers at the same time.
502
        syncMgr *SyncManager
503

504
        // reliableSender is a subsystem responsible for handling reliable
505
        // message send requests to peers. This should only be used for channels
506
        // that are unadvertised at the time of handling the message since if it
507
        // is advertised, then peers should be able to get the message from the
508
        // network.
509
        reliableSender *reliableSender
510

511
        // chanUpdateRateLimiter contains rate limiters for each direction of
512
        // a channel update we've processed. We'll use these to determine
513
        // whether we should accept a new update for a specific channel and
514
        // direction.
515
        //
516
        // NOTE: This map must be synchronized with the main
517
        // AuthenticatedGossiper lock.
518
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
519

520
        // vb is used to enforce job dependency ordering of gossip messages.
521
        vb *ValidationBarrier
522

523
        sync.Mutex
524
}
525

526
// New creates a new AuthenticatedGossiper instance, initialized with the
527
// passed configuration parameters.
528
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
30✔
529
        gossiper := &AuthenticatedGossiper{
30✔
530
                selfKey:           selfKeyDesc.PubKey,
30✔
531
                selfKeyLoc:        selfKeyDesc.KeyLocator,
30✔
532
                cfg:               &cfg,
30✔
533
                networkMsgs:       make(chan *networkMsg),
30✔
534
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
30✔
535
                quit:              make(chan struct{}),
30✔
536
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
30✔
537
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
30✔
538
                        maxPrematureUpdates,
30✔
539
                ),
30✔
540
                channelMtx: multimutex.NewMutex[uint64](),
30✔
541
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
30✔
542
                        maxRejectedUpdates,
30✔
543
                ),
30✔
544
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
30✔
545
                banman:                newBanman(),
30✔
546
        }
30✔
547

30✔
548
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
30✔
549

30✔
550
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
30✔
551
                ChainHash:               cfg.ChainHash,
30✔
552
                ChanSeries:              cfg.ChanSeries,
30✔
553
                RotateTicker:            cfg.RotateTicker,
30✔
554
                HistoricalSyncTicker:    cfg.HistoricalSyncTicker,
30✔
555
                NumActiveSyncers:        cfg.NumActiveSyncers,
30✔
556
                NoTimestampQueries:      cfg.NoTimestampQueries,
30✔
557
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
30✔
558
                BestHeight:              gossiper.latestHeight,
30✔
559
                PinnedSyncers:           cfg.PinnedSyncers,
30✔
560
                IsStillZombieChannel:    cfg.IsStillZombieChannel,
30✔
561
        })
30✔
562

30✔
563
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
30✔
564
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
30✔
565
                NotifyWhenOffline: cfg.NotifyWhenOffline,
30✔
566
                MessageStore:      cfg.MessageStore,
30✔
567
                IsMsgStale:        gossiper.isMsgStale,
30✔
568
        })
30✔
569

30✔
570
        return gossiper
30✔
571
}
30✔
572

573
// EdgeWithInfo contains the information that is required to update an edge.
574
type EdgeWithInfo struct {
575
        // Info describes the channel.
576
        Info *models.ChannelEdgeInfo
577

578
        // Edge describes the policy in one direction of the channel.
579
        Edge *models.ChannelEdgePolicy
580
}
581

582
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
583
// specified edge updates. Updates are done in two stages: first, the
584
// AuthenticatedGossiper ensures the update has been committed by dependent
585
// sub-systems, then it signs and broadcasts new updates to the network. A
586
// mapping between outpoints and updated channel policies is returned, which is
587
// used to update the forwarding policies of the underlying links.
588
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
589
        edgesToUpdate []EdgeWithInfo) error {
4✔
590

4✔
591
        errChan := make(chan error, 1)
4✔
592
        policyUpdate := &chanPolicyUpdateRequest{
4✔
593
                edgesToUpdate: edgesToUpdate,
4✔
594
                errChan:       errChan,
4✔
595
        }
4✔
596

4✔
597
        select {
4✔
598
        case d.chanPolicyUpdates <- policyUpdate:
4✔
599
                err := <-errChan
4✔
600
                return err
4✔
601
        case <-d.quit:
×
602
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
603
        }
604
}
605

606
// Start spawns network messages handler goroutine and registers on new block
607
// notifications in order to properly handle the premature announcements.
608
func (d *AuthenticatedGossiper) Start() error {
30✔
609
        var err error
30✔
610
        d.started.Do(func() {
60✔
611
                log.Info("Authenticated Gossiper starting")
30✔
612
                err = d.start()
30✔
613
        })
30✔
614
        return err
30✔
615
}
616

617
func (d *AuthenticatedGossiper) start() error {
30✔
618
        // First we register for new notifications of newly discovered blocks.
30✔
619
        // We do this immediately so we'll later be able to consume any/all
30✔
620
        // blocks which were discovered.
30✔
621
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
30✔
622
        if err != nil {
30✔
623
                return err
×
624
        }
×
625
        d.blockEpochs = blockEpochs
30✔
626

30✔
627
        height, err := d.cfg.Graph.CurrentBlockHeight()
30✔
628
        if err != nil {
30✔
629
                return err
×
630
        }
×
631
        d.bestHeight = height
30✔
632

30✔
633
        // Start the reliable sender. In case we had any pending messages ready
30✔
634
        // to be sent when the gossiper was last shut down, we must continue on
30✔
635
        // our quest to deliver them to their respective peers.
30✔
636
        if err := d.reliableSender.Start(); err != nil {
30✔
637
                return err
×
638
        }
×
639

640
        d.syncMgr.Start()
30✔
641

30✔
642
        d.banman.start()
30✔
643

30✔
644
        // Start receiving blocks in its dedicated goroutine.
30✔
645
        d.wg.Add(2)
30✔
646
        go d.syncBlockHeight()
30✔
647
        go d.networkHandler()
30✔
648

30✔
649
        return nil
30✔
650
}
651

652
// syncBlockHeight syncs the best block height for the gossiper by reading
653
// blockEpochs.
654
//
655
// NOTE: must be run as a goroutine.
656
func (d *AuthenticatedGossiper) syncBlockHeight() {
30✔
657
        defer d.wg.Done()
30✔
658

30✔
659
        for {
60✔
660
                select {
30✔
661
                // A new block has arrived, so we can re-process the previously
662
                // premature announcements.
663
                case newBlock, ok := <-d.blockEpochs.Epochs:
3✔
664
                        // If the channel has been closed, then this indicates
3✔
665
                        // the daemon is shutting down, so we exit ourselves.
3✔
666
                        if !ok {
6✔
667
                                return
3✔
668
                        }
3✔
669

670
                        // Once a new block arrives, we update our running
671
                        // track of the height of the chain tip.
672
                        d.Lock()
3✔
673
                        blockHeight := uint32(newBlock.Height)
3✔
674
                        d.bestHeight = blockHeight
3✔
675
                        d.Unlock()
3✔
676

3✔
677
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
3✔
678
                                newBlock.Hash)
3✔
679

3✔
680
                        // Resend future messages, if any.
3✔
681
                        d.resendFutureMessages(blockHeight)
3✔
682

683
                case <-d.quit:
27✔
684
                        return
27✔
685
                }
686
        }
687
}
688

689
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
690
// the unique ID when saving the message.
691
type futureMsgCache struct {
692
        *lru.Cache[uint64, *cachedFutureMsg]
693

694
        // msgID is a monotonically increased integer.
695
        msgID atomic.Uint64
696
}
697

698
// nextMsgID returns a unique message ID.
699
func (f *futureMsgCache) nextMsgID() uint64 {
6✔
700
        return f.msgID.Add(1)
6✔
701
}
6✔
702

703
// newFutureMsgCache creates a new future message cache with the underlying lru
704
// cache being initialized with the specified capacity.
705
func newFutureMsgCache(capacity uint64) *futureMsgCache {
31✔
706
        // Create a new cache.
31✔
707
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
31✔
708

31✔
709
        return &futureMsgCache{
31✔
710
                Cache: cache,
31✔
711
        }
31✔
712
}
31✔
713

714
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
715
type cachedFutureMsg struct {
716
        // msg is the network message.
717
        msg *networkMsg
718

719
        // height is the block height.
720
        height uint32
721
}
722

723
// Size returns the size of the message.
724
func (c *cachedFutureMsg) Size() (uint64, error) {
7✔
725
        // Return a constant 1.
7✔
726
        return 1, nil
7✔
727
}
7✔
728

729
// resendFutureMessages takes a block height, resends all the future messages
730
// found below and equal to that height and deletes those messages found in the
731
// gossiper's futureMsgs.
732
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
3✔
733
        var (
3✔
734
                // msgs are the target messages.
3✔
735
                msgs []*networkMsg
3✔
736

3✔
737
                // keys are the target messages' caching keys.
3✔
738
                keys []uint64
3✔
739
        )
3✔
740

3✔
741
        // filterMsgs is the visitor used when iterating the future cache.
3✔
742
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
6✔
743
                if cmsg.height <= height {
6✔
744
                        msgs = append(msgs, cmsg.msg)
3✔
745
                        keys = append(keys, k)
3✔
746
                }
3✔
747

748
                return true
3✔
749
        }
750

751
        // Filter out the target messages.
752
        d.futureMsgs.Range(filterMsgs)
3✔
753

3✔
754
        // Return early if no messages found.
3✔
755
        if len(msgs) == 0 {
6✔
756
                return
3✔
757
        }
3✔
758

759
        // Remove the filtered messages.
760
        for _, key := range keys {
6✔
761
                d.futureMsgs.Delete(key)
3✔
762
        }
3✔
763

764
        log.Debugf("Resending %d network messages at height %d",
3✔
765
                len(msgs), height)
3✔
766

3✔
767
        for _, msg := range msgs {
6✔
768
                select {
3✔
769
                case d.networkMsgs <- msg:
3✔
770
                case <-d.quit:
×
771
                        msg.err <- ErrGossiperShuttingDown
×
772
                }
773
        }
774
}
775

776
// Stop signals any active goroutines for a graceful closure.
777
func (d *AuthenticatedGossiper) Stop() error {
31✔
778
        d.stopped.Do(func() {
61✔
779
                log.Info("Authenticated gossiper shutting down...")
30✔
780
                defer log.Debug("Authenticated gossiper shutdown complete")
30✔
781

30✔
782
                d.stop()
30✔
783
        })
30✔
784
        return nil
31✔
785
}
786

787
func (d *AuthenticatedGossiper) stop() {
30✔
788
        log.Debug("Authenticated Gossiper is stopping")
30✔
789
        defer log.Debug("Authenticated Gossiper stopped")
30✔
790

30✔
791
        // `blockEpochs` is only initialized in the start routine so we make
30✔
792
        // sure we don't panic here in the case where the `Stop` method is
30✔
793
        // called when the `Start` method does not complete.
30✔
794
        if d.blockEpochs != nil {
60✔
795
                d.blockEpochs.Cancel()
30✔
796
        }
30✔
797

798
        d.syncMgr.Stop()
30✔
799

30✔
800
        d.banman.stop()
30✔
801

30✔
802
        close(d.quit)
30✔
803
        d.wg.Wait()
30✔
804

30✔
805
        // We'll stop our reliable sender after all of the gossiper's goroutines
30✔
806
        // have exited to ensure nothing can cause it to continue executing.
30✔
807
        d.reliableSender.Stop()
30✔
808
}
809

810
// TODO(roasbeef): need method to get current gossip timestamp?
811
//  * using mtx, check time rotate forward is needed?
812

813
// ProcessRemoteAnnouncement sends a new remote announcement message along with
814
// the peer that sent the routing message. The announcement will be processed
815
// then added to a queue for batched trickled announcement to all connected
816
// peers.  Remote channel announcements should contain the announcement proof
817
// and be fully validated.
818
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
819
        peer lnpeer.Peer) chan error {
287✔
820

287✔
821
        log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey())
287✔
822

287✔
823
        errChan := make(chan error, 1)
287✔
824

287✔
825
        // For messages in the known set of channel series queries, we'll
287✔
826
        // dispatch the message directly to the GossipSyncer, and skip the main
287✔
827
        // processing loop.
287✔
828
        switch m := msg.(type) {
287✔
829
        case *lnwire.QueryShortChanIDs,
830
                *lnwire.QueryChannelRange,
831
                *lnwire.ReplyChannelRange,
832
                *lnwire.ReplyShortChanIDsEnd:
3✔
833

3✔
834
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
835
                if !ok {
3✔
836
                        log.Warnf("Gossip syncer for peer=%x not found",
×
837
                                peer.PubKey())
×
838

×
839
                        errChan <- ErrGossipSyncerNotFound
×
840
                        return errChan
×
841
                }
×
842

843
                // If we've found the message target, then we'll dispatch the
844
                // message directly to it.
845
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
3✔
846
                if err != nil {
3✔
847
                        log.Errorf("Process query msg from peer %x got %v",
×
848
                                peer.PubKey(), err)
×
849
                }
×
850

851
                errChan <- err
3✔
852
                return errChan
3✔
853

854
        // If a peer is updating its current update horizon, then we'll dispatch
855
        // that directly to the proper GossipSyncer.
856
        case *lnwire.GossipTimestampRange:
3✔
857
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
858
                if !ok {
3✔
859
                        log.Warnf("Gossip syncer for peer=%x not found",
×
860
                                peer.PubKey())
×
861

×
862
                        errChan <- ErrGossipSyncerNotFound
×
863
                        return errChan
×
864
                }
×
865

866
                // If we've found the message target, then we'll dispatch the
867
                // message directly to it.
868
                if err := syncer.ApplyGossipFilter(m); err != nil {
3✔
869
                        log.Warnf("Unable to apply gossip filter for peer=%x: "+
×
870
                                "%v", peer.PubKey(), err)
×
871

×
872
                        errChan <- err
×
873
                        return errChan
×
874
                }
×
875

876
                errChan <- nil
3✔
877
                return errChan
3✔
878

879
        // To avoid inserting edges in the graph for our own channels that we
880
        // have already closed, we ignore such channel announcements coming
881
        // from the remote.
882
        case *lnwire.ChannelAnnouncement1:
222✔
883
                ownKey := d.selfKey.SerializeCompressed()
222✔
884
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
222✔
885
                        "for own channel")
222✔
886

222✔
887
                if bytes.Equal(m.NodeID1[:], ownKey) ||
222✔
888
                        bytes.Equal(m.NodeID2[:], ownKey) {
227✔
889

5✔
890
                        log.Warn(ownErr)
5✔
891
                        errChan <- ownErr
5✔
892
                        return errChan
5✔
893
                }
5✔
894
        }
895

896
        nMsg := &networkMsg{
285✔
897
                msg:      msg,
285✔
898
                isRemote: true,
285✔
899
                peer:     peer,
285✔
900
                source:   peer.IdentityKey(),
285✔
901
                err:      errChan,
285✔
902
        }
285✔
903

285✔
904
        select {
285✔
905
        case d.networkMsgs <- nMsg:
285✔
906

907
        // If the peer that sent us this error is quitting, then we don't need
908
        // to send back an error and can return immediately.
909
        case <-peer.QuitSignal():
×
910
                return nil
×
911
        case <-d.quit:
×
912
                nMsg.err <- ErrGossiperShuttingDown
×
913
        }
914

915
        return nMsg.err
285✔
916
}
917

918
// ProcessLocalAnnouncement sends a new remote announcement message along with
919
// the peer that sent the routing message. The announcement will be processed
920
// then added to a queue for batched trickled announcement to all connected
921
// peers.  Local channel announcements don't contain the announcement proof and
922
// will not be fully validated. Once the channel proofs are received, the
923
// entire channel announcement and update messages will be re-constructed and
924
// broadcast to the rest of the network.
925
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
926
        optionalFields ...OptionalMsgField) chan error {
50✔
927

50✔
928
        optionalMsgFields := &optionalMsgFields{}
50✔
929
        optionalMsgFields.apply(optionalFields...)
50✔
930

50✔
931
        nMsg := &networkMsg{
50✔
932
                msg:               msg,
50✔
933
                optionalMsgFields: optionalMsgFields,
50✔
934
                isRemote:          false,
50✔
935
                source:            d.selfKey,
50✔
936
                err:               make(chan error, 1),
50✔
937
        }
50✔
938

50✔
939
        select {
50✔
940
        case d.networkMsgs <- nMsg:
50✔
941
        case <-d.quit:
×
942
                nMsg.err <- ErrGossiperShuttingDown
×
943
        }
944

945
        return nMsg.err
50✔
946
}
947

948
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
949
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
950
// tuple.
951
type channelUpdateID struct {
952
        // channelID represents the set of data which is needed to
953
        // retrieve all necessary data to validate the channel existence.
954
        channelID lnwire.ShortChannelID
955

956
        // Flags least-significant bit must be set to 0 if the creating node
957
        // corresponds to the first node in the previously sent channel
958
        // announcement and 1 otherwise.
959
        flags lnwire.ChanUpdateChanFlags
960
}
961

962
// msgWithSenders is a wrapper struct around a message, and the set of peers
963
// that originally sent us this message. Using this struct, we can ensure that
964
// we don't re-send a message to the peer that sent it to us in the first
965
// place.
966
type msgWithSenders struct {
967
        // msg is the wire message itself.
968
        msg lnwire.Message
969

970
        // isLocal is true if this was a message that originated locally. We'll
971
        // use this to bypass our normal checks to ensure we prioritize sending
972
        // out our own updates.
973
        isLocal bool
974

975
        // sender is the set of peers that sent us this message.
976
        senders map[route.Vertex]struct{}
977
}
978

979
// mergeSyncerMap is used to merge the set of senders of a particular message
980
// with peers that we have an active GossipSyncer with. We do this to ensure
981
// that we don't broadcast messages to any peers that we have active gossip
982
// syncers for.
983
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
27✔
984
        for peerPub := range syncers {
30✔
985
                m.senders[peerPub] = struct{}{}
3✔
986
        }
3✔
987
}
988

989
// deDupedAnnouncements de-duplicates announcements that have been added to the
990
// batch. Internally, announcements are stored in three maps
991
// (one each for channel announcements, channel updates, and node
992
// announcements). These maps keep track of unique announcements and ensure no
993
// announcements are duplicated. We keep the three message types separate, such
994
// that we can send channel announcements first, then channel updates, and
995
// finally node announcements when it's time to broadcast them.
996
type deDupedAnnouncements struct {
997
        // channelAnnouncements are identified by the short channel id field.
998
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
999

1000
        // channelUpdates are identified by the channel update id field.
1001
        channelUpdates map[channelUpdateID]msgWithSenders
1002

1003
        // nodeAnnouncements are identified by the Vertex field.
1004
        nodeAnnouncements map[route.Vertex]msgWithSenders
1005

1006
        sync.Mutex
1007
}
1008

1009
// Reset operates on deDupedAnnouncements to reset the storage of
1010
// announcements.
1011
func (d *deDupedAnnouncements) Reset() {
32✔
1012
        d.Lock()
32✔
1013
        defer d.Unlock()
32✔
1014

32✔
1015
        d.reset()
32✔
1016
}
32✔
1017

1018
// reset is the private version of the Reset method. We have this so we can
1019
// call this method within method that are already holding the lock.
1020
func (d *deDupedAnnouncements) reset() {
319✔
1021
        // Storage of each type of announcement (channel announcements, channel
319✔
1022
        // updates, node announcements) is set to an empty map where the
319✔
1023
        // appropriate key points to the corresponding lnwire.Message.
319✔
1024
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
319✔
1025
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
319✔
1026
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
319✔
1027
}
319✔
1028

1029
// addMsg adds a new message to the current batch. If the message is already
1030
// present in the current batch, then this new instance replaces the latter,
1031
// and the set of senders is updated to reflect which node sent us this
1032
// message.
1033
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
89✔
1034
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
89✔
1035

89✔
1036
        // Depending on the message type (channel announcement, channel update,
89✔
1037
        // or node announcement), the message is added to the corresponding map
89✔
1038
        // in deDupedAnnouncements. Because each identifying key can have at
89✔
1039
        // most one value, the announcements are de-duplicated, with newer ones
89✔
1040
        // replacing older ones.
89✔
1041
        switch msg := message.msg.(type) {
89✔
1042

1043
        // Channel announcements are identified by the short channel id field.
1044
        case *lnwire.ChannelAnnouncement1:
25✔
1045
                deDupKey := msg.ShortChannelID
25✔
1046
                sender := route.NewVertex(message.source)
25✔
1047

25✔
1048
                mws, ok := d.channelAnnouncements[deDupKey]
25✔
1049
                if !ok {
49✔
1050
                        mws = msgWithSenders{
24✔
1051
                                msg:     msg,
24✔
1052
                                isLocal: !message.isRemote,
24✔
1053
                                senders: make(map[route.Vertex]struct{}),
24✔
1054
                        }
24✔
1055
                        mws.senders[sender] = struct{}{}
24✔
1056

24✔
1057
                        d.channelAnnouncements[deDupKey] = mws
24✔
1058

24✔
1059
                        return
24✔
1060
                }
24✔
1061

1062
                mws.msg = msg
1✔
1063
                mws.senders[sender] = struct{}{}
1✔
1064
                d.channelAnnouncements[deDupKey] = mws
1✔
1065

1066
        // Channel updates are identified by the (short channel id,
1067
        // channelflags) tuple.
1068
        case *lnwire.ChannelUpdate1:
45✔
1069
                sender := route.NewVertex(message.source)
45✔
1070
                deDupKey := channelUpdateID{
45✔
1071
                        msg.ShortChannelID,
45✔
1072
                        msg.ChannelFlags,
45✔
1073
                }
45✔
1074

45✔
1075
                oldTimestamp := uint32(0)
45✔
1076
                mws, ok := d.channelUpdates[deDupKey]
45✔
1077
                if ok {
48✔
1078
                        // If we already have seen this message, record its
3✔
1079
                        // timestamp.
3✔
1080
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
3✔
1081
                        if !ok {
3✔
1082
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1083
                                        "got: %T", mws.msg)
×
1084

×
1085
                                return
×
1086
                        }
×
1087

1088
                        oldTimestamp = update.Timestamp
3✔
1089
                }
1090

1091
                // If we already had this message with a strictly newer
1092
                // timestamp, then we'll just discard the message we got.
1093
                if oldTimestamp > msg.Timestamp {
46✔
1094
                        log.Debugf("Ignored outdated network message: "+
1✔
1095
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1✔
1096
                        return
1✔
1097
                }
1✔
1098

1099
                // If the message we just got is newer than what we previously
1100
                // have seen, or this is the first time we see it, then we'll
1101
                // add it to our map of announcements.
1102
                if oldTimestamp < msg.Timestamp {
87✔
1103
                        mws = msgWithSenders{
43✔
1104
                                msg:     msg,
43✔
1105
                                isLocal: !message.isRemote,
43✔
1106
                                senders: make(map[route.Vertex]struct{}),
43✔
1107
                        }
43✔
1108

43✔
1109
                        // We'll mark the sender of the message in the
43✔
1110
                        // senders map.
43✔
1111
                        mws.senders[sender] = struct{}{}
43✔
1112

43✔
1113
                        d.channelUpdates[deDupKey] = mws
43✔
1114

43✔
1115
                        return
43✔
1116
                }
43✔
1117

1118
                // Lastly, if we had seen this exact message from before, with
1119
                // the same timestamp, we'll add the sender to the map of
1120
                // senders, such that we can skip sending this message back in
1121
                // the next batch.
1122
                mws.msg = msg
1✔
1123
                mws.senders[sender] = struct{}{}
1✔
1124
                d.channelUpdates[deDupKey] = mws
1✔
1125

1126
        // Node announcements are identified by the Vertex field.  Use the
1127
        // NodeID to create the corresponding Vertex.
1128
        case *lnwire.NodeAnnouncement:
25✔
1129
                sender := route.NewVertex(message.source)
25✔
1130
                deDupKey := route.Vertex(msg.NodeID)
25✔
1131

25✔
1132
                // We do the same for node announcements as we did for channel
25✔
1133
                // updates, as they also carry a timestamp.
25✔
1134
                oldTimestamp := uint32(0)
25✔
1135
                mws, ok := d.nodeAnnouncements[deDupKey]
25✔
1136
                if ok {
33✔
1137
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
8✔
1138
                }
8✔
1139

1140
                // Discard the message if it's old.
1141
                if oldTimestamp > msg.Timestamp {
28✔
1142
                        return
3✔
1143
                }
3✔
1144

1145
                // Replace if it's newer.
1146
                if oldTimestamp < msg.Timestamp {
46✔
1147
                        mws = msgWithSenders{
21✔
1148
                                msg:     msg,
21✔
1149
                                isLocal: !message.isRemote,
21✔
1150
                                senders: make(map[route.Vertex]struct{}),
21✔
1151
                        }
21✔
1152

21✔
1153
                        mws.senders[sender] = struct{}{}
21✔
1154

21✔
1155
                        d.nodeAnnouncements[deDupKey] = mws
21✔
1156

21✔
1157
                        return
21✔
1158
                }
21✔
1159

1160
                // Add to senders map if it's the same as we had.
1161
                mws.msg = msg
7✔
1162
                mws.senders[sender] = struct{}{}
7✔
1163
                d.nodeAnnouncements[deDupKey] = mws
7✔
1164
        }
1165
}
1166

1167
// AddMsgs is a helper method to add multiple messages to the announcement
1168
// batch.
1169
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
57✔
1170
        d.Lock()
57✔
1171
        defer d.Unlock()
57✔
1172

57✔
1173
        for _, msg := range msgs {
146✔
1174
                d.addMsg(msg)
89✔
1175
        }
89✔
1176
}
1177

1178
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1179
// to broadcast next into messages that are locally sourced and those that are
1180
// sourced remotely.
1181
type msgsToBroadcast struct {
1182
        // localMsgs is the set of messages we created locally.
1183
        localMsgs []msgWithSenders
1184

1185
        // remoteMsgs is the set of messages that we received from a remote
1186
        // party.
1187
        remoteMsgs []msgWithSenders
1188
}
1189

1190
// addMsg adds a new message to the appropriate sub-slice.
1191
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
74✔
1192
        if msg.isLocal {
124✔
1193
                m.localMsgs = append(m.localMsgs, msg)
50✔
1194
        } else {
77✔
1195
                m.remoteMsgs = append(m.remoteMsgs, msg)
27✔
1196
        }
27✔
1197
}
1198

1199
// isEmpty returns true if the batch is empty.
1200
func (m *msgsToBroadcast) isEmpty() bool {
289✔
1201
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
289✔
1202
}
289✔
1203

1204
// length returns the length of the combined message set.
1205
func (m *msgsToBroadcast) length() int {
1✔
1206
        return len(m.localMsgs) + len(m.remoteMsgs)
1✔
1207
}
1✔
1208

1209
// Emit returns the set of de-duplicated announcements to be sent out during
1210
// the next announcement epoch, in the order of channel announcements, channel
1211
// updates, and node announcements. Each message emitted, contains the set of
1212
// peers that sent us the message. This way, we can ensure that we don't waste
1213
// bandwidth by re-sending a message to the peer that sent it to us in the
1214
// first place. Additionally, the set of stored messages are reset.
1215
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
290✔
1216
        d.Lock()
290✔
1217
        defer d.Unlock()
290✔
1218

290✔
1219
        // Get the total number of announcements.
290✔
1220
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
290✔
1221
                len(d.nodeAnnouncements)
290✔
1222

290✔
1223
        // Create an empty array of lnwire.Messages with a length equal to
290✔
1224
        // the total number of announcements.
290✔
1225
        msgs := msgsToBroadcast{
290✔
1226
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
290✔
1227
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
290✔
1228
        }
290✔
1229

290✔
1230
        // Add the channel announcements to the array first.
290✔
1231
        for _, message := range d.channelAnnouncements {
311✔
1232
                msgs.addMsg(message)
21✔
1233
        }
21✔
1234

1235
        // Then add the channel updates.
1236
        for _, message := range d.channelUpdates {
329✔
1237
                msgs.addMsg(message)
39✔
1238
        }
39✔
1239

1240
        // Finally add the node announcements.
1241
        for _, message := range d.nodeAnnouncements {
310✔
1242
                msgs.addMsg(message)
20✔
1243
        }
20✔
1244

1245
        d.reset()
290✔
1246

290✔
1247
        // Return the array of lnwire.messages.
290✔
1248
        return msgs
290✔
1249
}
1250

1251
// calculateSubBatchSize is a helper function that calculates the size to break
1252
// down the batchSize into.
1253
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1254
        minimumBatchSize, batchSize int) int {
16✔
1255
        if subBatchDelay > totalDelay {
18✔
1256
                return batchSize
2✔
1257
        }
2✔
1258

1259
        subBatchSize := (batchSize*int(subBatchDelay) +
14✔
1260
                int(totalDelay) - 1) / int(totalDelay)
14✔
1261

14✔
1262
        if subBatchSize < minimumBatchSize {
18✔
1263
                return minimumBatchSize
4✔
1264
        }
4✔
1265

1266
        return subBatchSize
10✔
1267
}
1268

1269
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1270
// this variable so the function can be mocked in our test.
1271
var batchSizeCalculator = calculateSubBatchSize
1272

1273
// splitAnnouncementBatches takes an exiting list of announcements and
1274
// decomposes it into sub batches controlled by the `subBatchSize`.
1275
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1276
        announcementBatch []msgWithSenders) [][]msgWithSenders {
72✔
1277

72✔
1278
        subBatchSize := batchSizeCalculator(
72✔
1279
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
72✔
1280
                d.cfg.MinimumBatchSize, len(announcementBatch),
72✔
1281
        )
72✔
1282

72✔
1283
        var splitAnnouncementBatch [][]msgWithSenders
72✔
1284

72✔
1285
        for subBatchSize < len(announcementBatch) {
196✔
1286
                // For slicing with minimal allocation
124✔
1287
                // https://github.com/golang/go/wiki/SliceTricks
124✔
1288
                announcementBatch, splitAnnouncementBatch =
124✔
1289
                        announcementBatch[subBatchSize:],
124✔
1290
                        append(splitAnnouncementBatch,
124✔
1291
                                announcementBatch[0:subBatchSize:subBatchSize])
124✔
1292
        }
124✔
1293
        splitAnnouncementBatch = append(
72✔
1294
                splitAnnouncementBatch, announcementBatch,
72✔
1295
        )
72✔
1296

72✔
1297
        return splitAnnouncementBatch
72✔
1298
}
1299

1300
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1301
// split size, and then sends out all items to the set of target peers. Locally
1302
// generated announcements are always sent before remotely generated
1303
// announcements.
1304
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
1305
        annBatch msgsToBroadcast) {
34✔
1306

34✔
1307
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
34✔
1308
        // duration to delay the sending of next announcement batch.
34✔
1309
        delayNextBatch := func() {
99✔
1310
                select {
65✔
1311
                case <-time.After(d.cfg.SubBatchDelay):
48✔
1312
                case <-d.quit:
17✔
1313
                        return
17✔
1314
                }
1315
        }
1316

1317
        // Fetch the local and remote announcements.
1318
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
34✔
1319
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
34✔
1320

34✔
1321
        d.wg.Add(1)
34✔
1322
        go func() {
68✔
1323
                defer d.wg.Done()
34✔
1324

34✔
1325
                log.Debugf("Broadcasting %v new local announcements in %d "+
34✔
1326
                        "sub batches", len(annBatch.localMsgs),
34✔
1327
                        len(localBatches))
34✔
1328

34✔
1329
                // Send out the local announcements first.
34✔
1330
                for _, annBatch := range localBatches {
68✔
1331
                        d.sendLocalBatch(annBatch)
34✔
1332
                        delayNextBatch()
34✔
1333
                }
34✔
1334

1335
                log.Debugf("Broadcasting %v new remote announcements in %d "+
34✔
1336
                        "sub batches", len(annBatch.remoteMsgs),
34✔
1337
                        len(remoteBatches))
34✔
1338

34✔
1339
                // Now send the remote announcements.
34✔
1340
                for _, annBatch := range remoteBatches {
68✔
1341
                        d.sendRemoteBatch(annBatch)
34✔
1342
                        delayNextBatch()
34✔
1343
                }
34✔
1344
        }()
1345
}
1346

1347
// sendLocalBatch broadcasts a list of locally generated announcements to our
1348
// peers. For local announcements, we skip the filter and dedup logic and just
1349
// send the announcements out to all our coonnected peers.
1350
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
34✔
1351
        msgsToSend := lnutils.Map(
34✔
1352
                annBatch, func(m msgWithSenders) lnwire.Message {
80✔
1353
                        return m.msg
46✔
1354
                },
46✔
1355
        )
1356

1357
        err := d.cfg.Broadcast(nil, msgsToSend...)
34✔
1358
        if err != nil {
34✔
1359
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1360
        }
×
1361
}
1362

1363
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1364
// peers.
1365
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
34✔
1366
        syncerPeers := d.syncMgr.GossipSyncers()
34✔
1367

34✔
1368
        // We'll first attempt to filter out this new message for all peers
34✔
1369
        // that have active gossip syncers active.
34✔
1370
        for pub, syncer := range syncerPeers {
37✔
1371
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
3✔
1372
                syncer.FilterGossipMsgs(annBatch...)
3✔
1373
        }
3✔
1374

1375
        for _, msgChunk := range annBatch {
61✔
1376
                msgChunk := msgChunk
27✔
1377

27✔
1378
                // With the syncers taken care of, we'll merge the sender map
27✔
1379
                // with the set of syncers, so we don't send out duplicate
27✔
1380
                // messages.
27✔
1381
                msgChunk.mergeSyncerMap(syncerPeers)
27✔
1382

27✔
1383
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
27✔
1384
                if err != nil {
27✔
1385
                        log.Errorf("Unable to send batch "+
×
1386
                                "announcements: %v", err)
×
1387
                        continue
×
1388
                }
1389
        }
1390
}
1391

1392
// networkHandler is the primary goroutine that drives this service. The roles
1393
// of this goroutine includes answering queries related to the state of the
1394
// network, syncing up newly connected peers, and also periodically
1395
// broadcasting our latest topology state to all connected peers.
1396
//
1397
// NOTE: This MUST be run as a goroutine.
1398
func (d *AuthenticatedGossiper) networkHandler() {
30✔
1399
        defer d.wg.Done()
30✔
1400

30✔
1401
        // Initialize empty deDupedAnnouncements to store announcement batch.
30✔
1402
        announcements := deDupedAnnouncements{}
30✔
1403
        announcements.Reset()
30✔
1404

30✔
1405
        d.cfg.RetransmitTicker.Resume()
30✔
1406
        defer d.cfg.RetransmitTicker.Stop()
30✔
1407

30✔
1408
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
30✔
1409
        defer trickleTimer.Stop()
30✔
1410

30✔
1411
        // To start, we'll first check to see if there are any stale channel or
30✔
1412
        // node announcements that we need to re-transmit.
30✔
1413
        if err := d.retransmitStaleAnns(time.Now()); err != nil {
30✔
1414
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1415
        }
×
1416

1417
        for {
679✔
1418
                select {
649✔
1419
                // A new policy update has arrived. We'll commit it to the
1420
                // sub-systems below us, then craft, sign, and broadcast a new
1421
                // ChannelUpdate for the set of affected clients.
1422
                case policyUpdate := <-d.chanPolicyUpdates:
4✔
1423
                        log.Tracef("Received channel %d policy update requests",
4✔
1424
                                len(policyUpdate.edgesToUpdate))
4✔
1425

4✔
1426
                        // First, we'll now create new fully signed updates for
4✔
1427
                        // the affected channels and also update the underlying
4✔
1428
                        // graph with the new state.
4✔
1429
                        newChanUpdates, err := d.processChanPolicyUpdate(
4✔
1430
                                policyUpdate.edgesToUpdate,
4✔
1431
                        )
4✔
1432
                        policyUpdate.errChan <- err
4✔
1433
                        if err != nil {
4✔
1434
                                log.Errorf("Unable to craft policy updates: %v",
×
1435
                                        err)
×
1436
                                continue
×
1437
                        }
1438

1439
                        // Finally, with the updates committed, we'll now add
1440
                        // them to the announcement batch to be flushed at the
1441
                        // start of the next epoch.
1442
                        announcements.AddMsgs(newChanUpdates...)
4✔
1443

1444
                case announcement := <-d.networkMsgs:
334✔
1445
                        log.Tracef("Received network message: "+
334✔
1446
                                "peer=%v, msg=%s, is_remote=%v",
334✔
1447
                                announcement.peer, announcement.msg.MsgType(),
334✔
1448
                                announcement.isRemote)
334✔
1449

334✔
1450
                        switch announcement.msg.(type) {
334✔
1451
                        // Channel announcement signatures are amongst the only
1452
                        // messages that we'll process serially.
1453
                        case *lnwire.AnnounceSignatures1:
24✔
1454
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
24✔
1455
                                        announcement,
24✔
1456
                                )
24✔
1457
                                log.Debugf("Processed network message %s, "+
24✔
1458
                                        "returned len(announcements)=%v",
24✔
1459
                                        announcement.msg.MsgType(),
24✔
1460
                                        len(emittedAnnouncements))
24✔
1461

24✔
1462
                                if emittedAnnouncements != nil {
37✔
1463
                                        announcements.AddMsgs(
13✔
1464
                                                emittedAnnouncements...,
13✔
1465
                                        )
13✔
1466
                                }
13✔
1467
                                continue
24✔
1468
                        }
1469

1470
                        // If this message was recently rejected, then we won't
1471
                        // attempt to re-process it.
1472
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
313✔
1473
                                announcement.msg,
313✔
1474
                                sourceToPub(announcement.source),
313✔
1475
                        ) {
314✔
1476

1✔
1477
                                announcement.err <- fmt.Errorf("recently " +
1✔
1478
                                        "rejected")
1✔
1479
                                continue
1✔
1480
                        }
1481

1482
                        // We'll set up any dependent, and wait until a free
1483
                        // slot for this job opens up, this allow us to not
1484
                        // have thousands of goroutines active.
1485
                        annJobID, err := d.vb.InitJobDependencies(
312✔
1486
                                announcement.msg,
312✔
1487
                        )
312✔
1488
                        if err != nil {
312✔
1489
                                announcement.err <- err
×
1490
                                continue
×
1491
                        }
1492

1493
                        d.wg.Add(1)
312✔
1494
                        go d.handleNetworkMessages(
312✔
1495
                                announcement, &announcements, annJobID,
312✔
1496
                        )
312✔
1497

1498
                // The trickle timer has ticked, which indicates we should
1499
                // flush to the network the pending batch of new announcements
1500
                // we've received since the last trickle tick.
1501
                case <-trickleTimer.C:
289✔
1502
                        // Emit the current batch of announcements from
289✔
1503
                        // deDupedAnnouncements.
289✔
1504
                        announcementBatch := announcements.Emit()
289✔
1505

289✔
1506
                        // If the current announcements batch is nil, then we
289✔
1507
                        // have no further work here.
289✔
1508
                        if announcementBatch.isEmpty() {
547✔
1509
                                continue
258✔
1510
                        }
1511

1512
                        // At this point, we have the set of local and remote
1513
                        // announcements we want to send out. We'll do the
1514
                        // batching as normal for both, but for local
1515
                        // announcements, we'll blast them out w/o regard for
1516
                        // our peer's policies so we ensure they propagate
1517
                        // properly.
1518
                        d.splitAndSendAnnBatch(announcementBatch)
34✔
1519

1520
                // The retransmission timer has ticked which indicates that we
1521
                // should check if we need to prune or re-broadcast any of our
1522
                // personal channels or node announcement. This addresses the
1523
                // case of "zombie" channels and channel advertisements that
1524
                // have been dropped, or not properly propagated through the
1525
                // network.
1526
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1✔
1527
                        if err := d.retransmitStaleAnns(tick); err != nil {
1✔
1528
                                log.Errorf("unable to rebroadcast stale "+
×
1529
                                        "announcements: %v", err)
×
1530
                        }
×
1531

1532
                // The gossiper has been signalled to exit, to we exit our
1533
                // main loop so the wait group can be decremented.
1534
                case <-d.quit:
30✔
1535
                        return
30✔
1536
                }
1537
        }
1538
}
1539

1540
// handleNetworkMessages is responsible for waiting for dependencies for a
1541
// given network message and processing the message. Once processed, it will
1542
// signal its dependants and add the new announcements to the announce batch.
1543
//
1544
// NOTE: must be run as a goroutine.
1545
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1546
        deDuped *deDupedAnnouncements, jobID JobID) {
312✔
1547

312✔
1548
        defer d.wg.Done()
312✔
1549
        defer d.vb.CompleteJob()
312✔
1550

312✔
1551
        // We should only broadcast this message forward if it originated from
312✔
1552
        // us or it wasn't received as part of our initial historical sync.
312✔
1553
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
312✔
1554

312✔
1555
        // If this message has an existing dependency, then we'll wait until
312✔
1556
        // that has been fully validated before we proceed.
312✔
1557
        err := d.vb.WaitForParents(jobID, nMsg.msg)
312✔
1558
        if err != nil {
312✔
1559
                log.Debugf("Validating network message %s got err: %v",
×
1560
                        nMsg.msg.MsgType(), err)
×
1561

×
1562
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
1563
                        log.Warnf("unexpected error during validation "+
×
1564
                                "barrier shutdown: %v", err)
×
1565
                }
×
1566
                nMsg.err <- err
×
1567

×
1568
                return
×
1569
        }
1570

1571
        // Process the network announcement to determine if this is either a
1572
        // new announcement from our PoV or an edges to a prior vertex/edge we
1573
        // previously proceeded.
1574
        newAnns, allow := d.processNetworkAnnouncement(nMsg)
312✔
1575

312✔
1576
        log.Tracef("Processed network message %s, returned "+
312✔
1577
                "len(announcements)=%v, allowDependents=%v",
312✔
1578
                nMsg.msg.MsgType(), len(newAnns), allow)
312✔
1579

312✔
1580
        // If this message had any dependencies, then we can now signal them to
312✔
1581
        // continue.
312✔
1582
        err = d.vb.SignalDependents(nMsg.msg, jobID)
312✔
1583
        if err != nil {
312✔
1584
                // Something is wrong if SignalDependents returns an error.
×
1585
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
1586
                        "JobID=%v", spew.Sdump(nMsg.msg), jobID)
×
1587

×
1588
                nMsg.err <- err
×
1589

×
1590
                return
×
1591
        }
×
1592

1593
        // If the announcement was accepted, then add the emitted announcements
1594
        // to our announce batch to be broadcast once the trickle timer ticks
1595
        // gain.
1596
        if newAnns != nil && shouldBroadcast {
347✔
1597
                // TODO(roasbeef): exclude peer that sent.
35✔
1598
                deDuped.AddMsgs(newAnns...)
35✔
1599
        } else if newAnns != nil {
319✔
1600
                log.Trace("Skipping broadcast of announcements received " +
4✔
1601
                        "during initial graph sync")
4✔
1602
        }
4✔
1603
}
1604

1605
// TODO(roasbeef): d/c peers that send updates not on our chain
1606

1607
// InitSyncState is called by outside sub-systems when a connection is
1608
// established to a new peer that understands how to perform channel range
1609
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1610
// needed to handle new queries.
1611
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
3✔
1612
        d.syncMgr.InitSyncState(syncPeer)
3✔
1613
}
3✔
1614

1615
// PruneSyncState is called by outside sub-systems once a peer that we were
1616
// previously connected to has been disconnected. In this case we can stop the
1617
// existing GossipSyncer assigned to the peer and free up resources.
1618
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
3✔
1619
        d.syncMgr.PruneSyncState(peer)
3✔
1620
}
3✔
1621

1622
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1623
// false otherwise, This avoids expensive reprocessing of the message.
1624
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1625
        peerPub [33]byte) bool {
276✔
1626

276✔
1627
        var scid uint64
276✔
1628
        switch m := msg.(type) {
276✔
1629
        case *lnwire.ChannelUpdate1:
45✔
1630
                scid = m.ShortChannelID.ToUint64()
45✔
1631

1632
        case *lnwire.ChannelAnnouncement1:
220✔
1633
                scid = m.ShortChannelID.ToUint64()
220✔
1634

1635
        default:
17✔
1636
                return false
17✔
1637
        }
1638

1639
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
262✔
1640
        return err != cache.ErrElementNotFound
262✔
1641
}
1642

1643
// retransmitStaleAnns examines all outgoing channels that the source node is
1644
// known to maintain to check to see if any of them are "stale". A channel is
1645
// stale iff, the last timestamp of its rebroadcast is older than the
1646
// RebroadcastInterval. We also check if a refreshed node announcement should
1647
// be resent.
1648
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
31✔
1649
        // Iterate over all of our channels and check if any of them fall
31✔
1650
        // within the prune interval or re-broadcast interval.
31✔
1651
        type updateTuple struct {
31✔
1652
                info *models.ChannelEdgeInfo
31✔
1653
                edge *models.ChannelEdgePolicy
31✔
1654
        }
31✔
1655

31✔
1656
        var (
31✔
1657
                havePublicChannels bool
31✔
1658
                edgesToUpdate      []updateTuple
31✔
1659
        )
31✔
1660
        err := d.cfg.Graph.ForAllOutgoingChannels(func(
31✔
1661
                info *models.ChannelEdgeInfo,
31✔
1662
                edge *models.ChannelEdgePolicy) error {
36✔
1663

5✔
1664
                // If there's no auth proof attached to this edge, it means
5✔
1665
                // that it is a private channel not meant to be announced to
5✔
1666
                // the greater network, so avoid sending channel updates for
5✔
1667
                // this channel to not leak its
5✔
1668
                // existence.
5✔
1669
                if info.AuthProof == nil {
9✔
1670
                        log.Debugf("Skipping retransmission of channel "+
4✔
1671
                                "without AuthProof: %v", info.ChannelID)
4✔
1672
                        return nil
4✔
1673
                }
4✔
1674

1675
                // We make a note that we have at least one public channel. We
1676
                // use this to determine whether we should send a node
1677
                // announcement below.
1678
                havePublicChannels = true
4✔
1679

4✔
1680
                // If this edge has a ChannelUpdate that was created before the
4✔
1681
                // introduction of the MaxHTLC field, then we'll update this
4✔
1682
                // edge to propagate this information in the network.
4✔
1683
                if !edge.MessageFlags.HasMaxHtlc() {
4✔
1684
                        // We'll make sure we support the new max_htlc field if
×
1685
                        // not already present.
×
1686
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1687
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1688

×
1689
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1690
                                info: info,
×
1691
                                edge: edge,
×
1692
                        })
×
1693
                        return nil
×
1694
                }
×
1695

1696
                timeElapsed := now.Sub(edge.LastUpdate)
4✔
1697

4✔
1698
                // If it's been longer than RebroadcastInterval since we've
4✔
1699
                // re-broadcasted the channel, add the channel to the set of
4✔
1700
                // edges we need to update.
4✔
1701
                if timeElapsed >= d.cfg.RebroadcastInterval {
5✔
1702
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
1✔
1703
                                info: info,
1✔
1704
                                edge: edge,
1✔
1705
                        })
1✔
1706
                }
1✔
1707

1708
                return nil
4✔
1709
        })
1710
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
31✔
1711
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1712
                        err)
×
1713
        }
×
1714

1715
        var signedUpdates []lnwire.Message
31✔
1716
        for _, chanToUpdate := range edgesToUpdate {
32✔
1717
                // Re-sign and update the channel on disk and retrieve our
1✔
1718
                // ChannelUpdate to broadcast.
1✔
1719
                chanAnn, chanUpdate, err := d.updateChannel(
1✔
1720
                        chanToUpdate.info, chanToUpdate.edge,
1✔
1721
                )
1✔
1722
                if err != nil {
1✔
1723
                        return fmt.Errorf("unable to update channel: %w", err)
×
1724
                }
×
1725

1726
                // If we have a valid announcement to transmit, then we'll send
1727
                // that along with the update.
1728
                if chanAnn != nil {
2✔
1729
                        signedUpdates = append(signedUpdates, chanAnn)
1✔
1730
                }
1✔
1731

1732
                signedUpdates = append(signedUpdates, chanUpdate)
1✔
1733
        }
1734

1735
        // If we don't have any public channels, we return as we don't want to
1736
        // broadcast anything that would reveal our existence.
1737
        if !havePublicChannels {
61✔
1738
                return nil
30✔
1739
        }
30✔
1740

1741
        // We'll also check that our NodeAnnouncement is not too old.
1742
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
4✔
1743
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
4✔
1744
        timeElapsed := now.Sub(timestamp)
4✔
1745

4✔
1746
        // If it's been a full day since we've re-broadcasted the
4✔
1747
        // node announcement, refresh it and resend it.
4✔
1748
        nodeAnnStr := ""
4✔
1749
        if timeElapsed >= d.cfg.RebroadcastInterval {
5✔
1750
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
1✔
1751
                if err != nil {
1✔
1752
                        return fmt.Errorf("unable to get refreshed node "+
×
1753
                                "announcement: %v", err)
×
1754
                }
×
1755

1756
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1757
                nodeAnnStr = " and our refreshed node announcement"
1✔
1758

1✔
1759
                // Before broadcasting the refreshed node announcement, add it
1✔
1760
                // to our own graph.
1✔
1761
                if err := d.addNode(&newNodeAnn); err != nil {
2✔
1762
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1763
                                "to graph: %v", err)
1✔
1764
                }
1✔
1765
        }
1766

1767
        // If we don't have any updates to re-broadcast, then we'll exit
1768
        // early.
1769
        if len(signedUpdates) == 0 {
7✔
1770
                return nil
3✔
1771
        }
3✔
1772

1773
        log.Infof("Retransmitting %v outgoing channels%v",
1✔
1774
                len(edgesToUpdate), nodeAnnStr)
1✔
1775

1✔
1776
        // With all the wire announcements properly crafted, we'll broadcast
1✔
1777
        // our known outgoing channels to all our immediate peers.
1✔
1778
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
1779
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1780
        }
×
1781

1782
        return nil
1✔
1783
}
1784

1785
// processChanPolicyUpdate generates a new set of channel updates for the
1786
// provided list of edges and updates the backing ChannelGraphSource.
1787
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
1788
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
4✔
1789

4✔
1790
        var chanUpdates []networkMsg
4✔
1791
        for _, edgeInfo := range edgesToUpdate {
10✔
1792
                // Now that we've collected all the channels we need to update,
6✔
1793
                // we'll re-sign and update the backing ChannelGraphSource, and
6✔
1794
                // retrieve our ChannelUpdate to broadcast.
6✔
1795
                _, chanUpdate, err := d.updateChannel(
6✔
1796
                        edgeInfo.Info, edgeInfo.Edge,
6✔
1797
                )
6✔
1798
                if err != nil {
6✔
1799
                        return nil, err
×
1800
                }
×
1801

1802
                // We'll avoid broadcasting any updates for private channels to
1803
                // avoid directly giving away their existence. Instead, we'll
1804
                // send the update directly to the remote party.
1805
                if edgeInfo.Info.AuthProof == nil {
10✔
1806
                        // If AuthProof is nil and an alias was found for this
4✔
1807
                        // ChannelID (meaning the option-scid-alias feature was
4✔
1808
                        // negotiated), we'll replace the ShortChannelID in the
4✔
1809
                        // update with the peer's alias. We do this after
4✔
1810
                        // updateChannel so that the alias isn't persisted to
4✔
1811
                        // the database.
4✔
1812
                        chanID := lnwire.NewChanIDFromOutPoint(
4✔
1813
                                edgeInfo.Info.ChannelPoint,
4✔
1814
                        )
4✔
1815

4✔
1816
                        var defaultAlias lnwire.ShortChannelID
4✔
1817
                        foundAlias, _ := d.cfg.GetAlias(chanID)
4✔
1818
                        if foundAlias != defaultAlias {
7✔
1819
                                chanUpdate.ShortChannelID = foundAlias
3✔
1820

3✔
1821
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
3✔
1822
                                if err != nil {
3✔
1823
                                        log.Errorf("Unable to sign alias "+
×
1824
                                                "update: %v", err)
×
1825
                                        continue
×
1826
                                }
1827

1828
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
1829
                                if err != nil {
3✔
1830
                                        log.Errorf("Unable to create sig: %v",
×
1831
                                                err)
×
1832
                                        continue
×
1833
                                }
1834

1835
                                chanUpdate.Signature = lnSig
3✔
1836
                        }
1837

1838
                        remotePubKey := remotePubFromChanInfo(
4✔
1839
                                edgeInfo.Info, chanUpdate.ChannelFlags,
4✔
1840
                        )
4✔
1841
                        err := d.reliableSender.sendMessage(
4✔
1842
                                chanUpdate, remotePubKey,
4✔
1843
                        )
4✔
1844
                        if err != nil {
4✔
1845
                                log.Errorf("Unable to reliably send %v for "+
×
1846
                                        "channel=%v to peer=%x: %v",
×
1847
                                        chanUpdate.MsgType(),
×
1848
                                        chanUpdate.ShortChannelID,
×
1849
                                        remotePubKey, err)
×
1850
                        }
×
1851
                        continue
4✔
1852
                }
1853

1854
                // We set ourselves as the source of this message to indicate
1855
                // that we shouldn't skip any peers when sending this message.
1856
                chanUpdates = append(chanUpdates, networkMsg{
5✔
1857
                        source:   d.selfKey,
5✔
1858
                        isRemote: false,
5✔
1859
                        msg:      chanUpdate,
5✔
1860
                })
5✔
1861
        }
1862

1863
        return chanUpdates, nil
4✔
1864
}
1865

1866
// remotePubFromChanInfo returns the public key of the remote peer given a
1867
// ChannelEdgeInfo that describe a channel we have with them.
1868
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1869
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
15✔
1870

15✔
1871
        var remotePubKey [33]byte
15✔
1872
        switch {
15✔
1873
        case chanFlags&lnwire.ChanUpdateDirection == 0:
15✔
1874
                remotePubKey = chanInfo.NodeKey2Bytes
15✔
1875
        case chanFlags&lnwire.ChanUpdateDirection == 1:
3✔
1876
                remotePubKey = chanInfo.NodeKey1Bytes
3✔
1877
        }
1878

1879
        return remotePubKey
15✔
1880
}
1881

1882
// processRejectedEdge examines a rejected edge to see if we can extract any
1883
// new announcements from it.  An edge will get rejected if we already added
1884
// the same edge without AuthProof to the graph. If the received announcement
1885
// contains a proof, we can add this proof to our edge.  We can end up in this
1886
// situation in the case where we create a channel, but for some reason fail
1887
// to receive the remote peer's proof, while the remote peer is able to fully
1888
// assemble the proof and craft the ChannelAnnouncement.
1889
func (d *AuthenticatedGossiper) processRejectedEdge(
1890
        chanAnnMsg *lnwire.ChannelAnnouncement1,
1891
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
3✔
1892

3✔
1893
        // First, we'll fetch the state of the channel as we know if from the
3✔
1894
        // database.
3✔
1895
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
3✔
1896
                chanAnnMsg.ShortChannelID,
3✔
1897
        )
3✔
1898
        if err != nil {
3✔
1899
                return nil, err
×
1900
        }
×
1901

1902
        // The edge is in the graph, and has a proof attached, then we'll just
1903
        // reject it as normal.
1904
        if chanInfo.AuthProof != nil {
6✔
1905
                return nil, nil
3✔
1906
        }
3✔
1907

1908
        // Otherwise, this means that the edge is within the graph, but it
1909
        // doesn't yet have a proper proof attached. If we did not receive
1910
        // the proof such that we now can add it, there's nothing more we
1911
        // can do.
1912
        if proof == nil {
×
1913
                return nil, nil
×
1914
        }
×
1915

1916
        // We'll then create then validate the new fully assembled
1917
        // announcement.
1918
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1919
                proof, chanInfo, e1, e2,
×
1920
        )
×
1921
        if err != nil {
×
1922
                return nil, err
×
1923
        }
×
1924
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1925
        if err != nil {
×
1926
                err := fmt.Errorf("assembled channel announcement proof "+
×
1927
                        "for shortChanID=%v isn't valid: %v",
×
1928
                        chanAnnMsg.ShortChannelID, err)
×
1929
                log.Error(err)
×
1930
                return nil, err
×
1931
        }
×
1932

1933
        // If everything checks out, then we'll add the fully assembled proof
1934
        // to the database.
1935
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
1936
        if err != nil {
×
1937
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
1938
                        chanAnnMsg.ShortChannelID, err)
×
1939
                log.Error(err)
×
1940
                return nil, err
×
1941
        }
×
1942

1943
        // As we now have a complete channel announcement for this channel,
1944
        // we'll construct the announcement so they can be broadcast out to all
1945
        // our peers.
1946
        announcements := make([]networkMsg, 0, 3)
×
1947
        announcements = append(announcements, networkMsg{
×
1948
                source: d.selfKey,
×
1949
                msg:    chanAnn,
×
1950
        })
×
1951
        if e1Ann != nil {
×
1952
                announcements = append(announcements, networkMsg{
×
1953
                        source: d.selfKey,
×
1954
                        msg:    e1Ann,
×
1955
                })
×
1956
        }
×
1957
        if e2Ann != nil {
×
1958
                announcements = append(announcements, networkMsg{
×
1959
                        source: d.selfKey,
×
1960
                        msg:    e2Ann,
×
1961
                })
×
1962

×
1963
        }
×
1964

1965
        return announcements, nil
×
1966
}
1967

1968
// fetchPKScript fetches the output script for the given SCID.
1969
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
1970
        []byte, error) {
×
1971

×
1972
        return lnwallet.FetchPKScriptWithQuit(d.cfg.ChainIO, chanID, d.quit)
×
1973
}
×
1974

1975
// addNode processes the given node announcement, and adds it to our channel
1976
// graph.
1977
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
1978
        op ...batch.SchedulerOption) error {
20✔
1979

20✔
1980
        if err := graph.ValidateNodeAnn(msg); err != nil {
21✔
1981
                return fmt.Errorf("unable to validate node announcement: %w",
1✔
1982
                        err)
1✔
1983
        }
1✔
1984

1985
        timestamp := time.Unix(int64(msg.Timestamp), 0)
19✔
1986
        features := lnwire.NewFeatureVector(msg.Features, lnwire.Features)
19✔
1987
        node := &models.LightningNode{
19✔
1988
                HaveNodeAnnouncement: true,
19✔
1989
                LastUpdate:           timestamp,
19✔
1990
                Addresses:            msg.Addresses,
19✔
1991
                PubKeyBytes:          msg.NodeID,
19✔
1992
                Alias:                msg.Alias.String(),
19✔
1993
                AuthSigBytes:         msg.Signature.ToSignatureBytes(),
19✔
1994
                Features:             features,
19✔
1995
                Color:                msg.RGBColor,
19✔
1996
                ExtraOpaqueData:      msg.ExtraOpaqueData,
19✔
1997
        }
19✔
1998

19✔
1999
        return d.cfg.Graph.AddNode(node, op...)
19✔
2000
}
2001

2002
// isPremature decides whether a given network message has a block height+delta
2003
// value specified in the future. If so, the message will be added to the
2004
// future message map and be processed when the block height as reached.
2005
//
2006
// NOTE: must be used inside a lock.
2007
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2008
        delta uint32, msg *networkMsg) bool {
282✔
2009

282✔
2010
        // The channel is already confirmed at chanID.BlockHeight so we minus
282✔
2011
        // one block. For instance, if the required confirmation for this
282✔
2012
        // channel announcement is 6, we then only need to wait for 5 more
282✔
2013
        // blocks once the funding tx is confirmed.
282✔
2014
        if delta > 0 {
285✔
2015
                delta--
3✔
2016
        }
3✔
2017

2018
        msgHeight := chanID.BlockHeight + delta
282✔
2019

282✔
2020
        // The message height is smaller or equal to our best known height,
282✔
2021
        // thus the message is mature.
282✔
2022
        if msgHeight <= d.bestHeight {
563✔
2023
                return false
281✔
2024
        }
281✔
2025

2026
        // Add the premature message to our future messages which will be
2027
        // resent once the block height has reached.
2028
        //
2029
        // Copy the networkMsgs since the old message's err chan will be
2030
        // consumed.
2031
        copied := &networkMsg{
4✔
2032
                peer:              msg.peer,
4✔
2033
                source:            msg.source,
4✔
2034
                msg:               msg.msg,
4✔
2035
                optionalMsgFields: msg.optionalMsgFields,
4✔
2036
                isRemote:          msg.isRemote,
4✔
2037
                err:               make(chan error, 1),
4✔
2038
        }
4✔
2039

4✔
2040
        // Create the cached message.
4✔
2041
        cachedMsg := &cachedFutureMsg{
4✔
2042
                msg:    copied,
4✔
2043
                height: msgHeight,
4✔
2044
        }
4✔
2045

4✔
2046
        // Increment the msg ID and add it to the cache.
4✔
2047
        nextMsgID := d.futureMsgs.nextMsgID()
4✔
2048
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
4✔
2049
        if err != nil {
4✔
2050
                log.Errorf("Adding future message got error: %v", err)
×
2051
        }
×
2052

2053
        log.Debugf("Network message: %v added to future messages for "+
4✔
2054
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
4✔
2055
                msgHeight, d.bestHeight)
4✔
2056

4✔
2057
        return true
4✔
2058
}
2059

2060
// processNetworkAnnouncement processes a new network relate authenticated
2061
// channel or node announcement or announcements proofs. If the announcement
2062
// didn't affect the internal state due to either being out of date, invalid,
2063
// or redundant, then nil is returned. Otherwise, the set of announcements will
2064
// be returned which should be broadcasted to the rest of the network. The
2065
// boolean returned indicates whether any dependents of the announcement should
2066
// attempt to be processed as well.
2067
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
2068
        nMsg *networkMsg) ([]networkMsg, bool) {
333✔
2069

333✔
2070
        // If this is a remote update, we set the scheduler option to lazily
333✔
2071
        // add it to the graph.
333✔
2072
        var schedulerOp []batch.SchedulerOption
333✔
2073
        if nMsg.isRemote {
619✔
2074
                schedulerOp = append(schedulerOp, batch.LazyAdd())
286✔
2075
        }
286✔
2076

2077
        switch msg := nMsg.msg.(type) {
333✔
2078
        // A new node announcement has arrived which either presents new
2079
        // information about a node in one of the channels we know about, or a
2080
        // updating previously advertised information.
2081
        case *lnwire.NodeAnnouncement:
27✔
2082
                return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
27✔
2083

2084
        // A new channel announcement has arrived, this indicates the
2085
        // *creation* of a new channel within the network. This only advertises
2086
        // the existence of a channel and not yet the routing policies in
2087
        // either direction of the channel.
2088
        case *lnwire.ChannelAnnouncement1:
233✔
2089
                return d.handleChanAnnouncement(nMsg, msg, schedulerOp)
233✔
2090

2091
        // A new authenticated channel edge update has arrived. This indicates
2092
        // that the directional information for an already known channel has
2093
        // been updated.
2094
        case *lnwire.ChannelUpdate1:
58✔
2095
                return d.handleChanUpdate(nMsg, msg, schedulerOp)
58✔
2096

2097
        // A new signature announcement has been received. This indicates
2098
        // willingness of nodes involved in the funding of a channel to
2099
        // announce this new channel to the rest of the world.
2100
        case *lnwire.AnnounceSignatures1:
24✔
2101
                return d.handleAnnSig(nMsg, msg)
24✔
2102

2103
        default:
×
2104
                err := errors.New("wrong type of the announcement")
×
2105
                nMsg.err <- err
×
2106
                return nil, false
×
2107
        }
2108
}
2109

2110
// processZombieUpdate determines whether the provided channel update should
2111
// resurrect a given zombie edge.
2112
//
2113
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2114
// should be inspected.
2115
func (d *AuthenticatedGossiper) processZombieUpdate(
2116
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2117
        msg *lnwire.ChannelUpdate1) error {
3✔
2118

3✔
2119
        // The least-significant bit in the flag on the channel update tells us
3✔
2120
        // which edge is being updated.
3✔
2121
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
2122

3✔
2123
        // Since we've deemed the update as not stale above, before marking it
3✔
2124
        // live, we'll make sure it has been signed by the correct party. If we
3✔
2125
        // have both pubkeys, either party can resurrect the channel. If we've
3✔
2126
        // already marked this with the stricter, single-sided resurrection we
3✔
2127
        // will only have the pubkey of the node with the oldest timestamp.
3✔
2128
        var pubKey *btcec.PublicKey
3✔
2129
        switch {
3✔
2130
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2131
                pubKey, _ = chanInfo.NodeKey1()
×
2132
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
2✔
2133
                pubKey, _ = chanInfo.NodeKey2()
2✔
2134
        }
2135
        if pubKey == nil {
4✔
2136
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
1✔
2137
                        "with chan_id=%v", msg.ShortChannelID)
1✔
2138
        }
1✔
2139

2140
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
2✔
2141
        if err != nil {
3✔
2142
                return fmt.Errorf("unable to verify channel "+
1✔
2143
                        "update signature: %v", err)
1✔
2144
        }
1✔
2145

2146
        // With the signature valid, we'll proceed to mark the
2147
        // edge as live and wait for the channel announcement to
2148
        // come through again.
2149
        err = d.cfg.Graph.MarkEdgeLive(scid)
1✔
2150
        switch {
1✔
2151
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2152
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2153
                        "zombie index: %v", err)
×
2154

×
2155
                return nil
×
2156

2157
        case err != nil:
×
2158
                return fmt.Errorf("unable to remove edge with "+
×
2159
                        "chan_id=%v from zombie index: %v",
×
2160
                        msg.ShortChannelID, err)
×
2161

2162
        default:
1✔
2163
        }
2164

2165
        log.Debugf("Removed edge with chan_id=%v from zombie "+
1✔
2166
                "index", msg.ShortChannelID)
1✔
2167

1✔
2168
        return nil
1✔
2169
}
2170

2171
// fetchNodeAnn fetches the latest signed node announcement from our point of
2172
// view for the node with the given public key.
2173
func (d *AuthenticatedGossiper) fetchNodeAnn(
2174
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
23✔
2175

23✔
2176
        node, err := d.cfg.Graph.FetchLightningNode(pubKey)
23✔
2177
        if err != nil {
29✔
2178
                return nil, err
6✔
2179
        }
6✔
2180

2181
        return node.NodeAnnouncement(true)
17✔
2182
}
2183

2184
// isMsgStale determines whether a message retrieved from the backing
2185
// MessageStore is seen as stale by the current graph.
2186
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
15✔
2187
        switch msg := msg.(type) {
15✔
2188
        case *lnwire.AnnounceSignatures1:
5✔
2189
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
5✔
2190
                        msg.ShortChannelID,
5✔
2191
                )
5✔
2192

5✔
2193
                // If the channel cannot be found, it is most likely a leftover
5✔
2194
                // message for a channel that was closed, so we can consider it
5✔
2195
                // stale.
5✔
2196
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
8✔
2197
                        return true
3✔
2198
                }
3✔
2199
                if err != nil {
5✔
2200
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2201
                                "%v", chanInfo.ChannelID, err)
×
2202
                        return false
×
2203
                }
×
2204

2205
                // If the proof exists in the graph, then we have successfully
2206
                // received the remote proof and assembled the full proof, so we
2207
                // can safely delete the local proof from the database.
2208
                return chanInfo.AuthProof != nil
5✔
2209

2210
        case *lnwire.ChannelUpdate1:
13✔
2211
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
13✔
2212

13✔
2213
                // If the channel cannot be found, it is most likely a leftover
13✔
2214
                // message for a channel that was closed, so we can consider it
13✔
2215
                // stale.
13✔
2216
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
16✔
2217
                        return true
3✔
2218
                }
3✔
2219
                if err != nil {
13✔
2220
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2221
                                "%v", msg.ShortChannelID, err)
×
2222
                        return false
×
2223
                }
×
2224

2225
                // Otherwise, we'll retrieve the correct policy that we
2226
                // currently have stored within our graph to check if this
2227
                // message is stale by comparing its timestamp.
2228
                var p *models.ChannelEdgePolicy
13✔
2229
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
26✔
2230
                        p = p1
13✔
2231
                } else {
16✔
2232
                        p = p2
3✔
2233
                }
3✔
2234

2235
                // If the policy is still unknown, then we can consider this
2236
                // policy fresh.
2237
                if p == nil {
13✔
2238
                        return false
×
2239
                }
×
2240

2241
                timestamp := time.Unix(int64(msg.Timestamp), 0)
13✔
2242
                return p.LastUpdate.After(timestamp)
13✔
2243

2244
        default:
×
2245
                // We'll make sure to not mark any unsupported messages as stale
×
2246
                // to ensure they are not removed.
×
2247
                return false
×
2248
        }
2249
}
2250

2251
// updateChannel creates a new fully signed update for the channel, and updates
2252
// the underlying graph with the new state.
2253
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
2254
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2255
        *lnwire.ChannelUpdate1, error) {
7✔
2256

7✔
2257
        // Parse the unsigned edge into a channel update.
7✔
2258
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
7✔
2259

7✔
2260
        // We'll generate a new signature over a digest of the channel
7✔
2261
        // announcement itself and update the timestamp to ensure it propagate.
7✔
2262
        err := netann.SignChannelUpdate(
7✔
2263
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
7✔
2264
                netann.ChanUpdSetTimestamp,
7✔
2265
        )
7✔
2266
        if err != nil {
7✔
2267
                return nil, nil, err
×
2268
        }
×
2269

2270
        // Next, we'll set the new signature in place, and update the reference
2271
        // in the backing slice.
2272
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
7✔
2273
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
7✔
2274

7✔
2275
        // To ensure that our signature is valid, we'll verify it ourself
7✔
2276
        // before committing it to the slice returned.
7✔
2277
        err = netann.ValidateChannelUpdateAnn(
7✔
2278
                d.selfKey, info.Capacity, chanUpdate,
7✔
2279
        )
7✔
2280
        if err != nil {
7✔
2281
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2282
                        "update sig: %v", err)
×
2283
        }
×
2284

2285
        // Finally, we'll write the new edge policy to disk.
2286
        if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
7✔
2287
                return nil, nil, err
×
2288
        }
×
2289

2290
        // We'll also create the original channel announcement so the two can
2291
        // be broadcast along side each other (if necessary), but only if we
2292
        // have a full channel announcement for this channel.
2293
        var chanAnn *lnwire.ChannelAnnouncement1
7✔
2294
        if info.AuthProof != nil {
13✔
2295
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
6✔
2296
                chanAnn = &lnwire.ChannelAnnouncement1{
6✔
2297
                        ShortChannelID:  chanID,
6✔
2298
                        NodeID1:         info.NodeKey1Bytes,
6✔
2299
                        NodeID2:         info.NodeKey2Bytes,
6✔
2300
                        ChainHash:       info.ChainHash,
6✔
2301
                        BitcoinKey1:     info.BitcoinKey1Bytes,
6✔
2302
                        Features:        lnwire.NewRawFeatureVector(),
6✔
2303
                        BitcoinKey2:     info.BitcoinKey2Bytes,
6✔
2304
                        ExtraOpaqueData: info.ExtraOpaqueData,
6✔
2305
                }
6✔
2306
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
6✔
2307
                        info.AuthProof.NodeSig1Bytes,
6✔
2308
                )
6✔
2309
                if err != nil {
6✔
2310
                        return nil, nil, err
×
2311
                }
×
2312
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
6✔
2313
                        info.AuthProof.NodeSig2Bytes,
6✔
2314
                )
6✔
2315
                if err != nil {
6✔
2316
                        return nil, nil, err
×
2317
                }
×
2318
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
6✔
2319
                        info.AuthProof.BitcoinSig1Bytes,
6✔
2320
                )
6✔
2321
                if err != nil {
6✔
2322
                        return nil, nil, err
×
2323
                }
×
2324
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
6✔
2325
                        info.AuthProof.BitcoinSig2Bytes,
6✔
2326
                )
6✔
2327
                if err != nil {
6✔
2328
                        return nil, nil, err
×
2329
                }
×
2330
        }
2331

2332
        return chanAnn, chanUpdate, err
7✔
2333
}
2334

2335
// SyncManager returns the gossiper's SyncManager instance.
2336
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
3✔
2337
        return d.syncMgr
3✔
2338
}
3✔
2339

2340
// IsKeepAliveUpdate determines whether this channel update is considered a
2341
// keep-alive update based on the previous channel update processed for the same
2342
// direction.
2343
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2344
        prev *models.ChannelEdgePolicy) bool {
17✔
2345

17✔
2346
        // Both updates should be from the same direction.
17✔
2347
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
17✔
2348
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
17✔
2349

×
2350
                return false
×
2351
        }
×
2352

2353
        // The timestamp should always increase for a keep-alive update.
2354
        timestamp := time.Unix(int64(update.Timestamp), 0)
17✔
2355
        if !timestamp.After(prev.LastUpdate) {
20✔
2356
                return false
3✔
2357
        }
3✔
2358

2359
        // None of the remaining fields should change for a keep-alive update.
2360
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
20✔
2361
                return false
3✔
2362
        }
3✔
2363
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
32✔
2364
                return false
15✔
2365
        }
15✔
2366
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
8✔
2367
                return false
3✔
2368
        }
3✔
2369
        if update.TimeLockDelta != prev.TimeLockDelta {
5✔
2370
                return false
×
2371
        }
×
2372
        if update.HtlcMinimumMsat != prev.MinHTLC {
5✔
2373
                return false
×
2374
        }
×
2375
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
5✔
2376
                return false
×
2377
        }
×
2378
        if update.HtlcMaximumMsat != prev.MaxHTLC {
5✔
2379
                return false
×
2380
        }
×
2381
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
8✔
2382
                return false
3✔
2383
        }
3✔
2384
        return true
5✔
2385
}
2386

2387
// latestHeight returns the gossiper's latest height known of the chain.
2388
func (d *AuthenticatedGossiper) latestHeight() uint32 {
3✔
2389
        d.Lock()
3✔
2390
        defer d.Unlock()
3✔
2391
        return d.bestHeight
3✔
2392
}
3✔
2393

2394
// handleNodeAnnouncement processes a new node announcement.
2395
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
2396
        nodeAnn *lnwire.NodeAnnouncement,
2397
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
27✔
2398

27✔
2399
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
27✔
2400

27✔
2401
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
27✔
2402
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
27✔
2403
                nMsg.source.SerializeCompressed())
27✔
2404

27✔
2405
        // We'll quickly ask the router if it already has a newer update for
27✔
2406
        // this node so we can skip validating signatures if not required.
27✔
2407
        if d.cfg.Graph.IsStaleNode(nodeAnn.NodeID, timestamp) {
38✔
2408
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
11✔
2409
                nMsg.err <- nil
11✔
2410
                return nil, true
11✔
2411
        }
11✔
2412

2413
        if err := d.addNode(nodeAnn, ops...); err != nil {
22✔
2414
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
3✔
2415
                        err)
3✔
2416

3✔
2417
                if !graph.IsError(
3✔
2418
                        err,
3✔
2419
                        graph.ErrOutdated,
3✔
2420
                        graph.ErrIgnored,
3✔
2421
                ) {
3✔
2422

×
2423
                        log.Error(err)
×
2424
                }
×
2425

2426
                nMsg.err <- err
3✔
2427
                return nil, false
3✔
2428
        }
2429

2430
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2431
        // quick check to ensure this node intends to publicly advertise itself
2432
        // to the network.
2433
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
19✔
2434
        if err != nil {
19✔
2435
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2436
                        nodeAnn.NodeID, err)
×
2437
                nMsg.err <- err
×
2438
                return nil, false
×
2439
        }
×
2440

2441
        var announcements []networkMsg
19✔
2442

19✔
2443
        // If it does, we'll add their announcement to our batch so that it can
19✔
2444
        // be broadcast to the rest of our peers.
19✔
2445
        if isPublic {
25✔
2446
                announcements = append(announcements, networkMsg{
6✔
2447
                        peer:     nMsg.peer,
6✔
2448
                        isRemote: nMsg.isRemote,
6✔
2449
                        source:   nMsg.source,
6✔
2450
                        msg:      nodeAnn,
6✔
2451
                })
6✔
2452
        } else {
22✔
2453
                log.Tracef("Skipping broadcasting node announcement for %x "+
16✔
2454
                        "due to being unadvertised", nodeAnn.NodeID)
16✔
2455
        }
16✔
2456

2457
        nMsg.err <- nil
19✔
2458
        // TODO(roasbeef): get rid of the above
19✔
2459

19✔
2460
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
19✔
2461
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
19✔
2462
                nMsg.source.SerializeCompressed())
19✔
2463

19✔
2464
        return announcements, true
19✔
2465
}
2466

2467
// handleChanAnnouncement processes a new channel announcement.
2468
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
2469
        ann *lnwire.ChannelAnnouncement1,
2470
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
233✔
2471

233✔
2472
        scid := ann.ShortChannelID
233✔
2473

233✔
2474
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
233✔
2475
                nMsg.peer, scid.ToUint64())
233✔
2476

233✔
2477
        // We'll ignore any channel announcements that target any chain other
233✔
2478
        // than the set of chains we know of.
233✔
2479
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
233✔
2480
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2481
                        ", gossiper on chain=%v", ann.ChainHash,
×
2482
                        d.cfg.ChainHash)
×
2483
                log.Errorf(err.Error())
×
2484

×
2485
                key := newRejectCacheKey(
×
2486
                        scid.ToUint64(),
×
2487
                        sourceToPub(nMsg.source),
×
2488
                )
×
2489
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2490

×
2491
                nMsg.err <- err
×
2492
                return nil, false
×
2493
        }
×
2494

2495
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2496
        // reject the announcement. Since the router accepts alias SCIDs,
2497
        // not erroring out would be a DoS vector.
2498
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
233✔
2499
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2500
                log.Errorf(err.Error())
×
2501

×
2502
                key := newRejectCacheKey(
×
2503
                        scid.ToUint64(),
×
2504
                        sourceToPub(nMsg.source),
×
2505
                )
×
2506
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2507

×
2508
                nMsg.err <- err
×
2509
                return nil, false
×
2510
        }
×
2511

2512
        // If the advertised inclusionary block is beyond our knowledge of the
2513
        // chain tip, then we'll ignore it for now.
2514
        d.Lock()
233✔
2515
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
234✔
2516
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
1✔
2517
                        "advertises height %v, only height %v is known",
1✔
2518
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
1✔
2519
                d.Unlock()
1✔
2520
                nMsg.err <- nil
1✔
2521
                return nil, false
1✔
2522
        }
1✔
2523
        d.Unlock()
232✔
2524

232✔
2525
        // At this point, we'll now ask the router if this is a zombie/known
232✔
2526
        // edge. If so we can skip all the processing below.
232✔
2527
        if d.cfg.Graph.IsKnownEdge(scid) {
236✔
2528
                nMsg.err <- nil
4✔
2529
                return nil, true
4✔
2530
        }
4✔
2531

2532
        // Check if the channel is already closed in which case we can ignore
2533
        // it.
2534
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
231✔
2535
        if err != nil {
231✔
2536
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2537
                        err)
×
2538
                nMsg.err <- err
×
2539

×
2540
                return nil, false
×
2541
        }
×
2542

2543
        if closed {
232✔
2544
                err = fmt.Errorf("ignoring closed channel %v", scid)
1✔
2545
                log.Error(err)
1✔
2546

1✔
2547
                // If this is an announcement from us, we'll just ignore it.
1✔
2548
                if !nMsg.isRemote {
1✔
2549
                        nMsg.err <- err
×
2550
                        return nil, false
×
2551
                }
×
2552

2553
                // Increment the peer's ban score if they are sending closed
2554
                // channel announcements.
2555
                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2556

1✔
2557
                // If the peer is banned and not a channel peer, we'll
1✔
2558
                // disconnect them.
1✔
2559
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2560
                if dcErr != nil {
1✔
2561
                        log.Errorf("failed to check if we should disconnect "+
×
2562
                                "peer: %v", dcErr)
×
2563
                        nMsg.err <- dcErr
×
2564

×
2565
                        return nil, false
×
2566
                }
×
2567

2568
                if shouldDc {
1✔
2569
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2570
                }
×
2571

2572
                nMsg.err <- err
1✔
2573

1✔
2574
                return nil, false
1✔
2575
        }
2576

2577
        // If this is a remote channel announcement, then we'll validate all
2578
        // the signatures within the proof as it should be well formed.
2579
        var proof *models.ChannelAuthProof
230✔
2580
        if nMsg.isRemote {
446✔
2581
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
216✔
2582
                if err != nil {
216✔
2583
                        err := fmt.Errorf("unable to validate announcement: "+
×
2584
                                "%v", err)
×
2585

×
2586
                        key := newRejectCacheKey(
×
2587
                                scid.ToUint64(),
×
2588
                                sourceToPub(nMsg.source),
×
2589
                        )
×
2590
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2591

×
2592
                        log.Error(err)
×
2593
                        nMsg.err <- err
×
2594
                        return nil, false
×
2595
                }
×
2596

2597
                // If the proof checks out, then we'll save the proof itself to
2598
                // the database so we can fetch it later when gossiping with
2599
                // other nodes.
2600
                proof = &models.ChannelAuthProof{
216✔
2601
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
216✔
2602
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
216✔
2603
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
216✔
2604
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
216✔
2605
                }
216✔
2606
        }
2607

2608
        // With the proof validated (if necessary), we can now store it within
2609
        // the database for our path finding and syncing needs.
2610
        var featureBuf bytes.Buffer
230✔
2611
        if err := ann.Features.Encode(&featureBuf); err != nil {
230✔
2612
                log.Errorf("unable to encode features: %v", err)
×
2613
                nMsg.err <- err
×
2614
                return nil, false
×
2615
        }
×
2616

2617
        edge := &models.ChannelEdgeInfo{
230✔
2618
                ChannelID:        scid.ToUint64(),
230✔
2619
                ChainHash:        ann.ChainHash,
230✔
2620
                NodeKey1Bytes:    ann.NodeID1,
230✔
2621
                NodeKey2Bytes:    ann.NodeID2,
230✔
2622
                BitcoinKey1Bytes: ann.BitcoinKey1,
230✔
2623
                BitcoinKey2Bytes: ann.BitcoinKey2,
230✔
2624
                AuthProof:        proof,
230✔
2625
                Features:         featureBuf.Bytes(),
230✔
2626
                ExtraOpaqueData:  ann.ExtraOpaqueData,
230✔
2627
        }
230✔
2628

230✔
2629
        // If there were any optional message fields provided, we'll include
230✔
2630
        // them in its serialized disk representation now.
230✔
2631
        if nMsg.optionalMsgFields != nil {
247✔
2632
                if nMsg.optionalMsgFields.capacity != nil {
21✔
2633
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
4✔
2634
                }
4✔
2635
                if nMsg.optionalMsgFields.channelPoint != nil {
24✔
2636
                        cp := *nMsg.optionalMsgFields.channelPoint
7✔
2637
                        edge.ChannelPoint = cp
7✔
2638
                }
7✔
2639

2640
                // Optional tapscript root for custom channels.
2641
                edge.TapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
17✔
2642
        }
2643

2644
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
230✔
2645

230✔
2646
        // We will add the edge to the channel router. If the nodes present in
230✔
2647
        // this channel are not present in the database, a partial node will be
230✔
2648
        // added to represent each node while we wait for a node announcement.
230✔
2649
        //
230✔
2650
        // Before we add the edge to the database, we obtain the mutex for this
230✔
2651
        // channel ID. We do this to ensure no other goroutine has read the
230✔
2652
        // database and is now making decisions based on this DB state, before
230✔
2653
        // it writes to the DB.
230✔
2654
        d.channelMtx.Lock(scid.ToUint64())
230✔
2655
        err = d.cfg.Graph.AddEdge(edge, ops...)
230✔
2656
        if err != nil {
435✔
2657
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
205✔
2658
                        scid.ToUint64(), err)
205✔
2659

205✔
2660
                defer d.channelMtx.Unlock(scid.ToUint64())
205✔
2661

205✔
2662
                // If the edge was rejected due to already being known, then it
205✔
2663
                // may be the case that this new message has a fresh channel
205✔
2664
                // proof, so we'll check.
205✔
2665
                switch {
205✔
2666
                case graph.IsError(err, graph.ErrIgnored):
3✔
2667
                        // Attempt to process the rejected message to see if we
3✔
2668
                        // get any new announcements.
3✔
2669
                        anns, rErr := d.processRejectedEdge(ann, proof)
3✔
2670
                        if rErr != nil {
3✔
2671
                                key := newRejectCacheKey(
×
2672
                                        scid.ToUint64(),
×
2673
                                        sourceToPub(nMsg.source),
×
2674
                                )
×
2675
                                cr := &cachedReject{}
×
2676
                                _, _ = d.recentRejects.Put(key, cr)
×
2677

×
2678
                                nMsg.err <- rErr
×
2679
                                return nil, false
×
2680
                        }
×
2681

2682
                        log.Debugf("Extracted %v announcements from rejected "+
3✔
2683
                                "msgs", len(anns))
3✔
2684

3✔
2685
                        // If while processing this rejected edge, we realized
3✔
2686
                        // there's a set of announcements we could extract,
3✔
2687
                        // then we'll return those directly.
3✔
2688
                        //
3✔
2689
                        // NOTE: since this is an ErrIgnored, we can return
3✔
2690
                        // true here to signal "allow" to its dependants.
3✔
2691
                        nMsg.err <- nil
3✔
2692

3✔
2693
                        return anns, true
3✔
2694

2695
                case graph.IsError(
2696
                        err, graph.ErrNoFundingTransaction,
2697
                        graph.ErrInvalidFundingOutput,
2698
                ):
200✔
2699
                        key := newRejectCacheKey(
200✔
2700
                                scid.ToUint64(),
200✔
2701
                                sourceToPub(nMsg.source),
200✔
2702
                        )
200✔
2703
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
200✔
2704

200✔
2705
                        // Increment the peer's ban score. We check isRemote
200✔
2706
                        // so we don't actually ban the peer in case of a local
200✔
2707
                        // bug.
200✔
2708
                        if nMsg.isRemote {
400✔
2709
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
200✔
2710
                        }
200✔
2711

2712
                case graph.IsError(err, graph.ErrChannelSpent):
1✔
2713
                        key := newRejectCacheKey(
1✔
2714
                                scid.ToUint64(),
1✔
2715
                                sourceToPub(nMsg.source),
1✔
2716
                        )
1✔
2717
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2718

1✔
2719
                        // Since this channel has already been closed, we'll
1✔
2720
                        // add it to the graph's closed channel index such that
1✔
2721
                        // we won't attempt to do expensive validation checks
1✔
2722
                        // on it again.
1✔
2723
                        // TODO: Populate the ScidCloser by using closed
1✔
2724
                        // channel notifications.
1✔
2725
                        dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
1✔
2726
                        if dbErr != nil {
1✔
2727
                                log.Errorf("failed to mark scid(%v) as "+
×
2728
                                        "closed: %v", scid, dbErr)
×
2729

×
2730
                                nMsg.err <- dbErr
×
2731

×
2732
                                return nil, false
×
2733
                        }
×
2734

2735
                        // Increment the peer's ban score. We check isRemote
2736
                        // so we don't accidentally ban ourselves in case of a
2737
                        // bug.
2738
                        if nMsg.isRemote {
2✔
2739
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2740
                        }
1✔
2741

2742
                default:
1✔
2743
                        // Otherwise, this is just a regular rejected edge.
1✔
2744
                        key := newRejectCacheKey(
1✔
2745
                                scid.ToUint64(),
1✔
2746
                                sourceToPub(nMsg.source),
1✔
2747
                        )
1✔
2748
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2749
                }
2750

2751
                if !nMsg.isRemote {
202✔
2752
                        log.Errorf("failed to add edge for local channel: %v",
×
2753
                                err)
×
2754
                        nMsg.err <- err
×
2755

×
2756
                        return nil, false
×
2757
                }
×
2758

2759
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
202✔
2760
                if dcErr != nil {
202✔
2761
                        log.Errorf("failed to check if we should disconnect "+
×
2762
                                "peer: %v", dcErr)
×
2763
                        nMsg.err <- dcErr
×
2764

×
2765
                        return nil, false
×
2766
                }
×
2767

2768
                if shouldDc {
203✔
2769
                        nMsg.peer.Disconnect(ErrPeerBanned)
1✔
2770
                }
1✔
2771

2772
                nMsg.err <- err
202✔
2773

202✔
2774
                return nil, false
202✔
2775
        }
2776

2777
        // If err is nil, release the lock immediately.
2778
        d.channelMtx.Unlock(scid.ToUint64())
28✔
2779

28✔
2780
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
28✔
2781

28✔
2782
        // If we earlier received any ChannelUpdates for this channel, we can
28✔
2783
        // now process them, as the channel is added to the graph.
28✔
2784
        var channelUpdates []*processedNetworkMsg
28✔
2785

28✔
2786
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
28✔
2787
        if err == nil {
33✔
2788
                // There was actually an entry in the map, so we'll accumulate
5✔
2789
                // it. We don't worry about deletion, since it'll eventually
5✔
2790
                // fall out anyway.
5✔
2791
                chanMsgs := earlyChanUpdates
5✔
2792
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
5✔
2793
        }
5✔
2794

2795
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2796
        // ensure we don't block here, as we can handle only one announcement
2797
        // at a time.
2798
        for _, cu := range channelUpdates {
33✔
2799
                // Skip if already processed.
5✔
2800
                if cu.processed {
5✔
2801
                        continue
×
2802
                }
2803

2804
                // Mark the ChannelUpdate as processed. This ensures that a
2805
                // subsequent announcement in the option-scid-alias case does
2806
                // not re-use an old ChannelUpdate.
2807
                cu.processed = true
5✔
2808

5✔
2809
                d.wg.Add(1)
5✔
2810
                go func(updMsg *networkMsg) {
10✔
2811
                        defer d.wg.Done()
5✔
2812

5✔
2813
                        switch msg := updMsg.msg.(type) {
5✔
2814
                        // Reprocess the message, making sure we return an
2815
                        // error to the original caller in case the gossiper
2816
                        // shuts down.
2817
                        case *lnwire.ChannelUpdate1:
5✔
2818
                                log.Debugf("Reprocessing ChannelUpdate for "+
5✔
2819
                                        "shortChanID=%v", scid.ToUint64())
5✔
2820

5✔
2821
                                select {
5✔
2822
                                case d.networkMsgs <- updMsg:
5✔
2823
                                case <-d.quit:
×
2824
                                        updMsg.err <- ErrGossiperShuttingDown
×
2825
                                }
2826

2827
                        // We don't expect any other message type than
2828
                        // ChannelUpdate to be in this cache.
2829
                        default:
×
2830
                                log.Errorf("Unsupported message type found "+
×
2831
                                        "among ChannelUpdates: %T", msg)
×
2832
                        }
2833
                }(cu.msg)
2834
        }
2835

2836
        // Channel announcement was successfully processed and now it might be
2837
        // broadcast to other connected nodes if it was an announcement with
2838
        // proof (remote).
2839
        var announcements []networkMsg
28✔
2840

28✔
2841
        if proof != nil {
42✔
2842
                announcements = append(announcements, networkMsg{
14✔
2843
                        peer:     nMsg.peer,
14✔
2844
                        isRemote: nMsg.isRemote,
14✔
2845
                        source:   nMsg.source,
14✔
2846
                        msg:      ann,
14✔
2847
                })
14✔
2848
        }
14✔
2849

2850
        nMsg.err <- nil
28✔
2851

28✔
2852
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
28✔
2853
                nMsg.peer, scid.ToUint64())
28✔
2854

28✔
2855
        return announcements, true
28✔
2856
}
2857

2858
// handleChanUpdate processes a new channel update.
2859
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
2860
        upd *lnwire.ChannelUpdate1,
2861
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
58✔
2862

58✔
2863
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
58✔
2864
                nMsg.peer, upd.ShortChannelID.ToUint64())
58✔
2865

58✔
2866
        // We'll ignore any channel updates that target any chain other than
58✔
2867
        // the set of chains we know of.
58✔
2868
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
58✔
2869
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2870
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
×
2871
                log.Errorf(err.Error())
×
2872

×
2873
                key := newRejectCacheKey(
×
2874
                        upd.ShortChannelID.ToUint64(),
×
2875
                        sourceToPub(nMsg.source),
×
2876
                )
×
2877
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2878

×
2879
                nMsg.err <- err
×
2880
                return nil, false
×
2881
        }
×
2882

2883
        blockHeight := upd.ShortChannelID.BlockHeight
58✔
2884
        shortChanID := upd.ShortChannelID.ToUint64()
58✔
2885

58✔
2886
        // If the advertised inclusionary block is beyond our knowledge of the
58✔
2887
        // chain tip, then we'll put the announcement in limbo to be fully
58✔
2888
        // verified once we advance forward in the chain. If the update has an
58✔
2889
        // alias SCID, we'll skip the isPremature check. This is necessary
58✔
2890
        // since aliases start at block height 16_000_000.
58✔
2891
        d.Lock()
58✔
2892
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
58✔
2893
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
58✔
2894

×
2895
                log.Warnf("Update announcement for short_chan_id(%v), is "+
×
2896
                        "premature: advertises height %v, only height %v is "+
×
2897
                        "known", shortChanID, blockHeight, d.bestHeight)
×
2898
                d.Unlock()
×
2899
                nMsg.err <- nil
×
2900
                return nil, false
×
2901
        }
×
2902
        d.Unlock()
58✔
2903

58✔
2904
        // Before we perform any of the expensive checks below, we'll check
58✔
2905
        // whether this update is stale or is for a zombie channel in order to
58✔
2906
        // quickly reject it.
58✔
2907
        timestamp := time.Unix(int64(upd.Timestamp), 0)
58✔
2908

58✔
2909
        // Fetch the SCID we should be using to lock the channelMtx and make
58✔
2910
        // graph queries with.
58✔
2911
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
58✔
2912
        if err != nil {
116✔
2913
                // Fallback and set the graphScid to the peer-provided SCID.
58✔
2914
                // This will occur for non-option-scid-alias channels and for
58✔
2915
                // public option-scid-alias channels after 6 confirmations.
58✔
2916
                // Once public option-scid-alias channels have 6 confs, we'll
58✔
2917
                // ignore ChannelUpdates with one of their aliases.
58✔
2918
                graphScid = upd.ShortChannelID
58✔
2919
        }
58✔
2920

2921
        if d.cfg.Graph.IsStaleEdgePolicy(
58✔
2922
                graphScid, timestamp, upd.ChannelFlags,
58✔
2923
        ) {
63✔
2924

5✔
2925
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
5✔
2926
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
5✔
2927
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
5✔
2928
                )
5✔
2929

5✔
2930
                nMsg.err <- nil
5✔
2931
                return nil, true
5✔
2932
        }
5✔
2933

2934
        // Check that the ChanUpdate is not too far into the future, this could
2935
        // reveal some faulty implementation therefore we log an error.
2936
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
56✔
2937
                log.Errorf("Skewed timestamp (%v) for edge policy of "+
×
2938
                        "short_chan_id(%v), timestamp too far in the future: "+
×
2939
                        "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
×
2940
                        shortChanID, nMsg.peer, nMsg.msg.MsgType(),
×
2941
                        nMsg.isRemote,
×
2942
                )
×
2943

×
2944
                nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
×
2945
                        "timestamp too far in the future: %v", timestamp.Unix())
×
2946

×
2947
                return nil, false
×
2948
        }
×
2949

2950
        // Get the node pub key as far since we don't have it in the channel
2951
        // update announcement message. We'll need this to properly verify the
2952
        // message's signature.
2953
        //
2954
        // We make sure to obtain the mutex for this channel ID before we
2955
        // access the database. This ensures the state we read from the
2956
        // database has not changed between this point and when we call
2957
        // UpdateEdge() later.
2958
        d.channelMtx.Lock(graphScid.ToUint64())
56✔
2959
        defer d.channelMtx.Unlock(graphScid.ToUint64())
56✔
2960

56✔
2961
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
56✔
2962
        switch {
56✔
2963
        // No error, break.
2964
        case err == nil:
52✔
2965
                break
52✔
2966

2967
        case errors.Is(err, graphdb.ErrZombieEdge):
3✔
2968
                err = d.processZombieUpdate(chanInfo, graphScid, upd)
3✔
2969
                if err != nil {
5✔
2970
                        log.Debug(err)
2✔
2971
                        nMsg.err <- err
2✔
2972
                        return nil, false
2✔
2973
                }
2✔
2974

2975
                // We'll fallthrough to ensure we stash the update until we
2976
                // receive its corresponding ChannelAnnouncement. This is
2977
                // needed to ensure the edge exists in the graph before
2978
                // applying the update.
2979
                fallthrough
1✔
2980
        case errors.Is(err, graphdb.ErrGraphNotFound):
1✔
2981
                fallthrough
1✔
2982
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
1✔
2983
                fallthrough
1✔
2984
        case errors.Is(err, graphdb.ErrEdgeNotFound):
5✔
2985
                // If the edge corresponding to this ChannelUpdate was not
5✔
2986
                // found in the graph, this might be a channel in the process
5✔
2987
                // of being opened, and we haven't processed our own
5✔
2988
                // ChannelAnnouncement yet, hence it is not not found in the
5✔
2989
                // graph. This usually gets resolved after the channel proofs
5✔
2990
                // are exchanged and the channel is broadcasted to the rest of
5✔
2991
                // the network, but in case this is a private channel this
5✔
2992
                // won't ever happen. This can also happen in the case of a
5✔
2993
                // zombie channel with a fresh update for which we don't have a
5✔
2994
                // ChannelAnnouncement for since we reject them. Because of
5✔
2995
                // this, we temporarily add it to a map, and reprocess it after
5✔
2996
                // our own ChannelAnnouncement has been processed.
5✔
2997
                //
5✔
2998
                // The shortChanID may be an alias, but it is fine to use here
5✔
2999
                // since we don't have an edge in the graph and if the peer is
5✔
3000
                // not buggy, we should be able to use it once the gossiper
5✔
3001
                // receives the local announcement.
5✔
3002
                pMsg := &processedNetworkMsg{msg: nMsg}
5✔
3003

5✔
3004
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
5✔
3005
                switch {
5✔
3006
                // Nothing in the cache yet, we can just directly insert this
3007
                // element.
3008
                case err == cache.ErrElementNotFound:
5✔
3009
                        _, _ = d.prematureChannelUpdates.Put(
5✔
3010
                                shortChanID, &cachedNetworkMsg{
5✔
3011
                                        msgs: []*processedNetworkMsg{pMsg},
5✔
3012
                                })
5✔
3013

3014
                // There's already something in the cache, so we'll combine the
3015
                // set of messages into a single value.
3016
                default:
3✔
3017
                        msgs := earlyMsgs.msgs
3✔
3018
                        msgs = append(msgs, pMsg)
3✔
3019
                        _, _ = d.prematureChannelUpdates.Put(
3✔
3020
                                shortChanID, &cachedNetworkMsg{
3✔
3021
                                        msgs: msgs,
3✔
3022
                                })
3✔
3023
                }
3024

3025
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
5✔
3026
                        "(shortChanID=%v), saving for reprocessing later",
5✔
3027
                        shortChanID)
5✔
3028

5✔
3029
                // NOTE: We don't return anything on the error channel for this
5✔
3030
                // message, as we expect that will be done when this
5✔
3031
                // ChannelUpdate is later reprocessed.
5✔
3032
                return nil, false
5✔
3033

3034
        default:
×
3035
                err := fmt.Errorf("unable to validate channel update "+
×
3036
                        "short_chan_id=%v: %v", shortChanID, err)
×
3037
                log.Error(err)
×
3038
                nMsg.err <- err
×
3039

×
3040
                key := newRejectCacheKey(
×
3041
                        upd.ShortChannelID.ToUint64(),
×
3042
                        sourceToPub(nMsg.source),
×
3043
                )
×
3044
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3045

×
3046
                return nil, false
×
3047
        }
3048

3049
        // The least-significant bit in the flag on the channel update
3050
        // announcement tells us "which" side of the channels directed edge is
3051
        // being updated.
3052
        var (
52✔
3053
                pubKey       *btcec.PublicKey
52✔
3054
                edgeToUpdate *models.ChannelEdgePolicy
52✔
3055
        )
52✔
3056
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
52✔
3057
        switch direction {
52✔
3058
        case 0:
37✔
3059
                pubKey, _ = chanInfo.NodeKey1()
37✔
3060
                edgeToUpdate = e1
37✔
3061
        case 1:
18✔
3062
                pubKey, _ = chanInfo.NodeKey2()
18✔
3063
                edgeToUpdate = e2
18✔
3064
        }
3065

3066
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
52✔
3067
                "edge policy=%v", chanInfo.ChannelID,
52✔
3068
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
52✔
3069

52✔
3070
        // Validate the channel announcement with the expected public key and
52✔
3071
        // channel capacity. In the case of an invalid channel update, we'll
52✔
3072
        // return an error to the caller and exit early.
52✔
3073
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
52✔
3074
        if err != nil {
56✔
3075
                rErr := fmt.Errorf("unable to validate channel update "+
4✔
3076
                        "announcement for short_chan_id=%v: %v",
4✔
3077
                        spew.Sdump(upd.ShortChannelID), err)
4✔
3078

4✔
3079
                log.Error(rErr)
4✔
3080
                nMsg.err <- rErr
4✔
3081
                return nil, false
4✔
3082
        }
4✔
3083

3084
        // If we have a previous version of the edge being updated, we'll want
3085
        // to rate limit its updates to prevent spam throughout the network.
3086
        if nMsg.isRemote && edgeToUpdate != nil {
65✔
3087
                // If it's a keep-alive update, we'll only propagate one if
17✔
3088
                // it's been a day since the previous. This follows our own
17✔
3089
                // heuristic of sending keep-alive updates after the same
17✔
3090
                // duration (see retransmitStaleAnns).
17✔
3091
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
17✔
3092
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
22✔
3093
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
9✔
3094
                                log.Debugf("Ignoring keep alive update not "+
4✔
3095
                                        "within %v period for channel %v",
4✔
3096
                                        d.cfg.RebroadcastInterval, shortChanID)
4✔
3097
                                nMsg.err <- nil
4✔
3098
                                return nil, false
4✔
3099
                        }
4✔
3100
                } else {
15✔
3101
                        // If it's not, we'll allow an update per minute with a
15✔
3102
                        // maximum burst of 10. If we haven't seen an update
15✔
3103
                        // for this channel before, we'll need to initialize a
15✔
3104
                        // rate limiter for each direction.
15✔
3105
                        //
15✔
3106
                        // Since the edge exists in the graph, we'll create a
15✔
3107
                        // rate limiter for chanInfo.ChannelID rather then the
15✔
3108
                        // SCID the peer sent. This is because there may be
15✔
3109
                        // multiple aliases for a channel and we may otherwise
15✔
3110
                        // rate-limit only a single alias of the channel,
15✔
3111
                        // instead of the whole channel.
15✔
3112
                        baseScid := chanInfo.ChannelID
15✔
3113
                        d.Lock()
15✔
3114
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
15✔
3115
                        if !ok {
19✔
3116
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
4✔
3117
                                b := d.cfg.MaxChannelUpdateBurst
4✔
3118
                                rls = [2]*rate.Limiter{
4✔
3119
                                        rate.NewLimiter(r, b),
4✔
3120
                                        rate.NewLimiter(r, b),
4✔
3121
                                }
4✔
3122
                                d.chanUpdateRateLimiter[baseScid] = rls
4✔
3123
                        }
4✔
3124
                        d.Unlock()
15✔
3125

15✔
3126
                        if !rls[direction].Allow() {
23✔
3127
                                log.Debugf("Rate limiting update for channel "+
8✔
3128
                                        "%v from direction %x", shortChanID,
8✔
3129
                                        pubKey.SerializeCompressed())
8✔
3130
                                nMsg.err <- nil
8✔
3131
                                return nil, false
8✔
3132
                        }
8✔
3133
                }
3134
        }
3135

3136
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3137
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3138
        // lookup the stored SCID. If we're sending the update, we'll always
3139
        // use the SCID stored in the database rather than a potentially
3140
        // different alias. This might mean that SigBytes is incorrect as it
3141
        // signs a different SCID than the database SCID, but since there will
3142
        // only be a difference if AuthProof == nil, this is fine.
3143
        update := &models.ChannelEdgePolicy{
42✔
3144
                SigBytes:                  upd.Signature.ToSignatureBytes(),
42✔
3145
                ChannelID:                 chanInfo.ChannelID,
42✔
3146
                LastUpdate:                timestamp,
42✔
3147
                MessageFlags:              upd.MessageFlags,
42✔
3148
                ChannelFlags:              upd.ChannelFlags,
42✔
3149
                TimeLockDelta:             upd.TimeLockDelta,
42✔
3150
                MinHTLC:                   upd.HtlcMinimumMsat,
42✔
3151
                MaxHTLC:                   upd.HtlcMaximumMsat,
42✔
3152
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
42✔
3153
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
42✔
3154
                ExtraOpaqueData:           upd.ExtraOpaqueData,
42✔
3155
        }
42✔
3156

42✔
3157
        if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
45✔
3158
                if graph.IsError(
3✔
3159
                        err, graph.ErrOutdated,
3✔
3160
                        graph.ErrIgnored,
3✔
3161
                ) {
6✔
3162

3✔
3163
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
3✔
3164
                                shortChanID, err)
3✔
3165
                } else {
3✔
3166
                        // Since we know the stored SCID in the graph, we'll
×
3167
                        // cache that SCID.
×
3168
                        key := newRejectCacheKey(
×
3169
                                chanInfo.ChannelID,
×
3170
                                sourceToPub(nMsg.source),
×
3171
                        )
×
3172
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3173

×
3174
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3175
                                shortChanID, err)
×
3176
                }
×
3177

3178
                nMsg.err <- err
3✔
3179
                return nil, false
3✔
3180
        }
3181

3182
        // If this is a local ChannelUpdate without an AuthProof, it means it
3183
        // is an update to a channel that is not (yet) supposed to be announced
3184
        // to the greater network. However, our channel counter party will need
3185
        // to be given the update, so we'll try sending the update directly to
3186
        // the remote peer.
3187
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
56✔
3188
                if nMsg.optionalMsgFields != nil {
28✔
3189
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
14✔
3190
                        if remoteAlias != nil {
17✔
3191
                                // The remoteAlias field was specified, meaning
3✔
3192
                                // that we should replace the SCID in the
3✔
3193
                                // update with the remote's alias. We'll also
3✔
3194
                                // need to re-sign the channel update. This is
3✔
3195
                                // required for option-scid-alias feature-bit
3✔
3196
                                // negotiated channels.
3✔
3197
                                upd.ShortChannelID = *remoteAlias
3✔
3198

3✔
3199
                                sig, err := d.cfg.SignAliasUpdate(upd)
3✔
3200
                                if err != nil {
3✔
3201
                                        log.Error(err)
×
3202
                                        nMsg.err <- err
×
3203
                                        return nil, false
×
3204
                                }
×
3205

3206
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
3207
                                if err != nil {
3✔
3208
                                        log.Error(err)
×
3209
                                        nMsg.err <- err
×
3210
                                        return nil, false
×
3211
                                }
×
3212

3213
                                upd.Signature = lnSig
3✔
3214
                        }
3215
                }
3216

3217
                // Get our peer's public key.
3218
                remotePubKey := remotePubFromChanInfo(
14✔
3219
                        chanInfo, upd.ChannelFlags,
14✔
3220
                )
14✔
3221

14✔
3222
                log.Debugf("The message %v has no AuthProof, sending the "+
14✔
3223
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
14✔
3224

14✔
3225
                // Now we'll attempt to send the channel update message
14✔
3226
                // reliably to the remote peer in the background, so that we
14✔
3227
                // don't block if the peer happens to be offline at the moment.
14✔
3228
                err := d.reliableSender.sendMessage(upd, remotePubKey)
14✔
3229
                if err != nil {
14✔
3230
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3231
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3232
                                upd.ShortChannelID, remotePubKey, err)
×
3233
                        nMsg.err <- err
×
3234
                        return nil, false
×
3235
                }
×
3236
        }
3237

3238
        // Channel update announcement was successfully processed and now it
3239
        // can be broadcast to the rest of the network. However, we'll only
3240
        // broadcast the channel update announcement if it has an attached
3241
        // authentication proof. We also won't broadcast the update if it
3242
        // contains an alias because the network would reject this.
3243
        var announcements []networkMsg
42✔
3244
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
64✔
3245
                announcements = append(announcements, networkMsg{
22✔
3246
                        peer:     nMsg.peer,
22✔
3247
                        source:   nMsg.source,
22✔
3248
                        isRemote: nMsg.isRemote,
22✔
3249
                        msg:      upd,
22✔
3250
                })
22✔
3251
        }
22✔
3252

3253
        nMsg.err <- nil
42✔
3254

42✔
3255
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
42✔
3256
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
42✔
3257
                timestamp)
42✔
3258
        return announcements, true
42✔
3259
}
3260

3261
// handleAnnSig processes a new announcement signatures message.
3262
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
3263
        ann *lnwire.AnnounceSignatures1) ([]networkMsg, bool) {
24✔
3264

24✔
3265
        needBlockHeight := ann.ShortChannelID.BlockHeight +
24✔
3266
                d.cfg.ProofMatureDelta
24✔
3267
        shortChanID := ann.ShortChannelID.ToUint64()
24✔
3268

24✔
3269
        prefix := "local"
24✔
3270
        if nMsg.isRemote {
38✔
3271
                prefix = "remote"
14✔
3272
        }
14✔
3273

3274
        log.Infof("Received new %v announcement signature for %v", prefix,
24✔
3275
                ann.ShortChannelID)
24✔
3276

24✔
3277
        // By the specification, channel announcement proofs should be sent
24✔
3278
        // after some number of confirmations after channel was registered in
24✔
3279
        // bitcoin blockchain. Therefore, we check if the proof is mature.
24✔
3280
        d.Lock()
24✔
3281
        premature := d.isPremature(
24✔
3282
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
24✔
3283
        )
24✔
3284
        if premature {
27✔
3285
                log.Warnf("Premature proof announcement, current block height"+
3✔
3286
                        "lower than needed: %v < %v", d.bestHeight,
3✔
3287
                        needBlockHeight)
3✔
3288
                d.Unlock()
3✔
3289
                nMsg.err <- nil
3✔
3290
                return nil, false
3✔
3291
        }
3✔
3292
        d.Unlock()
24✔
3293

24✔
3294
        // Ensure that we know of a channel with the target channel ID before
24✔
3295
        // proceeding further.
24✔
3296
        //
24✔
3297
        // We must acquire the mutex for this channel ID before getting the
24✔
3298
        // channel from the database, to ensure what we read does not change
24✔
3299
        // before we call AddProof() later.
24✔
3300
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
24✔
3301
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
24✔
3302

24✔
3303
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
24✔
3304
                ann.ShortChannelID,
24✔
3305
        )
24✔
3306
        if err != nil {
28✔
3307
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
4✔
3308
                if err != nil {
7✔
3309
                        err := fmt.Errorf("unable to store the proof for "+
3✔
3310
                                "short_chan_id=%v: %v", shortChanID, err)
3✔
3311
                        log.Error(err)
3✔
3312
                        nMsg.err <- err
3✔
3313

3✔
3314
                        return nil, false
3✔
3315
                }
3✔
3316

3317
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
4✔
3318
                err := d.cfg.WaitingProofStore.Add(proof)
4✔
3319
                if err != nil {
4✔
3320
                        err := fmt.Errorf("unable to store the proof for "+
×
3321
                                "short_chan_id=%v: %v", shortChanID, err)
×
3322
                        log.Error(err)
×
3323
                        nMsg.err <- err
×
3324
                        return nil, false
×
3325
                }
×
3326

3327
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
4✔
3328
                        ", adding to waiting batch", prefix, shortChanID)
4✔
3329
                nMsg.err <- nil
4✔
3330
                return nil, false
4✔
3331
        }
3332

3333
        nodeID := nMsg.source.SerializeCompressed()
23✔
3334
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
23✔
3335
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
23✔
3336

23✔
3337
        // Ensure that channel that was retrieved belongs to the peer which
23✔
3338
        // sent the proof announcement.
23✔
3339
        if !(isFirstNode || isSecondNode) {
23✔
3340
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3341
                        "to the peer which sent the proof, short_chan_id=%v",
×
3342
                        shortChanID)
×
3343
                log.Error(err)
×
3344
                nMsg.err <- err
×
3345
                return nil, false
×
3346
        }
×
3347

3348
        // If proof was sent by a local sub-system, then we'll send the
3349
        // announcement signature to the remote node so they can also
3350
        // reconstruct the full channel announcement.
3351
        if !nMsg.isRemote {
36✔
3352
                var remotePubKey [33]byte
13✔
3353
                if isFirstNode {
26✔
3354
                        remotePubKey = chanInfo.NodeKey2Bytes
13✔
3355
                } else {
16✔
3356
                        remotePubKey = chanInfo.NodeKey1Bytes
3✔
3357
                }
3✔
3358

3359
                // Since the remote peer might not be online we'll call a
3360
                // method that will attempt to deliver the proof when it comes
3361
                // online.
3362
                err := d.reliableSender.sendMessage(ann, remotePubKey)
13✔
3363
                if err != nil {
13✔
3364
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3365
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3366
                                ann.ShortChannelID, remotePubKey, err)
×
3367
                        nMsg.err <- err
×
3368
                        return nil, false
×
3369
                }
×
3370
        }
3371

3372
        // Check if we already have the full proof for this channel.
3373
        if chanInfo.AuthProof != nil {
27✔
3374
                // If we already have the fully assembled proof, then the peer
4✔
3375
                // sending us their proof has probably not received our local
4✔
3376
                // proof yet. So be kind and send them the full proof.
4✔
3377
                if nMsg.isRemote {
8✔
3378
                        peerID := nMsg.source.SerializeCompressed()
4✔
3379
                        log.Debugf("Got AnnounceSignatures for channel with " +
4✔
3380
                                "full proof.")
4✔
3381

4✔
3382
                        d.wg.Add(1)
4✔
3383
                        go func() {
8✔
3384
                                defer d.wg.Done()
4✔
3385

4✔
3386
                                log.Debugf("Received half proof for channel "+
4✔
3387
                                        "%v with existing full proof. Sending"+
4✔
3388
                                        " full proof to peer=%x",
4✔
3389
                                        ann.ChannelID, peerID)
4✔
3390

4✔
3391
                                ca, _, _, err := netann.CreateChanAnnouncement(
4✔
3392
                                        chanInfo.AuthProof, chanInfo, e1, e2,
4✔
3393
                                )
4✔
3394
                                if err != nil {
4✔
3395
                                        log.Errorf("unable to gen ann: %v",
×
3396
                                                err)
×
3397
                                        return
×
3398
                                }
×
3399

3400
                                err = nMsg.peer.SendMessage(false, ca)
4✔
3401
                                if err != nil {
4✔
3402
                                        log.Errorf("Failed sending full proof"+
×
3403
                                                " to peer=%x: %v", peerID, err)
×
3404
                                        return
×
3405
                                }
×
3406

3407
                                log.Debugf("Full proof sent to peer=%x for "+
4✔
3408
                                        "chanID=%v", peerID, ann.ChannelID)
4✔
3409
                        }()
3410
                }
3411

3412
                log.Debugf("Already have proof for channel with chanID=%v",
4✔
3413
                        ann.ChannelID)
4✔
3414
                nMsg.err <- nil
4✔
3415
                return nil, true
4✔
3416
        }
3417

3418
        // Check that we received the opposite proof. If so, then we're now
3419
        // able to construct the full proof, and create the channel
3420
        // announcement. If we didn't receive the opposite half of the proof
3421
        // then we should store this one, and wait for the opposite to be
3422
        // received.
3423
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
22✔
3424
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
22✔
3425
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
22✔
3426
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3427
                        "short_chan_id=%v: %v", shortChanID, err)
×
3428
                log.Error(err)
×
3429
                nMsg.err <- err
×
3430
                return nil, false
×
3431
        }
×
3432

3433
        if err == channeldb.ErrWaitingProofNotFound {
34✔
3434
                err := d.cfg.WaitingProofStore.Add(proof)
12✔
3435
                if err != nil {
12✔
3436
                        err := fmt.Errorf("unable to store the proof for "+
×
3437
                                "short_chan_id=%v: %v", shortChanID, err)
×
3438
                        log.Error(err)
×
3439
                        nMsg.err <- err
×
3440
                        return nil, false
×
3441
                }
×
3442

3443
                log.Infof("1/2 of channel ann proof received for "+
12✔
3444
                        "short_chan_id=%v, waiting for other half",
12✔
3445
                        shortChanID)
12✔
3446

12✔
3447
                nMsg.err <- nil
12✔
3448
                return nil, false
12✔
3449
        }
3450

3451
        // We now have both halves of the channel announcement proof, then
3452
        // we'll reconstruct the initial announcement so we can validate it
3453
        // shortly below.
3454
        var dbProof models.ChannelAuthProof
13✔
3455
        if isFirstNode {
17✔
3456
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
4✔
3457
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
4✔
3458
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
4✔
3459
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
4✔
3460
        } else {
16✔
3461
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
12✔
3462
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
12✔
3463
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
12✔
3464
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
12✔
3465
        }
12✔
3466

3467
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
13✔
3468
                &dbProof, chanInfo, e1, e2,
13✔
3469
        )
13✔
3470
        if err != nil {
13✔
3471
                log.Error(err)
×
3472
                nMsg.err <- err
×
3473
                return nil, false
×
3474
        }
×
3475

3476
        // With all the necessary components assembled validate the full
3477
        // channel announcement proof.
3478
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
13✔
3479
        if err != nil {
13✔
3480
                err := fmt.Errorf("channel announcement proof for "+
×
3481
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3482

×
3483
                log.Error(err)
×
3484
                nMsg.err <- err
×
3485
                return nil, false
×
3486
        }
×
3487

3488
        // If the channel was returned by the router it means that existence of
3489
        // funding point and inclusion of nodes bitcoin keys in it already
3490
        // checked by the router. In this stage we should check that node keys
3491
        // attest to the bitcoin keys by validating the signatures of
3492
        // announcement. If proof is valid then we'll populate the channel edge
3493
        // with it, so we can announce it on peer connect.
3494
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
13✔
3495
        if err != nil {
13✔
3496
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3497
                        " %v", ann.ChannelID, err)
×
3498
                log.Error(err)
×
3499
                nMsg.err <- err
×
3500
                return nil, false
×
3501
        }
×
3502

3503
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
13✔
3504
        if err != nil {
13✔
3505
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3506
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3507
                log.Error(err)
×
3508
                nMsg.err <- err
×
3509
                return nil, false
×
3510
        }
×
3511

3512
        // Proof was successfully created and now can announce the channel to
3513
        // the remain network.
3514
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
13✔
3515
                ", adding to next ann batch", shortChanID)
13✔
3516

13✔
3517
        // Assemble the necessary announcements to add to the next broadcasting
13✔
3518
        // batch.
13✔
3519
        var announcements []networkMsg
13✔
3520
        announcements = append(announcements, networkMsg{
13✔
3521
                peer:   nMsg.peer,
13✔
3522
                source: nMsg.source,
13✔
3523
                msg:    chanAnn,
13✔
3524
        })
13✔
3525
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
25✔
3526
                announcements = append(announcements, networkMsg{
12✔
3527
                        peer:   nMsg.peer,
12✔
3528
                        source: src,
12✔
3529
                        msg:    e1Ann,
12✔
3530
                })
12✔
3531
        }
12✔
3532
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
24✔
3533
                announcements = append(announcements, networkMsg{
11✔
3534
                        peer:   nMsg.peer,
11✔
3535
                        source: src,
11✔
3536
                        msg:    e2Ann,
11✔
3537
                })
11✔
3538
        }
11✔
3539

3540
        // We'll also send along the node announcements for each channel
3541
        // participant if we know of them. To ensure our node announcement
3542
        // propagates to our channel counterparty, we'll set the source for
3543
        // each announcement to the node it belongs to, otherwise we won't send
3544
        // it since the source gets skipped. This isn't necessary for channel
3545
        // updates and announcement signatures since we send those directly to
3546
        // our channel counterparty through the gossiper's reliable sender.
3547
        node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
13✔
3548
        if err != nil {
18✔
3549
                log.Debugf("Unable to fetch node announcement for %x: %v",
5✔
3550
                        chanInfo.NodeKey1Bytes, err)
5✔
3551
        } else {
16✔
3552
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
22✔
3553
                        announcements = append(announcements, networkMsg{
11✔
3554
                                peer:   nMsg.peer,
11✔
3555
                                source: nodeKey1,
11✔
3556
                                msg:    node1Ann,
11✔
3557
                        })
11✔
3558
                }
11✔
3559
        }
3560

3561
        node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
13✔
3562
        if err != nil {
20✔
3563
                log.Debugf("Unable to fetch node announcement for %x: %v",
7✔
3564
                        chanInfo.NodeKey2Bytes, err)
7✔
3565
        } else {
16✔
3566
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
18✔
3567
                        announcements = append(announcements, networkMsg{
9✔
3568
                                peer:   nMsg.peer,
9✔
3569
                                source: nodeKey2,
9✔
3570
                                msg:    node2Ann,
9✔
3571
                        })
9✔
3572
                }
9✔
3573
        }
3574

3575
        nMsg.err <- nil
13✔
3576
        return announcements, true
13✔
3577
}
3578

3579
// isBanned returns true if the peer identified by pubkey is banned for sending
3580
// invalid channel announcements.
3581
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
208✔
3582
        return d.banman.isBanned(pubkey)
208✔
3583
}
208✔
3584

3585
// ShouldDisconnect returns true if we should disconnect the peer identified by
3586
// pubkey.
3587
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3588
        bool, error) {
206✔
3589

206✔
3590
        pubkeySer := pubkey.SerializeCompressed()
206✔
3591

206✔
3592
        var pubkeyBytes [33]byte
206✔
3593
        copy(pubkeyBytes[:], pubkeySer)
206✔
3594

206✔
3595
        // If the public key is banned, check whether or not this is a channel
206✔
3596
        // peer.
206✔
3597
        if d.isBanned(pubkeyBytes) {
208✔
3598
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
2✔
3599
                if err != nil {
2✔
3600
                        return false, err
×
3601
                }
×
3602

3603
                // We should only disconnect non-channel peers.
3604
                if !isChanPeer {
3✔
3605
                        return true, nil
1✔
3606
                }
1✔
3607
        }
3608

3609
        return false, nil
205✔
3610
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc