• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13566028875

27 Feb 2025 12:09PM UTC coverage: 49.396% (-9.4%) from 58.748%
13566028875

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

34 of 54 new or added lines in 4 files covered. (62.96%)

27464 existing lines in 436 files now uncovered.

101095 of 204664 relevant lines covered (49.4%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.98
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/btcsuite/btcd/btcec/v2"
13
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
14
        "github.com/btcsuite/btcd/btcutil"
15
        "github.com/btcsuite/btcd/chaincfg/chainhash"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/davecgh/go-spew/spew"
18
        "github.com/lightninglabs/neutrino/cache"
19
        "github.com/lightninglabs/neutrino/cache/lru"
20
        "github.com/lightningnetwork/lnd/batch"
21
        "github.com/lightningnetwork/lnd/chainntnfs"
22
        "github.com/lightningnetwork/lnd/channeldb"
23
        "github.com/lightningnetwork/lnd/fn/v2"
24
        "github.com/lightningnetwork/lnd/graph"
25
        graphdb "github.com/lightningnetwork/lnd/graph/db"
26
        "github.com/lightningnetwork/lnd/graph/db/models"
27
        "github.com/lightningnetwork/lnd/input"
28
        "github.com/lightningnetwork/lnd/keychain"
29
        "github.com/lightningnetwork/lnd/lnpeer"
30
        "github.com/lightningnetwork/lnd/lnutils"
31
        "github.com/lightningnetwork/lnd/lnwallet"
32
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
33
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
34
        "github.com/lightningnetwork/lnd/lnwire"
35
        "github.com/lightningnetwork/lnd/multimutex"
36
        "github.com/lightningnetwork/lnd/netann"
37
        "github.com/lightningnetwork/lnd/routing/route"
38
        "github.com/lightningnetwork/lnd/ticker"
39
        "golang.org/x/time/rate"
40
)
41

42
const (
43
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
44
        // for a specific channel and direction that we'll accept over an
45
        // interval.
46
        DefaultMaxChannelUpdateBurst = 10
47

48
        // DefaultChannelUpdateInterval is the default interval we'll use to
49
        // determine how often we should allow a new update for a specific
50
        // channel and direction.
51
        DefaultChannelUpdateInterval = time.Minute
52

53
        // maxPrematureUpdates tracks the max amount of premature channel
54
        // updates that we'll hold onto.
55
        maxPrematureUpdates = 100
56

57
        // maxFutureMessages tracks the max amount of future messages that
58
        // we'll hold onto.
59
        maxFutureMessages = 1000
60

61
        // DefaultSubBatchDelay is the default delay we'll use when
62
        // broadcasting the next announcement batch.
63
        DefaultSubBatchDelay = 5 * time.Second
64

65
        // maxRejectedUpdates tracks the max amount of rejected channel updates
66
        // we'll maintain. This is the global size across all peers. We'll
67
        // allocate ~3 MB max to the cache.
68
        maxRejectedUpdates = 10_000
69

70
        // DefaultProofMatureDelta specifies the default value used for
71
        // ProofMatureDelta, which is the number of confirmations needed before
72
        // processing the announcement signatures.
73
        DefaultProofMatureDelta = 6
74
)
75

76
var (
77
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
78
        // is in the process of being shut down.
79
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
80

81
        // ErrGossipSyncerNotFound signals that we were unable to find an active
82
        // gossip syncer corresponding to a gossip query message received from
83
        // the remote peer.
84
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
85

86
        // ErrNoFundingTransaction is returned when we are unable to find the
87
        // funding transaction described by the short channel ID on chain.
88
        ErrNoFundingTransaction = errors.New(
89
                "unable to find the funding transaction",
90
        )
91

92
        // ErrInvalidFundingOutput is returned if the channel funding output
93
        // fails validation.
94
        ErrInvalidFundingOutput = errors.New(
95
                "channel funding output validation failed",
96
        )
97

98
        // ErrChannelSpent is returned when we go to validate a channel, but
99
        // the purported funding output has actually already been spent on
100
        // chain.
101
        ErrChannelSpent = errors.New("channel output has been spent")
102

103
        // emptyPubkey is used to compare compressed pubkeys against an empty
104
        // byte array.
105
        emptyPubkey [33]byte
106
)
107

108
// optionalMsgFields is a set of optional message fields that external callers
109
// can provide that serve useful when processing a specific network
110
// announcement.
111
type optionalMsgFields struct {
112
        capacity      *btcutil.Amount
113
        channelPoint  *wire.OutPoint
114
        remoteAlias   *lnwire.ShortChannelID
115
        tapscriptRoot fn.Option[chainhash.Hash]
116
}
117

118
// apply applies the optional fields within the functional options.
119
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
3✔
120
        for _, optionalMsgField := range optionalMsgFields {
6✔
121
                optionalMsgField(f)
3✔
122
        }
3✔
123
}
124

125
// OptionalMsgField is a functional option parameter that can be used to provide
126
// external information that is not included within a network message but serves
127
// useful when processing it.
128
type OptionalMsgField func(*optionalMsgFields)
129

130
// ChannelCapacity is an optional field that lets the gossiper know of the
131
// capacity of a channel.
132
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
3✔
133
        return func(f *optionalMsgFields) {
6✔
134
                f.capacity = &capacity
3✔
135
        }
3✔
136
}
137

138
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
139
// of a channel.
140
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
3✔
141
        return func(f *optionalMsgFields) {
6✔
142
                f.channelPoint = &op
3✔
143
        }
3✔
144
}
145

146
// TapscriptRoot is an optional field that lets the gossiper know of the root of
147
// the tapscript tree for a custom channel.
148
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
3✔
149
        return func(f *optionalMsgFields) {
6✔
150
                f.tapscriptRoot = root
3✔
151
        }
3✔
152
}
153

154
// RemoteAlias is an optional field that lets the gossiper know that a locally
155
// sent channel update is actually an update for the peer that should replace
156
// the ShortChannelID field with the remote's alias. This is only used for
157
// channels with peers where the option-scid-alias feature bit was negotiated.
158
// The channel update will be added to the graph under the original SCID, but
159
// will be modified and re-signed with this alias.
160
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
3✔
161
        return func(f *optionalMsgFields) {
6✔
162
                f.remoteAlias = alias
3✔
163
        }
3✔
164
}
165

166
// networkMsg couples a routing related wire message with the peer that
167
// originally sent it.
168
type networkMsg struct {
169
        peer              lnpeer.Peer
170
        source            *btcec.PublicKey
171
        msg               lnwire.Message
172
        optionalMsgFields *optionalMsgFields
173

174
        isRemote bool
175

176
        err chan error
177
}
178

179
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
180
// wishes to update a particular set of channels. New ChannelUpdate messages
181
// will be crafted to be sent out during the next broadcast epoch and the fee
182
// updates committed to the lower layer.
183
type chanPolicyUpdateRequest struct {
184
        edgesToUpdate []EdgeWithInfo
185
        errChan       chan error
186
}
187

188
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
189
// syncer at all times.
190
type PinnedSyncers map[route.Vertex]struct{}
191

192
// Config defines the configuration for the service. ALL elements within the
193
// configuration MUST be non-nil for the service to carry out its duties.
194
type Config struct {
195
        // ChainHash is a hash that indicates which resident chain of the
196
        // AuthenticatedGossiper. Any announcements that don't match this
197
        // chain hash will be ignored.
198
        //
199
        // TODO(roasbeef): eventually make into map so can de-multiplex
200
        // incoming announcements
201
        //   * also need to do same for Notifier
202
        ChainHash chainhash.Hash
203

204
        // Graph is the subsystem which is responsible for managing the
205
        // topology of lightning network. After incoming channel, node, channel
206
        // updates announcements are validated they are sent to the router in
207
        // order to be included in the LN graph.
208
        Graph graph.ChannelGraphSource
209

210
        // ChainIO represents an abstraction over a source that can query the
211
        // blockchain.
212
        ChainIO lnwallet.BlockChainIO
213

214
        // ChanSeries is an interfaces that provides access to a time series
215
        // view of the current known channel graph. Each GossipSyncer enabled
216
        // peer will utilize this in order to create and respond to channel
217
        // graph time series queries.
218
        ChanSeries ChannelGraphTimeSeries
219

220
        // Notifier is used for receiving notifications of incoming blocks.
221
        // With each new incoming block found we process previously premature
222
        // announcements.
223
        //
224
        // TODO(roasbeef): could possibly just replace this with an epoch
225
        // channel.
226
        Notifier chainntnfs.ChainNotifier
227

228
        // Broadcast broadcasts a particular set of announcements to all peers
229
        // that the daemon is connected to. If supplied, the exclude parameter
230
        // indicates that the target peer should be excluded from the
231
        // broadcast.
232
        Broadcast func(skips map[route.Vertex]struct{},
233
                msg ...lnwire.Message) error
234

235
        // NotifyWhenOnline is a function that allows the gossiper to be
236
        // notified when a certain peer comes online, allowing it to
237
        // retry sending a peer message.
238
        //
239
        // NOTE: The peerChan channel must be buffered.
240
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
241

242
        // NotifyWhenOffline is a function that allows the gossiper to be
243
        // notified when a certain peer disconnects, allowing it to request a
244
        // notification for when it reconnects.
245
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
246

247
        // FetchSelfAnnouncement retrieves our current node announcement, for
248
        // use when determining whether we should update our peers about our
249
        // presence in the network.
250
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
251

252
        // UpdateSelfAnnouncement produces a new announcement for our node with
253
        // an updated timestamp which can be broadcast to our peers.
254
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
255

256
        // ProofMatureDelta the number of confirmations which is needed before
257
        // exchange the channel announcement proofs.
258
        ProofMatureDelta uint32
259

260
        // TrickleDelay the period of trickle timer which flushes to the
261
        // network the pending batch of new announcements we've received since
262
        // the last trickle tick.
263
        TrickleDelay time.Duration
264

265
        // RetransmitTicker is a ticker that ticks with a period which
266
        // indicates that we should check if we need re-broadcast any of our
267
        // personal channels.
268
        RetransmitTicker ticker.Ticker
269

270
        // RebroadcastInterval is the maximum time we wait between sending out
271
        // channel updates for our active channels and our own node
272
        // announcement. We do this to ensure our active presence on the
273
        // network is known, and we are not being considered a zombie node or
274
        // having zombie channels.
275
        RebroadcastInterval time.Duration
276

277
        // WaitingProofStore is a persistent storage of partial channel proof
278
        // announcement messages. We use it to buffer half of the material
279
        // needed to reconstruct a full authenticated channel announcement.
280
        // Once we receive the other half the channel proof, we'll be able to
281
        // properly validate it and re-broadcast it out to the network.
282
        //
283
        // TODO(wilmer): make interface to prevent channeldb dependency.
284
        WaitingProofStore *channeldb.WaitingProofStore
285

286
        // MessageStore is a persistent storage of gossip messages which we will
287
        // use to determine which messages need to be resent for a given peer.
288
        MessageStore GossipMessageStore
289

290
        // AnnSigner is an instance of the MessageSigner interface which will
291
        // be used to manually sign any outgoing channel updates. The signer
292
        // implementation should be backed by the public key of the backing
293
        // Lightning node.
294
        //
295
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
296
        // here?
297
        AnnSigner lnwallet.MessageSigner
298

299
        // ScidCloser is an instance of ClosedChannelTracker that helps the
300
        // gossiper cut down on spam channel announcements for already closed
301
        // channels.
302
        ScidCloser ClosedChannelTracker
303

304
        // NumActiveSyncers is the number of peers for which we should have
305
        // active syncers with. After reaching NumActiveSyncers, any future
306
        // gossip syncers will be passive.
307
        NumActiveSyncers int
308

309
        // NoTimestampQueries will prevent the GossipSyncer from querying
310
        // timestamps of announcement messages from the peer and from replying
311
        // to timestamp queries.
312
        NoTimestampQueries bool
313

314
        // RotateTicker is a ticker responsible for notifying the SyncManager
315
        // when it should rotate its active syncers. A single active syncer with
316
        // a chansSynced state will be exchanged for a passive syncer in order
317
        // to ensure we don't keep syncing with the same peers.
318
        RotateTicker ticker.Ticker
319

320
        // HistoricalSyncTicker is a ticker responsible for notifying the
321
        // syncManager when it should attempt a historical sync with a gossip
322
        // sync peer.
323
        HistoricalSyncTicker ticker.Ticker
324

325
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
326
        // syncManager when it should attempt to start the next pending
327
        // activeSyncer due to the current one not completing its state machine
328
        // within the timeout.
329
        ActiveSyncerTimeoutTicker ticker.Ticker
330

331
        // MinimumBatchSize is minimum size of a sub batch of announcement
332
        // messages.
333
        MinimumBatchSize int
334

335
        // SubBatchDelay is the delay between sending sub batches of
336
        // gossip messages.
337
        SubBatchDelay time.Duration
338

339
        // IgnoreHistoricalFilters will prevent syncers from replying with
340
        // historical data when the remote peer sets a gossip_timestamp_range.
341
        // This prevents ranges with old start times from causing us to dump the
342
        // graph on connect.
343
        IgnoreHistoricalFilters bool
344

345
        // PinnedSyncers is a set of peers that will always transition to
346
        // ActiveSync upon connection. These peers will never transition to
347
        // PassiveSync.
348
        PinnedSyncers PinnedSyncers
349

350
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
351
        // specific channel and direction that we'll accept over an interval.
352
        MaxChannelUpdateBurst int
353

354
        // ChannelUpdateInterval specifies the interval we'll use to determine
355
        // how often we should allow a new update for a specific channel and
356
        // direction.
357
        ChannelUpdateInterval time.Duration
358

359
        // IsAlias returns true if a given ShortChannelID is an alias for
360
        // option_scid_alias channels.
361
        IsAlias func(scid lnwire.ShortChannelID) bool
362

363
        // SignAliasUpdate is used to re-sign a channel update using the
364
        // remote's alias if the option-scid-alias feature bit was negotiated.
365
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
366
                error)
367

368
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
369
        // This is used for channels that have negotiated the option-scid-alias
370
        // feature bit.
371
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
372
                lnwire.ShortChannelID, error)
373

374
        // GetAlias allows the gossiper to look up the peer's alias for a given
375
        // ChannelID. This is used to sign updates for them if the channel has
376
        // no AuthProof and the option-scid-alias feature bit was negotiated.
377
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
378

379
        // FindChannel allows the gossiper to find a channel that we're party
380
        // to without iterating over the entire set of open channels.
381
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
382
                *channeldb.OpenChannel, error)
383

384
        // IsStillZombieChannel takes the timestamps of the latest channel
385
        // updates for a channel and returns true if the channel should be
386
        // considered a zombie based on these timestamps.
387
        IsStillZombieChannel func(time.Time, time.Time) bool
388

389
        // AssumeChannelValid toggles whether the gossiper will check for
390
        // spent-ness of channel outpoints. For neutrino, this saves long
391
        // rescans from blocking initial usage of the daemon.
392
        AssumeChannelValid bool
393
}
394

395
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
396
// used to let the caller of the lru.Cache know if a message has already been
397
// processed or not.
398
type processedNetworkMsg struct {
399
        processed bool
400
        msg       *networkMsg
401
}
402

403
// cachedNetworkMsg is a wrapper around a network message that can be used with
404
// *lru.Cache.
405
type cachedNetworkMsg struct {
406
        msgs []*processedNetworkMsg
407
}
408

409
// Size returns the "size" of an entry. We return the number of items as we
410
// just want to limit the total amount of entries rather than do accurate size
411
// accounting.
412
func (c *cachedNetworkMsg) Size() (uint64, error) {
3✔
413
        return uint64(len(c.msgs)), nil
3✔
414
}
3✔
415

416
// rejectCacheKey is the cache key that we'll use to track announcements we've
417
// recently rejected.
418
type rejectCacheKey struct {
419
        pubkey [33]byte
420
        chanID uint64
421
}
422

423
// newRejectCacheKey returns a new cache key for the reject cache.
424
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
3✔
425
        k := rejectCacheKey{
3✔
426
                chanID: cid,
3✔
427
                pubkey: pub,
3✔
428
        }
3✔
429

3✔
430
        return k
3✔
431
}
3✔
432

433
// sourceToPub returns a serialized-compressed public key for use in the reject
434
// cache.
435
func sourceToPub(pk *btcec.PublicKey) [33]byte {
3✔
436
        var pub [33]byte
3✔
437
        copy(pub[:], pk.SerializeCompressed())
3✔
438
        return pub
3✔
439
}
3✔
440

441
// cachedReject is the empty value used to track the value for rejects.
442
type cachedReject struct {
443
}
444

445
// Size returns the "size" of an entry. We return 1 as we just want to limit
446
// the total size.
UNCOV
447
func (c *cachedReject) Size() (uint64, error) {
×
UNCOV
448
        return 1, nil
×
UNCOV
449
}
×
450

451
// AuthenticatedGossiper is a subsystem which is responsible for receiving
452
// announcements, validating them and applying the changes to router, syncing
453
// lightning network with newly connected nodes, broadcasting announcements
454
// after validation, negotiating the channel announcement proofs exchange and
455
// handling the premature announcements. All outgoing announcements are
456
// expected to be properly signed as dictated in BOLT#7, additionally, all
457
// incoming message are expected to be well formed and signed. Invalid messages
458
// will be rejected by this struct.
459
type AuthenticatedGossiper struct {
460
        // Parameters which are needed to properly handle the start and stop of
461
        // the service.
462
        started sync.Once
463
        stopped sync.Once
464

465
        // bestHeight is the height of the block at the tip of the main chain
466
        // as we know it. Accesses *MUST* be done with the gossiper's lock
467
        // held.
468
        bestHeight uint32
469

470
        quit chan struct{}
471
        wg   sync.WaitGroup
472

473
        // cfg is a copy of the configuration struct that the gossiper service
474
        // was initialized with.
475
        cfg *Config
476

477
        // blockEpochs encapsulates a stream of block epochs that are sent at
478
        // every new block height.
479
        blockEpochs *chainntnfs.BlockEpochEvent
480

481
        // prematureChannelUpdates is a map of ChannelUpdates we have received
482
        // that wasn't associated with any channel we know about.  We store
483
        // them temporarily, such that we can reprocess them when a
484
        // ChannelAnnouncement for the channel is received.
485
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
486

487
        // banman tracks our peer's ban status.
488
        banman *banman
489

490
        // networkMsgs is a channel that carries new network broadcasted
491
        // message from outside the gossiper service to be processed by the
492
        // networkHandler.
493
        networkMsgs chan *networkMsg
494

495
        // futureMsgs is a list of premature network messages that have a block
496
        // height specified in the future. We will save them and resend it to
497
        // the chan networkMsgs once the block height has reached. The cached
498
        // map format is,
499
        //   {msgID1: msg1, msgID2: msg2, ...}
500
        futureMsgs *futureMsgCache
501

502
        // chanPolicyUpdates is a channel that requests to update the
503
        // forwarding policy of a set of channels is sent over.
504
        chanPolicyUpdates chan *chanPolicyUpdateRequest
505

506
        // selfKey is the identity public key of the backing Lightning node.
507
        selfKey *btcec.PublicKey
508

509
        // selfKeyLoc is the locator for the identity public key of the backing
510
        // Lightning node.
511
        selfKeyLoc keychain.KeyLocator
512

513
        // channelMtx is used to restrict the database access to one
514
        // goroutine per channel ID. This is done to ensure that when
515
        // the gossiper is handling an announcement, the db state stays
516
        // consistent between when the DB is first read until it's written.
517
        channelMtx *multimutex.Mutex[uint64]
518

519
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
520

521
        // syncMgr is a subsystem responsible for managing the gossip syncers
522
        // for peers currently connected. When a new peer is connected, the
523
        // manager will create its accompanying gossip syncer and determine
524
        // whether it should have an activeSync or passiveSync sync type based
525
        // on how many other gossip syncers are currently active. Any activeSync
526
        // gossip syncers are started in a round-robin manner to ensure we're
527
        // not syncing with multiple peers at the same time.
528
        syncMgr *SyncManager
529

530
        // reliableSender is a subsystem responsible for handling reliable
531
        // message send requests to peers. This should only be used for channels
532
        // that are unadvertised at the time of handling the message since if it
533
        // is advertised, then peers should be able to get the message from the
534
        // network.
535
        reliableSender *reliableSender
536

537
        // chanUpdateRateLimiter contains rate limiters for each direction of
538
        // a channel update we've processed. We'll use these to determine
539
        // whether we should accept a new update for a specific channel and
540
        // direction.
541
        //
542
        // NOTE: This map must be synchronized with the main
543
        // AuthenticatedGossiper lock.
544
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
545

546
        // vb is used to enforce job dependency ordering of gossip messages.
547
        vb *ValidationBarrier
548

549
        sync.Mutex
550
}
551

552
// New creates a new AuthenticatedGossiper instance, initialized with the
553
// passed configuration parameters.
554
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
3✔
555
        gossiper := &AuthenticatedGossiper{
3✔
556
                selfKey:           selfKeyDesc.PubKey,
3✔
557
                selfKeyLoc:        selfKeyDesc.KeyLocator,
3✔
558
                cfg:               &cfg,
3✔
559
                networkMsgs:       make(chan *networkMsg),
3✔
560
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
3✔
561
                quit:              make(chan struct{}),
3✔
562
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
3✔
563
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
3✔
564
                        maxPrematureUpdates,
3✔
565
                ),
3✔
566
                channelMtx: multimutex.NewMutex[uint64](),
3✔
567
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
3✔
568
                        maxRejectedUpdates,
3✔
569
                ),
3✔
570
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
3✔
571
                banman:                newBanman(),
3✔
572
        }
3✔
573

3✔
574
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
3✔
575

3✔
576
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
3✔
577
                ChainHash:               cfg.ChainHash,
3✔
578
                ChanSeries:              cfg.ChanSeries,
3✔
579
                RotateTicker:            cfg.RotateTicker,
3✔
580
                HistoricalSyncTicker:    cfg.HistoricalSyncTicker,
3✔
581
                NumActiveSyncers:        cfg.NumActiveSyncers,
3✔
582
                NoTimestampQueries:      cfg.NoTimestampQueries,
3✔
583
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
3✔
584
                BestHeight:              gossiper.latestHeight,
3✔
585
                PinnedSyncers:           cfg.PinnedSyncers,
3✔
586
                IsStillZombieChannel:    cfg.IsStillZombieChannel,
3✔
587
        })
3✔
588

3✔
589
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
3✔
590
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
3✔
591
                NotifyWhenOffline: cfg.NotifyWhenOffline,
3✔
592
                MessageStore:      cfg.MessageStore,
3✔
593
                IsMsgStale:        gossiper.isMsgStale,
3✔
594
        })
3✔
595

3✔
596
        return gossiper
3✔
597
}
3✔
598

599
// EdgeWithInfo contains the information that is required to update an edge.
600
type EdgeWithInfo struct {
601
        // Info describes the channel.
602
        Info *models.ChannelEdgeInfo
603

604
        // Edge describes the policy in one direction of the channel.
605
        Edge *models.ChannelEdgePolicy
606
}
607

608
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
609
// specified edge updates. Updates are done in two stages: first, the
610
// AuthenticatedGossiper ensures the update has been committed by dependent
611
// sub-systems, then it signs and broadcasts new updates to the network. A
612
// mapping between outpoints and updated channel policies is returned, which is
613
// used to update the forwarding policies of the underlying links.
614
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
615
        edgesToUpdate []EdgeWithInfo) error {
3✔
616

3✔
617
        errChan := make(chan error, 1)
3✔
618
        policyUpdate := &chanPolicyUpdateRequest{
3✔
619
                edgesToUpdate: edgesToUpdate,
3✔
620
                errChan:       errChan,
3✔
621
        }
3✔
622

3✔
623
        select {
3✔
624
        case d.chanPolicyUpdates <- policyUpdate:
3✔
625
                err := <-errChan
3✔
626
                return err
3✔
627
        case <-d.quit:
×
628
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
629
        }
630
}
631

632
// Start spawns network messages handler goroutine and registers on new block
633
// notifications in order to properly handle the premature announcements.
634
func (d *AuthenticatedGossiper) Start() error {
3✔
635
        var err error
3✔
636
        d.started.Do(func() {
6✔
637
                log.Info("Authenticated Gossiper starting")
3✔
638
                err = d.start()
3✔
639
        })
3✔
640
        return err
3✔
641
}
642

643
func (d *AuthenticatedGossiper) start() error {
3✔
644
        // First we register for new notifications of newly discovered blocks.
3✔
645
        // We do this immediately so we'll later be able to consume any/all
3✔
646
        // blocks which were discovered.
3✔
647
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
3✔
648
        if err != nil {
3✔
649
                return err
×
650
        }
×
651
        d.blockEpochs = blockEpochs
3✔
652

3✔
653
        height, err := d.cfg.Graph.CurrentBlockHeight()
3✔
654
        if err != nil {
3✔
655
                return err
×
656
        }
×
657
        d.bestHeight = height
3✔
658

3✔
659
        // Start the reliable sender. In case we had any pending messages ready
3✔
660
        // to be sent when the gossiper was last shut down, we must continue on
3✔
661
        // our quest to deliver them to their respective peers.
3✔
662
        if err := d.reliableSender.Start(); err != nil {
3✔
663
                return err
×
664
        }
×
665

666
        d.syncMgr.Start()
3✔
667

3✔
668
        d.banman.start()
3✔
669

3✔
670
        // Start receiving blocks in its dedicated goroutine.
3✔
671
        d.wg.Add(2)
3✔
672
        go d.syncBlockHeight()
3✔
673
        go d.networkHandler()
3✔
674

3✔
675
        return nil
3✔
676
}
677

678
// syncBlockHeight syncs the best block height for the gossiper by reading
679
// blockEpochs.
680
//
681
// NOTE: must be run as a goroutine.
682
func (d *AuthenticatedGossiper) syncBlockHeight() {
3✔
683
        defer d.wg.Done()
3✔
684

3✔
685
        for {
6✔
686
                select {
3✔
687
                // A new block has arrived, so we can re-process the previously
688
                // premature announcements.
689
                case newBlock, ok := <-d.blockEpochs.Epochs:
3✔
690
                        // If the channel has been closed, then this indicates
3✔
691
                        // the daemon is shutting down, so we exit ourselves.
3✔
692
                        if !ok {
6✔
693
                                return
3✔
694
                        }
3✔
695

696
                        // Once a new block arrives, we update our running
697
                        // track of the height of the chain tip.
698
                        d.Lock()
3✔
699
                        blockHeight := uint32(newBlock.Height)
3✔
700
                        d.bestHeight = blockHeight
3✔
701
                        d.Unlock()
3✔
702

3✔
703
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
3✔
704
                                newBlock.Hash)
3✔
705

3✔
706
                        // Resend future messages, if any.
3✔
707
                        d.resendFutureMessages(blockHeight)
3✔
708

UNCOV
709
                case <-d.quit:
×
UNCOV
710
                        return
×
711
                }
712
        }
713
}
714

715
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
716
// the unique ID when saving the message.
717
type futureMsgCache struct {
718
        *lru.Cache[uint64, *cachedFutureMsg]
719

720
        // msgID is a monotonically increased integer.
721
        msgID atomic.Uint64
722
}
723

724
// nextMsgID returns a unique message ID.
725
func (f *futureMsgCache) nextMsgID() uint64 {
3✔
726
        return f.msgID.Add(1)
3✔
727
}
3✔
728

729
// newFutureMsgCache creates a new future message cache with the underlying lru
730
// cache being initialized with the specified capacity.
731
func newFutureMsgCache(capacity uint64) *futureMsgCache {
3✔
732
        // Create a new cache.
3✔
733
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
3✔
734

3✔
735
        return &futureMsgCache{
3✔
736
                Cache: cache,
3✔
737
        }
3✔
738
}
3✔
739

740
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
741
type cachedFutureMsg struct {
742
        // msg is the network message.
743
        msg *networkMsg
744

745
        // height is the block height.
746
        height uint32
747
}
748

749
// Size returns the size of the message.
750
func (c *cachedFutureMsg) Size() (uint64, error) {
3✔
751
        // Return a constant 1.
3✔
752
        return 1, nil
3✔
753
}
3✔
754

755
// resendFutureMessages takes a block height, resends all the future messages
756
// found below and equal to that height and deletes those messages found in the
757
// gossiper's futureMsgs.
758
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
3✔
759
        var (
3✔
760
                // msgs are the target messages.
3✔
761
                msgs []*networkMsg
3✔
762

3✔
763
                // keys are the target messages' caching keys.
3✔
764
                keys []uint64
3✔
765
        )
3✔
766

3✔
767
        // filterMsgs is the visitor used when iterating the future cache.
3✔
768
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
6✔
769
                if cmsg.height <= height {
6✔
770
                        msgs = append(msgs, cmsg.msg)
3✔
771
                        keys = append(keys, k)
3✔
772
                }
3✔
773

774
                return true
3✔
775
        }
776

777
        // Filter out the target messages.
778
        d.futureMsgs.Range(filterMsgs)
3✔
779

3✔
780
        // Return early if no messages found.
3✔
781
        if len(msgs) == 0 {
6✔
782
                return
3✔
783
        }
3✔
784

785
        // Remove the filtered messages.
786
        for _, key := range keys {
6✔
787
                d.futureMsgs.Delete(key)
3✔
788
        }
3✔
789

790
        log.Debugf("Resending %d network messages at height %d",
3✔
791
                len(msgs), height)
3✔
792

3✔
793
        for _, msg := range msgs {
6✔
794
                select {
3✔
795
                case d.networkMsgs <- msg:
3✔
796
                case <-d.quit:
×
797
                        msg.err <- ErrGossiperShuttingDown
×
798
                }
799
        }
800
}
801

802
// Stop signals any active goroutines for a graceful closure.
803
func (d *AuthenticatedGossiper) Stop() error {
3✔
804
        d.stopped.Do(func() {
6✔
805
                log.Info("Authenticated gossiper shutting down...")
3✔
806
                defer log.Debug("Authenticated gossiper shutdown complete")
3✔
807

3✔
808
                d.stop()
3✔
809
        })
3✔
810
        return nil
3✔
811
}
812

813
func (d *AuthenticatedGossiper) stop() {
3✔
814
        log.Debug("Authenticated Gossiper is stopping")
3✔
815
        defer log.Debug("Authenticated Gossiper stopped")
3✔
816

3✔
817
        // `blockEpochs` is only initialized in the start routine so we make
3✔
818
        // sure we don't panic here in the case where the `Stop` method is
3✔
819
        // called when the `Start` method does not complete.
3✔
820
        if d.blockEpochs != nil {
6✔
821
                d.blockEpochs.Cancel()
3✔
822
        }
3✔
823

824
        d.syncMgr.Stop()
3✔
825

3✔
826
        d.banman.stop()
3✔
827

3✔
828
        close(d.quit)
3✔
829
        d.wg.Wait()
3✔
830

3✔
831
        // We'll stop our reliable sender after all of the gossiper's goroutines
3✔
832
        // have exited to ensure nothing can cause it to continue executing.
3✔
833
        d.reliableSender.Stop()
3✔
834
}
835

836
// TODO(roasbeef): need method to get current gossip timestamp?
837
//  * using mtx, check time rotate forward is needed?
838

839
// ProcessRemoteAnnouncement sends a new remote announcement message along with
840
// the peer that sent the routing message. The announcement will be processed
841
// then added to a queue for batched trickled announcement to all connected
842
// peers.  Remote channel announcements should contain the announcement proof
843
// and be fully validated.
844
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
845
        peer lnpeer.Peer) chan error {
3✔
846

3✔
847
        log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey())
3✔
848

3✔
849
        errChan := make(chan error, 1)
3✔
850

3✔
851
        // For messages in the known set of channel series queries, we'll
3✔
852
        // dispatch the message directly to the GossipSyncer, and skip the main
3✔
853
        // processing loop.
3✔
854
        switch m := msg.(type) {
3✔
855
        case *lnwire.QueryShortChanIDs,
856
                *lnwire.QueryChannelRange,
857
                *lnwire.ReplyChannelRange,
858
                *lnwire.ReplyShortChanIDsEnd:
3✔
859

3✔
860
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
861
                if !ok {
3✔
862
                        log.Warnf("Gossip syncer for peer=%x not found",
×
863
                                peer.PubKey())
×
864

×
865
                        errChan <- ErrGossipSyncerNotFound
×
866
                        return errChan
×
867
                }
×
868

869
                // If we've found the message target, then we'll dispatch the
870
                // message directly to it.
871
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
3✔
872
                if err != nil {
3✔
873
                        log.Errorf("Process query msg from peer %x got %v",
×
874
                                peer.PubKey(), err)
×
875
                }
×
876

877
                errChan <- err
3✔
878
                return errChan
3✔
879

880
        // If a peer is updating its current update horizon, then we'll dispatch
881
        // that directly to the proper GossipSyncer.
882
        case *lnwire.GossipTimestampRange:
3✔
883
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
884
                if !ok {
3✔
885
                        log.Warnf("Gossip syncer for peer=%x not found",
×
886
                                peer.PubKey())
×
887

×
888
                        errChan <- ErrGossipSyncerNotFound
×
889
                        return errChan
×
890
                }
×
891

892
                // If we've found the message target, then we'll dispatch the
893
                // message directly to it.
894
                if err := syncer.ApplyGossipFilter(m); err != nil {
3✔
895
                        log.Warnf("Unable to apply gossip filter for peer=%x: "+
×
896
                                "%v", peer.PubKey(), err)
×
897

×
898
                        errChan <- err
×
899
                        return errChan
×
900
                }
×
901

902
                errChan <- nil
3✔
903
                return errChan
3✔
904

905
        // To avoid inserting edges in the graph for our own channels that we
906
        // have already closed, we ignore such channel announcements coming
907
        // from the remote.
908
        case *lnwire.ChannelAnnouncement1:
3✔
909
                ownKey := d.selfKey.SerializeCompressed()
3✔
910
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
3✔
911
                        "for own channel")
3✔
912

3✔
913
                if bytes.Equal(m.NodeID1[:], ownKey) ||
3✔
914
                        bytes.Equal(m.NodeID2[:], ownKey) {
6✔
915

3✔
916
                        log.Warn(ownErr)
3✔
917
                        errChan <- ownErr
3✔
918
                        return errChan
3✔
919
                }
3✔
920
        }
921

922
        nMsg := &networkMsg{
3✔
923
                msg:      msg,
3✔
924
                isRemote: true,
3✔
925
                peer:     peer,
3✔
926
                source:   peer.IdentityKey(),
3✔
927
                err:      errChan,
3✔
928
        }
3✔
929

3✔
930
        select {
3✔
931
        case d.networkMsgs <- nMsg:
3✔
932

933
        // If the peer that sent us this error is quitting, then we don't need
934
        // to send back an error and can return immediately.
935
        case <-peer.QuitSignal():
×
936
                return nil
×
937
        case <-d.quit:
×
938
                nMsg.err <- ErrGossiperShuttingDown
×
939
        }
940

941
        return nMsg.err
3✔
942
}
943

944
// ProcessLocalAnnouncement sends a new remote announcement message along with
945
// the peer that sent the routing message. The announcement will be processed
946
// then added to a queue for batched trickled announcement to all connected
947
// peers.  Local channel announcements don't contain the announcement proof and
948
// will not be fully validated. Once the channel proofs are received, the
949
// entire channel announcement and update messages will be re-constructed and
950
// broadcast to the rest of the network.
951
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
952
        optionalFields ...OptionalMsgField) chan error {
3✔
953

3✔
954
        optionalMsgFields := &optionalMsgFields{}
3✔
955
        optionalMsgFields.apply(optionalFields...)
3✔
956

3✔
957
        nMsg := &networkMsg{
3✔
958
                msg:               msg,
3✔
959
                optionalMsgFields: optionalMsgFields,
3✔
960
                isRemote:          false,
3✔
961
                source:            d.selfKey,
3✔
962
                err:               make(chan error, 1),
3✔
963
        }
3✔
964

3✔
965
        select {
3✔
966
        case d.networkMsgs <- nMsg:
3✔
967
        case <-d.quit:
×
968
                nMsg.err <- ErrGossiperShuttingDown
×
969
        }
970

971
        return nMsg.err
3✔
972
}
973

974
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
975
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
976
// tuple.
977
type channelUpdateID struct {
978
        // channelID represents the set of data which is needed to
979
        // retrieve all necessary data to validate the channel existence.
980
        channelID lnwire.ShortChannelID
981

982
        // Flags least-significant bit must be set to 0 if the creating node
983
        // corresponds to the first node in the previously sent channel
984
        // announcement and 1 otherwise.
985
        flags lnwire.ChanUpdateChanFlags
986
}
987

988
// msgWithSenders is a wrapper struct around a message, and the set of peers
989
// that originally sent us this message. Using this struct, we can ensure that
990
// we don't re-send a message to the peer that sent it to us in the first
991
// place.
992
type msgWithSenders struct {
993
        // msg is the wire message itself.
994
        msg lnwire.Message
995

996
        // isLocal is true if this was a message that originated locally. We'll
997
        // use this to bypass our normal checks to ensure we prioritize sending
998
        // out our own updates.
999
        isLocal bool
1000

1001
        // sender is the set of peers that sent us this message.
1002
        senders map[route.Vertex]struct{}
1003
}
1004

1005
// mergeSyncerMap is used to merge the set of senders of a particular message
1006
// with peers that we have an active GossipSyncer with. We do this to ensure
1007
// that we don't broadcast messages to any peers that we have active gossip
1008
// syncers for.
1009
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
3✔
1010
        for peerPub := range syncers {
6✔
1011
                m.senders[peerPub] = struct{}{}
3✔
1012
        }
3✔
1013
}
1014

1015
// deDupedAnnouncements de-duplicates announcements that have been added to the
1016
// batch. Internally, announcements are stored in three maps
1017
// (one each for channel announcements, channel updates, and node
1018
// announcements). These maps keep track of unique announcements and ensure no
1019
// announcements are duplicated. We keep the three message types separate, such
1020
// that we can send channel announcements first, then channel updates, and
1021
// finally node announcements when it's time to broadcast them.
1022
type deDupedAnnouncements struct {
1023
        // channelAnnouncements are identified by the short channel id field.
1024
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
1025

1026
        // channelUpdates are identified by the channel update id field.
1027
        channelUpdates map[channelUpdateID]msgWithSenders
1028

1029
        // nodeAnnouncements are identified by the Vertex field.
1030
        nodeAnnouncements map[route.Vertex]msgWithSenders
1031

1032
        sync.Mutex
1033
}
1034

1035
// Reset operates on deDupedAnnouncements to reset the storage of
1036
// announcements.
1037
func (d *deDupedAnnouncements) Reset() {
3✔
1038
        d.Lock()
3✔
1039
        defer d.Unlock()
3✔
1040

3✔
1041
        d.reset()
3✔
1042
}
3✔
1043

1044
// reset is the private version of the Reset method. We have this so we can
1045
// call this method within method that are already holding the lock.
1046
func (d *deDupedAnnouncements) reset() {
3✔
1047
        // Storage of each type of announcement (channel announcements, channel
3✔
1048
        // updates, node announcements) is set to an empty map where the
3✔
1049
        // appropriate key points to the corresponding lnwire.Message.
3✔
1050
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
3✔
1051
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
3✔
1052
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
3✔
1053
}
3✔
1054

1055
// addMsg adds a new message to the current batch. If the message is already
1056
// present in the current batch, then this new instance replaces the latter,
1057
// and the set of senders is updated to reflect which node sent us this
1058
// message.
1059
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
3✔
1060
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
3✔
1061

3✔
1062
        // Depending on the message type (channel announcement, channel update,
3✔
1063
        // or node announcement), the message is added to the corresponding map
3✔
1064
        // in deDupedAnnouncements. Because each identifying key can have at
3✔
1065
        // most one value, the announcements are de-duplicated, with newer ones
3✔
1066
        // replacing older ones.
3✔
1067
        switch msg := message.msg.(type) {
3✔
1068

1069
        // Channel announcements are identified by the short channel id field.
1070
        case *lnwire.ChannelAnnouncement1:
3✔
1071
                deDupKey := msg.ShortChannelID
3✔
1072
                sender := route.NewVertex(message.source)
3✔
1073

3✔
1074
                mws, ok := d.channelAnnouncements[deDupKey]
3✔
1075
                if !ok {
6✔
1076
                        mws = msgWithSenders{
3✔
1077
                                msg:     msg,
3✔
1078
                                isLocal: !message.isRemote,
3✔
1079
                                senders: make(map[route.Vertex]struct{}),
3✔
1080
                        }
3✔
1081
                        mws.senders[sender] = struct{}{}
3✔
1082

3✔
1083
                        d.channelAnnouncements[deDupKey] = mws
3✔
1084

3✔
1085
                        return
3✔
1086
                }
3✔
1087

UNCOV
1088
                mws.msg = msg
×
UNCOV
1089
                mws.senders[sender] = struct{}{}
×
UNCOV
1090
                d.channelAnnouncements[deDupKey] = mws
×
1091

1092
        // Channel updates are identified by the (short channel id,
1093
        // channelflags) tuple.
1094
        case *lnwire.ChannelUpdate1:
3✔
1095
                sender := route.NewVertex(message.source)
3✔
1096
                deDupKey := channelUpdateID{
3✔
1097
                        msg.ShortChannelID,
3✔
1098
                        msg.ChannelFlags,
3✔
1099
                }
3✔
1100

3✔
1101
                oldTimestamp := uint32(0)
3✔
1102
                mws, ok := d.channelUpdates[deDupKey]
3✔
1103
                if ok {
3✔
UNCOV
1104
                        // If we already have seen this message, record its
×
UNCOV
1105
                        // timestamp.
×
UNCOV
1106
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
×
UNCOV
1107
                        if !ok {
×
1108
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1109
                                        "got: %T", mws.msg)
×
1110

×
1111
                                return
×
1112
                        }
×
1113

UNCOV
1114
                        oldTimestamp = update.Timestamp
×
1115
                }
1116

1117
                // If we already had this message with a strictly newer
1118
                // timestamp, then we'll just discard the message we got.
1119
                if oldTimestamp > msg.Timestamp {
3✔
UNCOV
1120
                        log.Debugf("Ignored outdated network message: "+
×
UNCOV
1121
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
×
UNCOV
1122
                        return
×
UNCOV
1123
                }
×
1124

1125
                // If the message we just got is newer than what we previously
1126
                // have seen, or this is the first time we see it, then we'll
1127
                // add it to our map of announcements.
1128
                if oldTimestamp < msg.Timestamp {
6✔
1129
                        mws = msgWithSenders{
3✔
1130
                                msg:     msg,
3✔
1131
                                isLocal: !message.isRemote,
3✔
1132
                                senders: make(map[route.Vertex]struct{}),
3✔
1133
                        }
3✔
1134

3✔
1135
                        // We'll mark the sender of the message in the
3✔
1136
                        // senders map.
3✔
1137
                        mws.senders[sender] = struct{}{}
3✔
1138

3✔
1139
                        d.channelUpdates[deDupKey] = mws
3✔
1140

3✔
1141
                        return
3✔
1142
                }
3✔
1143

1144
                // Lastly, if we had seen this exact message from before, with
1145
                // the same timestamp, we'll add the sender to the map of
1146
                // senders, such that we can skip sending this message back in
1147
                // the next batch.
UNCOV
1148
                mws.msg = msg
×
UNCOV
1149
                mws.senders[sender] = struct{}{}
×
UNCOV
1150
                d.channelUpdates[deDupKey] = mws
×
1151

1152
        // Node announcements are identified by the Vertex field.  Use the
1153
        // NodeID to create the corresponding Vertex.
1154
        case *lnwire.NodeAnnouncement:
3✔
1155
                sender := route.NewVertex(message.source)
3✔
1156
                deDupKey := route.Vertex(msg.NodeID)
3✔
1157

3✔
1158
                // We do the same for node announcements as we did for channel
3✔
1159
                // updates, as they also carry a timestamp.
3✔
1160
                oldTimestamp := uint32(0)
3✔
1161
                mws, ok := d.nodeAnnouncements[deDupKey]
3✔
1162
                if ok {
6✔
1163
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
3✔
1164
                }
3✔
1165

1166
                // Discard the message if it's old.
1167
                if oldTimestamp > msg.Timestamp {
6✔
1168
                        return
3✔
1169
                }
3✔
1170

1171
                // Replace if it's newer.
1172
                if oldTimestamp < msg.Timestamp {
6✔
1173
                        mws = msgWithSenders{
3✔
1174
                                msg:     msg,
3✔
1175
                                isLocal: !message.isRemote,
3✔
1176
                                senders: make(map[route.Vertex]struct{}),
3✔
1177
                        }
3✔
1178

3✔
1179
                        mws.senders[sender] = struct{}{}
3✔
1180

3✔
1181
                        d.nodeAnnouncements[deDupKey] = mws
3✔
1182

3✔
1183
                        return
3✔
1184
                }
3✔
1185

1186
                // Add to senders map if it's the same as we had.
1187
                mws.msg = msg
3✔
1188
                mws.senders[sender] = struct{}{}
3✔
1189
                d.nodeAnnouncements[deDupKey] = mws
3✔
1190
        }
1191
}
1192

1193
// AddMsgs is a helper method to add multiple messages to the announcement
1194
// batch.
1195
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
3✔
1196
        d.Lock()
3✔
1197
        defer d.Unlock()
3✔
1198

3✔
1199
        for _, msg := range msgs {
6✔
1200
                d.addMsg(msg)
3✔
1201
        }
3✔
1202
}
1203

1204
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1205
// to broadcast next into messages that are locally sourced and those that are
1206
// sourced remotely.
1207
type msgsToBroadcast struct {
1208
        // localMsgs is the set of messages we created locally.
1209
        localMsgs []msgWithSenders
1210

1211
        // remoteMsgs is the set of messages that we received from a remote
1212
        // party.
1213
        remoteMsgs []msgWithSenders
1214
}
1215

1216
// addMsg adds a new message to the appropriate sub-slice.
1217
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
3✔
1218
        if msg.isLocal {
6✔
1219
                m.localMsgs = append(m.localMsgs, msg)
3✔
1220
        } else {
6✔
1221
                m.remoteMsgs = append(m.remoteMsgs, msg)
3✔
1222
        }
3✔
1223
}
1224

1225
// isEmpty returns true if the batch is empty.
1226
func (m *msgsToBroadcast) isEmpty() bool {
3✔
1227
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
3✔
1228
}
3✔
1229

1230
// length returns the length of the combined message set.
UNCOV
1231
func (m *msgsToBroadcast) length() int {
×
UNCOV
1232
        return len(m.localMsgs) + len(m.remoteMsgs)
×
UNCOV
1233
}
×
1234

1235
// Emit returns the set of de-duplicated announcements to be sent out during
1236
// the next announcement epoch, in the order of channel announcements, channel
1237
// updates, and node announcements. Each message emitted, contains the set of
1238
// peers that sent us the message. This way, we can ensure that we don't waste
1239
// bandwidth by re-sending a message to the peer that sent it to us in the
1240
// first place. Additionally, the set of stored messages are reset.
1241
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
3✔
1242
        d.Lock()
3✔
1243
        defer d.Unlock()
3✔
1244

3✔
1245
        // Get the total number of announcements.
3✔
1246
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
3✔
1247
                len(d.nodeAnnouncements)
3✔
1248

3✔
1249
        // Create an empty array of lnwire.Messages with a length equal to
3✔
1250
        // the total number of announcements.
3✔
1251
        msgs := msgsToBroadcast{
3✔
1252
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
3✔
1253
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
3✔
1254
        }
3✔
1255

3✔
1256
        // Add the channel announcements to the array first.
3✔
1257
        for _, message := range d.channelAnnouncements {
6✔
1258
                msgs.addMsg(message)
3✔
1259
        }
3✔
1260

1261
        // Then add the channel updates.
1262
        for _, message := range d.channelUpdates {
6✔
1263
                msgs.addMsg(message)
3✔
1264
        }
3✔
1265

1266
        // Finally add the node announcements.
1267
        for _, message := range d.nodeAnnouncements {
6✔
1268
                msgs.addMsg(message)
3✔
1269
        }
3✔
1270

1271
        d.reset()
3✔
1272

3✔
1273
        // Return the array of lnwire.messages.
3✔
1274
        return msgs
3✔
1275
}
1276

1277
// calculateSubBatchSize is a helper function that calculates the size to break
1278
// down the batchSize into.
1279
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1280
        minimumBatchSize, batchSize int) int {
3✔
1281
        if subBatchDelay > totalDelay {
3✔
UNCOV
1282
                return batchSize
×
UNCOV
1283
        }
×
1284

1285
        subBatchSize := (batchSize*int(subBatchDelay) +
3✔
1286
                int(totalDelay) - 1) / int(totalDelay)
3✔
1287

3✔
1288
        if subBatchSize < minimumBatchSize {
6✔
1289
                return minimumBatchSize
3✔
1290
        }
3✔
1291

UNCOV
1292
        return subBatchSize
×
1293
}
1294

1295
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1296
// this variable so the function can be mocked in our test.
1297
var batchSizeCalculator = calculateSubBatchSize
1298

1299
// splitAnnouncementBatches takes an exiting list of announcements and
1300
// decomposes it into sub batches controlled by the `subBatchSize`.
1301
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1302
        announcementBatch []msgWithSenders) [][]msgWithSenders {
3✔
1303

3✔
1304
        subBatchSize := batchSizeCalculator(
3✔
1305
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
3✔
1306
                d.cfg.MinimumBatchSize, len(announcementBatch),
3✔
1307
        )
3✔
1308

3✔
1309
        var splitAnnouncementBatch [][]msgWithSenders
3✔
1310

3✔
1311
        for subBatchSize < len(announcementBatch) {
3✔
UNCOV
1312
                // For slicing with minimal allocation
×
UNCOV
1313
                // https://github.com/golang/go/wiki/SliceTricks
×
UNCOV
1314
                announcementBatch, splitAnnouncementBatch =
×
UNCOV
1315
                        announcementBatch[subBatchSize:],
×
UNCOV
1316
                        append(splitAnnouncementBatch,
×
UNCOV
1317
                                announcementBatch[0:subBatchSize:subBatchSize])
×
UNCOV
1318
        }
×
1319
        splitAnnouncementBatch = append(
3✔
1320
                splitAnnouncementBatch, announcementBatch,
3✔
1321
        )
3✔
1322

3✔
1323
        return splitAnnouncementBatch
3✔
1324
}
1325

1326
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1327
// split size, and then sends out all items to the set of target peers. Locally
1328
// generated announcements are always sent before remotely generated
1329
// announcements.
1330
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
1331
        annBatch msgsToBroadcast) {
3✔
1332

3✔
1333
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
3✔
1334
        // duration to delay the sending of next announcement batch.
3✔
1335
        delayNextBatch := func() {
6✔
1336
                select {
3✔
1337
                case <-time.After(d.cfg.SubBatchDelay):
3✔
UNCOV
1338
                case <-d.quit:
×
UNCOV
1339
                        return
×
1340
                }
1341
        }
1342

1343
        // Fetch the local and remote announcements.
1344
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
3✔
1345
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
3✔
1346

3✔
1347
        d.wg.Add(1)
3✔
1348
        go func() {
6✔
1349
                defer d.wg.Done()
3✔
1350

3✔
1351
                log.Debugf("Broadcasting %v new local announcements in %d "+
3✔
1352
                        "sub batches", len(annBatch.localMsgs),
3✔
1353
                        len(localBatches))
3✔
1354

3✔
1355
                // Send out the local announcements first.
3✔
1356
                for _, annBatch := range localBatches {
6✔
1357
                        d.sendLocalBatch(annBatch)
3✔
1358
                        delayNextBatch()
3✔
1359
                }
3✔
1360

1361
                log.Debugf("Broadcasting %v new remote announcements in %d "+
3✔
1362
                        "sub batches", len(annBatch.remoteMsgs),
3✔
1363
                        len(remoteBatches))
3✔
1364

3✔
1365
                // Now send the remote announcements.
3✔
1366
                for _, annBatch := range remoteBatches {
6✔
1367
                        d.sendRemoteBatch(annBatch)
3✔
1368
                        delayNextBatch()
3✔
1369
                }
3✔
1370
        }()
1371
}
1372

1373
// sendLocalBatch broadcasts a list of locally generated announcements to our
1374
// peers. For local announcements, we skip the filter and dedup logic and just
1375
// send the announcements out to all our coonnected peers.
1376
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
3✔
1377
        msgsToSend := lnutils.Map(
3✔
1378
                annBatch, func(m msgWithSenders) lnwire.Message {
6✔
1379
                        return m.msg
3✔
1380
                },
3✔
1381
        )
1382

1383
        err := d.cfg.Broadcast(nil, msgsToSend...)
3✔
1384
        if err != nil {
3✔
1385
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1386
        }
×
1387
}
1388

1389
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1390
// peers.
1391
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
3✔
1392
        syncerPeers := d.syncMgr.GossipSyncers()
3✔
1393

3✔
1394
        // We'll first attempt to filter out this new message for all peers
3✔
1395
        // that have active gossip syncers active.
3✔
1396
        for pub, syncer := range syncerPeers {
6✔
1397
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
3✔
1398
                syncer.FilterGossipMsgs(annBatch...)
3✔
1399
        }
3✔
1400

1401
        for _, msgChunk := range annBatch {
6✔
1402
                msgChunk := msgChunk
3✔
1403

3✔
1404
                // With the syncers taken care of, we'll merge the sender map
3✔
1405
                // with the set of syncers, so we don't send out duplicate
3✔
1406
                // messages.
3✔
1407
                msgChunk.mergeSyncerMap(syncerPeers)
3✔
1408

3✔
1409
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
3✔
1410
                if err != nil {
3✔
1411
                        log.Errorf("Unable to send batch "+
×
1412
                                "announcements: %v", err)
×
1413
                        continue
×
1414
                }
1415
        }
1416
}
1417

1418
// networkHandler is the primary goroutine that drives this service. The roles
1419
// of this goroutine includes answering queries related to the state of the
1420
// network, syncing up newly connected peers, and also periodically
1421
// broadcasting our latest topology state to all connected peers.
1422
//
1423
// NOTE: This MUST be run as a goroutine.
1424
func (d *AuthenticatedGossiper) networkHandler() {
3✔
1425
        defer d.wg.Done()
3✔
1426

3✔
1427
        // Initialize empty deDupedAnnouncements to store announcement batch.
3✔
1428
        announcements := deDupedAnnouncements{}
3✔
1429
        announcements.Reset()
3✔
1430

3✔
1431
        d.cfg.RetransmitTicker.Resume()
3✔
1432
        defer d.cfg.RetransmitTicker.Stop()
3✔
1433

3✔
1434
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
3✔
1435
        defer trickleTimer.Stop()
3✔
1436

3✔
1437
        // To start, we'll first check to see if there are any stale channel or
3✔
1438
        // node announcements that we need to re-transmit.
3✔
1439
        if err := d.retransmitStaleAnns(time.Now()); err != nil {
3✔
1440
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1441
        }
×
1442

1443
        for {
6✔
1444
                select {
3✔
1445
                // A new policy update has arrived. We'll commit it to the
1446
                // sub-systems below us, then craft, sign, and broadcast a new
1447
                // ChannelUpdate for the set of affected clients.
1448
                case policyUpdate := <-d.chanPolicyUpdates:
3✔
1449
                        log.Tracef("Received channel %d policy update requests",
3✔
1450
                                len(policyUpdate.edgesToUpdate))
3✔
1451

3✔
1452
                        // First, we'll now create new fully signed updates for
3✔
1453
                        // the affected channels and also update the underlying
3✔
1454
                        // graph with the new state.
3✔
1455
                        newChanUpdates, err := d.processChanPolicyUpdate(
3✔
1456
                                policyUpdate.edgesToUpdate,
3✔
1457
                        )
3✔
1458
                        policyUpdate.errChan <- err
3✔
1459
                        if err != nil {
3✔
1460
                                log.Errorf("Unable to craft policy updates: %v",
×
1461
                                        err)
×
1462
                                continue
×
1463
                        }
1464

1465
                        // Finally, with the updates committed, we'll now add
1466
                        // them to the announcement batch to be flushed at the
1467
                        // start of the next epoch.
1468
                        announcements.AddMsgs(newChanUpdates...)
3✔
1469

1470
                case announcement := <-d.networkMsgs:
3✔
1471
                        log.Tracef("Received network message: "+
3✔
1472
                                "peer=%v, msg=%s, is_remote=%v",
3✔
1473
                                announcement.peer, announcement.msg.MsgType(),
3✔
1474
                                announcement.isRemote)
3✔
1475

3✔
1476
                        switch announcement.msg.(type) {
3✔
1477
                        // Channel announcement signatures are amongst the only
1478
                        // messages that we'll process serially.
1479
                        case *lnwire.AnnounceSignatures1:
3✔
1480
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
3✔
1481
                                        announcement,
3✔
1482
                                )
3✔
1483
                                log.Debugf("Processed network message %s, "+
3✔
1484
                                        "returned len(announcements)=%v",
3✔
1485
                                        announcement.msg.MsgType(),
3✔
1486
                                        len(emittedAnnouncements))
3✔
1487

3✔
1488
                                if emittedAnnouncements != nil {
6✔
1489
                                        announcements.AddMsgs(
3✔
1490
                                                emittedAnnouncements...,
3✔
1491
                                        )
3✔
1492
                                }
3✔
1493
                                continue
3✔
1494
                        }
1495

1496
                        // If this message was recently rejected, then we won't
1497
                        // attempt to re-process it.
1498
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
3✔
1499
                                announcement.msg,
3✔
1500
                                sourceToPub(announcement.source),
3✔
1501
                        ) {
3✔
UNCOV
1502

×
UNCOV
1503
                                announcement.err <- fmt.Errorf("recently " +
×
UNCOV
1504
                                        "rejected")
×
UNCOV
1505
                                continue
×
1506
                        }
1507

1508
                        // We'll set up any dependent, and wait until a free
1509
                        // slot for this job opens up, this allow us to not
1510
                        // have thousands of goroutines active.
1511
                        annJobID, err := d.vb.InitJobDependencies(
3✔
1512
                                announcement.msg,
3✔
1513
                        )
3✔
1514
                        if err != nil {
3✔
1515
                                announcement.err <- err
×
1516
                                continue
×
1517
                        }
1518

1519
                        d.wg.Add(1)
3✔
1520
                        go d.handleNetworkMessages(
3✔
1521
                                announcement, &announcements, annJobID,
3✔
1522
                        )
3✔
1523

1524
                // The trickle timer has ticked, which indicates we should
1525
                // flush to the network the pending batch of new announcements
1526
                // we've received since the last trickle tick.
1527
                case <-trickleTimer.C:
3✔
1528
                        // Emit the current batch of announcements from
3✔
1529
                        // deDupedAnnouncements.
3✔
1530
                        announcementBatch := announcements.Emit()
3✔
1531

3✔
1532
                        // If the current announcements batch is nil, then we
3✔
1533
                        // have no further work here.
3✔
1534
                        if announcementBatch.isEmpty() {
6✔
1535
                                continue
3✔
1536
                        }
1537

1538
                        // At this point, we have the set of local and remote
1539
                        // announcements we want to send out. We'll do the
1540
                        // batching as normal for both, but for local
1541
                        // announcements, we'll blast them out w/o regard for
1542
                        // our peer's policies so we ensure they propagate
1543
                        // properly.
1544
                        d.splitAndSendAnnBatch(announcementBatch)
3✔
1545

1546
                // The retransmission timer has ticked which indicates that we
1547
                // should check if we need to prune or re-broadcast any of our
1548
                // personal channels or node announcement. This addresses the
1549
                // case of "zombie" channels and channel advertisements that
1550
                // have been dropped, or not properly propagated through the
1551
                // network.
UNCOV
1552
                case tick := <-d.cfg.RetransmitTicker.Ticks():
×
UNCOV
1553
                        if err := d.retransmitStaleAnns(tick); err != nil {
×
1554
                                log.Errorf("unable to rebroadcast stale "+
×
1555
                                        "announcements: %v", err)
×
1556
                        }
×
1557

1558
                // The gossiper has been signalled to exit, to we exit our
1559
                // main loop so the wait group can be decremented.
1560
                case <-d.quit:
3✔
1561
                        return
3✔
1562
                }
1563
        }
1564
}
1565

1566
// handleNetworkMessages is responsible for waiting for dependencies for a
1567
// given network message and processing the message. Once processed, it will
1568
// signal its dependants and add the new announcements to the announce batch.
1569
//
1570
// NOTE: must be run as a goroutine.
1571
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1572
        deDuped *deDupedAnnouncements, jobID JobID) {
3✔
1573

3✔
1574
        defer d.wg.Done()
3✔
1575
        defer d.vb.CompleteJob()
3✔
1576

3✔
1577
        // We should only broadcast this message forward if it originated from
3✔
1578
        // us or it wasn't received as part of our initial historical sync.
3✔
1579
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
3✔
1580

3✔
1581
        // If this message has an existing dependency, then we'll wait until
3✔
1582
        // that has been fully validated before we proceed.
3✔
1583
        err := d.vb.WaitForParents(jobID, nMsg.msg)
3✔
1584
        if err != nil {
3✔
1585
                log.Debugf("Validating network message %s got err: %v",
×
1586
                        nMsg.msg.MsgType(), err)
×
1587

×
1588
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
1589
                        log.Warnf("unexpected error during validation "+
×
1590
                                "barrier shutdown: %v", err)
×
1591
                }
×
1592
                nMsg.err <- err
×
1593

×
1594
                return
×
1595
        }
1596

1597
        // Process the network announcement to determine if this is either a
1598
        // new announcement from our PoV or an edges to a prior vertex/edge we
1599
        // previously proceeded.
1600
        newAnns, allow := d.processNetworkAnnouncement(nMsg)
3✔
1601

3✔
1602
        log.Tracef("Processed network message %s, returned "+
3✔
1603
                "len(announcements)=%v, allowDependents=%v",
3✔
1604
                nMsg.msg.MsgType(), len(newAnns), allow)
3✔
1605

3✔
1606
        // If this message had any dependencies, then we can now signal them to
3✔
1607
        // continue.
3✔
1608
        err = d.vb.SignalDependents(nMsg.msg, jobID)
3✔
1609
        if err != nil {
3✔
1610
                // Something is wrong if SignalDependents returns an error.
×
1611
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
1612
                        "JobID=%v", spew.Sdump(nMsg.msg), jobID)
×
1613

×
1614
                nMsg.err <- err
×
1615

×
1616
                return
×
1617
        }
×
1618

1619
        // If the announcement was accepted, then add the emitted announcements
1620
        // to our announce batch to be broadcast once the trickle timer ticks
1621
        // gain.
1622
        if newAnns != nil && shouldBroadcast {
6✔
1623
                // TODO(roasbeef): exclude peer that sent.
3✔
1624
                deDuped.AddMsgs(newAnns...)
3✔
1625
        } else if newAnns != nil {
9✔
1626
                log.Trace("Skipping broadcast of announcements received " +
3✔
1627
                        "during initial graph sync")
3✔
1628
        }
3✔
1629
}
1630

1631
// TODO(roasbeef): d/c peers that send updates not on our chain
1632

1633
// InitSyncState is called by outside sub-systems when a connection is
1634
// established to a new peer that understands how to perform channel range
1635
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1636
// needed to handle new queries.
1637
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
3✔
1638
        d.syncMgr.InitSyncState(syncPeer)
3✔
1639
}
3✔
1640

1641
// PruneSyncState is called by outside sub-systems once a peer that we were
1642
// previously connected to has been disconnected. In this case we can stop the
1643
// existing GossipSyncer assigned to the peer and free up resources.
1644
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
3✔
1645
        d.syncMgr.PruneSyncState(peer)
3✔
1646
}
3✔
1647

1648
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1649
// false otherwise, This avoids expensive reprocessing of the message.
1650
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1651
        peerPub [33]byte) bool {
3✔
1652

3✔
1653
        var scid uint64
3✔
1654
        switch m := msg.(type) {
3✔
1655
        case *lnwire.ChannelUpdate1:
3✔
1656
                scid = m.ShortChannelID.ToUint64()
3✔
1657

1658
        case *lnwire.ChannelAnnouncement1:
3✔
1659
                scid = m.ShortChannelID.ToUint64()
3✔
1660

1661
        default:
3✔
1662
                return false
3✔
1663
        }
1664

1665
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
3✔
1666
        return err != cache.ErrElementNotFound
3✔
1667
}
1668

1669
// retransmitStaleAnns examines all outgoing channels that the source node is
1670
// known to maintain to check to see if any of them are "stale". A channel is
1671
// stale iff, the last timestamp of its rebroadcast is older than the
1672
// RebroadcastInterval. We also check if a refreshed node announcement should
1673
// be resent.
1674
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
3✔
1675
        // Iterate over all of our channels and check if any of them fall
3✔
1676
        // within the prune interval or re-broadcast interval.
3✔
1677
        type updateTuple struct {
3✔
1678
                info *models.ChannelEdgeInfo
3✔
1679
                edge *models.ChannelEdgePolicy
3✔
1680
        }
3✔
1681

3✔
1682
        var (
3✔
1683
                havePublicChannels bool
3✔
1684
                edgesToUpdate      []updateTuple
3✔
1685
        )
3✔
1686
        err := d.cfg.Graph.ForAllOutgoingChannels(func(
3✔
1687
                info *models.ChannelEdgeInfo,
3✔
1688
                edge *models.ChannelEdgePolicy) error {
6✔
1689

3✔
1690
                // If there's no auth proof attached to this edge, it means
3✔
1691
                // that it is a private channel not meant to be announced to
3✔
1692
                // the greater network, so avoid sending channel updates for
3✔
1693
                // this channel to not leak its
3✔
1694
                // existence.
3✔
1695
                if info.AuthProof == nil {
6✔
1696
                        log.Debugf("Skipping retransmission of channel "+
3✔
1697
                                "without AuthProof: %v", info.ChannelID)
3✔
1698
                        return nil
3✔
1699
                }
3✔
1700

1701
                // We make a note that we have at least one public channel. We
1702
                // use this to determine whether we should send a node
1703
                // announcement below.
1704
                havePublicChannels = true
3✔
1705

3✔
1706
                // If this edge has a ChannelUpdate that was created before the
3✔
1707
                // introduction of the MaxHTLC field, then we'll update this
3✔
1708
                // edge to propagate this information in the network.
3✔
1709
                if !edge.MessageFlags.HasMaxHtlc() {
3✔
1710
                        // We'll make sure we support the new max_htlc field if
×
1711
                        // not already present.
×
1712
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1713
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1714

×
1715
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1716
                                info: info,
×
1717
                                edge: edge,
×
1718
                        })
×
1719
                        return nil
×
1720
                }
×
1721

1722
                timeElapsed := now.Sub(edge.LastUpdate)
3✔
1723

3✔
1724
                // If it's been longer than RebroadcastInterval since we've
3✔
1725
                // re-broadcasted the channel, add the channel to the set of
3✔
1726
                // edges we need to update.
3✔
1727
                if timeElapsed >= d.cfg.RebroadcastInterval {
3✔
UNCOV
1728
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
UNCOV
1729
                                info: info,
×
UNCOV
1730
                                edge: edge,
×
UNCOV
1731
                        })
×
UNCOV
1732
                }
×
1733

1734
                return nil
3✔
1735
        })
1736
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
3✔
1737
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1738
                        err)
×
1739
        }
×
1740

1741
        var signedUpdates []lnwire.Message
3✔
1742
        for _, chanToUpdate := range edgesToUpdate {
3✔
UNCOV
1743
                // Re-sign and update the channel on disk and retrieve our
×
UNCOV
1744
                // ChannelUpdate to broadcast.
×
UNCOV
1745
                chanAnn, chanUpdate, err := d.updateChannel(
×
UNCOV
1746
                        chanToUpdate.info, chanToUpdate.edge,
×
UNCOV
1747
                )
×
UNCOV
1748
                if err != nil {
×
1749
                        return fmt.Errorf("unable to update channel: %w", err)
×
1750
                }
×
1751

1752
                // If we have a valid announcement to transmit, then we'll send
1753
                // that along with the update.
UNCOV
1754
                if chanAnn != nil {
×
UNCOV
1755
                        signedUpdates = append(signedUpdates, chanAnn)
×
UNCOV
1756
                }
×
1757

UNCOV
1758
                signedUpdates = append(signedUpdates, chanUpdate)
×
1759
        }
1760

1761
        // If we don't have any public channels, we return as we don't want to
1762
        // broadcast anything that would reveal our existence.
1763
        if !havePublicChannels {
6✔
1764
                return nil
3✔
1765
        }
3✔
1766

1767
        // We'll also check that our NodeAnnouncement is not too old.
1768
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
3✔
1769
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
3✔
1770
        timeElapsed := now.Sub(timestamp)
3✔
1771

3✔
1772
        // If it's been a full day since we've re-broadcasted the
3✔
1773
        // node announcement, refresh it and resend it.
3✔
1774
        nodeAnnStr := ""
3✔
1775
        if timeElapsed >= d.cfg.RebroadcastInterval {
3✔
UNCOV
1776
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
×
UNCOV
1777
                if err != nil {
×
1778
                        return fmt.Errorf("unable to get refreshed node "+
×
1779
                                "announcement: %v", err)
×
1780
                }
×
1781

UNCOV
1782
                signedUpdates = append(signedUpdates, &newNodeAnn)
×
UNCOV
1783
                nodeAnnStr = " and our refreshed node announcement"
×
UNCOV
1784

×
UNCOV
1785
                // Before broadcasting the refreshed node announcement, add it
×
UNCOV
1786
                // to our own graph.
×
UNCOV
1787
                if err := d.addNode(&newNodeAnn); err != nil {
×
UNCOV
1788
                        log.Errorf("Unable to add refreshed node announcement "+
×
UNCOV
1789
                                "to graph: %v", err)
×
UNCOV
1790
                }
×
1791
        }
1792

1793
        // If we don't have any updates to re-broadcast, then we'll exit
1794
        // early.
1795
        if len(signedUpdates) == 0 {
6✔
1796
                return nil
3✔
1797
        }
3✔
1798

UNCOV
1799
        log.Infof("Retransmitting %v outgoing channels%v",
×
UNCOV
1800
                len(edgesToUpdate), nodeAnnStr)
×
UNCOV
1801

×
UNCOV
1802
        // With all the wire announcements properly crafted, we'll broadcast
×
UNCOV
1803
        // our known outgoing channels to all our immediate peers.
×
UNCOV
1804
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
×
1805
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1806
        }
×
1807

UNCOV
1808
        return nil
×
1809
}
1810

1811
// processChanPolicyUpdate generates a new set of channel updates for the
1812
// provided list of edges and updates the backing ChannelGraphSource.
1813
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
1814
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
3✔
1815

3✔
1816
        var chanUpdates []networkMsg
3✔
1817
        for _, edgeInfo := range edgesToUpdate {
6✔
1818
                // Now that we've collected all the channels we need to update,
3✔
1819
                // we'll re-sign and update the backing ChannelGraphSource, and
3✔
1820
                // retrieve our ChannelUpdate to broadcast.
3✔
1821
                _, chanUpdate, err := d.updateChannel(
3✔
1822
                        edgeInfo.Info, edgeInfo.Edge,
3✔
1823
                )
3✔
1824
                if err != nil {
3✔
1825
                        return nil, err
×
1826
                }
×
1827

1828
                // We'll avoid broadcasting any updates for private channels to
1829
                // avoid directly giving away their existence. Instead, we'll
1830
                // send the update directly to the remote party.
1831
                if edgeInfo.Info.AuthProof == nil {
6✔
1832
                        // If AuthProof is nil and an alias was found for this
3✔
1833
                        // ChannelID (meaning the option-scid-alias feature was
3✔
1834
                        // negotiated), we'll replace the ShortChannelID in the
3✔
1835
                        // update with the peer's alias. We do this after
3✔
1836
                        // updateChannel so that the alias isn't persisted to
3✔
1837
                        // the database.
3✔
1838
                        chanID := lnwire.NewChanIDFromOutPoint(
3✔
1839
                                edgeInfo.Info.ChannelPoint,
3✔
1840
                        )
3✔
1841

3✔
1842
                        var defaultAlias lnwire.ShortChannelID
3✔
1843
                        foundAlias, _ := d.cfg.GetAlias(chanID)
3✔
1844
                        if foundAlias != defaultAlias {
6✔
1845
                                chanUpdate.ShortChannelID = foundAlias
3✔
1846

3✔
1847
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
3✔
1848
                                if err != nil {
3✔
1849
                                        log.Errorf("Unable to sign alias "+
×
1850
                                                "update: %v", err)
×
1851
                                        continue
×
1852
                                }
1853

1854
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
1855
                                if err != nil {
3✔
1856
                                        log.Errorf("Unable to create sig: %v",
×
1857
                                                err)
×
1858
                                        continue
×
1859
                                }
1860

1861
                                chanUpdate.Signature = lnSig
3✔
1862
                        }
1863

1864
                        remotePubKey := remotePubFromChanInfo(
3✔
1865
                                edgeInfo.Info, chanUpdate.ChannelFlags,
3✔
1866
                        )
3✔
1867
                        err := d.reliableSender.sendMessage(
3✔
1868
                                chanUpdate, remotePubKey,
3✔
1869
                        )
3✔
1870
                        if err != nil {
3✔
1871
                                log.Errorf("Unable to reliably send %v for "+
×
1872
                                        "channel=%v to peer=%x: %v",
×
1873
                                        chanUpdate.MsgType(),
×
1874
                                        chanUpdate.ShortChannelID,
×
1875
                                        remotePubKey, err)
×
1876
                        }
×
1877
                        continue
3✔
1878
                }
1879

1880
                // We set ourselves as the source of this message to indicate
1881
                // that we shouldn't skip any peers when sending this message.
1882
                chanUpdates = append(chanUpdates, networkMsg{
3✔
1883
                        source:   d.selfKey,
3✔
1884
                        isRemote: false,
3✔
1885
                        msg:      chanUpdate,
3✔
1886
                })
3✔
1887
        }
1888

1889
        return chanUpdates, nil
3✔
1890
}
1891

1892
// remotePubFromChanInfo returns the public key of the remote peer given a
1893
// ChannelEdgeInfo that describe a channel we have with them.
1894
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1895
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
3✔
1896

3✔
1897
        var remotePubKey [33]byte
3✔
1898
        switch {
3✔
1899
        case chanFlags&lnwire.ChanUpdateDirection == 0:
3✔
1900
                remotePubKey = chanInfo.NodeKey2Bytes
3✔
1901
        case chanFlags&lnwire.ChanUpdateDirection == 1:
3✔
1902
                remotePubKey = chanInfo.NodeKey1Bytes
3✔
1903
        }
1904

1905
        return remotePubKey
3✔
1906
}
1907

1908
// processRejectedEdge examines a rejected edge to see if we can extract any
1909
// new announcements from it.  An edge will get rejected if we already added
1910
// the same edge without AuthProof to the graph. If the received announcement
1911
// contains a proof, we can add this proof to our edge.  We can end up in this
1912
// situation in the case where we create a channel, but for some reason fail
1913
// to receive the remote peer's proof, while the remote peer is able to fully
1914
// assemble the proof and craft the ChannelAnnouncement.
1915
func (d *AuthenticatedGossiper) processRejectedEdge(
1916
        chanAnnMsg *lnwire.ChannelAnnouncement1,
1917
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
3✔
1918

3✔
1919
        // First, we'll fetch the state of the channel as we know if from the
3✔
1920
        // database.
3✔
1921
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
3✔
1922
                chanAnnMsg.ShortChannelID,
3✔
1923
        )
3✔
1924
        if err != nil {
3✔
1925
                return nil, err
×
1926
        }
×
1927

1928
        // The edge is in the graph, and has a proof attached, then we'll just
1929
        // reject it as normal.
1930
        if chanInfo.AuthProof != nil {
6✔
1931
                return nil, nil
3✔
1932
        }
3✔
1933

1934
        // Otherwise, this means that the edge is within the graph, but it
1935
        // doesn't yet have a proper proof attached. If we did not receive
1936
        // the proof such that we now can add it, there's nothing more we
1937
        // can do.
1938
        if proof == nil {
×
1939
                return nil, nil
×
1940
        }
×
1941

1942
        // We'll then create then validate the new fully assembled
1943
        // announcement.
1944
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1945
                proof, chanInfo, e1, e2,
×
1946
        )
×
1947
        if err != nil {
×
1948
                return nil, err
×
1949
        }
×
1950
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1951
        if err != nil {
×
1952
                err := fmt.Errorf("assembled channel announcement proof "+
×
1953
                        "for shortChanID=%v isn't valid: %v",
×
1954
                        chanAnnMsg.ShortChannelID, err)
×
1955
                log.Error(err)
×
1956
                return nil, err
×
1957
        }
×
1958

1959
        // If everything checks out, then we'll add the fully assembled proof
1960
        // to the database.
1961
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
1962
        if err != nil {
×
1963
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
1964
                        chanAnnMsg.ShortChannelID, err)
×
1965
                log.Error(err)
×
1966
                return nil, err
×
1967
        }
×
1968

1969
        // As we now have a complete channel announcement for this channel,
1970
        // we'll construct the announcement so they can be broadcast out to all
1971
        // our peers.
1972
        announcements := make([]networkMsg, 0, 3)
×
1973
        announcements = append(announcements, networkMsg{
×
1974
                source: d.selfKey,
×
1975
                msg:    chanAnn,
×
1976
        })
×
1977
        if e1Ann != nil {
×
1978
                announcements = append(announcements, networkMsg{
×
1979
                        source: d.selfKey,
×
1980
                        msg:    e1Ann,
×
1981
                })
×
1982
        }
×
1983
        if e2Ann != nil {
×
1984
                announcements = append(announcements, networkMsg{
×
1985
                        source: d.selfKey,
×
1986
                        msg:    e2Ann,
×
1987
                })
×
1988

×
1989
        }
×
1990

1991
        return announcements, nil
×
1992
}
1993

1994
// fetchPKScript fetches the output script for the given SCID.
1995
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
1996
        []byte, error) {
×
1997

×
1998
        return lnwallet.FetchPKScriptWithQuit(d.cfg.ChainIO, chanID, d.quit)
×
1999
}
×
2000

2001
// addNode processes the given node announcement, and adds it to our channel
2002
// graph.
2003
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
2004
        op ...batch.SchedulerOption) error {
3✔
2005

3✔
2006
        if err := netann.ValidateNodeAnn(msg); err != nil {
3✔
UNCOV
2007
                return fmt.Errorf("unable to validate node announcement: %w",
×
UNCOV
2008
                        err)
×
UNCOV
2009
        }
×
2010

2011
        return d.cfg.Graph.AddNode(models.NodeFromWireAnnouncement(msg), op...)
3✔
2012
}
2013

2014
// isPremature decides whether a given network message has a block height+delta
2015
// value specified in the future. If so, the message will be added to the
2016
// future message map and be processed when the block height as reached.
2017
//
2018
// NOTE: must be used inside a lock.
2019
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2020
        delta uint32, msg *networkMsg) bool {
3✔
2021

3✔
2022
        // The channel is already confirmed at chanID.BlockHeight so we minus
3✔
2023
        // one block. For instance, if the required confirmation for this
3✔
2024
        // channel announcement is 6, we then only need to wait for 5 more
3✔
2025
        // blocks once the funding tx is confirmed.
3✔
2026
        if delta > 0 {
6✔
2027
                delta--
3✔
2028
        }
3✔
2029

2030
        msgHeight := chanID.BlockHeight + delta
3✔
2031

3✔
2032
        // The message height is smaller or equal to our best known height,
3✔
2033
        // thus the message is mature.
3✔
2034
        if msgHeight <= d.bestHeight {
6✔
2035
                return false
3✔
2036
        }
3✔
2037

2038
        // Add the premature message to our future messages which will be
2039
        // resent once the block height has reached.
2040
        //
2041
        // Copy the networkMsgs since the old message's err chan will be
2042
        // consumed.
2043
        copied := &networkMsg{
3✔
2044
                peer:              msg.peer,
3✔
2045
                source:            msg.source,
3✔
2046
                msg:               msg.msg,
3✔
2047
                optionalMsgFields: msg.optionalMsgFields,
3✔
2048
                isRemote:          msg.isRemote,
3✔
2049
                err:               make(chan error, 1),
3✔
2050
        }
3✔
2051

3✔
2052
        // Create the cached message.
3✔
2053
        cachedMsg := &cachedFutureMsg{
3✔
2054
                msg:    copied,
3✔
2055
                height: msgHeight,
3✔
2056
        }
3✔
2057

3✔
2058
        // Increment the msg ID and add it to the cache.
3✔
2059
        nextMsgID := d.futureMsgs.nextMsgID()
3✔
2060
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
3✔
2061
        if err != nil {
3✔
2062
                log.Errorf("Adding future message got error: %v", err)
×
2063
        }
×
2064

2065
        log.Debugf("Network message: %v added to future messages for "+
3✔
2066
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
3✔
2067
                msgHeight, d.bestHeight)
3✔
2068

3✔
2069
        return true
3✔
2070
}
2071

2072
// processNetworkAnnouncement processes a new network relate authenticated
2073
// channel or node announcement or announcements proofs. If the announcement
2074
// didn't affect the internal state due to either being out of date, invalid,
2075
// or redundant, then nil is returned. Otherwise, the set of announcements will
2076
// be returned which should be broadcasted to the rest of the network. The
2077
// boolean returned indicates whether any dependents of the announcement should
2078
// attempt to be processed as well.
2079
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
2080
        nMsg *networkMsg) ([]networkMsg, bool) {
3✔
2081

3✔
2082
        // If this is a remote update, we set the scheduler option to lazily
3✔
2083
        // add it to the graph.
3✔
2084
        var schedulerOp []batch.SchedulerOption
3✔
2085
        if nMsg.isRemote {
6✔
2086
                schedulerOp = append(schedulerOp, batch.LazyAdd())
3✔
2087
        }
3✔
2088

2089
        switch msg := nMsg.msg.(type) {
3✔
2090
        // A new node announcement has arrived which either presents new
2091
        // information about a node in one of the channels we know about, or a
2092
        // updating previously advertised information.
2093
        case *lnwire.NodeAnnouncement:
3✔
2094
                return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
3✔
2095

2096
        // A new channel announcement has arrived, this indicates the
2097
        // *creation* of a new channel within the network. This only advertises
2098
        // the existence of a channel and not yet the routing policies in
2099
        // either direction of the channel.
2100
        case *lnwire.ChannelAnnouncement1:
3✔
2101
                return d.handleChanAnnouncement(nMsg, msg, schedulerOp...)
3✔
2102

2103
        // A new authenticated channel edge update has arrived. This indicates
2104
        // that the directional information for an already known channel has
2105
        // been updated.
2106
        case *lnwire.ChannelUpdate1:
3✔
2107
                return d.handleChanUpdate(nMsg, msg, schedulerOp)
3✔
2108

2109
        // A new signature announcement has been received. This indicates
2110
        // willingness of nodes involved in the funding of a channel to
2111
        // announce this new channel to the rest of the world.
2112
        case *lnwire.AnnounceSignatures1:
3✔
2113
                return d.handleAnnSig(nMsg, msg)
3✔
2114

2115
        default:
×
2116
                err := errors.New("wrong type of the announcement")
×
2117
                nMsg.err <- err
×
2118
                return nil, false
×
2119
        }
2120
}
2121

2122
// processZombieUpdate determines whether the provided channel update should
2123
// resurrect a given zombie edge.
2124
//
2125
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2126
// should be inspected.
2127
func (d *AuthenticatedGossiper) processZombieUpdate(
2128
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
UNCOV
2129
        msg *lnwire.ChannelUpdate1) error {
×
UNCOV
2130

×
UNCOV
2131
        // The least-significant bit in the flag on the channel update tells us
×
UNCOV
2132
        // which edge is being updated.
×
UNCOV
2133
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
×
UNCOV
2134

×
UNCOV
2135
        // Since we've deemed the update as not stale above, before marking it
×
UNCOV
2136
        // live, we'll make sure it has been signed by the correct party. If we
×
UNCOV
2137
        // have both pubkeys, either party can resurrect the channel. If we've
×
UNCOV
2138
        // already marked this with the stricter, single-sided resurrection we
×
UNCOV
2139
        // will only have the pubkey of the node with the oldest timestamp.
×
UNCOV
2140
        var pubKey *btcec.PublicKey
×
UNCOV
2141
        switch {
×
2142
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2143
                pubKey, _ = chanInfo.NodeKey1()
×
UNCOV
2144
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
×
UNCOV
2145
                pubKey, _ = chanInfo.NodeKey2()
×
2146
        }
UNCOV
2147
        if pubKey == nil {
×
UNCOV
2148
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
×
UNCOV
2149
                        "with chan_id=%v", msg.ShortChannelID)
×
UNCOV
2150
        }
×
2151

UNCOV
2152
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
×
UNCOV
2153
        if err != nil {
×
UNCOV
2154
                return fmt.Errorf("unable to verify channel "+
×
UNCOV
2155
                        "update signature: %v", err)
×
UNCOV
2156
        }
×
2157

2158
        // With the signature valid, we'll proceed to mark the
2159
        // edge as live and wait for the channel announcement to
2160
        // come through again.
UNCOV
2161
        err = d.cfg.Graph.MarkEdgeLive(scid)
×
UNCOV
2162
        switch {
×
2163
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2164
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2165
                        "zombie index: %v", err)
×
2166

×
2167
                return nil
×
2168

2169
        case err != nil:
×
2170
                return fmt.Errorf("unable to remove edge with "+
×
2171
                        "chan_id=%v from zombie index: %v",
×
2172
                        msg.ShortChannelID, err)
×
2173

UNCOV
2174
        default:
×
2175
        }
2176

UNCOV
2177
        log.Debugf("Removed edge with chan_id=%v from zombie "+
×
UNCOV
2178
                "index", msg.ShortChannelID)
×
UNCOV
2179

×
UNCOV
2180
        return nil
×
2181
}
2182

2183
// fetchNodeAnn fetches the latest signed node announcement from our point of
2184
// view for the node with the given public key.
2185
func (d *AuthenticatedGossiper) fetchNodeAnn(
2186
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
3✔
2187

3✔
2188
        node, err := d.cfg.Graph.FetchLightningNode(pubKey)
3✔
2189
        if err != nil {
3✔
UNCOV
2190
                return nil, err
×
UNCOV
2191
        }
×
2192

2193
        return node.NodeAnnouncement(true)
3✔
2194
}
2195

2196
// isMsgStale determines whether a message retrieved from the backing
2197
// MessageStore is seen as stale by the current graph.
2198
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
3✔
2199
        switch msg := msg.(type) {
3✔
2200
        case *lnwire.AnnounceSignatures1:
3✔
2201
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
3✔
2202
                        msg.ShortChannelID,
3✔
2203
                )
3✔
2204

3✔
2205
                // If the channel cannot be found, it is most likely a leftover
3✔
2206
                // message for a channel that was closed, so we can consider it
3✔
2207
                // stale.
3✔
2208
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
6✔
2209
                        return true
3✔
2210
                }
3✔
2211
                if err != nil {
3✔
2212
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2213
                                "%v", chanInfo.ChannelID, err)
×
2214
                        return false
×
2215
                }
×
2216

2217
                // If the proof exists in the graph, then we have successfully
2218
                // received the remote proof and assembled the full proof, so we
2219
                // can safely delete the local proof from the database.
2220
                return chanInfo.AuthProof != nil
3✔
2221

2222
        case *lnwire.ChannelUpdate1:
3✔
2223
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
3✔
2224

3✔
2225
                // If the channel cannot be found, it is most likely a leftover
3✔
2226
                // message for a channel that was closed, so we can consider it
3✔
2227
                // stale.
3✔
2228
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
6✔
2229
                        return true
3✔
2230
                }
3✔
2231
                if err != nil {
3✔
2232
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2233
                                "%v", msg.ShortChannelID, err)
×
2234
                        return false
×
2235
                }
×
2236

2237
                // Otherwise, we'll retrieve the correct policy that we
2238
                // currently have stored within our graph to check if this
2239
                // message is stale by comparing its timestamp.
2240
                var p *models.ChannelEdgePolicy
3✔
2241
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
6✔
2242
                        p = p1
3✔
2243
                } else {
6✔
2244
                        p = p2
3✔
2245
                }
3✔
2246

2247
                // If the policy is still unknown, then we can consider this
2248
                // policy fresh.
2249
                if p == nil {
3✔
2250
                        return false
×
2251
                }
×
2252

2253
                timestamp := time.Unix(int64(msg.Timestamp), 0)
3✔
2254
                return p.LastUpdate.After(timestamp)
3✔
2255

2256
        default:
×
2257
                // We'll make sure to not mark any unsupported messages as stale
×
2258
                // to ensure they are not removed.
×
2259
                return false
×
2260
        }
2261
}
2262

2263
// updateChannel creates a new fully signed update for the channel, and updates
2264
// the underlying graph with the new state.
2265
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
2266
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2267
        *lnwire.ChannelUpdate1, error) {
3✔
2268

3✔
2269
        // Parse the unsigned edge into a channel update.
3✔
2270
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
3✔
2271

3✔
2272
        // We'll generate a new signature over a digest of the channel
3✔
2273
        // announcement itself and update the timestamp to ensure it propagate.
3✔
2274
        err := netann.SignChannelUpdate(
3✔
2275
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
3✔
2276
                netann.ChanUpdSetTimestamp,
3✔
2277
        )
3✔
2278
        if err != nil {
3✔
2279
                return nil, nil, err
×
2280
        }
×
2281

2282
        // Next, we'll set the new signature in place, and update the reference
2283
        // in the backing slice.
2284
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
3✔
2285
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
3✔
2286

3✔
2287
        // To ensure that our signature is valid, we'll verify it ourself
3✔
2288
        // before committing it to the slice returned.
3✔
2289
        err = netann.ValidateChannelUpdateAnn(
3✔
2290
                d.selfKey, info.Capacity, chanUpdate,
3✔
2291
        )
3✔
2292
        if err != nil {
3✔
2293
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2294
                        "update sig: %v", err)
×
2295
        }
×
2296

2297
        // Finally, we'll write the new edge policy to disk.
2298
        if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
3✔
2299
                return nil, nil, err
×
2300
        }
×
2301

2302
        // We'll also create the original channel announcement so the two can
2303
        // be broadcast along side each other (if necessary), but only if we
2304
        // have a full channel announcement for this channel.
2305
        var chanAnn *lnwire.ChannelAnnouncement1
3✔
2306
        if info.AuthProof != nil {
6✔
2307
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
3✔
2308
                chanAnn = &lnwire.ChannelAnnouncement1{
3✔
2309
                        ShortChannelID:  chanID,
3✔
2310
                        NodeID1:         info.NodeKey1Bytes,
3✔
2311
                        NodeID2:         info.NodeKey2Bytes,
3✔
2312
                        ChainHash:       info.ChainHash,
3✔
2313
                        BitcoinKey1:     info.BitcoinKey1Bytes,
3✔
2314
                        Features:        lnwire.NewRawFeatureVector(),
3✔
2315
                        BitcoinKey2:     info.BitcoinKey2Bytes,
3✔
2316
                        ExtraOpaqueData: info.ExtraOpaqueData,
3✔
2317
                }
3✔
2318
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2319
                        info.AuthProof.NodeSig1Bytes,
3✔
2320
                )
3✔
2321
                if err != nil {
3✔
2322
                        return nil, nil, err
×
2323
                }
×
2324
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2325
                        info.AuthProof.NodeSig2Bytes,
3✔
2326
                )
3✔
2327
                if err != nil {
3✔
2328
                        return nil, nil, err
×
2329
                }
×
2330
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2331
                        info.AuthProof.BitcoinSig1Bytes,
3✔
2332
                )
3✔
2333
                if err != nil {
3✔
2334
                        return nil, nil, err
×
2335
                }
×
2336
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2337
                        info.AuthProof.BitcoinSig2Bytes,
3✔
2338
                )
3✔
2339
                if err != nil {
3✔
2340
                        return nil, nil, err
×
2341
                }
×
2342
        }
2343

2344
        return chanAnn, chanUpdate, err
3✔
2345
}
2346

2347
// SyncManager returns the gossiper's SyncManager instance.
2348
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
3✔
2349
        return d.syncMgr
3✔
2350
}
3✔
2351

2352
// IsKeepAliveUpdate determines whether this channel update is considered a
2353
// keep-alive update based on the previous channel update processed for the same
2354
// direction.
2355
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2356
        prev *models.ChannelEdgePolicy) bool {
3✔
2357

3✔
2358
        // Both updates should be from the same direction.
3✔
2359
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
3✔
2360
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
3✔
2361

×
2362
                return false
×
2363
        }
×
2364

2365
        // The timestamp should always increase for a keep-alive update.
2366
        timestamp := time.Unix(int64(update.Timestamp), 0)
3✔
2367
        if !timestamp.After(prev.LastUpdate) {
6✔
2368
                return false
3✔
2369
        }
3✔
2370

2371
        // None of the remaining fields should change for a keep-alive update.
2372
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
6✔
2373
                return false
3✔
2374
        }
3✔
2375
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
6✔
2376
                return false
3✔
2377
        }
3✔
2378
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
6✔
2379
                return false
3✔
2380
        }
3✔
2381
        if update.TimeLockDelta != prev.TimeLockDelta {
3✔
2382
                return false
×
2383
        }
×
2384
        if update.HtlcMinimumMsat != prev.MinHTLC {
3✔
2385
                return false
×
2386
        }
×
2387
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
3✔
2388
                return false
×
2389
        }
×
2390
        if update.HtlcMaximumMsat != prev.MaxHTLC {
3✔
2391
                return false
×
2392
        }
×
2393
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
6✔
2394
                return false
3✔
2395
        }
3✔
2396
        return true
3✔
2397
}
2398

2399
// latestHeight returns the gossiper's latest height known of the chain.
2400
func (d *AuthenticatedGossiper) latestHeight() uint32 {
3✔
2401
        d.Lock()
3✔
2402
        defer d.Unlock()
3✔
2403
        return d.bestHeight
3✔
2404
}
3✔
2405

2406
// handleNodeAnnouncement processes a new node announcement.
2407
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
2408
        nodeAnn *lnwire.NodeAnnouncement,
2409
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
3✔
2410

3✔
2411
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
3✔
2412

3✔
2413
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
3✔
2414
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
3✔
2415
                nMsg.source.SerializeCompressed())
3✔
2416

3✔
2417
        // We'll quickly ask the router if it already has a newer update for
3✔
2418
        // this node so we can skip validating signatures if not required.
3✔
2419
        if d.cfg.Graph.IsStaleNode(nodeAnn.NodeID, timestamp) {
6✔
2420
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
3✔
2421
                nMsg.err <- nil
3✔
2422
                return nil, true
3✔
2423
        }
3✔
2424

2425
        if err := d.addNode(nodeAnn, ops...); err != nil {
6✔
2426
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
3✔
2427
                        err)
3✔
2428

3✔
2429
                if !graph.IsError(
3✔
2430
                        err,
3✔
2431
                        graph.ErrOutdated,
3✔
2432
                        graph.ErrIgnored,
3✔
2433
                ) {
3✔
2434

×
2435
                        log.Error(err)
×
2436
                }
×
2437

2438
                nMsg.err <- err
3✔
2439
                return nil, false
3✔
2440
        }
2441

2442
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2443
        // quick check to ensure this node intends to publicly advertise itself
2444
        // to the network.
2445
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
3✔
2446
        if err != nil {
3✔
2447
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2448
                        nodeAnn.NodeID, err)
×
2449
                nMsg.err <- err
×
2450
                return nil, false
×
2451
        }
×
2452

2453
        var announcements []networkMsg
3✔
2454

3✔
2455
        // If it does, we'll add their announcement to our batch so that it can
3✔
2456
        // be broadcast to the rest of our peers.
3✔
2457
        if isPublic {
6✔
2458
                announcements = append(announcements, networkMsg{
3✔
2459
                        peer:     nMsg.peer,
3✔
2460
                        isRemote: nMsg.isRemote,
3✔
2461
                        source:   nMsg.source,
3✔
2462
                        msg:      nodeAnn,
3✔
2463
                })
3✔
2464
        } else {
6✔
2465
                log.Tracef("Skipping broadcasting node announcement for %x "+
3✔
2466
                        "due to being unadvertised", nodeAnn.NodeID)
3✔
2467
        }
3✔
2468

2469
        nMsg.err <- nil
3✔
2470
        // TODO(roasbeef): get rid of the above
3✔
2471

3✔
2472
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
3✔
2473
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
3✔
2474
                nMsg.source.SerializeCompressed())
3✔
2475

3✔
2476
        return announcements, true
3✔
2477
}
2478

2479
// handleChanAnnouncement processes a new channel announcement.
2480
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
2481
        ann *lnwire.ChannelAnnouncement1,
2482
        ops ...batch.SchedulerOption) ([]networkMsg, bool) {
3✔
2483

3✔
2484
        scid := ann.ShortChannelID
3✔
2485

3✔
2486
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
3✔
2487
                nMsg.peer, scid.ToUint64())
3✔
2488

3✔
2489
        // We'll ignore any channel announcements that target any chain other
3✔
2490
        // than the set of chains we know of.
3✔
2491
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
3✔
2492
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2493
                        ", gossiper on chain=%v", ann.ChainHash,
×
2494
                        d.cfg.ChainHash)
×
2495
                log.Errorf(err.Error())
×
2496

×
2497
                key := newRejectCacheKey(
×
2498
                        scid.ToUint64(),
×
2499
                        sourceToPub(nMsg.source),
×
2500
                )
×
2501
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2502

×
2503
                nMsg.err <- err
×
2504
                return nil, false
×
2505
        }
×
2506

2507
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2508
        // reject the announcement. Since the router accepts alias SCIDs,
2509
        // not erroring out would be a DoS vector.
2510
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
3✔
2511
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2512
                log.Errorf(err.Error())
×
2513

×
2514
                key := newRejectCacheKey(
×
2515
                        scid.ToUint64(),
×
2516
                        sourceToPub(nMsg.source),
×
2517
                )
×
2518
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2519

×
2520
                nMsg.err <- err
×
2521
                return nil, false
×
2522
        }
×
2523

2524
        // If the advertised inclusionary block is beyond our knowledge of the
2525
        // chain tip, then we'll ignore it for now.
2526
        d.Lock()
3✔
2527
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
3✔
UNCOV
2528
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
×
UNCOV
2529
                        "advertises height %v, only height %v is known",
×
UNCOV
2530
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
×
UNCOV
2531
                d.Unlock()
×
UNCOV
2532
                nMsg.err <- nil
×
UNCOV
2533
                return nil, false
×
UNCOV
2534
        }
×
2535
        d.Unlock()
3✔
2536

3✔
2537
        // At this point, we'll now ask the router if this is a zombie/known
3✔
2538
        // edge. If so we can skip all the processing below.
3✔
2539
        if d.cfg.Graph.IsKnownEdge(scid) {
6✔
2540
                nMsg.err <- nil
3✔
2541
                return nil, true
3✔
2542
        }
3✔
2543

2544
        // Check if the channel is already closed in which case we can ignore
2545
        // it.
2546
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
3✔
2547
        if err != nil {
3✔
2548
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2549
                        err)
×
2550
                nMsg.err <- err
×
2551

×
2552
                return nil, false
×
2553
        }
×
2554

2555
        if closed {
3✔
UNCOV
2556
                err = fmt.Errorf("ignoring closed channel %v", scid)
×
UNCOV
2557
                log.Error(err)
×
UNCOV
2558

×
UNCOV
2559
                // If this is an announcement from us, we'll just ignore it.
×
UNCOV
2560
                if !nMsg.isRemote {
×
2561
                        nMsg.err <- err
×
2562
                        return nil, false
×
2563
                }
×
2564

2565
                // Increment the peer's ban score if they are sending closed
2566
                // channel announcements.
UNCOV
2567
                d.banman.incrementBanScore(nMsg.peer.PubKey())
×
UNCOV
2568

×
UNCOV
2569
                // If the peer is banned and not a channel peer, we'll
×
UNCOV
2570
                // disconnect them.
×
UNCOV
2571
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
×
UNCOV
2572
                if dcErr != nil {
×
2573
                        log.Errorf("failed to check if we should disconnect "+
×
2574
                                "peer: %v", dcErr)
×
2575
                        nMsg.err <- dcErr
×
2576

×
2577
                        return nil, false
×
2578
                }
×
2579

UNCOV
2580
                if shouldDc {
×
2581
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2582
                }
×
2583

UNCOV
2584
                nMsg.err <- err
×
UNCOV
2585

×
UNCOV
2586
                return nil, false
×
2587
        }
2588

2589
        // If this is a remote channel announcement, then we'll validate all
2590
        // the signatures within the proof as it should be well formed.
2591
        var proof *models.ChannelAuthProof
3✔
2592
        if nMsg.isRemote {
6✔
2593
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
3✔
2594
                if err != nil {
3✔
2595
                        err := fmt.Errorf("unable to validate announcement: "+
×
2596
                                "%v", err)
×
2597

×
2598
                        key := newRejectCacheKey(
×
2599
                                scid.ToUint64(),
×
2600
                                sourceToPub(nMsg.source),
×
2601
                        )
×
2602
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2603

×
2604
                        log.Error(err)
×
2605
                        nMsg.err <- err
×
2606
                        return nil, false
×
2607
                }
×
2608

2609
                // If the proof checks out, then we'll save the proof itself to
2610
                // the database so we can fetch it later when gossiping with
2611
                // other nodes.
2612
                proof = &models.ChannelAuthProof{
3✔
2613
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
3✔
2614
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
3✔
2615
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
3✔
2616
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
3✔
2617
                }
3✔
2618
        }
2619

2620
        // With the proof validated (if necessary), we can now store it within
2621
        // the database for our path finding and syncing needs.
2622
        var featureBuf bytes.Buffer
3✔
2623
        if err := ann.Features.Encode(&featureBuf); err != nil {
3✔
2624
                log.Errorf("unable to encode features: %v", err)
×
2625
                nMsg.err <- err
×
2626
                return nil, false
×
2627
        }
×
2628

2629
        edge := &models.ChannelEdgeInfo{
3✔
2630
                ChannelID:        scid.ToUint64(),
3✔
2631
                ChainHash:        ann.ChainHash,
3✔
2632
                NodeKey1Bytes:    ann.NodeID1,
3✔
2633
                NodeKey2Bytes:    ann.NodeID2,
3✔
2634
                BitcoinKey1Bytes: ann.BitcoinKey1,
3✔
2635
                BitcoinKey2Bytes: ann.BitcoinKey2,
3✔
2636
                AuthProof:        proof,
3✔
2637
                Features:         featureBuf.Bytes(),
3✔
2638
                ExtraOpaqueData:  ann.ExtraOpaqueData,
3✔
2639
        }
3✔
2640

3✔
2641
        // If there were any optional message fields provided, we'll include
3✔
2642
        // them in its serialized disk representation now.
3✔
2643
        var tapscriptRoot fn.Option[chainhash.Hash]
3✔
2644
        if nMsg.optionalMsgFields != nil {
6✔
2645
                if nMsg.optionalMsgFields.capacity != nil {
6✔
2646
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
3✔
2647
                }
3✔
2648
                if nMsg.optionalMsgFields.channelPoint != nil {
6✔
2649
                        cp := *nMsg.optionalMsgFields.channelPoint
3✔
2650
                        edge.ChannelPoint = cp
3✔
2651
                }
3✔
2652

2653
                // Optional tapscript root for custom channels.
2654
                tapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
3✔
2655
        }
2656

2657
        // Before we start validation or add the edge to the database, we obtain
2658
        // the mutex for this channel ID. We do this to ensure no other
2659
        // goroutine has read the database and is now making decisions based on
2660
        // this DB state, before it writes to the DB. It also ensures that we
2661
        // don't perform the expensive validation check on the same channel
2662
        // announcement at the same time.
2663
        d.channelMtx.Lock(scid.ToUint64())
3✔
2664

3✔
2665
        // If AssumeChannelValid is present, then we are unable to perform any
3✔
2666
        // of the expensive checks below, so we'll short-circuit our path
3✔
2667
        // straight to adding the edge to our graph. If the passed
3✔
2668
        // ShortChannelID is an alias, then we'll skip validation as it will
3✔
2669
        // not map to a legitimate tx. This is not a DoS vector as only we can
3✔
2670
        // add an alias ChannelAnnouncement from the gossiper.
3✔
2671
        if !(d.cfg.AssumeChannelValid || d.cfg.IsAlias(scid)) { //nolint:nestif
6✔
2672
                op, capacity, script, err := d.validateFundingTransaction(
3✔
2673
                        ann, tapscriptRoot,
3✔
2674
                )
3✔
2675
                if err != nil {
3✔
UNCOV
2676
                        defer d.channelMtx.Unlock(scid.ToUint64())
×
UNCOV
2677

×
UNCOV
2678
                        switch {
×
2679
                        case errors.Is(err, ErrNoFundingTransaction),
UNCOV
2680
                                errors.Is(err, ErrInvalidFundingOutput):
×
UNCOV
2681

×
UNCOV
2682
                                key := newRejectCacheKey(
×
UNCOV
2683
                                        scid.ToUint64(),
×
UNCOV
2684
                                        sourceToPub(nMsg.source),
×
UNCOV
2685
                                )
×
UNCOV
2686
                                _, _ = d.recentRejects.Put(
×
UNCOV
2687
                                        key, &cachedReject{},
×
UNCOV
2688
                                )
×
UNCOV
2689

×
UNCOV
2690
                                // Increment the peer's ban score. We check
×
UNCOV
2691
                                // isRemote so we don't actually ban the peer in
×
UNCOV
2692
                                // case of a local bug.
×
UNCOV
2693
                                if nMsg.isRemote {
×
UNCOV
2694
                                        d.banman.incrementBanScore(
×
UNCOV
2695
                                                nMsg.peer.PubKey(),
×
UNCOV
2696
                                        )
×
UNCOV
2697
                                }
×
2698

UNCOV
2699
                        case errors.Is(err, ErrChannelSpent):
×
UNCOV
2700
                                key := newRejectCacheKey(
×
UNCOV
2701
                                        scid.ToUint64(),
×
UNCOV
2702
                                        sourceToPub(nMsg.source),
×
UNCOV
2703
                                )
×
UNCOV
2704
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2705

×
UNCOV
2706
                                // Since this channel has already been closed,
×
UNCOV
2707
                                // we'll add it to the graph's closed channel
×
UNCOV
2708
                                // index such that we won't attempt to do
×
UNCOV
2709
                                // expensive validation checks on it again.
×
UNCOV
2710
                                // TODO: Populate the ScidCloser by using closed
×
UNCOV
2711
                                // channel notifications.
×
UNCOV
2712
                                dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
×
UNCOV
2713
                                if dbErr != nil {
×
2714
                                        log.Errorf("failed to mark scid(%v) "+
×
2715
                                                "as closed: %v", scid, dbErr)
×
2716

×
2717
                                        nMsg.err <- dbErr
×
2718

×
2719
                                        return nil, false
×
2720
                                }
×
2721

2722
                                // Increment the peer's ban score. We check
2723
                                // isRemote so we don't accidentally ban
2724
                                // ourselves in case of a bug.
UNCOV
2725
                                if nMsg.isRemote {
×
UNCOV
2726
                                        d.banman.incrementBanScore(
×
UNCOV
2727
                                                nMsg.peer.PubKey(),
×
UNCOV
2728
                                        )
×
UNCOV
2729
                                }
×
2730

2731
                        default:
×
2732
                                // Otherwise, this is just a regular rejected
×
2733
                                // edge.
×
2734
                                key := newRejectCacheKey(
×
2735
                                        scid.ToUint64(),
×
2736
                                        sourceToPub(nMsg.source),
×
2737
                                )
×
2738
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2739
                        }
2740

UNCOV
2741
                        if !nMsg.isRemote {
×
2742
                                log.Errorf("failed to add edge for local "+
×
2743
                                        "channel: %v", err)
×
2744
                                nMsg.err <- err
×
2745

×
2746
                                return nil, false
×
2747
                        }
×
2748

UNCOV
2749
                        shouldDc, dcErr := d.ShouldDisconnect(
×
UNCOV
2750
                                nMsg.peer.IdentityKey(),
×
UNCOV
2751
                        )
×
UNCOV
2752
                        if dcErr != nil {
×
2753
                                log.Errorf("failed to check if we should "+
×
2754
                                        "disconnect peer: %v", dcErr)
×
2755
                                nMsg.err <- dcErr
×
2756

×
2757
                                return nil, false
×
2758
                        }
×
2759

UNCOV
2760
                        if shouldDc {
×
UNCOV
2761
                                nMsg.peer.Disconnect(ErrPeerBanned)
×
UNCOV
2762
                        }
×
2763

UNCOV
2764
                        nMsg.err <- err
×
UNCOV
2765

×
UNCOV
2766
                        return nil, false
×
2767
                }
2768

2769
                edge.FundingScript = fn.Some(script)
3✔
2770

3✔
2771
                // TODO(roasbeef): this is a hack, needs to be removed after
3✔
2772
                //  commitment fees are dynamic.
3✔
2773
                edge.Capacity = capacity
3✔
2774
                edge.ChannelPoint = op
3✔
2775
        }
2776

2777
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
3✔
2778

3✔
2779
        // We will add the edge to the channel router. If the nodes present in
3✔
2780
        // this channel are not present in the database, a partial node will be
3✔
2781
        // added to represent each node while we wait for a node announcement.
3✔
2782
        err = d.cfg.Graph.AddEdge(edge, ops...)
3✔
2783
        if err != nil {
6✔
2784
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
3✔
2785
                        scid.ToUint64(), err)
3✔
2786

3✔
2787
                defer d.channelMtx.Unlock(scid.ToUint64())
3✔
2788

3✔
2789
                // If the edge was rejected due to already being known, then it
3✔
2790
                // may be the case that this new message has a fresh channel
3✔
2791
                // proof, so we'll check.
3✔
2792
                if graph.IsError(err, graph.ErrIgnored) {
6✔
2793
                        // Attempt to process the rejected message to see if we
3✔
2794
                        // get any new announcements.
3✔
2795
                        anns, rErr := d.processRejectedEdge(ann, proof)
3✔
2796
                        if rErr != nil {
3✔
2797
                                key := newRejectCacheKey(
×
2798
                                        scid.ToUint64(),
×
2799
                                        sourceToPub(nMsg.source),
×
2800
                                )
×
2801
                                cr := &cachedReject{}
×
2802
                                _, _ = d.recentRejects.Put(key, cr)
×
2803

×
2804
                                nMsg.err <- rErr
×
2805

×
2806
                                return nil, false
×
2807
                        }
×
2808

2809
                        log.Debugf("Extracted %v announcements from rejected "+
3✔
2810
                                "msgs", len(anns))
3✔
2811

3✔
2812
                        // If while processing this rejected edge, we realized
3✔
2813
                        // there's a set of announcements we could extract,
3✔
2814
                        // then we'll return those directly.
3✔
2815
                        //
3✔
2816
                        // NOTE: since this is an ErrIgnored, we can return
3✔
2817
                        // true here to signal "allow" to its dependants.
3✔
2818
                        nMsg.err <- nil
3✔
2819

3✔
2820
                        return anns, true
3✔
2821
                }
2822

2823
                // Otherwise, this is just a regular rejected edge.
UNCOV
2824
                key := newRejectCacheKey(
×
UNCOV
2825
                        scid.ToUint64(),
×
UNCOV
2826
                        sourceToPub(nMsg.source),
×
UNCOV
2827
                )
×
UNCOV
2828
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2829

×
UNCOV
2830
                if !nMsg.isRemote {
×
2831
                        log.Errorf("failed to add edge for local channel: %v",
×
2832
                                err)
×
2833
                        nMsg.err <- err
×
2834

×
2835
                        return nil, false
×
2836
                }
×
2837

UNCOV
2838
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
×
UNCOV
2839
                if dcErr != nil {
×
2840
                        log.Errorf("failed to check if we should disconnect "+
×
2841
                                "peer: %v", dcErr)
×
2842
                        nMsg.err <- dcErr
×
2843

×
2844
                        return nil, false
×
2845
                }
×
2846

UNCOV
2847
                if shouldDc {
×
2848
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2849
                }
×
2850

UNCOV
2851
                nMsg.err <- err
×
UNCOV
2852

×
UNCOV
2853
                return nil, false
×
2854
        }
2855

2856
        // If err is nil, release the lock immediately.
2857
        d.channelMtx.Unlock(scid.ToUint64())
3✔
2858

3✔
2859
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
3✔
2860

3✔
2861
        // If we earlier received any ChannelUpdates for this channel, we can
3✔
2862
        // now process them, as the channel is added to the graph.
3✔
2863
        var channelUpdates []*processedNetworkMsg
3✔
2864

3✔
2865
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
3✔
2866
        if err == nil {
6✔
2867
                // There was actually an entry in the map, so we'll accumulate
3✔
2868
                // it. We don't worry about deletion, since it'll eventually
3✔
2869
                // fall out anyway.
3✔
2870
                chanMsgs := earlyChanUpdates
3✔
2871
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
3✔
2872
        }
3✔
2873

2874
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2875
        // ensure we don't block here, as we can handle only one announcement
2876
        // at a time.
2877
        for _, cu := range channelUpdates {
6✔
2878
                // Skip if already processed.
3✔
2879
                if cu.processed {
5✔
2880
                        continue
2✔
2881
                }
2882

2883
                // Mark the ChannelUpdate as processed. This ensures that a
2884
                // subsequent announcement in the option-scid-alias case does
2885
                // not re-use an old ChannelUpdate.
2886
                cu.processed = true
3✔
2887

3✔
2888
                d.wg.Add(1)
3✔
2889
                go func(updMsg *networkMsg) {
6✔
2890
                        defer d.wg.Done()
3✔
2891

3✔
2892
                        switch msg := updMsg.msg.(type) {
3✔
2893
                        // Reprocess the message, making sure we return an
2894
                        // error to the original caller in case the gossiper
2895
                        // shuts down.
2896
                        case *lnwire.ChannelUpdate1:
3✔
2897
                                log.Debugf("Reprocessing ChannelUpdate for "+
3✔
2898
                                        "shortChanID=%v", scid.ToUint64())
3✔
2899

3✔
2900
                                select {
3✔
2901
                                case d.networkMsgs <- updMsg:
3✔
2902
                                case <-d.quit:
×
2903
                                        updMsg.err <- ErrGossiperShuttingDown
×
2904
                                }
2905

2906
                        // We don't expect any other message type than
2907
                        // ChannelUpdate to be in this cache.
2908
                        default:
×
2909
                                log.Errorf("Unsupported message type found "+
×
2910
                                        "among ChannelUpdates: %T", msg)
×
2911
                        }
2912
                }(cu.msg)
2913
        }
2914

2915
        // Channel announcement was successfully processed and now it might be
2916
        // broadcast to other connected nodes if it was an announcement with
2917
        // proof (remote).
2918
        var announcements []networkMsg
3✔
2919

3✔
2920
        if proof != nil {
6✔
2921
                announcements = append(announcements, networkMsg{
3✔
2922
                        peer:     nMsg.peer,
3✔
2923
                        isRemote: nMsg.isRemote,
3✔
2924
                        source:   nMsg.source,
3✔
2925
                        msg:      ann,
3✔
2926
                })
3✔
2927
        }
3✔
2928

2929
        nMsg.err <- nil
3✔
2930

3✔
2931
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
3✔
2932
                nMsg.peer, scid.ToUint64())
3✔
2933

3✔
2934
        return announcements, true
3✔
2935
}
2936

2937
// handleChanUpdate processes a new channel update.
2938
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
2939
        upd *lnwire.ChannelUpdate1,
2940
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
3✔
2941

3✔
2942
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
3✔
2943
                nMsg.peer, upd.ShortChannelID.ToUint64())
3✔
2944

3✔
2945
        // We'll ignore any channel updates that target any chain other than
3✔
2946
        // the set of chains we know of.
3✔
2947
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
3✔
2948
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2949
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
×
2950
                log.Errorf(err.Error())
×
2951

×
2952
                key := newRejectCacheKey(
×
2953
                        upd.ShortChannelID.ToUint64(),
×
2954
                        sourceToPub(nMsg.source),
×
2955
                )
×
2956
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2957

×
2958
                nMsg.err <- err
×
2959
                return nil, false
×
2960
        }
×
2961

2962
        blockHeight := upd.ShortChannelID.BlockHeight
3✔
2963
        shortChanID := upd.ShortChannelID.ToUint64()
3✔
2964

3✔
2965
        // If the advertised inclusionary block is beyond our knowledge of the
3✔
2966
        // chain tip, then we'll put the announcement in limbo to be fully
3✔
2967
        // verified once we advance forward in the chain. If the update has an
3✔
2968
        // alias SCID, we'll skip the isPremature check. This is necessary
3✔
2969
        // since aliases start at block height 16_000_000.
3✔
2970
        d.Lock()
3✔
2971
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
3✔
2972
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
4✔
2973

1✔
2974
                log.Warnf("Update announcement for short_chan_id(%v), is "+
1✔
2975
                        "premature: advertises height %v, only height %v is "+
1✔
2976
                        "known", shortChanID, blockHeight, d.bestHeight)
1✔
2977
                d.Unlock()
1✔
2978
                nMsg.err <- nil
1✔
2979
                return nil, false
1✔
2980
        }
1✔
2981
        d.Unlock()
3✔
2982

3✔
2983
        // Before we perform any of the expensive checks below, we'll check
3✔
2984
        // whether this update is stale or is for a zombie channel in order to
3✔
2985
        // quickly reject it.
3✔
2986
        timestamp := time.Unix(int64(upd.Timestamp), 0)
3✔
2987

3✔
2988
        // Fetch the SCID we should be using to lock the channelMtx and make
3✔
2989
        // graph queries with.
3✔
2990
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
3✔
2991
        if err != nil {
6✔
2992
                // Fallback and set the graphScid to the peer-provided SCID.
3✔
2993
                // This will occur for non-option-scid-alias channels and for
3✔
2994
                // public option-scid-alias channels after 6 confirmations.
3✔
2995
                // Once public option-scid-alias channels have 6 confs, we'll
3✔
2996
                // ignore ChannelUpdates with one of their aliases.
3✔
2997
                graphScid = upd.ShortChannelID
3✔
2998
        }
3✔
2999

3000
        if d.cfg.Graph.IsStaleEdgePolicy(
3✔
3001
                graphScid, timestamp, upd.ChannelFlags,
3✔
3002
        ) {
6✔
3003

3✔
3004
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
3✔
3005
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
3✔
3006
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
3✔
3007
                )
3✔
3008

3✔
3009
                nMsg.err <- nil
3✔
3010
                return nil, true
3✔
3011
        }
3✔
3012

3013
        // Check that the ChanUpdate is not too far into the future, this could
3014
        // reveal some faulty implementation therefore we log an error.
3015
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
3✔
3016
                log.Errorf("Skewed timestamp (%v) for edge policy of "+
×
3017
                        "short_chan_id(%v), timestamp too far in the future: "+
×
3018
                        "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
×
3019
                        shortChanID, nMsg.peer, nMsg.msg.MsgType(),
×
3020
                        nMsg.isRemote,
×
3021
                )
×
3022

×
3023
                nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
×
3024
                        "timestamp too far in the future: %v", timestamp.Unix())
×
3025

×
3026
                return nil, false
×
3027
        }
×
3028

3029
        // Get the node pub key as far since we don't have it in the channel
3030
        // update announcement message. We'll need this to properly verify the
3031
        // message's signature.
3032
        //
3033
        // We make sure to obtain the mutex for this channel ID before we
3034
        // access the database. This ensures the state we read from the
3035
        // database has not changed between this point and when we call
3036
        // UpdateEdge() later.
3037
        d.channelMtx.Lock(graphScid.ToUint64())
3✔
3038
        defer d.channelMtx.Unlock(graphScid.ToUint64())
3✔
3039

3✔
3040
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
3✔
3041
        switch {
3✔
3042
        // No error, break.
3043
        case err == nil:
3✔
3044
                break
3✔
3045

UNCOV
3046
        case errors.Is(err, graphdb.ErrZombieEdge):
×
UNCOV
3047
                err = d.processZombieUpdate(chanInfo, graphScid, upd)
×
UNCOV
3048
                if err != nil {
×
UNCOV
3049
                        log.Debug(err)
×
UNCOV
3050
                        nMsg.err <- err
×
UNCOV
3051
                        return nil, false
×
UNCOV
3052
                }
×
3053

3054
                // We'll fallthrough to ensure we stash the update until we
3055
                // receive its corresponding ChannelAnnouncement. This is
3056
                // needed to ensure the edge exists in the graph before
3057
                // applying the update.
UNCOV
3058
                fallthrough
×
UNCOV
3059
        case errors.Is(err, graphdb.ErrGraphNotFound):
×
UNCOV
3060
                fallthrough
×
UNCOV
3061
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
×
UNCOV
3062
                fallthrough
×
3063
        case errors.Is(err, graphdb.ErrEdgeNotFound):
3✔
3064
                // If the edge corresponding to this ChannelUpdate was not
3✔
3065
                // found in the graph, this might be a channel in the process
3✔
3066
                // of being opened, and we haven't processed our own
3✔
3067
                // ChannelAnnouncement yet, hence it is not not found in the
3✔
3068
                // graph. This usually gets resolved after the channel proofs
3✔
3069
                // are exchanged and the channel is broadcasted to the rest of
3✔
3070
                // the network, but in case this is a private channel this
3✔
3071
                // won't ever happen. This can also happen in the case of a
3✔
3072
                // zombie channel with a fresh update for which we don't have a
3✔
3073
                // ChannelAnnouncement for since we reject them. Because of
3✔
3074
                // this, we temporarily add it to a map, and reprocess it after
3✔
3075
                // our own ChannelAnnouncement has been processed.
3✔
3076
                //
3✔
3077
                // The shortChanID may be an alias, but it is fine to use here
3✔
3078
                // since we don't have an edge in the graph and if the peer is
3✔
3079
                // not buggy, we should be able to use it once the gossiper
3✔
3080
                // receives the local announcement.
3✔
3081
                pMsg := &processedNetworkMsg{msg: nMsg}
3✔
3082

3✔
3083
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
3✔
3084
                switch {
3✔
3085
                // Nothing in the cache yet, we can just directly insert this
3086
                // element.
3087
                case err == cache.ErrElementNotFound:
3✔
3088
                        _, _ = d.prematureChannelUpdates.Put(
3✔
3089
                                shortChanID, &cachedNetworkMsg{
3✔
3090
                                        msgs: []*processedNetworkMsg{pMsg},
3✔
3091
                                })
3✔
3092

3093
                // There's already something in the cache, so we'll combine the
3094
                // set of messages into a single value.
3095
                default:
3✔
3096
                        msgs := earlyMsgs.msgs
3✔
3097
                        msgs = append(msgs, pMsg)
3✔
3098
                        _, _ = d.prematureChannelUpdates.Put(
3✔
3099
                                shortChanID, &cachedNetworkMsg{
3✔
3100
                                        msgs: msgs,
3✔
3101
                                })
3✔
3102
                }
3103

3104
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
3✔
3105
                        "(shortChanID=%v), saving for reprocessing later",
3✔
3106
                        shortChanID)
3✔
3107

3✔
3108
                // NOTE: We don't return anything on the error channel for this
3✔
3109
                // message, as we expect that will be done when this
3✔
3110
                // ChannelUpdate is later reprocessed.
3✔
3111
                return nil, false
3✔
3112

3113
        default:
×
3114
                err := fmt.Errorf("unable to validate channel update "+
×
3115
                        "short_chan_id=%v: %v", shortChanID, err)
×
3116
                log.Error(err)
×
3117
                nMsg.err <- err
×
3118

×
3119
                key := newRejectCacheKey(
×
3120
                        upd.ShortChannelID.ToUint64(),
×
3121
                        sourceToPub(nMsg.source),
×
3122
                )
×
3123
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3124

×
3125
                return nil, false
×
3126
        }
3127

3128
        // The least-significant bit in the flag on the channel update
3129
        // announcement tells us "which" side of the channels directed edge is
3130
        // being updated.
3131
        var (
3✔
3132
                pubKey       *btcec.PublicKey
3✔
3133
                edgeToUpdate *models.ChannelEdgePolicy
3✔
3134
        )
3✔
3135
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
3✔
3136
        switch direction {
3✔
3137
        case 0:
3✔
3138
                pubKey, _ = chanInfo.NodeKey1()
3✔
3139
                edgeToUpdate = e1
3✔
3140
        case 1:
3✔
3141
                pubKey, _ = chanInfo.NodeKey2()
3✔
3142
                edgeToUpdate = e2
3✔
3143
        }
3144

3145
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
3✔
3146
                "edge policy=%v", chanInfo.ChannelID,
3✔
3147
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
3✔
3148

3✔
3149
        // Validate the channel announcement with the expected public key and
3✔
3150
        // channel capacity. In the case of an invalid channel update, we'll
3✔
3151
        // return an error to the caller and exit early.
3✔
3152
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
3✔
3153
        if err != nil {
3✔
UNCOV
3154
                rErr := fmt.Errorf("unable to validate channel update "+
×
UNCOV
3155
                        "announcement for short_chan_id=%v: %v",
×
UNCOV
3156
                        spew.Sdump(upd.ShortChannelID), err)
×
UNCOV
3157

×
UNCOV
3158
                log.Error(rErr)
×
UNCOV
3159
                nMsg.err <- rErr
×
UNCOV
3160
                return nil, false
×
UNCOV
3161
        }
×
3162

3163
        // If we have a previous version of the edge being updated, we'll want
3164
        // to rate limit its updates to prevent spam throughout the network.
3165
        if nMsg.isRemote && edgeToUpdate != nil {
6✔
3166
                // If it's a keep-alive update, we'll only propagate one if
3✔
3167
                // it's been a day since the previous. This follows our own
3✔
3168
                // heuristic of sending keep-alive updates after the same
3✔
3169
                // duration (see retransmitStaleAnns).
3✔
3170
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
3✔
3171
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
6✔
3172
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
6✔
3173
                                log.Debugf("Ignoring keep alive update not "+
3✔
3174
                                        "within %v period for channel %v",
3✔
3175
                                        d.cfg.RebroadcastInterval, shortChanID)
3✔
3176
                                nMsg.err <- nil
3✔
3177
                                return nil, false
3✔
3178
                        }
3✔
3179
                } else {
3✔
3180
                        // If it's not, we'll allow an update per minute with a
3✔
3181
                        // maximum burst of 10. If we haven't seen an update
3✔
3182
                        // for this channel before, we'll need to initialize a
3✔
3183
                        // rate limiter for each direction.
3✔
3184
                        //
3✔
3185
                        // Since the edge exists in the graph, we'll create a
3✔
3186
                        // rate limiter for chanInfo.ChannelID rather then the
3✔
3187
                        // SCID the peer sent. This is because there may be
3✔
3188
                        // multiple aliases for a channel and we may otherwise
3✔
3189
                        // rate-limit only a single alias of the channel,
3✔
3190
                        // instead of the whole channel.
3✔
3191
                        baseScid := chanInfo.ChannelID
3✔
3192
                        d.Lock()
3✔
3193
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
3✔
3194
                        if !ok {
6✔
3195
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
3✔
3196
                                b := d.cfg.MaxChannelUpdateBurst
3✔
3197
                                rls = [2]*rate.Limiter{
3✔
3198
                                        rate.NewLimiter(r, b),
3✔
3199
                                        rate.NewLimiter(r, b),
3✔
3200
                                }
3✔
3201
                                d.chanUpdateRateLimiter[baseScid] = rls
3✔
3202
                        }
3✔
3203
                        d.Unlock()
3✔
3204

3✔
3205
                        if !rls[direction].Allow() {
6✔
3206
                                log.Debugf("Rate limiting update for channel "+
3✔
3207
                                        "%v from direction %x", shortChanID,
3✔
3208
                                        pubKey.SerializeCompressed())
3✔
3209
                                nMsg.err <- nil
3✔
3210
                                return nil, false
3✔
3211
                        }
3✔
3212
                }
3213
        }
3214

3215
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3216
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3217
        // lookup the stored SCID. If we're sending the update, we'll always
3218
        // use the SCID stored in the database rather than a potentially
3219
        // different alias. This might mean that SigBytes is incorrect as it
3220
        // signs a different SCID than the database SCID, but since there will
3221
        // only be a difference if AuthProof == nil, this is fine.
3222
        update := &models.ChannelEdgePolicy{
3✔
3223
                SigBytes:                  upd.Signature.ToSignatureBytes(),
3✔
3224
                ChannelID:                 chanInfo.ChannelID,
3✔
3225
                LastUpdate:                timestamp,
3✔
3226
                MessageFlags:              upd.MessageFlags,
3✔
3227
                ChannelFlags:              upd.ChannelFlags,
3✔
3228
                TimeLockDelta:             upd.TimeLockDelta,
3✔
3229
                MinHTLC:                   upd.HtlcMinimumMsat,
3✔
3230
                MaxHTLC:                   upd.HtlcMaximumMsat,
3✔
3231
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
3✔
3232
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
3✔
3233
                ExtraOpaqueData:           upd.ExtraOpaqueData,
3✔
3234
        }
3✔
3235

3✔
3236
        if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
6✔
3237
                if graph.IsError(
3✔
3238
                        err, graph.ErrOutdated,
3✔
3239
                        graph.ErrIgnored,
3✔
3240
                ) {
6✔
3241

3✔
3242
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
3✔
3243
                                shortChanID, err)
3✔
3244
                } else {
3✔
3245
                        // Since we know the stored SCID in the graph, we'll
×
3246
                        // cache that SCID.
×
3247
                        key := newRejectCacheKey(
×
3248
                                chanInfo.ChannelID,
×
3249
                                sourceToPub(nMsg.source),
×
3250
                        )
×
3251
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3252

×
3253
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3254
                                shortChanID, err)
×
3255
                }
×
3256

3257
                nMsg.err <- err
3✔
3258
                return nil, false
3✔
3259
        }
3260

3261
        // If this is a local ChannelUpdate without an AuthProof, it means it
3262
        // is an update to a channel that is not (yet) supposed to be announced
3263
        // to the greater network. However, our channel counter party will need
3264
        // to be given the update, so we'll try sending the update directly to
3265
        // the remote peer.
3266
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
6✔
3267
                if nMsg.optionalMsgFields != nil {
6✔
3268
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
3✔
3269
                        if remoteAlias != nil {
6✔
3270
                                // The remoteAlias field was specified, meaning
3✔
3271
                                // that we should replace the SCID in the
3✔
3272
                                // update with the remote's alias. We'll also
3✔
3273
                                // need to re-sign the channel update. This is
3✔
3274
                                // required for option-scid-alias feature-bit
3✔
3275
                                // negotiated channels.
3✔
3276
                                upd.ShortChannelID = *remoteAlias
3✔
3277

3✔
3278
                                sig, err := d.cfg.SignAliasUpdate(upd)
3✔
3279
                                if err != nil {
3✔
3280
                                        log.Error(err)
×
3281
                                        nMsg.err <- err
×
3282
                                        return nil, false
×
3283
                                }
×
3284

3285
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
3286
                                if err != nil {
3✔
3287
                                        log.Error(err)
×
3288
                                        nMsg.err <- err
×
3289
                                        return nil, false
×
3290
                                }
×
3291

3292
                                upd.Signature = lnSig
3✔
3293
                        }
3294
                }
3295

3296
                // Get our peer's public key.
3297
                remotePubKey := remotePubFromChanInfo(
3✔
3298
                        chanInfo, upd.ChannelFlags,
3✔
3299
                )
3✔
3300

3✔
3301
                log.Debugf("The message %v has no AuthProof, sending the "+
3✔
3302
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
3✔
3303

3✔
3304
                // Now we'll attempt to send the channel update message
3✔
3305
                // reliably to the remote peer in the background, so that we
3✔
3306
                // don't block if the peer happens to be offline at the moment.
3✔
3307
                err := d.reliableSender.sendMessage(upd, remotePubKey)
3✔
3308
                if err != nil {
3✔
3309
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3310
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3311
                                upd.ShortChannelID, remotePubKey, err)
×
3312
                        nMsg.err <- err
×
3313
                        return nil, false
×
3314
                }
×
3315
        }
3316

3317
        // Channel update announcement was successfully processed and now it
3318
        // can be broadcast to the rest of the network. However, we'll only
3319
        // broadcast the channel update announcement if it has an attached
3320
        // authentication proof. We also won't broadcast the update if it
3321
        // contains an alias because the network would reject this.
3322
        var announcements []networkMsg
3✔
3323
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
6✔
3324
                announcements = append(announcements, networkMsg{
3✔
3325
                        peer:     nMsg.peer,
3✔
3326
                        source:   nMsg.source,
3✔
3327
                        isRemote: nMsg.isRemote,
3✔
3328
                        msg:      upd,
3✔
3329
                })
3✔
3330
        }
3✔
3331

3332
        nMsg.err <- nil
3✔
3333

3✔
3334
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
3✔
3335
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
3✔
3336
                timestamp)
3✔
3337
        return announcements, true
3✔
3338
}
3339

3340
// handleAnnSig processes a new announcement signatures message.
3341
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
3342
        ann *lnwire.AnnounceSignatures1) ([]networkMsg, bool) {
3✔
3343

3✔
3344
        needBlockHeight := ann.ShortChannelID.BlockHeight +
3✔
3345
                d.cfg.ProofMatureDelta
3✔
3346
        shortChanID := ann.ShortChannelID.ToUint64()
3✔
3347

3✔
3348
        prefix := "local"
3✔
3349
        if nMsg.isRemote {
6✔
3350
                prefix = "remote"
3✔
3351
        }
3✔
3352

3353
        log.Infof("Received new %v announcement signature for %v", prefix,
3✔
3354
                ann.ShortChannelID)
3✔
3355

3✔
3356
        // By the specification, channel announcement proofs should be sent
3✔
3357
        // after some number of confirmations after channel was registered in
3✔
3358
        // bitcoin blockchain. Therefore, we check if the proof is mature.
3✔
3359
        d.Lock()
3✔
3360
        premature := d.isPremature(
3✔
3361
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
3✔
3362
        )
3✔
3363
        if premature {
6✔
3364
                log.Warnf("Premature proof announcement, current block height"+
3✔
3365
                        "lower than needed: %v < %v", d.bestHeight,
3✔
3366
                        needBlockHeight)
3✔
3367
                d.Unlock()
3✔
3368
                nMsg.err <- nil
3✔
3369
                return nil, false
3✔
3370
        }
3✔
3371
        d.Unlock()
3✔
3372

3✔
3373
        // Ensure that we know of a channel with the target channel ID before
3✔
3374
        // proceeding further.
3✔
3375
        //
3✔
3376
        // We must acquire the mutex for this channel ID before getting the
3✔
3377
        // channel from the database, to ensure what we read does not change
3✔
3378
        // before we call AddProof() later.
3✔
3379
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
3✔
3380
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
3✔
3381

3✔
3382
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
3✔
3383
                ann.ShortChannelID,
3✔
3384
        )
3✔
3385
        if err != nil {
6✔
3386
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
3✔
3387
                if err != nil {
6✔
3388
                        err := fmt.Errorf("unable to store the proof for "+
3✔
3389
                                "short_chan_id=%v: %v", shortChanID, err)
3✔
3390
                        log.Error(err)
3✔
3391
                        nMsg.err <- err
3✔
3392

3✔
3393
                        return nil, false
3✔
3394
                }
3✔
3395

3396
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
3✔
3397
                err := d.cfg.WaitingProofStore.Add(proof)
3✔
3398
                if err != nil {
3✔
3399
                        err := fmt.Errorf("unable to store the proof for "+
×
3400
                                "short_chan_id=%v: %v", shortChanID, err)
×
3401
                        log.Error(err)
×
3402
                        nMsg.err <- err
×
3403
                        return nil, false
×
3404
                }
×
3405

3406
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
3✔
3407
                        ", adding to waiting batch", prefix, shortChanID)
3✔
3408
                nMsg.err <- nil
3✔
3409
                return nil, false
3✔
3410
        }
3411

3412
        nodeID := nMsg.source.SerializeCompressed()
3✔
3413
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
3✔
3414
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
3✔
3415

3✔
3416
        // Ensure that channel that was retrieved belongs to the peer which
3✔
3417
        // sent the proof announcement.
3✔
3418
        if !(isFirstNode || isSecondNode) {
3✔
3419
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3420
                        "to the peer which sent the proof, short_chan_id=%v",
×
3421
                        shortChanID)
×
3422
                log.Error(err)
×
3423
                nMsg.err <- err
×
3424
                return nil, false
×
3425
        }
×
3426

3427
        // If proof was sent by a local sub-system, then we'll send the
3428
        // announcement signature to the remote node so they can also
3429
        // reconstruct the full channel announcement.
3430
        if !nMsg.isRemote {
6✔
3431
                var remotePubKey [33]byte
3✔
3432
                if isFirstNode {
6✔
3433
                        remotePubKey = chanInfo.NodeKey2Bytes
3✔
3434
                } else {
6✔
3435
                        remotePubKey = chanInfo.NodeKey1Bytes
3✔
3436
                }
3✔
3437

3438
                // Since the remote peer might not be online we'll call a
3439
                // method that will attempt to deliver the proof when it comes
3440
                // online.
3441
                err := d.reliableSender.sendMessage(ann, remotePubKey)
3✔
3442
                if err != nil {
3✔
3443
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3444
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3445
                                ann.ShortChannelID, remotePubKey, err)
×
3446
                        nMsg.err <- err
×
3447
                        return nil, false
×
3448
                }
×
3449
        }
3450

3451
        // Check if we already have the full proof for this channel.
3452
        if chanInfo.AuthProof != nil {
6✔
3453
                // If we already have the fully assembled proof, then the peer
3✔
3454
                // sending us their proof has probably not received our local
3✔
3455
                // proof yet. So be kind and send them the full proof.
3✔
3456
                if nMsg.isRemote {
6✔
3457
                        peerID := nMsg.source.SerializeCompressed()
3✔
3458
                        log.Debugf("Got AnnounceSignatures for channel with " +
3✔
3459
                                "full proof.")
3✔
3460

3✔
3461
                        d.wg.Add(1)
3✔
3462
                        go func() {
6✔
3463
                                defer d.wg.Done()
3✔
3464

3✔
3465
                                log.Debugf("Received half proof for channel "+
3✔
3466
                                        "%v with existing full proof. Sending"+
3✔
3467
                                        " full proof to peer=%x",
3✔
3468
                                        ann.ChannelID, peerID)
3✔
3469

3✔
3470
                                ca, _, _, err := netann.CreateChanAnnouncement(
3✔
3471
                                        chanInfo.AuthProof, chanInfo, e1, e2,
3✔
3472
                                )
3✔
3473
                                if err != nil {
3✔
3474
                                        log.Errorf("unable to gen ann: %v",
×
3475
                                                err)
×
3476
                                        return
×
3477
                                }
×
3478

3479
                                err = nMsg.peer.SendMessage(false, ca)
3✔
3480
                                if err != nil {
3✔
3481
                                        log.Errorf("Failed sending full proof"+
×
3482
                                                " to peer=%x: %v", peerID, err)
×
3483
                                        return
×
3484
                                }
×
3485

3486
                                log.Debugf("Full proof sent to peer=%x for "+
3✔
3487
                                        "chanID=%v", peerID, ann.ChannelID)
3✔
3488
                        }()
3489
                }
3490

3491
                log.Debugf("Already have proof for channel with chanID=%v",
3✔
3492
                        ann.ChannelID)
3✔
3493
                nMsg.err <- nil
3✔
3494
                return nil, true
3✔
3495
        }
3496

3497
        // Check that we received the opposite proof. If so, then we're now
3498
        // able to construct the full proof, and create the channel
3499
        // announcement. If we didn't receive the opposite half of the proof
3500
        // then we should store this one, and wait for the opposite to be
3501
        // received.
3502
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
3✔
3503
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
3✔
3504
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
3✔
3505
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3506
                        "short_chan_id=%v: %v", shortChanID, err)
×
3507
                log.Error(err)
×
3508
                nMsg.err <- err
×
3509
                return nil, false
×
3510
        }
×
3511

3512
        if err == channeldb.ErrWaitingProofNotFound {
6✔
3513
                err := d.cfg.WaitingProofStore.Add(proof)
3✔
3514
                if err != nil {
3✔
3515
                        err := fmt.Errorf("unable to store the proof for "+
×
3516
                                "short_chan_id=%v: %v", shortChanID, err)
×
3517
                        log.Error(err)
×
3518
                        nMsg.err <- err
×
3519
                        return nil, false
×
3520
                }
×
3521

3522
                log.Infof("1/2 of channel ann proof received for "+
3✔
3523
                        "short_chan_id=%v, waiting for other half",
3✔
3524
                        shortChanID)
3✔
3525

3✔
3526
                nMsg.err <- nil
3✔
3527
                return nil, false
3✔
3528
        }
3529

3530
        // We now have both halves of the channel announcement proof, then
3531
        // we'll reconstruct the initial announcement so we can validate it
3532
        // shortly below.
3533
        var dbProof models.ChannelAuthProof
3✔
3534
        if isFirstNode {
6✔
3535
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
3✔
3536
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
3✔
3537
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
3✔
3538
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
3✔
3539
        } else {
6✔
3540
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
3✔
3541
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
3✔
3542
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
3✔
3543
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
3✔
3544
        }
3✔
3545

3546
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
3✔
3547
                &dbProof, chanInfo, e1, e2,
3✔
3548
        )
3✔
3549
        if err != nil {
3✔
3550
                log.Error(err)
×
3551
                nMsg.err <- err
×
3552
                return nil, false
×
3553
        }
×
3554

3555
        // With all the necessary components assembled validate the full
3556
        // channel announcement proof.
3557
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
3✔
3558
        if err != nil {
3✔
3559
                err := fmt.Errorf("channel announcement proof for "+
×
3560
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3561

×
3562
                log.Error(err)
×
3563
                nMsg.err <- err
×
3564
                return nil, false
×
3565
        }
×
3566

3567
        // If the channel was returned by the router it means that existence of
3568
        // funding point and inclusion of nodes bitcoin keys in it already
3569
        // checked by the router. In this stage we should check that node keys
3570
        // attest to the bitcoin keys by validating the signatures of
3571
        // announcement. If proof is valid then we'll populate the channel edge
3572
        // with it, so we can announce it on peer connect.
3573
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
3✔
3574
        if err != nil {
3✔
3575
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3576
                        " %v", ann.ChannelID, err)
×
3577
                log.Error(err)
×
3578
                nMsg.err <- err
×
3579
                return nil, false
×
3580
        }
×
3581

3582
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
3✔
3583
        if err != nil {
3✔
3584
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3585
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3586
                log.Error(err)
×
3587
                nMsg.err <- err
×
3588
                return nil, false
×
3589
        }
×
3590

3591
        // Proof was successfully created and now can announce the channel to
3592
        // the remain network.
3593
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
3✔
3594
                ", adding to next ann batch", shortChanID)
3✔
3595

3✔
3596
        // Assemble the necessary announcements to add to the next broadcasting
3✔
3597
        // batch.
3✔
3598
        var announcements []networkMsg
3✔
3599
        announcements = append(announcements, networkMsg{
3✔
3600
                peer:   nMsg.peer,
3✔
3601
                source: nMsg.source,
3✔
3602
                msg:    chanAnn,
3✔
3603
        })
3✔
3604
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
6✔
3605
                announcements = append(announcements, networkMsg{
3✔
3606
                        peer:   nMsg.peer,
3✔
3607
                        source: src,
3✔
3608
                        msg:    e1Ann,
3✔
3609
                })
3✔
3610
        }
3✔
3611
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
6✔
3612
                announcements = append(announcements, networkMsg{
3✔
3613
                        peer:   nMsg.peer,
3✔
3614
                        source: src,
3✔
3615
                        msg:    e2Ann,
3✔
3616
                })
3✔
3617
        }
3✔
3618

3619
        // We'll also send along the node announcements for each channel
3620
        // participant if we know of them. To ensure our node announcement
3621
        // propagates to our channel counterparty, we'll set the source for
3622
        // each announcement to the node it belongs to, otherwise we won't send
3623
        // it since the source gets skipped. This isn't necessary for channel
3624
        // updates and announcement signatures since we send those directly to
3625
        // our channel counterparty through the gossiper's reliable sender.
3626
        node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
3✔
3627
        if err != nil {
6✔
3628
                log.Debugf("Unable to fetch node announcement for %x: %v",
3✔
3629
                        chanInfo.NodeKey1Bytes, err)
3✔
3630
        } else {
6✔
3631
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
6✔
3632
                        announcements = append(announcements, networkMsg{
3✔
3633
                                peer:   nMsg.peer,
3✔
3634
                                source: nodeKey1,
3✔
3635
                                msg:    node1Ann,
3✔
3636
                        })
3✔
3637
                }
3✔
3638
        }
3639

3640
        node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
3✔
3641
        if err != nil {
6✔
3642
                log.Debugf("Unable to fetch node announcement for %x: %v",
3✔
3643
                        chanInfo.NodeKey2Bytes, err)
3✔
3644
        } else {
6✔
3645
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
6✔
3646
                        announcements = append(announcements, networkMsg{
3✔
3647
                                peer:   nMsg.peer,
3✔
3648
                                source: nodeKey2,
3✔
3649
                                msg:    node2Ann,
3✔
3650
                        })
3✔
3651
                }
3✔
3652
        }
3653

3654
        nMsg.err <- nil
3✔
3655
        return announcements, true
3✔
3656
}
3657

3658
// isBanned returns true if the peer identified by pubkey is banned for sending
3659
// invalid channel announcements.
3660
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
3✔
3661
        return d.banman.isBanned(pubkey)
3✔
3662
}
3✔
3663

3664
// ShouldDisconnect returns true if we should disconnect the peer identified by
3665
// pubkey.
3666
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3667
        bool, error) {
3✔
3668

3✔
3669
        pubkeySer := pubkey.SerializeCompressed()
3✔
3670

3✔
3671
        var pubkeyBytes [33]byte
3✔
3672
        copy(pubkeyBytes[:], pubkeySer)
3✔
3673

3✔
3674
        // If the public key is banned, check whether or not this is a channel
3✔
3675
        // peer.
3✔
3676
        if d.isBanned(pubkeyBytes) {
3✔
UNCOV
3677
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
×
UNCOV
3678
                if err != nil {
×
3679
                        return false, err
×
3680
                }
×
3681

3682
                // We should only disconnect non-channel peers.
UNCOV
3683
                if !isChanPeer {
×
UNCOV
3684
                        return true, nil
×
UNCOV
3685
                }
×
3686
        }
3687

3688
        return false, nil
3✔
3689
}
3690

3691
// validateFundingTransaction fetches the channel announcements claimed funding
3692
// transaction from chain to ensure that it exists, is not spent and matches
3693
// the channel announcement proof. The transaction's outpoint and value are
3694
// returned if we can glean them from the work done in this method.
3695
func (d *AuthenticatedGossiper) validateFundingTransaction(
3696
        ann *lnwire.ChannelAnnouncement1,
3697
        tapscriptRoot fn.Option[chainhash.Hash]) (wire.OutPoint, btcutil.Amount,
3698
        []byte, error) {
3✔
3699

3✔
3700
        scid := ann.ShortChannelID
3✔
3701

3✔
3702
        // Before we can add the channel to the channel graph, we need to obtain
3✔
3703
        // the full funding outpoint that's encoded within the channel ID.
3✔
3704
        fundingTx, err := lnwallet.FetchFundingTxWrapper(
3✔
3705
                d.cfg.ChainIO, &scid, d.quit,
3✔
3706
        )
3✔
3707
        if err != nil {
3✔
UNCOV
3708
                //nolint:ll
×
UNCOV
3709
                //
×
UNCOV
3710
                // In order to ensure we don't erroneously mark a channel as a
×
UNCOV
3711
                // zombie due to an RPC failure, we'll attempt to string match
×
UNCOV
3712
                // for the relevant errors.
×
UNCOV
3713
                //
×
UNCOV
3714
                // * btcd:
×
UNCOV
3715
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
×
UNCOV
3716
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
×
UNCOV
3717
                // * bitcoind:
×
UNCOV
3718
                //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
×
UNCOV
3719
                //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
×
UNCOV
3720
                switch {
×
UNCOV
3721
                case strings.Contains(err.Error(), "not found"):
×
UNCOV
3722
                        fallthrough
×
3723

UNCOV
3724
                case strings.Contains(err.Error(), "out of range"):
×
UNCOV
3725
                        // If the funding transaction isn't found at all, then
×
UNCOV
3726
                        // we'll mark the edge itself as a zombie so we don't
×
UNCOV
3727
                        // continue to request it. We use the "zero key" for
×
UNCOV
3728
                        // both node pubkeys so this edge can't be resurrected.
×
UNCOV
3729
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
×
UNCOV
3730
                        if zErr != nil {
×
3731
                                return wire.OutPoint{}, 0, nil, zErr
×
3732
                        }
×
3733

3734
                default:
×
3735
                }
3736

UNCOV
3737
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
×
UNCOV
3738
                        ErrNoFundingTransaction, err)
×
3739
        }
3740

3741
        // Recreate witness output to be sure that declared in channel edge
3742
        // bitcoin keys and channel value corresponds to the reality.
3743
        fundingPkScript, err := makeFundingScript(
3✔
3744
                ann.BitcoinKey1[:], ann.BitcoinKey2[:], ann.Features,
3✔
3745
                tapscriptRoot,
3✔
3746
        )
3✔
3747
        if err != nil {
3✔
3748
                return wire.OutPoint{}, 0, nil, err
×
3749
        }
×
3750

3751
        // Next we'll validate that this channel is actually well formed. If
3752
        // this check fails, then this channel either doesn't exist, or isn't
3753
        // the one that was meant to be created according to the passed channel
3754
        // proofs.
3755
        fundingPoint, err := chanvalidate.Validate(
3✔
3756
                &chanvalidate.Context{
3✔
3757
                        Locator: &chanvalidate.ShortChanIDChanLocator{
3✔
3758
                                ID: scid,
3✔
3759
                        },
3✔
3760
                        MultiSigPkScript: fundingPkScript,
3✔
3761
                        FundingTx:        fundingTx,
3✔
3762
                },
3✔
3763
        )
3✔
3764
        if err != nil {
3✔
UNCOV
3765
                // Mark the edge as a zombie so we won't try to re-validate it
×
UNCOV
3766
                // on start up.
×
UNCOV
3767
                zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
×
UNCOV
3768
                if zErr != nil {
×
3769
                        return wire.OutPoint{}, 0, nil, zErr
×
3770
                }
×
3771

UNCOV
3772
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
×
UNCOV
3773
                        ErrInvalidFundingOutput, err)
×
3774
        }
3775

3776
        // Now that we have the funding outpoint of the channel, ensure
3777
        // that it hasn't yet been spent. If so, then this channel has
3778
        // been closed so we'll ignore it.
3779
        chanUtxo, err := d.cfg.ChainIO.GetUtxo(
3✔
3780
                fundingPoint, fundingPkScript, scid.BlockHeight, d.quit,
3✔
3781
        )
3✔
3782
        if err != nil {
3✔
UNCOV
3783
                if errors.Is(err, btcwallet.ErrOutputSpent) {
×
UNCOV
3784
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
×
UNCOV
3785
                        if zErr != nil {
×
3786
                                return wire.OutPoint{}, 0, nil, zErr
×
3787
                        }
×
3788
                }
3789

UNCOV
3790
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: unable to "+
×
UNCOV
3791
                        "fetch utxo for chan_id=%v, chan_point=%v: %w",
×
UNCOV
3792
                        ErrChannelSpent, scid.ToUint64(), fundingPoint, err)
×
3793
        }
3794

3795
        return *fundingPoint, btcutil.Amount(chanUtxo.Value), fundingPkScript,
3✔
3796
                nil
3✔
3797
}
3798

3799
// makeFundingScript is used to make the funding script for both segwit v0 and
3800
// segwit v1 (taproot) channels.
3801
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
3802
        features *lnwire.RawFeatureVector,
3803
        tapscriptRoot fn.Option[chainhash.Hash]) ([]byte, error) {
3✔
3804

3✔
3805
        legacyFundingScript := func() ([]byte, error) {
6✔
3806
                witnessScript, err := input.GenMultiSigScript(
3✔
3807
                        bitcoinKey1, bitcoinKey2,
3✔
3808
                )
3✔
3809
                if err != nil {
3✔
3810
                        return nil, err
×
3811
                }
×
3812
                pkScript, err := input.WitnessScriptHash(witnessScript)
3✔
3813
                if err != nil {
3✔
3814
                        return nil, err
×
3815
                }
×
3816

3817
                return pkScript, nil
3✔
3818
        }
3819

3820
        if features.IsEmpty() {
6✔
3821
                return legacyFundingScript()
3✔
3822
        }
3✔
3823

3824
        chanFeatureBits := lnwire.NewFeatureVector(features, lnwire.Features)
3✔
3825
        if chanFeatureBits.HasFeature(
3✔
3826
                lnwire.SimpleTaprootChannelsOptionalStaging,
3✔
3827
        ) {
6✔
3828

3✔
3829
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
3✔
3830
                if err != nil {
3✔
3831
                        return nil, err
×
3832
                }
×
3833
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
3✔
3834
                if err != nil {
3✔
3835
                        return nil, err
×
3836
                }
×
3837

3838
                fundingScript, _, err := input.GenTaprootFundingScript(
3✔
3839
                        pubKey1, pubKey2, 0, tapscriptRoot,
3✔
3840
                )
3✔
3841
                if err != nil {
3✔
3842
                        return nil, err
×
3843
                }
×
3844

3845
                // TODO(roasbeef): add tapscript root to gossip v1.5
3846

3847
                return fundingScript, nil
3✔
3848
        }
3849

3850
        return legacyFundingScript()
×
3851
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc