• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13536249039

26 Feb 2025 03:42AM UTC coverage: 57.462% (-1.4%) from 58.835%
13536249039

Pull #8453

github

Roasbeef
peer: update chooseDeliveryScript to gen script if needed

In this commit, we update `chooseDeliveryScript` to generate a new
script if needed. This allows us to fold in a few other lines that
always followed this function into this expanded function.

The tests have been updated accordingly.
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

275 of 1318 new or added lines in 22 files covered. (20.86%)

19521 existing lines in 257 files now uncovered.

103858 of 180741 relevant lines covered (57.46%)

24750.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.6
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/btcsuite/btcd/btcec/v2"
13
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
14
        "github.com/btcsuite/btcd/btcutil"
15
        "github.com/btcsuite/btcd/chaincfg/chainhash"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/davecgh/go-spew/spew"
18
        "github.com/lightninglabs/neutrino/cache"
19
        "github.com/lightninglabs/neutrino/cache/lru"
20
        "github.com/lightningnetwork/lnd/batch"
21
        "github.com/lightningnetwork/lnd/chainntnfs"
22
        "github.com/lightningnetwork/lnd/channeldb"
23
        "github.com/lightningnetwork/lnd/fn/v2"
24
        "github.com/lightningnetwork/lnd/graph"
25
        graphdb "github.com/lightningnetwork/lnd/graph/db"
26
        "github.com/lightningnetwork/lnd/graph/db/models"
27
        "github.com/lightningnetwork/lnd/input"
28
        "github.com/lightningnetwork/lnd/keychain"
29
        "github.com/lightningnetwork/lnd/lnpeer"
30
        "github.com/lightningnetwork/lnd/lnutils"
31
        "github.com/lightningnetwork/lnd/lnwallet"
32
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
33
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
34
        "github.com/lightningnetwork/lnd/lnwire"
35
        "github.com/lightningnetwork/lnd/multimutex"
36
        "github.com/lightningnetwork/lnd/netann"
37
        "github.com/lightningnetwork/lnd/routing/route"
38
        "github.com/lightningnetwork/lnd/ticker"
39
        "golang.org/x/time/rate"
40
)
41

42
const (
43
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
44
        // for a specific channel and direction that we'll accept over an
45
        // interval.
46
        DefaultMaxChannelUpdateBurst = 10
47

48
        // DefaultChannelUpdateInterval is the default interval we'll use to
49
        // determine how often we should allow a new update for a specific
50
        // channel and direction.
51
        DefaultChannelUpdateInterval = time.Minute
52

53
        // maxPrematureUpdates tracks the max amount of premature channel
54
        // updates that we'll hold onto.
55
        maxPrematureUpdates = 100
56

57
        // maxFutureMessages tracks the max amount of future messages that
58
        // we'll hold onto.
59
        maxFutureMessages = 1000
60

61
        // DefaultSubBatchDelay is the default delay we'll use when
62
        // broadcasting the next announcement batch.
63
        DefaultSubBatchDelay = 5 * time.Second
64

65
        // maxRejectedUpdates tracks the max amount of rejected channel updates
66
        // we'll maintain. This is the global size across all peers. We'll
67
        // allocate ~3 MB max to the cache.
68
        maxRejectedUpdates = 10_000
69

70
        // DefaultProofMatureDelta specifies the default value used for
71
        // ProofMatureDelta, which is the number of confirmations needed before
72
        // processing the announcement signatures.
73
        DefaultProofMatureDelta = 6
74
)
75

76
var (
77
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
78
        // is in the process of being shut down.
79
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
80

81
        // ErrGossipSyncerNotFound signals that we were unable to find an active
82
        // gossip syncer corresponding to a gossip query message received from
83
        // the remote peer.
84
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
85

86
        // ErrNoFundingTransaction is returned when we are unable to find the
87
        // funding transaction described by the short channel ID on chain.
88
        ErrNoFundingTransaction = errors.New(
89
                "unable to find the funding transaction",
90
        )
91

92
        // ErrInvalidFundingOutput is returned if the channel funding output
93
        // fails validation.
94
        ErrInvalidFundingOutput = errors.New(
95
                "channel funding output validation failed",
96
        )
97

98
        // ErrChannelSpent is returned when we go to validate a channel, but
99
        // the purported funding output has actually already been spent on
100
        // chain.
101
        ErrChannelSpent = errors.New("channel output has been spent")
102

103
        // emptyPubkey is used to compare compressed pubkeys against an empty
104
        // byte array.
105
        emptyPubkey [33]byte
106
)
107

108
// optionalMsgFields is a set of optional message fields that external callers
109
// can provide that serve useful when processing a specific network
110
// announcement.
111
type optionalMsgFields struct {
112
        capacity      *btcutil.Amount
113
        channelPoint  *wire.OutPoint
114
        remoteAlias   *lnwire.ShortChannelID
115
        tapscriptRoot fn.Option[chainhash.Hash]
116
}
117

118
// apply applies the optional fields within the functional options.
119
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
47✔
120
        for _, optionalMsgField := range optionalMsgFields {
52✔
121
                optionalMsgField(f)
5✔
122
        }
5✔
123
}
124

125
// OptionalMsgField is a functional option parameter that can be used to provide
126
// external information that is not included within a network message but serves
127
// useful when processing it.
128
type OptionalMsgField func(*optionalMsgFields)
129

130
// ChannelCapacity is an optional field that lets the gossiper know of the
131
// capacity of a channel.
132
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
27✔
133
        return func(f *optionalMsgFields) {
28✔
134
                f.capacity = &capacity
1✔
135
        }
1✔
136
}
137

138
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
139
// of a channel.
140
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
30✔
141
        return func(f *optionalMsgFields) {
34✔
142
                f.channelPoint = &op
4✔
143
        }
4✔
144
}
145

146
// TapscriptRoot is an optional field that lets the gossiper know of the root of
147
// the tapscript tree for a custom channel.
148
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
26✔
149
        return func(f *optionalMsgFields) {
26✔
UNCOV
150
                f.tapscriptRoot = root
×
UNCOV
151
        }
×
152
}
153

154
// RemoteAlias is an optional field that lets the gossiper know that a locally
155
// sent channel update is actually an update for the peer that should replace
156
// the ShortChannelID field with the remote's alias. This is only used for
157
// channels with peers where the option-scid-alias feature bit was negotiated.
158
// The channel update will be added to the graph under the original SCID, but
159
// will be modified and re-signed with this alias.
160
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
26✔
161
        return func(f *optionalMsgFields) {
26✔
UNCOV
162
                f.remoteAlias = alias
×
UNCOV
163
        }
×
164
}
165

166
// networkMsg couples a routing related wire message with the peer that
167
// originally sent it.
168
type networkMsg struct {
169
        peer              lnpeer.Peer
170
        source            *btcec.PublicKey
171
        msg               lnwire.Message
172
        optionalMsgFields *optionalMsgFields
173

174
        isRemote bool
175

176
        err chan error
177
}
178

179
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
180
// wishes to update a particular set of channels. New ChannelUpdate messages
181
// will be crafted to be sent out during the next broadcast epoch and the fee
182
// updates committed to the lower layer.
183
type chanPolicyUpdateRequest struct {
184
        edgesToUpdate []EdgeWithInfo
185
        errChan       chan error
186
}
187

188
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
189
// syncer at all times.
190
type PinnedSyncers map[route.Vertex]struct{}
191

192
// Config defines the configuration for the service. ALL elements within the
193
// configuration MUST be non-nil for the service to carry out its duties.
194
type Config struct {
195
        // ChainHash is a hash that indicates which resident chain of the
196
        // AuthenticatedGossiper. Any announcements that don't match this
197
        // chain hash will be ignored.
198
        //
199
        // TODO(roasbeef): eventually make into map so can de-multiplex
200
        // incoming announcements
201
        //   * also need to do same for Notifier
202
        ChainHash chainhash.Hash
203

204
        // Graph is the subsystem which is responsible for managing the
205
        // topology of lightning network. After incoming channel, node, channel
206
        // updates announcements are validated they are sent to the router in
207
        // order to be included in the LN graph.
208
        Graph graph.ChannelGraphSource
209

210
        // ChainIO represents an abstraction over a source that can query the
211
        // blockchain.
212
        ChainIO lnwallet.BlockChainIO
213

214
        // ChanSeries is an interfaces that provides access to a time series
215
        // view of the current known channel graph. Each GossipSyncer enabled
216
        // peer will utilize this in order to create and respond to channel
217
        // graph time series queries.
218
        ChanSeries ChannelGraphTimeSeries
219

220
        // Notifier is used for receiving notifications of incoming blocks.
221
        // With each new incoming block found we process previously premature
222
        // announcements.
223
        //
224
        // TODO(roasbeef): could possibly just replace this with an epoch
225
        // channel.
226
        Notifier chainntnfs.ChainNotifier
227

228
        // Broadcast broadcasts a particular set of announcements to all peers
229
        // that the daemon is connected to. If supplied, the exclude parameter
230
        // indicates that the target peer should be excluded from the
231
        // broadcast.
232
        Broadcast func(skips map[route.Vertex]struct{},
233
                msg ...lnwire.Message) error
234

235
        // NotifyWhenOnline is a function that allows the gossiper to be
236
        // notified when a certain peer comes online, allowing it to
237
        // retry sending a peer message.
238
        //
239
        // NOTE: The peerChan channel must be buffered.
240
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
241

242
        // NotifyWhenOffline is a function that allows the gossiper to be
243
        // notified when a certain peer disconnects, allowing it to request a
244
        // notification for when it reconnects.
245
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
246

247
        // FetchSelfAnnouncement retrieves our current node announcement, for
248
        // use when determining whether we should update our peers about our
249
        // presence in the network.
250
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
251

252
        // UpdateSelfAnnouncement produces a new announcement for our node with
253
        // an updated timestamp which can be broadcast to our peers.
254
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
255

256
        // ProofMatureDelta the number of confirmations which is needed before
257
        // exchange the channel announcement proofs.
258
        ProofMatureDelta uint32
259

260
        // TrickleDelay the period of trickle timer which flushes to the
261
        // network the pending batch of new announcements we've received since
262
        // the last trickle tick.
263
        TrickleDelay time.Duration
264

265
        // RetransmitTicker is a ticker that ticks with a period which
266
        // indicates that we should check if we need re-broadcast any of our
267
        // personal channels.
268
        RetransmitTicker ticker.Ticker
269

270
        // RebroadcastInterval is the maximum time we wait between sending out
271
        // channel updates for our active channels and our own node
272
        // announcement. We do this to ensure our active presence on the
273
        // network is known, and we are not being considered a zombie node or
274
        // having zombie channels.
275
        RebroadcastInterval time.Duration
276

277
        // WaitingProofStore is a persistent storage of partial channel proof
278
        // announcement messages. We use it to buffer half of the material
279
        // needed to reconstruct a full authenticated channel announcement.
280
        // Once we receive the other half the channel proof, we'll be able to
281
        // properly validate it and re-broadcast it out to the network.
282
        //
283
        // TODO(wilmer): make interface to prevent channeldb dependency.
284
        WaitingProofStore *channeldb.WaitingProofStore
285

286
        // MessageStore is a persistent storage of gossip messages which we will
287
        // use to determine which messages need to be resent for a given peer.
288
        MessageStore GossipMessageStore
289

290
        // AnnSigner is an instance of the MessageSigner interface which will
291
        // be used to manually sign any outgoing channel updates. The signer
292
        // implementation should be backed by the public key of the backing
293
        // Lightning node.
294
        //
295
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
296
        // here?
297
        AnnSigner lnwallet.MessageSigner
298

299
        // ScidCloser is an instance of ClosedChannelTracker that helps the
300
        // gossiper cut down on spam channel announcements for already closed
301
        // channels.
302
        ScidCloser ClosedChannelTracker
303

304
        // NumActiveSyncers is the number of peers for which we should have
305
        // active syncers with. After reaching NumActiveSyncers, any future
306
        // gossip syncers will be passive.
307
        NumActiveSyncers int
308

309
        // NoTimestampQueries will prevent the GossipSyncer from querying
310
        // timestamps of announcement messages from the peer and from replying
311
        // to timestamp queries.
312
        NoTimestampQueries bool
313

314
        // RotateTicker is a ticker responsible for notifying the SyncManager
315
        // when it should rotate its active syncers. A single active syncer with
316
        // a chansSynced state will be exchanged for a passive syncer in order
317
        // to ensure we don't keep syncing with the same peers.
318
        RotateTicker ticker.Ticker
319

320
        // HistoricalSyncTicker is a ticker responsible for notifying the
321
        // syncManager when it should attempt a historical sync with a gossip
322
        // sync peer.
323
        HistoricalSyncTicker ticker.Ticker
324

325
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
326
        // syncManager when it should attempt to start the next pending
327
        // activeSyncer due to the current one not completing its state machine
328
        // within the timeout.
329
        ActiveSyncerTimeoutTicker ticker.Ticker
330

331
        // MinimumBatchSize is minimum size of a sub batch of announcement
332
        // messages.
333
        MinimumBatchSize int
334

335
        // SubBatchDelay is the delay between sending sub batches of
336
        // gossip messages.
337
        SubBatchDelay time.Duration
338

339
        // IgnoreHistoricalFilters will prevent syncers from replying with
340
        // historical data when the remote peer sets a gossip_timestamp_range.
341
        // This prevents ranges with old start times from causing us to dump the
342
        // graph on connect.
343
        IgnoreHistoricalFilters bool
344

345
        // PinnedSyncers is a set of peers that will always transition to
346
        // ActiveSync upon connection. These peers will never transition to
347
        // PassiveSync.
348
        PinnedSyncers PinnedSyncers
349

350
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
351
        // specific channel and direction that we'll accept over an interval.
352
        MaxChannelUpdateBurst int
353

354
        // ChannelUpdateInterval specifies the interval we'll use to determine
355
        // how often we should allow a new update for a specific channel and
356
        // direction.
357
        ChannelUpdateInterval time.Duration
358

359
        // IsAlias returns true if a given ShortChannelID is an alias for
360
        // option_scid_alias channels.
361
        IsAlias func(scid lnwire.ShortChannelID) bool
362

363
        // SignAliasUpdate is used to re-sign a channel update using the
364
        // remote's alias if the option-scid-alias feature bit was negotiated.
365
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
366
                error)
367

368
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
369
        // This is used for channels that have negotiated the option-scid-alias
370
        // feature bit.
371
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
372
                lnwire.ShortChannelID, error)
373

374
        // GetAlias allows the gossiper to look up the peer's alias for a given
375
        // ChannelID. This is used to sign updates for them if the channel has
376
        // no AuthProof and the option-scid-alias feature bit was negotiated.
377
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
378

379
        // FindChannel allows the gossiper to find a channel that we're party
380
        // to without iterating over the entire set of open channels.
381
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
382
                *channeldb.OpenChannel, error)
383

384
        // IsStillZombieChannel takes the timestamps of the latest channel
385
        // updates for a channel and returns true if the channel should be
386
        // considered a zombie based on these timestamps.
387
        IsStillZombieChannel func(time.Time, time.Time) bool
388

389
        // AssumeChannelValid toggles whether the gossiper will check for
390
        // spent-ness of channel outpoints. For neutrino, this saves long
391
        // rescans from blocking initial usage of the daemon.
392
        AssumeChannelValid bool
393
}
394

395
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
396
// used to let the caller of the lru.Cache know if a message has already been
397
// processed or not.
398
type processedNetworkMsg struct {
399
        processed bool
400
        msg       *networkMsg
401
}
402

403
// cachedNetworkMsg is a wrapper around a network message that can be used with
404
// *lru.Cache.
405
type cachedNetworkMsg struct {
406
        msgs []*processedNetworkMsg
407
}
408

409
// Size returns the "size" of an entry. We return the number of items as we
410
// just want to limit the total amount of entries rather than do accurate size
411
// accounting.
412
func (c *cachedNetworkMsg) Size() (uint64, error) {
2✔
413
        return uint64(len(c.msgs)), nil
2✔
414
}
2✔
415

416
// rejectCacheKey is the cache key that we'll use to track announcements we've
417
// recently rejected.
418
type rejectCacheKey struct {
419
        pubkey [33]byte
420
        chanID uint64
421
}
422

423
// newRejectCacheKey returns a new cache key for the reject cache.
424
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
465✔
425
        k := rejectCacheKey{
465✔
426
                chanID: cid,
465✔
427
                pubkey: pub,
465✔
428
        }
465✔
429

465✔
430
        return k
465✔
431
}
465✔
432

433
// sourceToPub returns a serialized-compressed public key for use in the reject
434
// cache.
435
func sourceToPub(pk *btcec.PublicKey) [33]byte {
479✔
436
        var pub [33]byte
479✔
437
        copy(pub[:], pk.SerializeCompressed())
479✔
438
        return pub
479✔
439
}
479✔
440

441
// cachedReject is the empty value used to track the value for rejects.
442
type cachedReject struct {
443
}
444

445
// Size returns the "size" of an entry. We return 1 as we just want to limit
446
// the total size.
447
func (c *cachedReject) Size() (uint64, error) {
206✔
448
        return 1, nil
206✔
449
}
206✔
450

451
// AuthenticatedGossiper is a subsystem which is responsible for receiving
452
// announcements, validating them and applying the changes to router, syncing
453
// lightning network with newly connected nodes, broadcasting announcements
454
// after validation, negotiating the channel announcement proofs exchange and
455
// handling the premature announcements. All outgoing announcements are
456
// expected to be properly signed as dictated in BOLT#7, additionally, all
457
// incoming message are expected to be well formed and signed. Invalid messages
458
// will be rejected by this struct.
459
type AuthenticatedGossiper struct {
460
        // Parameters which are needed to properly handle the start and stop of
461
        // the service.
462
        started sync.Once
463
        stopped sync.Once
464

465
        // bestHeight is the height of the block at the tip of the main chain
466
        // as we know it. Accesses *MUST* be done with the gossiper's lock
467
        // held.
468
        bestHeight uint32
469

470
        quit chan struct{}
471
        wg   sync.WaitGroup
472

473
        // cfg is a copy of the configuration struct that the gossiper service
474
        // was initialized with.
475
        cfg *Config
476

477
        // blockEpochs encapsulates a stream of block epochs that are sent at
478
        // every new block height.
479
        blockEpochs *chainntnfs.BlockEpochEvent
480

481
        // prematureChannelUpdates is a map of ChannelUpdates we have received
482
        // that wasn't associated with any channel we know about.  We store
483
        // them temporarily, such that we can reprocess them when a
484
        // ChannelAnnouncement for the channel is received.
485
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
486

487
        // banman tracks our peer's ban status.
488
        banman *banman
489

490
        // networkMsgs is a channel that carries new network broadcasted
491
        // message from outside the gossiper service to be processed by the
492
        // networkHandler.
493
        networkMsgs chan *networkMsg
494

495
        // futureMsgs is a list of premature network messages that have a block
496
        // height specified in the future. We will save them and resend it to
497
        // the chan networkMsgs once the block height has reached. The cached
498
        // map format is,
499
        //   {msgID1: msg1, msgID2: msg2, ...}
500
        futureMsgs *futureMsgCache
501

502
        // chanPolicyUpdates is a channel that requests to update the
503
        // forwarding policy of a set of channels is sent over.
504
        chanPolicyUpdates chan *chanPolicyUpdateRequest
505

506
        // selfKey is the identity public key of the backing Lightning node.
507
        selfKey *btcec.PublicKey
508

509
        // selfKeyLoc is the locator for the identity public key of the backing
510
        // Lightning node.
511
        selfKeyLoc keychain.KeyLocator
512

513
        // channelMtx is used to restrict the database access to one
514
        // goroutine per channel ID. This is done to ensure that when
515
        // the gossiper is handling an announcement, the db state stays
516
        // consistent between when the DB is first read until it's written.
517
        channelMtx *multimutex.Mutex[uint64]
518

519
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
520

521
        // syncMgr is a subsystem responsible for managing the gossip syncers
522
        // for peers currently connected. When a new peer is connected, the
523
        // manager will create its accompanying gossip syncer and determine
524
        // whether it should have an activeSync or passiveSync sync type based
525
        // on how many other gossip syncers are currently active. Any activeSync
526
        // gossip syncers are started in a round-robin manner to ensure we're
527
        // not syncing with multiple peers at the same time.
528
        syncMgr *SyncManager
529

530
        // reliableSender is a subsystem responsible for handling reliable
531
        // message send requests to peers. This should only be used for channels
532
        // that are unadvertised at the time of handling the message since if it
533
        // is advertised, then peers should be able to get the message from the
534
        // network.
535
        reliableSender *reliableSender
536

537
        // chanUpdateRateLimiter contains rate limiters for each direction of
538
        // a channel update we've processed. We'll use these to determine
539
        // whether we should accept a new update for a specific channel and
540
        // direction.
541
        //
542
        // NOTE: This map must be synchronized with the main
543
        // AuthenticatedGossiper lock.
544
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
545

546
        // vb is used to enforce job dependency ordering of gossip messages.
547
        vb *ValidationBarrier
548

549
        sync.Mutex
550
}
551

552
// New creates a new AuthenticatedGossiper instance, initialized with the
553
// passed configuration parameters.
554
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
29✔
555
        gossiper := &AuthenticatedGossiper{
29✔
556
                selfKey:           selfKeyDesc.PubKey,
29✔
557
                selfKeyLoc:        selfKeyDesc.KeyLocator,
29✔
558
                cfg:               &cfg,
29✔
559
                networkMsgs:       make(chan *networkMsg),
29✔
560
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
29✔
561
                quit:              make(chan struct{}),
29✔
562
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
29✔
563
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
29✔
564
                        maxPrematureUpdates,
29✔
565
                ),
29✔
566
                channelMtx: multimutex.NewMutex[uint64](),
29✔
567
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
29✔
568
                        maxRejectedUpdates,
29✔
569
                ),
29✔
570
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
29✔
571
                banman:                newBanman(),
29✔
572
        }
29✔
573

29✔
574
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
29✔
575

29✔
576
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
29✔
577
                ChainHash:               cfg.ChainHash,
29✔
578
                ChanSeries:              cfg.ChanSeries,
29✔
579
                RotateTicker:            cfg.RotateTicker,
29✔
580
                HistoricalSyncTicker:    cfg.HistoricalSyncTicker,
29✔
581
                NumActiveSyncers:        cfg.NumActiveSyncers,
29✔
582
                NoTimestampQueries:      cfg.NoTimestampQueries,
29✔
583
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
29✔
584
                BestHeight:              gossiper.latestHeight,
29✔
585
                PinnedSyncers:           cfg.PinnedSyncers,
29✔
586
                IsStillZombieChannel:    cfg.IsStillZombieChannel,
29✔
587
        })
29✔
588

29✔
589
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
29✔
590
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
29✔
591
                NotifyWhenOffline: cfg.NotifyWhenOffline,
29✔
592
                MessageStore:      cfg.MessageStore,
29✔
593
                IsMsgStale:        gossiper.isMsgStale,
29✔
594
        })
29✔
595

29✔
596
        return gossiper
29✔
597
}
29✔
598

599
// EdgeWithInfo contains the information that is required to update an edge.
600
type EdgeWithInfo struct {
601
        // Info describes the channel.
602
        Info *models.ChannelEdgeInfo
603

604
        // Edge describes the policy in one direction of the channel.
605
        Edge *models.ChannelEdgePolicy
606
}
607

608
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
609
// specified edge updates. Updates are done in two stages: first, the
610
// AuthenticatedGossiper ensures the update has been committed by dependent
611
// sub-systems, then it signs and broadcasts new updates to the network. A
612
// mapping between outpoints and updated channel policies is returned, which is
613
// used to update the forwarding policies of the underlying links.
614
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
615
        edgesToUpdate []EdgeWithInfo) error {
1✔
616

1✔
617
        errChan := make(chan error, 1)
1✔
618
        policyUpdate := &chanPolicyUpdateRequest{
1✔
619
                edgesToUpdate: edgesToUpdate,
1✔
620
                errChan:       errChan,
1✔
621
        }
1✔
622

1✔
623
        select {
1✔
624
        case d.chanPolicyUpdates <- policyUpdate:
1✔
625
                err := <-errChan
1✔
626
                return err
1✔
627
        case <-d.quit:
×
628
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
629
        }
630
}
631

632
// Start spawns network messages handler goroutine and registers on new block
633
// notifications in order to properly handle the premature announcements.
634
func (d *AuthenticatedGossiper) Start() error {
29✔
635
        var err error
29✔
636
        d.started.Do(func() {
58✔
637
                log.Info("Authenticated Gossiper starting")
29✔
638
                err = d.start()
29✔
639
        })
29✔
640
        return err
29✔
641
}
642

643
func (d *AuthenticatedGossiper) start() error {
29✔
644
        // First we register for new notifications of newly discovered blocks.
29✔
645
        // We do this immediately so we'll later be able to consume any/all
29✔
646
        // blocks which were discovered.
29✔
647
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
29✔
648
        if err != nil {
29✔
649
                return err
×
650
        }
×
651
        d.blockEpochs = blockEpochs
29✔
652

29✔
653
        height, err := d.cfg.Graph.CurrentBlockHeight()
29✔
654
        if err != nil {
29✔
655
                return err
×
656
        }
×
657
        d.bestHeight = height
29✔
658

29✔
659
        // Start the reliable sender. In case we had any pending messages ready
29✔
660
        // to be sent when the gossiper was last shut down, we must continue on
29✔
661
        // our quest to deliver them to their respective peers.
29✔
662
        if err := d.reliableSender.Start(); err != nil {
29✔
663
                return err
×
664
        }
×
665

666
        d.syncMgr.Start()
29✔
667

29✔
668
        d.banman.start()
29✔
669

29✔
670
        // Start receiving blocks in its dedicated goroutine.
29✔
671
        d.wg.Add(2)
29✔
672
        go d.syncBlockHeight()
29✔
673
        go d.networkHandler()
29✔
674

29✔
675
        return nil
29✔
676
}
677

678
// syncBlockHeight syncs the best block height for the gossiper by reading
679
// blockEpochs.
680
//
681
// NOTE: must be run as a goroutine.
682
func (d *AuthenticatedGossiper) syncBlockHeight() {
29✔
683
        defer d.wg.Done()
29✔
684

29✔
685
        for {
58✔
686
                select {
29✔
687
                // A new block has arrived, so we can re-process the previously
688
                // premature announcements.
UNCOV
689
                case newBlock, ok := <-d.blockEpochs.Epochs:
×
UNCOV
690
                        // If the channel has been closed, then this indicates
×
UNCOV
691
                        // the daemon is shutting down, so we exit ourselves.
×
UNCOV
692
                        if !ok {
×
UNCOV
693
                                return
×
UNCOV
694
                        }
×
695

696
                        // Once a new block arrives, we update our running
697
                        // track of the height of the chain tip.
UNCOV
698
                        d.Lock()
×
UNCOV
699
                        blockHeight := uint32(newBlock.Height)
×
UNCOV
700
                        d.bestHeight = blockHeight
×
UNCOV
701
                        d.Unlock()
×
UNCOV
702

×
UNCOV
703
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
×
UNCOV
704
                                newBlock.Hash)
×
UNCOV
705

×
UNCOV
706
                        // Resend future messages, if any.
×
UNCOV
707
                        d.resendFutureMessages(blockHeight)
×
708

709
                case <-d.quit:
29✔
710
                        return
29✔
711
                }
712
        }
713
}
714

715
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
716
// the unique ID when saving the message.
717
type futureMsgCache struct {
718
        *lru.Cache[uint64, *cachedFutureMsg]
719

720
        // msgID is a monotonically increased integer.
721
        msgID atomic.Uint64
722
}
723

724
// nextMsgID returns a unique message ID.
725
func (f *futureMsgCache) nextMsgID() uint64 {
3✔
726
        return f.msgID.Add(1)
3✔
727
}
3✔
728

729
// newFutureMsgCache creates a new future message cache with the underlying lru
730
// cache being initialized with the specified capacity.
731
func newFutureMsgCache(capacity uint64) *futureMsgCache {
30✔
732
        // Create a new cache.
30✔
733
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
30✔
734

30✔
735
        return &futureMsgCache{
30✔
736
                Cache: cache,
30✔
737
        }
30✔
738
}
30✔
739

740
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
741
type cachedFutureMsg struct {
742
        // msg is the network message.
743
        msg *networkMsg
744

745
        // height is the block height.
746
        height uint32
747
}
748

749
// Size returns the size of the message.
750
func (c *cachedFutureMsg) Size() (uint64, error) {
4✔
751
        // Return a constant 1.
4✔
752
        return 1, nil
4✔
753
}
4✔
754

755
// resendFutureMessages takes a block height, resends all the future messages
756
// found below and equal to that height and deletes those messages found in the
757
// gossiper's futureMsgs.
UNCOV
758
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
×
UNCOV
759
        var (
×
UNCOV
760
                // msgs are the target messages.
×
UNCOV
761
                msgs []*networkMsg
×
UNCOV
762

×
UNCOV
763
                // keys are the target messages' caching keys.
×
UNCOV
764
                keys []uint64
×
UNCOV
765
        )
×
UNCOV
766

×
UNCOV
767
        // filterMsgs is the visitor used when iterating the future cache.
×
UNCOV
768
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
×
UNCOV
769
                if cmsg.height <= height {
×
UNCOV
770
                        msgs = append(msgs, cmsg.msg)
×
UNCOV
771
                        keys = append(keys, k)
×
UNCOV
772
                }
×
773

UNCOV
774
                return true
×
775
        }
776

777
        // Filter out the target messages.
UNCOV
778
        d.futureMsgs.Range(filterMsgs)
×
UNCOV
779

×
UNCOV
780
        // Return early if no messages found.
×
UNCOV
781
        if len(msgs) == 0 {
×
UNCOV
782
                return
×
UNCOV
783
        }
×
784

785
        // Remove the filtered messages.
UNCOV
786
        for _, key := range keys {
×
UNCOV
787
                d.futureMsgs.Delete(key)
×
UNCOV
788
        }
×
789

UNCOV
790
        log.Debugf("Resending %d network messages at height %d",
×
UNCOV
791
                len(msgs), height)
×
UNCOV
792

×
UNCOV
793
        for _, msg := range msgs {
×
UNCOV
794
                select {
×
UNCOV
795
                case d.networkMsgs <- msg:
×
796
                case <-d.quit:
×
797
                        msg.err <- ErrGossiperShuttingDown
×
798
                }
799
        }
800
}
801

802
// Stop signals any active goroutines for a graceful closure.
803
func (d *AuthenticatedGossiper) Stop() error {
30✔
804
        d.stopped.Do(func() {
59✔
805
                log.Info("Authenticated gossiper shutting down...")
29✔
806
                defer log.Debug("Authenticated gossiper shutdown complete")
29✔
807

29✔
808
                d.stop()
29✔
809
        })
29✔
810
        return nil
30✔
811
}
812

813
func (d *AuthenticatedGossiper) stop() {
29✔
814
        log.Debug("Authenticated Gossiper is stopping")
29✔
815
        defer log.Debug("Authenticated Gossiper stopped")
29✔
816

29✔
817
        // `blockEpochs` is only initialized in the start routine so we make
29✔
818
        // sure we don't panic here in the case where the `Stop` method is
29✔
819
        // called when the `Start` method does not complete.
29✔
820
        if d.blockEpochs != nil {
58✔
821
                d.blockEpochs.Cancel()
29✔
822
        }
29✔
823

824
        d.syncMgr.Stop()
29✔
825

29✔
826
        d.banman.stop()
29✔
827

29✔
828
        close(d.quit)
29✔
829
        d.wg.Wait()
29✔
830

29✔
831
        // We'll stop our reliable sender after all of the gossiper's goroutines
29✔
832
        // have exited to ensure nothing can cause it to continue executing.
29✔
833
        d.reliableSender.Stop()
29✔
834
}
835

836
// TODO(roasbeef): need method to get current gossip timestamp?
837
//  * using mtx, check time rotate forward is needed?
838

839
// ProcessRemoteAnnouncement sends a new remote announcement message along with
840
// the peer that sent the routing message. The announcement will be processed
841
// then added to a queue for batched trickled announcement to all connected
842
// peers.  Remote channel announcements should contain the announcement proof
843
// and be fully validated.
844
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
845
        peer lnpeer.Peer) chan error {
284✔
846

284✔
847
        log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey())
284✔
848

284✔
849
        errChan := make(chan error, 1)
284✔
850

284✔
851
        // For messages in the known set of channel series queries, we'll
284✔
852
        // dispatch the message directly to the GossipSyncer, and skip the main
284✔
853
        // processing loop.
284✔
854
        switch m := msg.(type) {
284✔
855
        case *lnwire.QueryShortChanIDs,
856
                *lnwire.QueryChannelRange,
857
                *lnwire.ReplyChannelRange,
UNCOV
858
                *lnwire.ReplyShortChanIDsEnd:
×
UNCOV
859

×
UNCOV
860
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
×
UNCOV
861
                if !ok {
×
862
                        log.Warnf("Gossip syncer for peer=%x not found",
×
863
                                peer.PubKey())
×
864

×
865
                        errChan <- ErrGossipSyncerNotFound
×
866
                        return errChan
×
867
                }
×
868

869
                // If we've found the message target, then we'll dispatch the
870
                // message directly to it.
UNCOV
871
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
×
UNCOV
872
                if err != nil {
×
873
                        log.Errorf("Process query msg from peer %x got %v",
×
874
                                peer.PubKey(), err)
×
875
                }
×
876

UNCOV
877
                errChan <- err
×
UNCOV
878
                return errChan
×
879

880
        // If a peer is updating its current update horizon, then we'll dispatch
881
        // that directly to the proper GossipSyncer.
UNCOV
882
        case *lnwire.GossipTimestampRange:
×
UNCOV
883
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
×
UNCOV
884
                if !ok {
×
885
                        log.Warnf("Gossip syncer for peer=%x not found",
×
886
                                peer.PubKey())
×
887

×
888
                        errChan <- ErrGossipSyncerNotFound
×
889
                        return errChan
×
890
                }
×
891

892
                // If we've found the message target, then we'll dispatch the
893
                // message directly to it.
UNCOV
894
                if err := syncer.ApplyGossipFilter(m); err != nil {
×
895
                        log.Warnf("Unable to apply gossip filter for peer=%x: "+
×
896
                                "%v", peer.PubKey(), err)
×
897

×
898
                        errChan <- err
×
899
                        return errChan
×
900
                }
×
901

UNCOV
902
                errChan <- nil
×
UNCOV
903
                return errChan
×
904

905
        // To avoid inserting edges in the graph for our own channels that we
906
        // have already closed, we ignore such channel announcements coming
907
        // from the remote.
908
        case *lnwire.ChannelAnnouncement1:
219✔
909
                ownKey := d.selfKey.SerializeCompressed()
219✔
910
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
219✔
911
                        "for own channel")
219✔
912

219✔
913
                if bytes.Equal(m.NodeID1[:], ownKey) ||
219✔
914
                        bytes.Equal(m.NodeID2[:], ownKey) {
221✔
915

2✔
916
                        log.Warn(ownErr)
2✔
917
                        errChan <- ownErr
2✔
918
                        return errChan
2✔
919
                }
2✔
920
        }
921

922
        nMsg := &networkMsg{
282✔
923
                msg:      msg,
282✔
924
                isRemote: true,
282✔
925
                peer:     peer,
282✔
926
                source:   peer.IdentityKey(),
282✔
927
                err:      errChan,
282✔
928
        }
282✔
929

282✔
930
        select {
282✔
931
        case d.networkMsgs <- nMsg:
282✔
932

933
        // If the peer that sent us this error is quitting, then we don't need
934
        // to send back an error and can return immediately.
935
        case <-peer.QuitSignal():
×
936
                return nil
×
937
        case <-d.quit:
×
938
                nMsg.err <- ErrGossiperShuttingDown
×
939
        }
940

941
        return nMsg.err
282✔
942
}
943

944
// ProcessLocalAnnouncement sends a new remote announcement message along with
945
// the peer that sent the routing message. The announcement will be processed
946
// then added to a queue for batched trickled announcement to all connected
947
// peers.  Local channel announcements don't contain the announcement proof and
948
// will not be fully validated. Once the channel proofs are received, the
949
// entire channel announcement and update messages will be re-constructed and
950
// broadcast to the rest of the network.
951
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
952
        optionalFields ...OptionalMsgField) chan error {
47✔
953

47✔
954
        optionalMsgFields := &optionalMsgFields{}
47✔
955
        optionalMsgFields.apply(optionalFields...)
47✔
956

47✔
957
        nMsg := &networkMsg{
47✔
958
                msg:               msg,
47✔
959
                optionalMsgFields: optionalMsgFields,
47✔
960
                isRemote:          false,
47✔
961
                source:            d.selfKey,
47✔
962
                err:               make(chan error, 1),
47✔
963
        }
47✔
964

47✔
965
        select {
47✔
966
        case d.networkMsgs <- nMsg:
47✔
967
        case <-d.quit:
×
968
                nMsg.err <- ErrGossiperShuttingDown
×
969
        }
970

971
        return nMsg.err
47✔
972
}
973

974
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
975
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
976
// tuple.
977
type channelUpdateID struct {
978
        // channelID represents the set of data which is needed to
979
        // retrieve all necessary data to validate the channel existence.
980
        channelID lnwire.ShortChannelID
981

982
        // Flags least-significant bit must be set to 0 if the creating node
983
        // corresponds to the first node in the previously sent channel
984
        // announcement and 1 otherwise.
985
        flags lnwire.ChanUpdateChanFlags
986
}
987

988
// msgWithSenders is a wrapper struct around a message, and the set of peers
989
// that originally sent us this message. Using this struct, we can ensure that
990
// we don't re-send a message to the peer that sent it to us in the first
991
// place.
992
type msgWithSenders struct {
993
        // msg is the wire message itself.
994
        msg lnwire.Message
995

996
        // isLocal is true if this was a message that originated locally. We'll
997
        // use this to bypass our normal checks to ensure we prioritize sending
998
        // out our own updates.
999
        isLocal bool
1000

1001
        // sender is the set of peers that sent us this message.
1002
        senders map[route.Vertex]struct{}
1003
}
1004

1005
// mergeSyncerMap is used to merge the set of senders of a particular message
1006
// with peers that we have an active GossipSyncer with. We do this to ensure
1007
// that we don't broadcast messages to any peers that we have active gossip
1008
// syncers for.
1009
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
24✔
1010
        for peerPub := range syncers {
24✔
UNCOV
1011
                m.senders[peerPub] = struct{}{}
×
UNCOV
1012
        }
×
1013
}
1014

1015
// deDupedAnnouncements de-duplicates announcements that have been added to the
1016
// batch. Internally, announcements are stored in three maps
1017
// (one each for channel announcements, channel updates, and node
1018
// announcements). These maps keep track of unique announcements and ensure no
1019
// announcements are duplicated. We keep the three message types separate, such
1020
// that we can send channel announcements first, then channel updates, and
1021
// finally node announcements when it's time to broadcast them.
1022
type deDupedAnnouncements struct {
1023
        // channelAnnouncements are identified by the short channel id field.
1024
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
1025

1026
        // channelUpdates are identified by the channel update id field.
1027
        channelUpdates map[channelUpdateID]msgWithSenders
1028

1029
        // nodeAnnouncements are identified by the Vertex field.
1030
        nodeAnnouncements map[route.Vertex]msgWithSenders
1031

1032
        sync.Mutex
1033
}
1034

1035
// Reset operates on deDupedAnnouncements to reset the storage of
1036
// announcements.
1037
func (d *deDupedAnnouncements) Reset() {
31✔
1038
        d.Lock()
31✔
1039
        defer d.Unlock()
31✔
1040

31✔
1041
        d.reset()
31✔
1042
}
31✔
1043

1044
// reset is the private version of the Reset method. We have this so we can
1045
// call this method within method that are already holding the lock.
1046
func (d *deDupedAnnouncements) reset() {
318✔
1047
        // Storage of each type of announcement (channel announcements, channel
318✔
1048
        // updates, node announcements) is set to an empty map where the
318✔
1049
        // appropriate key points to the corresponding lnwire.Message.
318✔
1050
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
318✔
1051
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
318✔
1052
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
318✔
1053
}
318✔
1054

1055
// addMsg adds a new message to the current batch. If the message is already
1056
// present in the current batch, then this new instance replaces the latter,
1057
// and the set of senders is updated to reflect which node sent us this
1058
// message.
1059
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
86✔
1060
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
86✔
1061

86✔
1062
        // Depending on the message type (channel announcement, channel update,
86✔
1063
        // or node announcement), the message is added to the corresponding map
86✔
1064
        // in deDupedAnnouncements. Because each identifying key can have at
86✔
1065
        // most one value, the announcements are de-duplicated, with newer ones
86✔
1066
        // replacing older ones.
86✔
1067
        switch msg := message.msg.(type) {
86✔
1068

1069
        // Channel announcements are identified by the short channel id field.
1070
        case *lnwire.ChannelAnnouncement1:
22✔
1071
                deDupKey := msg.ShortChannelID
22✔
1072
                sender := route.NewVertex(message.source)
22✔
1073

22✔
1074
                mws, ok := d.channelAnnouncements[deDupKey]
22✔
1075
                if !ok {
43✔
1076
                        mws = msgWithSenders{
21✔
1077
                                msg:     msg,
21✔
1078
                                isLocal: !message.isRemote,
21✔
1079
                                senders: make(map[route.Vertex]struct{}),
21✔
1080
                        }
21✔
1081
                        mws.senders[sender] = struct{}{}
21✔
1082

21✔
1083
                        d.channelAnnouncements[deDupKey] = mws
21✔
1084

21✔
1085
                        return
21✔
1086
                }
21✔
1087

1088
                mws.msg = msg
1✔
1089
                mws.senders[sender] = struct{}{}
1✔
1090
                d.channelAnnouncements[deDupKey] = mws
1✔
1091

1092
        // Channel updates are identified by the (short channel id,
1093
        // channelflags) tuple.
1094
        case *lnwire.ChannelUpdate1:
42✔
1095
                sender := route.NewVertex(message.source)
42✔
1096
                deDupKey := channelUpdateID{
42✔
1097
                        msg.ShortChannelID,
42✔
1098
                        msg.ChannelFlags,
42✔
1099
                }
42✔
1100

42✔
1101
                oldTimestamp := uint32(0)
42✔
1102
                mws, ok := d.channelUpdates[deDupKey]
42✔
1103
                if ok {
45✔
1104
                        // If we already have seen this message, record its
3✔
1105
                        // timestamp.
3✔
1106
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
3✔
1107
                        if !ok {
3✔
1108
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1109
                                        "got: %T", mws.msg)
×
1110

×
1111
                                return
×
1112
                        }
×
1113

1114
                        oldTimestamp = update.Timestamp
3✔
1115
                }
1116

1117
                // If we already had this message with a strictly newer
1118
                // timestamp, then we'll just discard the message we got.
1119
                if oldTimestamp > msg.Timestamp {
43✔
1120
                        log.Debugf("Ignored outdated network message: "+
1✔
1121
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1✔
1122
                        return
1✔
1123
                }
1✔
1124

1125
                // If the message we just got is newer than what we previously
1126
                // have seen, or this is the first time we see it, then we'll
1127
                // add it to our map of announcements.
1128
                if oldTimestamp < msg.Timestamp {
81✔
1129
                        mws = msgWithSenders{
40✔
1130
                                msg:     msg,
40✔
1131
                                isLocal: !message.isRemote,
40✔
1132
                                senders: make(map[route.Vertex]struct{}),
40✔
1133
                        }
40✔
1134

40✔
1135
                        // We'll mark the sender of the message in the
40✔
1136
                        // senders map.
40✔
1137
                        mws.senders[sender] = struct{}{}
40✔
1138

40✔
1139
                        d.channelUpdates[deDupKey] = mws
40✔
1140

40✔
1141
                        return
40✔
1142
                }
40✔
1143

1144
                // Lastly, if we had seen this exact message from before, with
1145
                // the same timestamp, we'll add the sender to the map of
1146
                // senders, such that we can skip sending this message back in
1147
                // the next batch.
1148
                mws.msg = msg
1✔
1149
                mws.senders[sender] = struct{}{}
1✔
1150
                d.channelUpdates[deDupKey] = mws
1✔
1151

1152
        // Node announcements are identified by the Vertex field.  Use the
1153
        // NodeID to create the corresponding Vertex.
1154
        case *lnwire.NodeAnnouncement:
22✔
1155
                sender := route.NewVertex(message.source)
22✔
1156
                deDupKey := route.Vertex(msg.NodeID)
22✔
1157

22✔
1158
                // We do the same for node announcements as we did for channel
22✔
1159
                // updates, as they also carry a timestamp.
22✔
1160
                oldTimestamp := uint32(0)
22✔
1161
                mws, ok := d.nodeAnnouncements[deDupKey]
22✔
1162
                if ok {
27✔
1163
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
5✔
1164
                }
5✔
1165

1166
                // Discard the message if it's old.
1167
                if oldTimestamp > msg.Timestamp {
22✔
UNCOV
1168
                        return
×
UNCOV
1169
                }
×
1170

1171
                // Replace if it's newer.
1172
                if oldTimestamp < msg.Timestamp {
40✔
1173
                        mws = msgWithSenders{
18✔
1174
                                msg:     msg,
18✔
1175
                                isLocal: !message.isRemote,
18✔
1176
                                senders: make(map[route.Vertex]struct{}),
18✔
1177
                        }
18✔
1178

18✔
1179
                        mws.senders[sender] = struct{}{}
18✔
1180

18✔
1181
                        d.nodeAnnouncements[deDupKey] = mws
18✔
1182

18✔
1183
                        return
18✔
1184
                }
18✔
1185

1186
                // Add to senders map if it's the same as we had.
1187
                mws.msg = msg
4✔
1188
                mws.senders[sender] = struct{}{}
4✔
1189
                d.nodeAnnouncements[deDupKey] = mws
4✔
1190
        }
1191
}
1192

1193
// AddMsgs is a helper method to add multiple messages to the announcement
1194
// batch.
1195
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
54✔
1196
        d.Lock()
54✔
1197
        defer d.Unlock()
54✔
1198

54✔
1199
        for _, msg := range msgs {
140✔
1200
                d.addMsg(msg)
86✔
1201
        }
86✔
1202
}
1203

1204
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1205
// to broadcast next into messages that are locally sourced and those that are
1206
// sourced remotely.
1207
type msgsToBroadcast struct {
1208
        // localMsgs is the set of messages we created locally.
1209
        localMsgs []msgWithSenders
1210

1211
        // remoteMsgs is the set of messages that we received from a remote
1212
        // party.
1213
        remoteMsgs []msgWithSenders
1214
}
1215

1216
// addMsg adds a new message to the appropriate sub-slice.
1217
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
71✔
1218
        if msg.isLocal {
118✔
1219
                m.localMsgs = append(m.localMsgs, msg)
47✔
1220
        } else {
71✔
1221
                m.remoteMsgs = append(m.remoteMsgs, msg)
24✔
1222
        }
24✔
1223
}
1224

1225
// isEmpty returns true if the batch is empty.
1226
func (m *msgsToBroadcast) isEmpty() bool {
286✔
1227
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
286✔
1228
}
286✔
1229

1230
// length returns the length of the combined message set.
1231
func (m *msgsToBroadcast) length() int {
1✔
1232
        return len(m.localMsgs) + len(m.remoteMsgs)
1✔
1233
}
1✔
1234

1235
// Emit returns the set of de-duplicated announcements to be sent out during
1236
// the next announcement epoch, in the order of channel announcements, channel
1237
// updates, and node announcements. Each message emitted, contains the set of
1238
// peers that sent us the message. This way, we can ensure that we don't waste
1239
// bandwidth by re-sending a message to the peer that sent it to us in the
1240
// first place. Additionally, the set of stored messages are reset.
1241
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
287✔
1242
        d.Lock()
287✔
1243
        defer d.Unlock()
287✔
1244

287✔
1245
        // Get the total number of announcements.
287✔
1246
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
287✔
1247
                len(d.nodeAnnouncements)
287✔
1248

287✔
1249
        // Create an empty array of lnwire.Messages with a length equal to
287✔
1250
        // the total number of announcements.
287✔
1251
        msgs := msgsToBroadcast{
287✔
1252
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
287✔
1253
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
287✔
1254
        }
287✔
1255

287✔
1256
        // Add the channel announcements to the array first.
287✔
1257
        for _, message := range d.channelAnnouncements {
305✔
1258
                msgs.addMsg(message)
18✔
1259
        }
18✔
1260

1261
        // Then add the channel updates.
1262
        for _, message := range d.channelUpdates {
323✔
1263
                msgs.addMsg(message)
36✔
1264
        }
36✔
1265

1266
        // Finally add the node announcements.
1267
        for _, message := range d.nodeAnnouncements {
304✔
1268
                msgs.addMsg(message)
17✔
1269
        }
17✔
1270

1271
        d.reset()
287✔
1272

287✔
1273
        // Return the array of lnwire.messages.
287✔
1274
        return msgs
287✔
1275
}
1276

1277
// calculateSubBatchSize is a helper function that calculates the size to break
1278
// down the batchSize into.
1279
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1280
        minimumBatchSize, batchSize int) int {
13✔
1281
        if subBatchDelay > totalDelay {
15✔
1282
                return batchSize
2✔
1283
        }
2✔
1284

1285
        subBatchSize := (batchSize*int(subBatchDelay) +
11✔
1286
                int(totalDelay) - 1) / int(totalDelay)
11✔
1287

11✔
1288
        if subBatchSize < minimumBatchSize {
12✔
1289
                return minimumBatchSize
1✔
1290
        }
1✔
1291

1292
        return subBatchSize
10✔
1293
}
1294

1295
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1296
// this variable so the function can be mocked in our test.
1297
var batchSizeCalculator = calculateSubBatchSize
1298

1299
// splitAnnouncementBatches takes an exiting list of announcements and
1300
// decomposes it into sub batches controlled by the `subBatchSize`.
1301
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1302
        announcementBatch []msgWithSenders) [][]msgWithSenders {
69✔
1303

69✔
1304
        subBatchSize := batchSizeCalculator(
69✔
1305
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
69✔
1306
                d.cfg.MinimumBatchSize, len(announcementBatch),
69✔
1307
        )
69✔
1308

69✔
1309
        var splitAnnouncementBatch [][]msgWithSenders
69✔
1310

69✔
1311
        for subBatchSize < len(announcementBatch) {
190✔
1312
                // For slicing with minimal allocation
121✔
1313
                // https://github.com/golang/go/wiki/SliceTricks
121✔
1314
                announcementBatch, splitAnnouncementBatch =
121✔
1315
                        announcementBatch[subBatchSize:],
121✔
1316
                        append(splitAnnouncementBatch,
121✔
1317
                                announcementBatch[0:subBatchSize:subBatchSize])
121✔
1318
        }
121✔
1319
        splitAnnouncementBatch = append(
69✔
1320
                splitAnnouncementBatch, announcementBatch,
69✔
1321
        )
69✔
1322

69✔
1323
        return splitAnnouncementBatch
69✔
1324
}
1325

1326
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1327
// split size, and then sends out all items to the set of target peers. Locally
1328
// generated announcements are always sent before remotely generated
1329
// announcements.
1330
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
1331
        annBatch msgsToBroadcast) {
31✔
1332

31✔
1333
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
31✔
1334
        // duration to delay the sending of next announcement batch.
31✔
1335
        delayNextBatch := func() {
93✔
1336
                select {
62✔
1337
                case <-time.After(d.cfg.SubBatchDelay):
45✔
1338
                case <-d.quit:
17✔
1339
                        return
17✔
1340
                }
1341
        }
1342

1343
        // Fetch the local and remote announcements.
1344
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
31✔
1345
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
31✔
1346

31✔
1347
        d.wg.Add(1)
31✔
1348
        go func() {
62✔
1349
                defer d.wg.Done()
31✔
1350

31✔
1351
                log.Debugf("Broadcasting %v new local announcements in %d "+
31✔
1352
                        "sub batches", len(annBatch.localMsgs),
31✔
1353
                        len(localBatches))
31✔
1354

31✔
1355
                // Send out the local announcements first.
31✔
1356
                for _, annBatch := range localBatches {
62✔
1357
                        d.sendLocalBatch(annBatch)
31✔
1358
                        delayNextBatch()
31✔
1359
                }
31✔
1360

1361
                log.Debugf("Broadcasting %v new remote announcements in %d "+
31✔
1362
                        "sub batches", len(annBatch.remoteMsgs),
31✔
1363
                        len(remoteBatches))
31✔
1364

31✔
1365
                // Now send the remote announcements.
31✔
1366
                for _, annBatch := range remoteBatches {
62✔
1367
                        d.sendRemoteBatch(annBatch)
31✔
1368
                        delayNextBatch()
31✔
1369
                }
31✔
1370
        }()
1371
}
1372

1373
// sendLocalBatch broadcasts a list of locally generated announcements to our
1374
// peers. For local announcements, we skip the filter and dedup logic and just
1375
// send the announcements out to all our coonnected peers.
1376
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
31✔
1377
        msgsToSend := lnutils.Map(
31✔
1378
                annBatch, func(m msgWithSenders) lnwire.Message {
74✔
1379
                        return m.msg
43✔
1380
                },
43✔
1381
        )
1382

1383
        err := d.cfg.Broadcast(nil, msgsToSend...)
31✔
1384
        if err != nil {
31✔
1385
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1386
        }
×
1387
}
1388

1389
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1390
// peers.
1391
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
31✔
1392
        syncerPeers := d.syncMgr.GossipSyncers()
31✔
1393

31✔
1394
        // We'll first attempt to filter out this new message for all peers
31✔
1395
        // that have active gossip syncers active.
31✔
1396
        for pub, syncer := range syncerPeers {
31✔
UNCOV
1397
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
×
UNCOV
1398
                syncer.FilterGossipMsgs(annBatch...)
×
UNCOV
1399
        }
×
1400

1401
        for _, msgChunk := range annBatch {
55✔
1402
                msgChunk := msgChunk
24✔
1403

24✔
1404
                // With the syncers taken care of, we'll merge the sender map
24✔
1405
                // with the set of syncers, so we don't send out duplicate
24✔
1406
                // messages.
24✔
1407
                msgChunk.mergeSyncerMap(syncerPeers)
24✔
1408

24✔
1409
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
24✔
1410
                if err != nil {
24✔
1411
                        log.Errorf("Unable to send batch "+
×
1412
                                "announcements: %v", err)
×
1413
                        continue
×
1414
                }
1415
        }
1416
}
1417

1418
// networkHandler is the primary goroutine that drives this service. The roles
1419
// of this goroutine includes answering queries related to the state of the
1420
// network, syncing up newly connected peers, and also periodically
1421
// broadcasting our latest topology state to all connected peers.
1422
//
1423
// NOTE: This MUST be run as a goroutine.
1424
func (d *AuthenticatedGossiper) networkHandler() {
29✔
1425
        defer d.wg.Done()
29✔
1426

29✔
1427
        // Initialize empty deDupedAnnouncements to store announcement batch.
29✔
1428
        announcements := deDupedAnnouncements{}
29✔
1429
        announcements.Reset()
29✔
1430

29✔
1431
        d.cfg.RetransmitTicker.Resume()
29✔
1432
        defer d.cfg.RetransmitTicker.Stop()
29✔
1433

29✔
1434
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
29✔
1435
        defer trickleTimer.Stop()
29✔
1436

29✔
1437
        // To start, we'll first check to see if there are any stale channel or
29✔
1438
        // node announcements that we need to re-transmit.
29✔
1439
        if err := d.retransmitStaleAnns(time.Now()); err != nil {
29✔
1440
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1441
        }
×
1442

1443
        for {
677✔
1444
                select {
648✔
1445
                // A new policy update has arrived. We'll commit it to the
1446
                // sub-systems below us, then craft, sign, and broadcast a new
1447
                // ChannelUpdate for the set of affected clients.
1448
                case policyUpdate := <-d.chanPolicyUpdates:
1✔
1449
                        log.Tracef("Received channel %d policy update requests",
1✔
1450
                                len(policyUpdate.edgesToUpdate))
1✔
1451

1✔
1452
                        // First, we'll now create new fully signed updates for
1✔
1453
                        // the affected channels and also update the underlying
1✔
1454
                        // graph with the new state.
1✔
1455
                        newChanUpdates, err := d.processChanPolicyUpdate(
1✔
1456
                                policyUpdate.edgesToUpdate,
1✔
1457
                        )
1✔
1458
                        policyUpdate.errChan <- err
1✔
1459
                        if err != nil {
1✔
1460
                                log.Errorf("Unable to craft policy updates: %v",
×
1461
                                        err)
×
1462
                                continue
×
1463
                        }
1464

1465
                        // Finally, with the updates committed, we'll now add
1466
                        // them to the announcement batch to be flushed at the
1467
                        // start of the next epoch.
1468
                        announcements.AddMsgs(newChanUpdates...)
1✔
1469

1470
                case announcement := <-d.networkMsgs:
331✔
1471
                        log.Tracef("Received network message: "+
331✔
1472
                                "peer=%v, msg=%s, is_remote=%v",
331✔
1473
                                announcement.peer, announcement.msg.MsgType(),
331✔
1474
                                announcement.isRemote)
331✔
1475

331✔
1476
                        switch announcement.msg.(type) {
331✔
1477
                        // Channel announcement signatures are amongst the only
1478
                        // messages that we'll process serially.
1479
                        case *lnwire.AnnounceSignatures1:
21✔
1480
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
21✔
1481
                                        announcement,
21✔
1482
                                )
21✔
1483
                                log.Debugf("Processed network message %s, "+
21✔
1484
                                        "returned len(announcements)=%v",
21✔
1485
                                        announcement.msg.MsgType(),
21✔
1486
                                        len(emittedAnnouncements))
21✔
1487

21✔
1488
                                if emittedAnnouncements != nil {
31✔
1489
                                        announcements.AddMsgs(
10✔
1490
                                                emittedAnnouncements...,
10✔
1491
                                        )
10✔
1492
                                }
10✔
1493
                                continue
21✔
1494
                        }
1495

1496
                        // If this message was recently rejected, then we won't
1497
                        // attempt to re-process it.
1498
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
310✔
1499
                                announcement.msg,
310✔
1500
                                sourceToPub(announcement.source),
310✔
1501
                        ) {
311✔
1502

1✔
1503
                                announcement.err <- fmt.Errorf("recently " +
1✔
1504
                                        "rejected")
1✔
1505
                                continue
1✔
1506
                        }
1507

1508
                        // We'll set up any dependent, and wait until a free
1509
                        // slot for this job opens up, this allow us to not
1510
                        // have thousands of goroutines active.
1511
                        annJobID, err := d.vb.InitJobDependencies(
309✔
1512
                                announcement.msg,
309✔
1513
                        )
309✔
1514
                        if err != nil {
309✔
1515
                                announcement.err <- err
×
1516
                                continue
×
1517
                        }
1518

1519
                        d.wg.Add(1)
309✔
1520
                        go d.handleNetworkMessages(
309✔
1521
                                announcement, &announcements, annJobID,
309✔
1522
                        )
309✔
1523

1524
                // The trickle timer has ticked, which indicates we should
1525
                // flush to the network the pending batch of new announcements
1526
                // we've received since the last trickle tick.
1527
                case <-trickleTimer.C:
286✔
1528
                        // Emit the current batch of announcements from
286✔
1529
                        // deDupedAnnouncements.
286✔
1530
                        announcementBatch := announcements.Emit()
286✔
1531

286✔
1532
                        // If the current announcements batch is nil, then we
286✔
1533
                        // have no further work here.
286✔
1534
                        if announcementBatch.isEmpty() {
541✔
1535
                                continue
255✔
1536
                        }
1537

1538
                        // At this point, we have the set of local and remote
1539
                        // announcements we want to send out. We'll do the
1540
                        // batching as normal for both, but for local
1541
                        // announcements, we'll blast them out w/o regard for
1542
                        // our peer's policies so we ensure they propagate
1543
                        // properly.
1544
                        d.splitAndSendAnnBatch(announcementBatch)
31✔
1545

1546
                // The retransmission timer has ticked which indicates that we
1547
                // should check if we need to prune or re-broadcast any of our
1548
                // personal channels or node announcement. This addresses the
1549
                // case of "zombie" channels and channel advertisements that
1550
                // have been dropped, or not properly propagated through the
1551
                // network.
1552
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1✔
1553
                        if err := d.retransmitStaleAnns(tick); err != nil {
1✔
1554
                                log.Errorf("unable to rebroadcast stale "+
×
1555
                                        "announcements: %v", err)
×
1556
                        }
×
1557

1558
                // The gossiper has been signalled to exit, to we exit our
1559
                // main loop so the wait group can be decremented.
1560
                case <-d.quit:
29✔
1561
                        return
29✔
1562
                }
1563
        }
1564
}
1565

1566
// handleNetworkMessages is responsible for waiting for dependencies for a
1567
// given network message and processing the message. Once processed, it will
1568
// signal its dependants and add the new announcements to the announce batch.
1569
//
1570
// NOTE: must be run as a goroutine.
1571
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1572
        deDuped *deDupedAnnouncements, jobID JobID) {
309✔
1573

309✔
1574
        defer d.wg.Done()
309✔
1575
        defer d.vb.CompleteJob()
309✔
1576

309✔
1577
        // We should only broadcast this message forward if it originated from
309✔
1578
        // us or it wasn't received as part of our initial historical sync.
309✔
1579
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
309✔
1580

309✔
1581
        // If this message has an existing dependency, then we'll wait until
309✔
1582
        // that has been fully validated before we proceed.
309✔
1583
        err := d.vb.WaitForParents(jobID, nMsg.msg)
309✔
1584
        if err != nil {
309✔
1585
                log.Debugf("Validating network message %s got err: %v",
×
1586
                        nMsg.msg.MsgType(), err)
×
1587

×
1588
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
1589
                        log.Warnf("unexpected error during validation "+
×
1590
                                "barrier shutdown: %v", err)
×
1591
                }
×
1592
                nMsg.err <- err
×
1593

×
1594
                return
×
1595
        }
1596

1597
        // Process the network announcement to determine if this is either a
1598
        // new announcement from our PoV or an edges to a prior vertex/edge we
1599
        // previously proceeded.
1600
        newAnns, allow := d.processNetworkAnnouncement(nMsg)
309✔
1601

309✔
1602
        log.Tracef("Processed network message %s, returned "+
309✔
1603
                "len(announcements)=%v, allowDependents=%v",
309✔
1604
                nMsg.msg.MsgType(), len(newAnns), allow)
309✔
1605

309✔
1606
        // If this message had any dependencies, then we can now signal them to
309✔
1607
        // continue.
309✔
1608
        err = d.vb.SignalDependents(nMsg.msg, jobID)
309✔
1609
        if err != nil {
309✔
1610
                // Something is wrong if SignalDependents returns an error.
×
1611
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
1612
                        "JobID=%v", spew.Sdump(nMsg.msg), jobID)
×
1613

×
1614
                nMsg.err <- err
×
1615

×
1616
                return
×
1617
        }
×
1618

1619
        // If the announcement was accepted, then add the emitted announcements
1620
        // to our announce batch to be broadcast once the trickle timer ticks
1621
        // gain.
1622
        if newAnns != nil && shouldBroadcast {
341✔
1623
                // TODO(roasbeef): exclude peer that sent.
32✔
1624
                deDuped.AddMsgs(newAnns...)
32✔
1625
        } else if newAnns != nil {
310✔
1626
                log.Trace("Skipping broadcast of announcements received " +
1✔
1627
                        "during initial graph sync")
1✔
1628
        }
1✔
1629
}
1630

1631
// TODO(roasbeef): d/c peers that send updates not on our chain
1632

1633
// InitSyncState is called by outside sub-systems when a connection is
1634
// established to a new peer that understands how to perform channel range
1635
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1636
// needed to handle new queries.
UNCOV
1637
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
×
UNCOV
1638
        d.syncMgr.InitSyncState(syncPeer)
×
UNCOV
1639
}
×
1640

1641
// PruneSyncState is called by outside sub-systems once a peer that we were
1642
// previously connected to has been disconnected. In this case we can stop the
1643
// existing GossipSyncer assigned to the peer and free up resources.
UNCOV
1644
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
×
UNCOV
1645
        d.syncMgr.PruneSyncState(peer)
×
UNCOV
1646
}
×
1647

1648
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1649
// false otherwise, This avoids expensive reprocessing of the message.
1650
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1651
        peerPub [33]byte) bool {
273✔
1652

273✔
1653
        var scid uint64
273✔
1654
        switch m := msg.(type) {
273✔
1655
        case *lnwire.ChannelUpdate1:
42✔
1656
                scid = m.ShortChannelID.ToUint64()
42✔
1657

1658
        case *lnwire.ChannelAnnouncement1:
217✔
1659
                scid = m.ShortChannelID.ToUint64()
217✔
1660

1661
        default:
14✔
1662
                return false
14✔
1663
        }
1664

1665
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
259✔
1666
        return err != cache.ErrElementNotFound
259✔
1667
}
1668

1669
// retransmitStaleAnns examines all outgoing channels that the source node is
1670
// known to maintain to check to see if any of them are "stale". A channel is
1671
// stale iff, the last timestamp of its rebroadcast is older than the
1672
// RebroadcastInterval. We also check if a refreshed node announcement should
1673
// be resent.
1674
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
30✔
1675
        // Iterate over all of our channels and check if any of them fall
30✔
1676
        // within the prune interval or re-broadcast interval.
30✔
1677
        type updateTuple struct {
30✔
1678
                info *models.ChannelEdgeInfo
30✔
1679
                edge *models.ChannelEdgePolicy
30✔
1680
        }
30✔
1681

30✔
1682
        var (
30✔
1683
                havePublicChannels bool
30✔
1684
                edgesToUpdate      []updateTuple
30✔
1685
        )
30✔
1686
        err := d.cfg.Graph.ForAllOutgoingChannels(func(
30✔
1687
                info *models.ChannelEdgeInfo,
30✔
1688
                edge *models.ChannelEdgePolicy) error {
32✔
1689

2✔
1690
                // If there's no auth proof attached to this edge, it means
2✔
1691
                // that it is a private channel not meant to be announced to
2✔
1692
                // the greater network, so avoid sending channel updates for
2✔
1693
                // this channel to not leak its
2✔
1694
                // existence.
2✔
1695
                if info.AuthProof == nil {
3✔
1696
                        log.Debugf("Skipping retransmission of channel "+
1✔
1697
                                "without AuthProof: %v", info.ChannelID)
1✔
1698
                        return nil
1✔
1699
                }
1✔
1700

1701
                // We make a note that we have at least one public channel. We
1702
                // use this to determine whether we should send a node
1703
                // announcement below.
1704
                havePublicChannels = true
1✔
1705

1✔
1706
                // If this edge has a ChannelUpdate that was created before the
1✔
1707
                // introduction of the MaxHTLC field, then we'll update this
1✔
1708
                // edge to propagate this information in the network.
1✔
1709
                if !edge.MessageFlags.HasMaxHtlc() {
1✔
1710
                        // We'll make sure we support the new max_htlc field if
×
1711
                        // not already present.
×
1712
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1713
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1714

×
1715
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1716
                                info: info,
×
1717
                                edge: edge,
×
1718
                        })
×
1719
                        return nil
×
1720
                }
×
1721

1722
                timeElapsed := now.Sub(edge.LastUpdate)
1✔
1723

1✔
1724
                // If it's been longer than RebroadcastInterval since we've
1✔
1725
                // re-broadcasted the channel, add the channel to the set of
1✔
1726
                // edges we need to update.
1✔
1727
                if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
1728
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
1✔
1729
                                info: info,
1✔
1730
                                edge: edge,
1✔
1731
                        })
1✔
1732
                }
1✔
1733

1734
                return nil
1✔
1735
        })
1736
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
30✔
1737
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1738
                        err)
×
1739
        }
×
1740

1741
        var signedUpdates []lnwire.Message
30✔
1742
        for _, chanToUpdate := range edgesToUpdate {
31✔
1743
                // Re-sign and update the channel on disk and retrieve our
1✔
1744
                // ChannelUpdate to broadcast.
1✔
1745
                chanAnn, chanUpdate, err := d.updateChannel(
1✔
1746
                        chanToUpdate.info, chanToUpdate.edge,
1✔
1747
                )
1✔
1748
                if err != nil {
1✔
1749
                        return fmt.Errorf("unable to update channel: %w", err)
×
1750
                }
×
1751

1752
                // If we have a valid announcement to transmit, then we'll send
1753
                // that along with the update.
1754
                if chanAnn != nil {
2✔
1755
                        signedUpdates = append(signedUpdates, chanAnn)
1✔
1756
                }
1✔
1757

1758
                signedUpdates = append(signedUpdates, chanUpdate)
1✔
1759
        }
1760

1761
        // If we don't have any public channels, we return as we don't want to
1762
        // broadcast anything that would reveal our existence.
1763
        if !havePublicChannels {
59✔
1764
                return nil
29✔
1765
        }
29✔
1766

1767
        // We'll also check that our NodeAnnouncement is not too old.
1768
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
1✔
1769
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
1✔
1770
        timeElapsed := now.Sub(timestamp)
1✔
1771

1✔
1772
        // If it's been a full day since we've re-broadcasted the
1✔
1773
        // node announcement, refresh it and resend it.
1✔
1774
        nodeAnnStr := ""
1✔
1775
        if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
1776
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
1✔
1777
                if err != nil {
1✔
1778
                        return fmt.Errorf("unable to get refreshed node "+
×
1779
                                "announcement: %v", err)
×
1780
                }
×
1781

1782
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1783
                nodeAnnStr = " and our refreshed node announcement"
1✔
1784

1✔
1785
                // Before broadcasting the refreshed node announcement, add it
1✔
1786
                // to our own graph.
1✔
1787
                if err := d.addNode(&newNodeAnn); err != nil {
2✔
1788
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1789
                                "to graph: %v", err)
1✔
1790
                }
1✔
1791
        }
1792

1793
        // If we don't have any updates to re-broadcast, then we'll exit
1794
        // early.
1795
        if len(signedUpdates) == 0 {
1✔
UNCOV
1796
                return nil
×
UNCOV
1797
        }
×
1798

1799
        log.Infof("Retransmitting %v outgoing channels%v",
1✔
1800
                len(edgesToUpdate), nodeAnnStr)
1✔
1801

1✔
1802
        // With all the wire announcements properly crafted, we'll broadcast
1✔
1803
        // our known outgoing channels to all our immediate peers.
1✔
1804
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
1805
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1806
        }
×
1807

1808
        return nil
1✔
1809
}
1810

1811
// processChanPolicyUpdate generates a new set of channel updates for the
1812
// provided list of edges and updates the backing ChannelGraphSource.
1813
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
1814
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
1✔
1815

1✔
1816
        var chanUpdates []networkMsg
1✔
1817
        for _, edgeInfo := range edgesToUpdate {
4✔
1818
                // Now that we've collected all the channels we need to update,
3✔
1819
                // we'll re-sign and update the backing ChannelGraphSource, and
3✔
1820
                // retrieve our ChannelUpdate to broadcast.
3✔
1821
                _, chanUpdate, err := d.updateChannel(
3✔
1822
                        edgeInfo.Info, edgeInfo.Edge,
3✔
1823
                )
3✔
1824
                if err != nil {
3✔
1825
                        return nil, err
×
1826
                }
×
1827

1828
                // We'll avoid broadcasting any updates for private channels to
1829
                // avoid directly giving away their existence. Instead, we'll
1830
                // send the update directly to the remote party.
1831
                if edgeInfo.Info.AuthProof == nil {
4✔
1832
                        // If AuthProof is nil and an alias was found for this
1✔
1833
                        // ChannelID (meaning the option-scid-alias feature was
1✔
1834
                        // negotiated), we'll replace the ShortChannelID in the
1✔
1835
                        // update with the peer's alias. We do this after
1✔
1836
                        // updateChannel so that the alias isn't persisted to
1✔
1837
                        // the database.
1✔
1838
                        chanID := lnwire.NewChanIDFromOutPoint(
1✔
1839
                                edgeInfo.Info.ChannelPoint,
1✔
1840
                        )
1✔
1841

1✔
1842
                        var defaultAlias lnwire.ShortChannelID
1✔
1843
                        foundAlias, _ := d.cfg.GetAlias(chanID)
1✔
1844
                        if foundAlias != defaultAlias {
1✔
UNCOV
1845
                                chanUpdate.ShortChannelID = foundAlias
×
UNCOV
1846

×
UNCOV
1847
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
×
UNCOV
1848
                                if err != nil {
×
1849
                                        log.Errorf("Unable to sign alias "+
×
1850
                                                "update: %v", err)
×
1851
                                        continue
×
1852
                                }
1853

UNCOV
1854
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
UNCOV
1855
                                if err != nil {
×
1856
                                        log.Errorf("Unable to create sig: %v",
×
1857
                                                err)
×
1858
                                        continue
×
1859
                                }
1860

UNCOV
1861
                                chanUpdate.Signature = lnSig
×
1862
                        }
1863

1864
                        remotePubKey := remotePubFromChanInfo(
1✔
1865
                                edgeInfo.Info, chanUpdate.ChannelFlags,
1✔
1866
                        )
1✔
1867
                        err := d.reliableSender.sendMessage(
1✔
1868
                                chanUpdate, remotePubKey,
1✔
1869
                        )
1✔
1870
                        if err != nil {
1✔
1871
                                log.Errorf("Unable to reliably send %v for "+
×
1872
                                        "channel=%v to peer=%x: %v",
×
1873
                                        chanUpdate.MsgType(),
×
1874
                                        chanUpdate.ShortChannelID,
×
1875
                                        remotePubKey, err)
×
1876
                        }
×
1877
                        continue
1✔
1878
                }
1879

1880
                // We set ourselves as the source of this message to indicate
1881
                // that we shouldn't skip any peers when sending this message.
1882
                chanUpdates = append(chanUpdates, networkMsg{
2✔
1883
                        source:   d.selfKey,
2✔
1884
                        isRemote: false,
2✔
1885
                        msg:      chanUpdate,
2✔
1886
                })
2✔
1887
        }
1888

1889
        return chanUpdates, nil
1✔
1890
}
1891

1892
// remotePubFromChanInfo returns the public key of the remote peer given a
1893
// ChannelEdgeInfo that describe a channel we have with them.
1894
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1895
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
12✔
1896

12✔
1897
        var remotePubKey [33]byte
12✔
1898
        switch {
12✔
1899
        case chanFlags&lnwire.ChanUpdateDirection == 0:
12✔
1900
                remotePubKey = chanInfo.NodeKey2Bytes
12✔
UNCOV
1901
        case chanFlags&lnwire.ChanUpdateDirection == 1:
×
UNCOV
1902
                remotePubKey = chanInfo.NodeKey1Bytes
×
1903
        }
1904

1905
        return remotePubKey
12✔
1906
}
1907

1908
// processRejectedEdge examines a rejected edge to see if we can extract any
1909
// new announcements from it.  An edge will get rejected if we already added
1910
// the same edge without AuthProof to the graph. If the received announcement
1911
// contains a proof, we can add this proof to our edge.  We can end up in this
1912
// situation in the case where we create a channel, but for some reason fail
1913
// to receive the remote peer's proof, while the remote peer is able to fully
1914
// assemble the proof and craft the ChannelAnnouncement.
1915
func (d *AuthenticatedGossiper) processRejectedEdge(
1916
        chanAnnMsg *lnwire.ChannelAnnouncement1,
UNCOV
1917
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
×
UNCOV
1918

×
UNCOV
1919
        // First, we'll fetch the state of the channel as we know if from the
×
UNCOV
1920
        // database.
×
UNCOV
1921
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
×
UNCOV
1922
                chanAnnMsg.ShortChannelID,
×
UNCOV
1923
        )
×
UNCOV
1924
        if err != nil {
×
1925
                return nil, err
×
1926
        }
×
1927

1928
        // The edge is in the graph, and has a proof attached, then we'll just
1929
        // reject it as normal.
UNCOV
1930
        if chanInfo.AuthProof != nil {
×
UNCOV
1931
                return nil, nil
×
UNCOV
1932
        }
×
1933

1934
        // Otherwise, this means that the edge is within the graph, but it
1935
        // doesn't yet have a proper proof attached. If we did not receive
1936
        // the proof such that we now can add it, there's nothing more we
1937
        // can do.
1938
        if proof == nil {
×
1939
                return nil, nil
×
1940
        }
×
1941

1942
        // We'll then create then validate the new fully assembled
1943
        // announcement.
1944
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1945
                proof, chanInfo, e1, e2,
×
1946
        )
×
1947
        if err != nil {
×
1948
                return nil, err
×
1949
        }
×
1950
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1951
        if err != nil {
×
1952
                err := fmt.Errorf("assembled channel announcement proof "+
×
1953
                        "for shortChanID=%v isn't valid: %v",
×
1954
                        chanAnnMsg.ShortChannelID, err)
×
1955
                log.Error(err)
×
1956
                return nil, err
×
1957
        }
×
1958

1959
        // If everything checks out, then we'll add the fully assembled proof
1960
        // to the database.
1961
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
1962
        if err != nil {
×
1963
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
1964
                        chanAnnMsg.ShortChannelID, err)
×
1965
                log.Error(err)
×
1966
                return nil, err
×
1967
        }
×
1968

1969
        // As we now have a complete channel announcement for this channel,
1970
        // we'll construct the announcement so they can be broadcast out to all
1971
        // our peers.
1972
        announcements := make([]networkMsg, 0, 3)
×
1973
        announcements = append(announcements, networkMsg{
×
1974
                source: d.selfKey,
×
1975
                msg:    chanAnn,
×
1976
        })
×
1977
        if e1Ann != nil {
×
1978
                announcements = append(announcements, networkMsg{
×
1979
                        source: d.selfKey,
×
1980
                        msg:    e1Ann,
×
1981
                })
×
1982
        }
×
1983
        if e2Ann != nil {
×
1984
                announcements = append(announcements, networkMsg{
×
1985
                        source: d.selfKey,
×
1986
                        msg:    e2Ann,
×
1987
                })
×
1988

×
1989
        }
×
1990

1991
        return announcements, nil
×
1992
}
1993

1994
// fetchPKScript fetches the output script for the given SCID.
1995
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
1996
        []byte, error) {
×
1997

×
1998
        return lnwallet.FetchPKScriptWithQuit(d.cfg.ChainIO, chanID, d.quit)
×
1999
}
×
2000

2001
// addNode processes the given node announcement, and adds it to our channel
2002
// graph.
2003
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
2004
        op ...batch.SchedulerOption) error {
17✔
2005

17✔
2006
        if err := netann.ValidateNodeAnn(msg); err != nil {
18✔
2007
                return fmt.Errorf("unable to validate node announcement: %w",
1✔
2008
                        err)
1✔
2009
        }
1✔
2010

2011
        return d.cfg.Graph.AddNode(models.NodeFromWireAnnouncement(msg), op...)
16✔
2012
}
2013

2014
// isPremature decides whether a given network message has a block height+delta
2015
// value specified in the future. If so, the message will be added to the
2016
// future message map and be processed when the block height as reached.
2017
//
2018
// NOTE: must be used inside a lock.
2019
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2020
        delta uint32, msg *networkMsg) bool {
282✔
2021

282✔
2022
        // The channel is already confirmed at chanID.BlockHeight so we minus
282✔
2023
        // one block. For instance, if the required confirmation for this
282✔
2024
        // channel announcement is 6, we then only need to wait for 5 more
282✔
2025
        // blocks once the funding tx is confirmed.
282✔
2026
        if delta > 0 {
282✔
UNCOV
2027
                delta--
×
UNCOV
2028
        }
×
2029

2030
        msgHeight := chanID.BlockHeight + delta
282✔
2031

282✔
2032
        // The message height is smaller or equal to our best known height,
282✔
2033
        // thus the message is mature.
282✔
2034
        if msgHeight <= d.bestHeight {
563✔
2035
                return false
281✔
2036
        }
281✔
2037

2038
        // Add the premature message to our future messages which will be
2039
        // resent once the block height has reached.
2040
        //
2041
        // Copy the networkMsgs since the old message's err chan will be
2042
        // consumed.
2043
        copied := &networkMsg{
1✔
2044
                peer:              msg.peer,
1✔
2045
                source:            msg.source,
1✔
2046
                msg:               msg.msg,
1✔
2047
                optionalMsgFields: msg.optionalMsgFields,
1✔
2048
                isRemote:          msg.isRemote,
1✔
2049
                err:               make(chan error, 1),
1✔
2050
        }
1✔
2051

1✔
2052
        // Create the cached message.
1✔
2053
        cachedMsg := &cachedFutureMsg{
1✔
2054
                msg:    copied,
1✔
2055
                height: msgHeight,
1✔
2056
        }
1✔
2057

1✔
2058
        // Increment the msg ID and add it to the cache.
1✔
2059
        nextMsgID := d.futureMsgs.nextMsgID()
1✔
2060
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
1✔
2061
        if err != nil {
1✔
2062
                log.Errorf("Adding future message got error: %v", err)
×
2063
        }
×
2064

2065
        log.Debugf("Network message: %v added to future messages for "+
1✔
2066
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
1✔
2067
                msgHeight, d.bestHeight)
1✔
2068

1✔
2069
        return true
1✔
2070
}
2071

2072
// processNetworkAnnouncement processes a new network relate authenticated
2073
// channel or node announcement or announcements proofs. If the announcement
2074
// didn't affect the internal state due to either being out of date, invalid,
2075
// or redundant, then nil is returned. Otherwise, the set of announcements will
2076
// be returned which should be broadcasted to the rest of the network. The
2077
// boolean returned indicates whether any dependents of the announcement should
2078
// attempt to be processed as well.
2079
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
2080
        nMsg *networkMsg) ([]networkMsg, bool) {
330✔
2081

330✔
2082
        // If this is a remote update, we set the scheduler option to lazily
330✔
2083
        // add it to the graph.
330✔
2084
        var schedulerOp []batch.SchedulerOption
330✔
2085
        if nMsg.isRemote {
613✔
2086
                schedulerOp = append(schedulerOp, batch.LazyAdd())
283✔
2087
        }
283✔
2088

2089
        switch msg := nMsg.msg.(type) {
330✔
2090
        // A new node announcement has arrived which either presents new
2091
        // information about a node in one of the channels we know about, or a
2092
        // updating previously advertised information.
2093
        case *lnwire.NodeAnnouncement:
24✔
2094
                return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
24✔
2095

2096
        // A new channel announcement has arrived, this indicates the
2097
        // *creation* of a new channel within the network. This only advertises
2098
        // the existence of a channel and not yet the routing policies in
2099
        // either direction of the channel.
2100
        case *lnwire.ChannelAnnouncement1:
230✔
2101
                return d.handleChanAnnouncement(nMsg, msg, schedulerOp...)
230✔
2102

2103
        // A new authenticated channel edge update has arrived. This indicates
2104
        // that the directional information for an already known channel has
2105
        // been updated.
2106
        case *lnwire.ChannelUpdate1:
55✔
2107
                return d.handleChanUpdate(nMsg, msg, schedulerOp)
55✔
2108

2109
        // A new signature announcement has been received. This indicates
2110
        // willingness of nodes involved in the funding of a channel to
2111
        // announce this new channel to the rest of the world.
2112
        case *lnwire.AnnounceSignatures1:
21✔
2113
                return d.handleAnnSig(nMsg, msg)
21✔
2114

2115
        default:
×
2116
                err := errors.New("wrong type of the announcement")
×
2117
                nMsg.err <- err
×
2118
                return nil, false
×
2119
        }
2120
}
2121

2122
// processZombieUpdate determines whether the provided channel update should
2123
// resurrect a given zombie edge.
2124
//
2125
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2126
// should be inspected.
2127
func (d *AuthenticatedGossiper) processZombieUpdate(
2128
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2129
        msg *lnwire.ChannelUpdate1) error {
3✔
2130

3✔
2131
        // The least-significant bit in the flag on the channel update tells us
3✔
2132
        // which edge is being updated.
3✔
2133
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
2134

3✔
2135
        // Since we've deemed the update as not stale above, before marking it
3✔
2136
        // live, we'll make sure it has been signed by the correct party. If we
3✔
2137
        // have both pubkeys, either party can resurrect the channel. If we've
3✔
2138
        // already marked this with the stricter, single-sided resurrection we
3✔
2139
        // will only have the pubkey of the node with the oldest timestamp.
3✔
2140
        var pubKey *btcec.PublicKey
3✔
2141
        switch {
3✔
2142
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2143
                pubKey, _ = chanInfo.NodeKey1()
×
2144
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
2✔
2145
                pubKey, _ = chanInfo.NodeKey2()
2✔
2146
        }
2147
        if pubKey == nil {
4✔
2148
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
1✔
2149
                        "with chan_id=%v", msg.ShortChannelID)
1✔
2150
        }
1✔
2151

2152
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
2✔
2153
        if err != nil {
3✔
2154
                return fmt.Errorf("unable to verify channel "+
1✔
2155
                        "update signature: %v", err)
1✔
2156
        }
1✔
2157

2158
        // With the signature valid, we'll proceed to mark the
2159
        // edge as live and wait for the channel announcement to
2160
        // come through again.
2161
        err = d.cfg.Graph.MarkEdgeLive(scid)
1✔
2162
        switch {
1✔
2163
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2164
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2165
                        "zombie index: %v", err)
×
2166

×
2167
                return nil
×
2168

2169
        case err != nil:
×
2170
                return fmt.Errorf("unable to remove edge with "+
×
2171
                        "chan_id=%v from zombie index: %v",
×
2172
                        msg.ShortChannelID, err)
×
2173

2174
        default:
1✔
2175
        }
2176

2177
        log.Debugf("Removed edge with chan_id=%v from zombie "+
1✔
2178
                "index", msg.ShortChannelID)
1✔
2179

1✔
2180
        return nil
1✔
2181
}
2182

2183
// fetchNodeAnn fetches the latest signed node announcement from our point of
2184
// view for the node with the given public key.
2185
func (d *AuthenticatedGossiper) fetchNodeAnn(
2186
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
20✔
2187

20✔
2188
        node, err := d.cfg.Graph.FetchLightningNode(pubKey)
20✔
2189
        if err != nil {
26✔
2190
                return nil, err
6✔
2191
        }
6✔
2192

2193
        return node.NodeAnnouncement(true)
14✔
2194
}
2195

2196
// isMsgStale determines whether a message retrieved from the backing
2197
// MessageStore is seen as stale by the current graph.
2198
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
12✔
2199
        switch msg := msg.(type) {
12✔
2200
        case *lnwire.AnnounceSignatures1:
2✔
2201
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
2✔
2202
                        msg.ShortChannelID,
2✔
2203
                )
2✔
2204

2✔
2205
                // If the channel cannot be found, it is most likely a leftover
2✔
2206
                // message for a channel that was closed, so we can consider it
2✔
2207
                // stale.
2✔
2208
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
2✔
UNCOV
2209
                        return true
×
UNCOV
2210
                }
×
2211
                if err != nil {
2✔
2212
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2213
                                "%v", chanInfo.ChannelID, err)
×
2214
                        return false
×
2215
                }
×
2216

2217
                // If the proof exists in the graph, then we have successfully
2218
                // received the remote proof and assembled the full proof, so we
2219
                // can safely delete the local proof from the database.
2220
                return chanInfo.AuthProof != nil
2✔
2221

2222
        case *lnwire.ChannelUpdate1:
10✔
2223
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
10✔
2224

10✔
2225
                // If the channel cannot be found, it is most likely a leftover
10✔
2226
                // message for a channel that was closed, so we can consider it
10✔
2227
                // stale.
10✔
2228
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
10✔
UNCOV
2229
                        return true
×
UNCOV
2230
                }
×
2231
                if err != nil {
10✔
2232
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2233
                                "%v", msg.ShortChannelID, err)
×
2234
                        return false
×
2235
                }
×
2236

2237
                // Otherwise, we'll retrieve the correct policy that we
2238
                // currently have stored within our graph to check if this
2239
                // message is stale by comparing its timestamp.
2240
                var p *models.ChannelEdgePolicy
10✔
2241
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
20✔
2242
                        p = p1
10✔
2243
                } else {
10✔
UNCOV
2244
                        p = p2
×
UNCOV
2245
                }
×
2246

2247
                // If the policy is still unknown, then we can consider this
2248
                // policy fresh.
2249
                if p == nil {
10✔
2250
                        return false
×
2251
                }
×
2252

2253
                timestamp := time.Unix(int64(msg.Timestamp), 0)
10✔
2254
                return p.LastUpdate.After(timestamp)
10✔
2255

2256
        default:
×
2257
                // We'll make sure to not mark any unsupported messages as stale
×
2258
                // to ensure they are not removed.
×
2259
                return false
×
2260
        }
2261
}
2262

2263
// updateChannel creates a new fully signed update for the channel, and updates
2264
// the underlying graph with the new state.
2265
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
2266
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2267
        *lnwire.ChannelUpdate1, error) {
4✔
2268

4✔
2269
        // Parse the unsigned edge into a channel update.
4✔
2270
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
4✔
2271

4✔
2272
        // We'll generate a new signature over a digest of the channel
4✔
2273
        // announcement itself and update the timestamp to ensure it propagate.
4✔
2274
        err := netann.SignChannelUpdate(
4✔
2275
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
4✔
2276
                netann.ChanUpdSetTimestamp,
4✔
2277
        )
4✔
2278
        if err != nil {
4✔
2279
                return nil, nil, err
×
2280
        }
×
2281

2282
        // Next, we'll set the new signature in place, and update the reference
2283
        // in the backing slice.
2284
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
4✔
2285
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
4✔
2286

4✔
2287
        // To ensure that our signature is valid, we'll verify it ourself
4✔
2288
        // before committing it to the slice returned.
4✔
2289
        err = netann.ValidateChannelUpdateAnn(
4✔
2290
                d.selfKey, info.Capacity, chanUpdate,
4✔
2291
        )
4✔
2292
        if err != nil {
4✔
2293
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2294
                        "update sig: %v", err)
×
2295
        }
×
2296

2297
        // Finally, we'll write the new edge policy to disk.
2298
        if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
4✔
2299
                return nil, nil, err
×
2300
        }
×
2301

2302
        // We'll also create the original channel announcement so the two can
2303
        // be broadcast along side each other (if necessary), but only if we
2304
        // have a full channel announcement for this channel.
2305
        var chanAnn *lnwire.ChannelAnnouncement1
4✔
2306
        if info.AuthProof != nil {
7✔
2307
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
3✔
2308
                chanAnn = &lnwire.ChannelAnnouncement1{
3✔
2309
                        ShortChannelID:  chanID,
3✔
2310
                        NodeID1:         info.NodeKey1Bytes,
3✔
2311
                        NodeID2:         info.NodeKey2Bytes,
3✔
2312
                        ChainHash:       info.ChainHash,
3✔
2313
                        BitcoinKey1:     info.BitcoinKey1Bytes,
3✔
2314
                        Features:        lnwire.NewRawFeatureVector(),
3✔
2315
                        BitcoinKey2:     info.BitcoinKey2Bytes,
3✔
2316
                        ExtraOpaqueData: info.ExtraOpaqueData,
3✔
2317
                }
3✔
2318
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2319
                        info.AuthProof.NodeSig1Bytes,
3✔
2320
                )
3✔
2321
                if err != nil {
3✔
2322
                        return nil, nil, err
×
2323
                }
×
2324
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2325
                        info.AuthProof.NodeSig2Bytes,
3✔
2326
                )
3✔
2327
                if err != nil {
3✔
2328
                        return nil, nil, err
×
2329
                }
×
2330
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2331
                        info.AuthProof.BitcoinSig1Bytes,
3✔
2332
                )
3✔
2333
                if err != nil {
3✔
2334
                        return nil, nil, err
×
2335
                }
×
2336
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2337
                        info.AuthProof.BitcoinSig2Bytes,
3✔
2338
                )
3✔
2339
                if err != nil {
3✔
2340
                        return nil, nil, err
×
2341
                }
×
2342
        }
2343

2344
        return chanAnn, chanUpdate, err
4✔
2345
}
2346

2347
// SyncManager returns the gossiper's SyncManager instance.
UNCOV
2348
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
×
UNCOV
2349
        return d.syncMgr
×
UNCOV
2350
}
×
2351

2352
// IsKeepAliveUpdate determines whether this channel update is considered a
2353
// keep-alive update based on the previous channel update processed for the same
2354
// direction.
2355
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2356
        prev *models.ChannelEdgePolicy) bool {
14✔
2357

14✔
2358
        // Both updates should be from the same direction.
14✔
2359
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
14✔
2360
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
14✔
2361

×
2362
                return false
×
2363
        }
×
2364

2365
        // The timestamp should always increase for a keep-alive update.
2366
        timestamp := time.Unix(int64(update.Timestamp), 0)
14✔
2367
        if !timestamp.After(prev.LastUpdate) {
14✔
UNCOV
2368
                return false
×
UNCOV
2369
        }
×
2370

2371
        // None of the remaining fields should change for a keep-alive update.
2372
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
14✔
UNCOV
2373
                return false
×
UNCOV
2374
        }
×
2375
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
26✔
2376
                return false
12✔
2377
        }
12✔
2378
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
2✔
UNCOV
2379
                return false
×
UNCOV
2380
        }
×
2381
        if update.TimeLockDelta != prev.TimeLockDelta {
2✔
2382
                return false
×
2383
        }
×
2384
        if update.HtlcMinimumMsat != prev.MinHTLC {
2✔
2385
                return false
×
2386
        }
×
2387
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
2✔
2388
                return false
×
2389
        }
×
2390
        if update.HtlcMaximumMsat != prev.MaxHTLC {
2✔
2391
                return false
×
2392
        }
×
2393
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
2✔
UNCOV
2394
                return false
×
UNCOV
2395
        }
×
2396
        return true
2✔
2397
}
2398

2399
// latestHeight returns the gossiper's latest height known of the chain.
UNCOV
2400
func (d *AuthenticatedGossiper) latestHeight() uint32 {
×
UNCOV
2401
        d.Lock()
×
UNCOV
2402
        defer d.Unlock()
×
UNCOV
2403
        return d.bestHeight
×
UNCOV
2404
}
×
2405

2406
// handleNodeAnnouncement processes a new node announcement.
2407
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
2408
        nodeAnn *lnwire.NodeAnnouncement,
2409
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
24✔
2410

24✔
2411
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
24✔
2412

24✔
2413
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
24✔
2414
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
24✔
2415
                nMsg.source.SerializeCompressed())
24✔
2416

24✔
2417
        // We'll quickly ask the router if it already has a newer update for
24✔
2418
        // this node so we can skip validating signatures if not required.
24✔
2419
        if d.cfg.Graph.IsStaleNode(nodeAnn.NodeID, timestamp) {
32✔
2420
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
8✔
2421
                nMsg.err <- nil
8✔
2422
                return nil, true
8✔
2423
        }
8✔
2424

2425
        if err := d.addNode(nodeAnn, ops...); err != nil {
16✔
UNCOV
2426
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
×
UNCOV
2427
                        err)
×
UNCOV
2428

×
UNCOV
2429
                if !graph.IsError(
×
UNCOV
2430
                        err,
×
UNCOV
2431
                        graph.ErrOutdated,
×
UNCOV
2432
                        graph.ErrIgnored,
×
UNCOV
2433
                ) {
×
2434

×
2435
                        log.Error(err)
×
2436
                }
×
2437

UNCOV
2438
                nMsg.err <- err
×
UNCOV
2439
                return nil, false
×
2440
        }
2441

2442
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2443
        // quick check to ensure this node intends to publicly advertise itself
2444
        // to the network.
2445
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
16✔
2446
        if err != nil {
16✔
2447
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2448
                        nodeAnn.NodeID, err)
×
2449
                nMsg.err <- err
×
2450
                return nil, false
×
2451
        }
×
2452

2453
        var announcements []networkMsg
16✔
2454

16✔
2455
        // If it does, we'll add their announcement to our batch so that it can
16✔
2456
        // be broadcast to the rest of our peers.
16✔
2457
        if isPublic {
19✔
2458
                announcements = append(announcements, networkMsg{
3✔
2459
                        peer:     nMsg.peer,
3✔
2460
                        isRemote: nMsg.isRemote,
3✔
2461
                        source:   nMsg.source,
3✔
2462
                        msg:      nodeAnn,
3✔
2463
                })
3✔
2464
        } else {
16✔
2465
                log.Tracef("Skipping broadcasting node announcement for %x "+
13✔
2466
                        "due to being unadvertised", nodeAnn.NodeID)
13✔
2467
        }
13✔
2468

2469
        nMsg.err <- nil
16✔
2470
        // TODO(roasbeef): get rid of the above
16✔
2471

16✔
2472
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
16✔
2473
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
16✔
2474
                nMsg.source.SerializeCompressed())
16✔
2475

16✔
2476
        return announcements, true
16✔
2477
}
2478

2479
// handleChanAnnouncement processes a new channel announcement.
2480
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
2481
        ann *lnwire.ChannelAnnouncement1,
2482
        ops ...batch.SchedulerOption) ([]networkMsg, bool) {
233✔
2483

233✔
2484
        scid := ann.ShortChannelID
233✔
2485

233✔
2486
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
233✔
2487
                nMsg.peer, scid.ToUint64())
233✔
2488

233✔
2489
        // We'll ignore any channel announcements that target any chain other
233✔
2490
        // than the set of chains we know of.
233✔
2491
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
233✔
2492
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2493
                        ", gossiper on chain=%v", ann.ChainHash,
×
2494
                        d.cfg.ChainHash)
×
2495
                log.Errorf(err.Error())
×
2496

×
2497
                key := newRejectCacheKey(
×
2498
                        scid.ToUint64(),
×
2499
                        sourceToPub(nMsg.source),
×
2500
                )
×
2501
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2502

×
2503
                nMsg.err <- err
×
2504
                return nil, false
×
2505
        }
×
2506

2507
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2508
        // reject the announcement. Since the router accepts alias SCIDs,
2509
        // not erroring out would be a DoS vector.
2510
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
233✔
2511
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2512
                log.Errorf(err.Error())
×
2513

×
2514
                key := newRejectCacheKey(
×
2515
                        scid.ToUint64(),
×
2516
                        sourceToPub(nMsg.source),
×
2517
                )
×
2518
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2519

×
2520
                nMsg.err <- err
×
2521
                return nil, false
×
2522
        }
×
2523

2524
        // If the advertised inclusionary block is beyond our knowledge of the
2525
        // chain tip, then we'll ignore it for now.
2526
        d.Lock()
233✔
2527
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
234✔
2528
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
1✔
2529
                        "advertises height %v, only height %v is known",
1✔
2530
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
1✔
2531
                d.Unlock()
1✔
2532
                nMsg.err <- nil
1✔
2533
                return nil, false
1✔
2534
        }
1✔
2535
        d.Unlock()
232✔
2536

232✔
2537
        // At this point, we'll now ask the router if this is a zombie/known
232✔
2538
        // edge. If so we can skip all the processing below.
232✔
2539
        if d.cfg.Graph.IsKnownEdge(scid) {
233✔
2540
                nMsg.err <- nil
1✔
2541
                return nil, true
1✔
2542
        }
1✔
2543

2544
        // Check if the channel is already closed in which case we can ignore
2545
        // it.
2546
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
231✔
2547
        if err != nil {
231✔
2548
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2549
                        err)
×
2550
                nMsg.err <- err
×
2551

×
2552
                return nil, false
×
2553
        }
×
2554

2555
        if closed {
232✔
2556
                err = fmt.Errorf("ignoring closed channel %v", scid)
1✔
2557
                log.Error(err)
1✔
2558

1✔
2559
                // If this is an announcement from us, we'll just ignore it.
1✔
2560
                if !nMsg.isRemote {
1✔
2561
                        nMsg.err <- err
×
2562
                        return nil, false
×
2563
                }
×
2564

2565
                // Increment the peer's ban score if they are sending closed
2566
                // channel announcements.
2567
                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2568

1✔
2569
                // If the peer is banned and not a channel peer, we'll
1✔
2570
                // disconnect them.
1✔
2571
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2572
                if dcErr != nil {
1✔
2573
                        log.Errorf("failed to check if we should disconnect "+
×
2574
                                "peer: %v", dcErr)
×
2575
                        nMsg.err <- dcErr
×
2576

×
2577
                        return nil, false
×
2578
                }
×
2579

2580
                if shouldDc {
1✔
2581
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2582
                }
×
2583

2584
                nMsg.err <- err
1✔
2585

1✔
2586
                return nil, false
1✔
2587
        }
2588

2589
        // If this is a remote channel announcement, then we'll validate all
2590
        // the signatures within the proof as it should be well formed.
2591
        var proof *models.ChannelAuthProof
230✔
2592
        if nMsg.isRemote {
446✔
2593
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
216✔
2594
                if err != nil {
216✔
2595
                        err := fmt.Errorf("unable to validate announcement: "+
×
2596
                                "%v", err)
×
2597

×
2598
                        key := newRejectCacheKey(
×
2599
                                scid.ToUint64(),
×
2600
                                sourceToPub(nMsg.source),
×
2601
                        )
×
2602
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2603

×
2604
                        log.Error(err)
×
2605
                        nMsg.err <- err
×
2606
                        return nil, false
×
2607
                }
×
2608

2609
                // If the proof checks out, then we'll save the proof itself to
2610
                // the database so we can fetch it later when gossiping with
2611
                // other nodes.
2612
                proof = &models.ChannelAuthProof{
216✔
2613
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
216✔
2614
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
216✔
2615
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
216✔
2616
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
216✔
2617
                }
216✔
2618
        }
2619

2620
        // With the proof validated (if necessary), we can now store it within
2621
        // the database for our path finding and syncing needs.
2622
        var featureBuf bytes.Buffer
230✔
2623
        if err := ann.Features.Encode(&featureBuf); err != nil {
230✔
2624
                log.Errorf("unable to encode features: %v", err)
×
2625
                nMsg.err <- err
×
2626
                return nil, false
×
2627
        }
×
2628

2629
        edge := &models.ChannelEdgeInfo{
230✔
2630
                ChannelID:        scid.ToUint64(),
230✔
2631
                ChainHash:        ann.ChainHash,
230✔
2632
                NodeKey1Bytes:    ann.NodeID1,
230✔
2633
                NodeKey2Bytes:    ann.NodeID2,
230✔
2634
                BitcoinKey1Bytes: ann.BitcoinKey1,
230✔
2635
                BitcoinKey2Bytes: ann.BitcoinKey2,
230✔
2636
                AuthProof:        proof,
230✔
2637
                Features:         featureBuf.Bytes(),
230✔
2638
                ExtraOpaqueData:  ann.ExtraOpaqueData,
230✔
2639
        }
230✔
2640

230✔
2641
        // If there were any optional message fields provided, we'll include
230✔
2642
        // them in its serialized disk representation now.
230✔
2643
        var tapscriptRoot fn.Option[chainhash.Hash]
230✔
2644
        if nMsg.optionalMsgFields != nil {
244✔
2645
                if nMsg.optionalMsgFields.capacity != nil {
15✔
2646
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
1✔
2647
                }
1✔
2648
                if nMsg.optionalMsgFields.channelPoint != nil {
18✔
2649
                        cp := *nMsg.optionalMsgFields.channelPoint
4✔
2650
                        edge.ChannelPoint = cp
4✔
2651
                }
4✔
2652

2653
                // Optional tapscript root for custom channels.
2654
                tapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
14✔
2655
        }
2656

2657
        // Before we start validation or add the edge to the database, we obtain
2658
        // the mutex for this channel ID. We do this to ensure no other
2659
        // goroutine has read the database and is now making decisions based on
2660
        // this DB state, before it writes to the DB. It also ensures that we
2661
        // don't perform the expensive validation check on the same channel
2662
        // announcement at the same time.
2663
        d.channelMtx.Lock(scid.ToUint64())
230✔
2664

230✔
2665
        // If AssumeChannelValid is present, then we are unable to perform any
230✔
2666
        // of the expensive checks below, so we'll short-circuit our path
230✔
2667
        // straight to adding the edge to our graph. If the passed
230✔
2668
        // ShortChannelID is an alias, then we'll skip validation as it will
230✔
2669
        // not map to a legitimate tx. This is not a DoS vector as only we can
230✔
2670
        // add an alias ChannelAnnouncement from the gossiper.
230✔
2671
        if !(d.cfg.AssumeChannelValid || d.cfg.IsAlias(scid)) { //nolint:nestif
458✔
2672
                op, capacity, script, err := d.validateFundingTransaction(
228✔
2673
                        ann, tapscriptRoot,
228✔
2674
                )
228✔
2675
                if err != nil {
432✔
2676
                        defer d.channelMtx.Unlock(scid.ToUint64())
204✔
2677

204✔
2678
                        switch {
204✔
2679
                        case errors.Is(err, ErrNoFundingTransaction),
2680
                                errors.Is(err, ErrInvalidFundingOutput):
202✔
2681

202✔
2682
                                key := newRejectCacheKey(
202✔
2683
                                        scid.ToUint64(),
202✔
2684
                                        sourceToPub(nMsg.source),
202✔
2685
                                )
202✔
2686
                                _, _ = d.recentRejects.Put(
202✔
2687
                                        key, &cachedReject{},
202✔
2688
                                )
202✔
2689

202✔
2690
                                // Increment the peer's ban score. We check
202✔
2691
                                // isRemote so we don't actually ban the peer in
202✔
2692
                                // case of a local bug.
202✔
2693
                                if nMsg.isRemote {
404✔
2694
                                        d.banman.incrementBanScore(
202✔
2695
                                                nMsg.peer.PubKey(),
202✔
2696
                                        )
202✔
2697
                                }
202✔
2698

2699
                        case errors.Is(err, ErrChannelSpent):
2✔
2700
                                key := newRejectCacheKey(
2✔
2701
                                        scid.ToUint64(),
2✔
2702
                                        sourceToPub(nMsg.source),
2✔
2703
                                )
2✔
2704
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
2✔
2705

2✔
2706
                                // Since this channel has already been closed,
2✔
2707
                                // we'll add it to the graph's closed channel
2✔
2708
                                // index such that we won't attempt to do
2✔
2709
                                // expensive validation checks on it again.
2✔
2710
                                // TODO: Populate the ScidCloser by using closed
2✔
2711
                                // channel notifications.
2✔
2712
                                dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
2✔
2713
                                if dbErr != nil {
2✔
2714
                                        log.Errorf("failed to mark scid(%v) "+
×
2715
                                                "as closed: %v", scid, dbErr)
×
2716

×
2717
                                        nMsg.err <- dbErr
×
2718

×
2719
                                        return nil, false
×
2720
                                }
×
2721

2722
                                // Increment the peer's ban score. We check
2723
                                // isRemote so we don't accidentally ban
2724
                                // ourselves in case of a bug.
2725
                                if nMsg.isRemote {
4✔
2726
                                        d.banman.incrementBanScore(
2✔
2727
                                                nMsg.peer.PubKey(),
2✔
2728
                                        )
2✔
2729
                                }
2✔
2730

2731
                        default:
×
2732
                                // Otherwise, this is just a regular rejected
×
2733
                                // edge.
×
2734
                                key := newRejectCacheKey(
×
2735
                                        scid.ToUint64(),
×
2736
                                        sourceToPub(nMsg.source),
×
2737
                                )
×
2738
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2739
                        }
2740

2741
                        if !nMsg.isRemote {
204✔
2742
                                log.Errorf("failed to add edge for local "+
×
2743
                                        "channel: %v", err)
×
2744
                                nMsg.err <- err
×
2745

×
2746
                                return nil, false
×
2747
                        }
×
2748

2749
                        shouldDc, dcErr := d.ShouldDisconnect(
204✔
2750
                                nMsg.peer.IdentityKey(),
204✔
2751
                        )
204✔
2752
                        if dcErr != nil {
204✔
2753
                                log.Errorf("failed to check if we should "+
×
2754
                                        "disconnect peer: %v", dcErr)
×
2755
                                nMsg.err <- dcErr
×
2756

×
2757
                                return nil, false
×
2758
                        }
×
2759

2760
                        if shouldDc {
205✔
2761
                                nMsg.peer.Disconnect(ErrPeerBanned)
1✔
2762
                        }
1✔
2763

2764
                        nMsg.err <- err
204✔
2765

204✔
2766
                        return nil, false
204✔
2767
                }
2768

2769
                edge.FundingScript = fn.Some(script)
24✔
2770

24✔
2771
                // TODO(roasbeef): this is a hack, needs to be removed after
24✔
2772
                //  commitment fees are dynamic.
24✔
2773
                edge.Capacity = capacity
24✔
2774
                edge.ChannelPoint = op
24✔
2775
        }
2776

2777
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
26✔
2778

26✔
2779
        // We will add the edge to the channel router. If the nodes present in
26✔
2780
        // this channel are not present in the database, a partial node will be
26✔
2781
        // added to represent each node while we wait for a node announcement.
26✔
2782
        err = d.cfg.Graph.AddEdge(edge, ops...)
26✔
2783
        if err != nil {
27✔
2784
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
1✔
2785
                        scid.ToUint64(), err)
1✔
2786

1✔
2787
                defer d.channelMtx.Unlock(scid.ToUint64())
1✔
2788

1✔
2789
                // If the edge was rejected due to already being known, then it
1✔
2790
                // may be the case that this new message has a fresh channel
1✔
2791
                // proof, so we'll check.
1✔
2792
                if graph.IsError(err, graph.ErrIgnored) {
1✔
UNCOV
2793
                        // Attempt to process the rejected message to see if we
×
UNCOV
2794
                        // get any new announcements.
×
UNCOV
2795
                        anns, rErr := d.processRejectedEdge(ann, proof)
×
UNCOV
2796
                        if rErr != nil {
×
2797
                                key := newRejectCacheKey(
×
2798
                                        scid.ToUint64(),
×
2799
                                        sourceToPub(nMsg.source),
×
2800
                                )
×
2801
                                cr := &cachedReject{}
×
2802
                                _, _ = d.recentRejects.Put(key, cr)
×
2803

×
2804
                                nMsg.err <- rErr
×
2805

×
2806
                                return nil, false
×
2807
                        }
×
2808

UNCOV
2809
                        log.Debugf("Extracted %v announcements from rejected "+
×
UNCOV
2810
                                "msgs", len(anns))
×
UNCOV
2811

×
UNCOV
2812
                        // If while processing this rejected edge, we realized
×
UNCOV
2813
                        // there's a set of announcements we could extract,
×
UNCOV
2814
                        // then we'll return those directly.
×
UNCOV
2815
                        //
×
UNCOV
2816
                        // NOTE: since this is an ErrIgnored, we can return
×
UNCOV
2817
                        // true here to signal "allow" to its dependants.
×
UNCOV
2818
                        nMsg.err <- nil
×
UNCOV
2819

×
UNCOV
2820
                        return anns, true
×
2821
                }
2822

2823
                // Otherwise, this is just a regular rejected edge.
2824
                key := newRejectCacheKey(
1✔
2825
                        scid.ToUint64(),
1✔
2826
                        sourceToPub(nMsg.source),
1✔
2827
                )
1✔
2828
                _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2829

1✔
2830
                if !nMsg.isRemote {
1✔
2831
                        log.Errorf("failed to add edge for local channel: %v",
×
2832
                                err)
×
2833
                        nMsg.err <- err
×
2834

×
2835
                        return nil, false
×
2836
                }
×
2837

2838
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2839
                if dcErr != nil {
1✔
2840
                        log.Errorf("failed to check if we should disconnect "+
×
2841
                                "peer: %v", dcErr)
×
2842
                        nMsg.err <- dcErr
×
2843

×
2844
                        return nil, false
×
2845
                }
×
2846

2847
                if shouldDc {
1✔
2848
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2849
                }
×
2850

2851
                nMsg.err <- err
1✔
2852

1✔
2853
                return nil, false
1✔
2854
        }
2855

2856
        // If err is nil, release the lock immediately.
2857
        d.channelMtx.Unlock(scid.ToUint64())
25✔
2858

25✔
2859
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
25✔
2860

25✔
2861
        // If we earlier received any ChannelUpdates for this channel, we can
25✔
2862
        // now process them, as the channel is added to the graph.
25✔
2863
        var channelUpdates []*processedNetworkMsg
25✔
2864

25✔
2865
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
25✔
2866
        if err == nil {
27✔
2867
                // There was actually an entry in the map, so we'll accumulate
2✔
2868
                // it. We don't worry about deletion, since it'll eventually
2✔
2869
                // fall out anyway.
2✔
2870
                chanMsgs := earlyChanUpdates
2✔
2871
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
2✔
2872
        }
2✔
2873

2874
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2875
        // ensure we don't block here, as we can handle only one announcement
2876
        // at a time.
2877
        for _, cu := range channelUpdates {
27✔
2878
                // Skip if already processed.
2✔
2879
                if cu.processed {
2✔
UNCOV
2880
                        continue
×
2881
                }
2882

2883
                // Mark the ChannelUpdate as processed. This ensures that a
2884
                // subsequent announcement in the option-scid-alias case does
2885
                // not re-use an old ChannelUpdate.
2886
                cu.processed = true
2✔
2887

2✔
2888
                d.wg.Add(1)
2✔
2889
                go func(updMsg *networkMsg) {
4✔
2890
                        defer d.wg.Done()
2✔
2891

2✔
2892
                        switch msg := updMsg.msg.(type) {
2✔
2893
                        // Reprocess the message, making sure we return an
2894
                        // error to the original caller in case the gossiper
2895
                        // shuts down.
2896
                        case *lnwire.ChannelUpdate1:
2✔
2897
                                log.Debugf("Reprocessing ChannelUpdate for "+
2✔
2898
                                        "shortChanID=%v", scid.ToUint64())
2✔
2899

2✔
2900
                                select {
2✔
2901
                                case d.networkMsgs <- updMsg:
2✔
2902
                                case <-d.quit:
×
2903
                                        updMsg.err <- ErrGossiperShuttingDown
×
2904
                                }
2905

2906
                        // We don't expect any other message type than
2907
                        // ChannelUpdate to be in this cache.
2908
                        default:
×
2909
                                log.Errorf("Unsupported message type found "+
×
2910
                                        "among ChannelUpdates: %T", msg)
×
2911
                        }
2912
                }(cu.msg)
2913
        }
2914

2915
        // Channel announcement was successfully processed and now it might be
2916
        // broadcast to other connected nodes if it was an announcement with
2917
        // proof (remote).
2918
        var announcements []networkMsg
25✔
2919

25✔
2920
        if proof != nil {
36✔
2921
                announcements = append(announcements, networkMsg{
11✔
2922
                        peer:     nMsg.peer,
11✔
2923
                        isRemote: nMsg.isRemote,
11✔
2924
                        source:   nMsg.source,
11✔
2925
                        msg:      ann,
11✔
2926
                })
11✔
2927
        }
11✔
2928

2929
        nMsg.err <- nil
25✔
2930

25✔
2931
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
25✔
2932
                nMsg.peer, scid.ToUint64())
25✔
2933

25✔
2934
        return announcements, true
25✔
2935
}
2936

2937
// handleChanUpdate processes a new channel update.
2938
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
2939
        upd *lnwire.ChannelUpdate1,
2940
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
55✔
2941

55✔
2942
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
55✔
2943
                nMsg.peer, upd.ShortChannelID.ToUint64())
55✔
2944

55✔
2945
        // We'll ignore any channel updates that target any chain other than
55✔
2946
        // the set of chains we know of.
55✔
2947
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
55✔
2948
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2949
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
×
2950
                log.Errorf(err.Error())
×
2951

×
2952
                key := newRejectCacheKey(
×
2953
                        upd.ShortChannelID.ToUint64(),
×
2954
                        sourceToPub(nMsg.source),
×
2955
                )
×
2956
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2957

×
2958
                nMsg.err <- err
×
2959
                return nil, false
×
2960
        }
×
2961

2962
        blockHeight := upd.ShortChannelID.BlockHeight
55✔
2963
        shortChanID := upd.ShortChannelID.ToUint64()
55✔
2964

55✔
2965
        // If the advertised inclusionary block is beyond our knowledge of the
55✔
2966
        // chain tip, then we'll put the announcement in limbo to be fully
55✔
2967
        // verified once we advance forward in the chain. If the update has an
55✔
2968
        // alias SCID, we'll skip the isPremature check. This is necessary
55✔
2969
        // since aliases start at block height 16_000_000.
55✔
2970
        d.Lock()
55✔
2971
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
55✔
2972
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
55✔
UNCOV
2973

×
UNCOV
2974
                log.Warnf("Update announcement for short_chan_id(%v), is "+
×
UNCOV
2975
                        "premature: advertises height %v, only height %v is "+
×
UNCOV
2976
                        "known", shortChanID, blockHeight, d.bestHeight)
×
UNCOV
2977
                d.Unlock()
×
UNCOV
2978
                nMsg.err <- nil
×
UNCOV
2979
                return nil, false
×
UNCOV
2980
        }
×
2981
        d.Unlock()
55✔
2982

55✔
2983
        // Before we perform any of the expensive checks below, we'll check
55✔
2984
        // whether this update is stale or is for a zombie channel in order to
55✔
2985
        // quickly reject it.
55✔
2986
        timestamp := time.Unix(int64(upd.Timestamp), 0)
55✔
2987

55✔
2988
        // Fetch the SCID we should be using to lock the channelMtx and make
55✔
2989
        // graph queries with.
55✔
2990
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
55✔
2991
        if err != nil {
110✔
2992
                // Fallback and set the graphScid to the peer-provided SCID.
55✔
2993
                // This will occur for non-option-scid-alias channels and for
55✔
2994
                // public option-scid-alias channels after 6 confirmations.
55✔
2995
                // Once public option-scid-alias channels have 6 confs, we'll
55✔
2996
                // ignore ChannelUpdates with one of their aliases.
55✔
2997
                graphScid = upd.ShortChannelID
55✔
2998
        }
55✔
2999

3000
        if d.cfg.Graph.IsStaleEdgePolicy(
55✔
3001
                graphScid, timestamp, upd.ChannelFlags,
55✔
3002
        ) {
57✔
3003

2✔
3004
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
2✔
3005
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
2✔
3006
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
2✔
3007
                )
2✔
3008

2✔
3009
                nMsg.err <- nil
2✔
3010
                return nil, true
2✔
3011
        }
2✔
3012

3013
        // Check that the ChanUpdate is not too far into the future, this could
3014
        // reveal some faulty implementation therefore we log an error.
3015
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
53✔
3016
                log.Errorf("Skewed timestamp (%v) for edge policy of "+
×
3017
                        "short_chan_id(%v), timestamp too far in the future: "+
×
3018
                        "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
×
3019
                        shortChanID, nMsg.peer, nMsg.msg.MsgType(),
×
3020
                        nMsg.isRemote,
×
3021
                )
×
3022

×
3023
                nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
×
3024
                        "timestamp too far in the future: %v", timestamp.Unix())
×
3025

×
3026
                return nil, false
×
3027
        }
×
3028

3029
        // Get the node pub key as far since we don't have it in the channel
3030
        // update announcement message. We'll need this to properly verify the
3031
        // message's signature.
3032
        //
3033
        // We make sure to obtain the mutex for this channel ID before we
3034
        // access the database. This ensures the state we read from the
3035
        // database has not changed between this point and when we call
3036
        // UpdateEdge() later.
3037
        d.channelMtx.Lock(graphScid.ToUint64())
53✔
3038
        defer d.channelMtx.Unlock(graphScid.ToUint64())
53✔
3039

53✔
3040
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
53✔
3041
        switch {
53✔
3042
        // No error, break.
3043
        case err == nil:
49✔
3044
                break
49✔
3045

3046
        case errors.Is(err, graphdb.ErrZombieEdge):
3✔
3047
                err = d.processZombieUpdate(chanInfo, graphScid, upd)
3✔
3048
                if err != nil {
5✔
3049
                        log.Debug(err)
2✔
3050
                        nMsg.err <- err
2✔
3051
                        return nil, false
2✔
3052
                }
2✔
3053

3054
                // We'll fallthrough to ensure we stash the update until we
3055
                // receive its corresponding ChannelAnnouncement. This is
3056
                // needed to ensure the edge exists in the graph before
3057
                // applying the update.
3058
                fallthrough
1✔
3059
        case errors.Is(err, graphdb.ErrGraphNotFound):
1✔
3060
                fallthrough
1✔
3061
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
1✔
3062
                fallthrough
1✔
3063
        case errors.Is(err, graphdb.ErrEdgeNotFound):
2✔
3064
                // If the edge corresponding to this ChannelUpdate was not
2✔
3065
                // found in the graph, this might be a channel in the process
2✔
3066
                // of being opened, and we haven't processed our own
2✔
3067
                // ChannelAnnouncement yet, hence it is not not found in the
2✔
3068
                // graph. This usually gets resolved after the channel proofs
2✔
3069
                // are exchanged and the channel is broadcasted to the rest of
2✔
3070
                // the network, but in case this is a private channel this
2✔
3071
                // won't ever happen. This can also happen in the case of a
2✔
3072
                // zombie channel with a fresh update for which we don't have a
2✔
3073
                // ChannelAnnouncement for since we reject them. Because of
2✔
3074
                // this, we temporarily add it to a map, and reprocess it after
2✔
3075
                // our own ChannelAnnouncement has been processed.
2✔
3076
                //
2✔
3077
                // The shortChanID may be an alias, but it is fine to use here
2✔
3078
                // since we don't have an edge in the graph and if the peer is
2✔
3079
                // not buggy, we should be able to use it once the gossiper
2✔
3080
                // receives the local announcement.
2✔
3081
                pMsg := &processedNetworkMsg{msg: nMsg}
2✔
3082

2✔
3083
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
2✔
3084
                switch {
2✔
3085
                // Nothing in the cache yet, we can just directly insert this
3086
                // element.
3087
                case err == cache.ErrElementNotFound:
2✔
3088
                        _, _ = d.prematureChannelUpdates.Put(
2✔
3089
                                shortChanID, &cachedNetworkMsg{
2✔
3090
                                        msgs: []*processedNetworkMsg{pMsg},
2✔
3091
                                })
2✔
3092

3093
                // There's already something in the cache, so we'll combine the
3094
                // set of messages into a single value.
UNCOV
3095
                default:
×
UNCOV
3096
                        msgs := earlyMsgs.msgs
×
UNCOV
3097
                        msgs = append(msgs, pMsg)
×
UNCOV
3098
                        _, _ = d.prematureChannelUpdates.Put(
×
UNCOV
3099
                                shortChanID, &cachedNetworkMsg{
×
UNCOV
3100
                                        msgs: msgs,
×
UNCOV
3101
                                })
×
3102
                }
3103

3104
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
2✔
3105
                        "(shortChanID=%v), saving for reprocessing later",
2✔
3106
                        shortChanID)
2✔
3107

2✔
3108
                // NOTE: We don't return anything on the error channel for this
2✔
3109
                // message, as we expect that will be done when this
2✔
3110
                // ChannelUpdate is later reprocessed.
2✔
3111
                return nil, false
2✔
3112

3113
        default:
×
3114
                err := fmt.Errorf("unable to validate channel update "+
×
3115
                        "short_chan_id=%v: %v", shortChanID, err)
×
3116
                log.Error(err)
×
3117
                nMsg.err <- err
×
3118

×
3119
                key := newRejectCacheKey(
×
3120
                        upd.ShortChannelID.ToUint64(),
×
3121
                        sourceToPub(nMsg.source),
×
3122
                )
×
3123
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3124

×
3125
                return nil, false
×
3126
        }
3127

3128
        // The least-significant bit in the flag on the channel update
3129
        // announcement tells us "which" side of the channels directed edge is
3130
        // being updated.
3131
        var (
49✔
3132
                pubKey       *btcec.PublicKey
49✔
3133
                edgeToUpdate *models.ChannelEdgePolicy
49✔
3134
        )
49✔
3135
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
49✔
3136
        switch direction {
49✔
3137
        case 0:
34✔
3138
                pubKey, _ = chanInfo.NodeKey1()
34✔
3139
                edgeToUpdate = e1
34✔
3140
        case 1:
15✔
3141
                pubKey, _ = chanInfo.NodeKey2()
15✔
3142
                edgeToUpdate = e2
15✔
3143
        }
3144

3145
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
49✔
3146
                "edge policy=%v", chanInfo.ChannelID,
49✔
3147
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
49✔
3148

49✔
3149
        // Validate the channel announcement with the expected public key and
49✔
3150
        // channel capacity. In the case of an invalid channel update, we'll
49✔
3151
        // return an error to the caller and exit early.
49✔
3152
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
49✔
3153
        if err != nil {
53✔
3154
                rErr := fmt.Errorf("unable to validate channel update "+
4✔
3155
                        "announcement for short_chan_id=%v: %v",
4✔
3156
                        spew.Sdump(upd.ShortChannelID), err)
4✔
3157

4✔
3158
                log.Error(rErr)
4✔
3159
                nMsg.err <- rErr
4✔
3160
                return nil, false
4✔
3161
        }
4✔
3162

3163
        // If we have a previous version of the edge being updated, we'll want
3164
        // to rate limit its updates to prevent spam throughout the network.
3165
        if nMsg.isRemote && edgeToUpdate != nil {
59✔
3166
                // If it's a keep-alive update, we'll only propagate one if
14✔
3167
                // it's been a day since the previous. This follows our own
14✔
3168
                // heuristic of sending keep-alive updates after the same
14✔
3169
                // duration (see retransmitStaleAnns).
14✔
3170
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
14✔
3171
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
16✔
3172
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
3✔
3173
                                log.Debugf("Ignoring keep alive update not "+
1✔
3174
                                        "within %v period for channel %v",
1✔
3175
                                        d.cfg.RebroadcastInterval, shortChanID)
1✔
3176
                                nMsg.err <- nil
1✔
3177
                                return nil, false
1✔
3178
                        }
1✔
3179
                } else {
12✔
3180
                        // If it's not, we'll allow an update per minute with a
12✔
3181
                        // maximum burst of 10. If we haven't seen an update
12✔
3182
                        // for this channel before, we'll need to initialize a
12✔
3183
                        // rate limiter for each direction.
12✔
3184
                        //
12✔
3185
                        // Since the edge exists in the graph, we'll create a
12✔
3186
                        // rate limiter for chanInfo.ChannelID rather then the
12✔
3187
                        // SCID the peer sent. This is because there may be
12✔
3188
                        // multiple aliases for a channel and we may otherwise
12✔
3189
                        // rate-limit only a single alias of the channel,
12✔
3190
                        // instead of the whole channel.
12✔
3191
                        baseScid := chanInfo.ChannelID
12✔
3192
                        d.Lock()
12✔
3193
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
12✔
3194
                        if !ok {
13✔
3195
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
1✔
3196
                                b := d.cfg.MaxChannelUpdateBurst
1✔
3197
                                rls = [2]*rate.Limiter{
1✔
3198
                                        rate.NewLimiter(r, b),
1✔
3199
                                        rate.NewLimiter(r, b),
1✔
3200
                                }
1✔
3201
                                d.chanUpdateRateLimiter[baseScid] = rls
1✔
3202
                        }
1✔
3203
                        d.Unlock()
12✔
3204

12✔
3205
                        if !rls[direction].Allow() {
17✔
3206
                                log.Debugf("Rate limiting update for channel "+
5✔
3207
                                        "%v from direction %x", shortChanID,
5✔
3208
                                        pubKey.SerializeCompressed())
5✔
3209
                                nMsg.err <- nil
5✔
3210
                                return nil, false
5✔
3211
                        }
5✔
3212
                }
3213
        }
3214

3215
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3216
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3217
        // lookup the stored SCID. If we're sending the update, we'll always
3218
        // use the SCID stored in the database rather than a potentially
3219
        // different alias. This might mean that SigBytes is incorrect as it
3220
        // signs a different SCID than the database SCID, but since there will
3221
        // only be a difference if AuthProof == nil, this is fine.
3222
        update := &models.ChannelEdgePolicy{
39✔
3223
                SigBytes:                  upd.Signature.ToSignatureBytes(),
39✔
3224
                ChannelID:                 chanInfo.ChannelID,
39✔
3225
                LastUpdate:                timestamp,
39✔
3226
                MessageFlags:              upd.MessageFlags,
39✔
3227
                ChannelFlags:              upd.ChannelFlags,
39✔
3228
                TimeLockDelta:             upd.TimeLockDelta,
39✔
3229
                MinHTLC:                   upd.HtlcMinimumMsat,
39✔
3230
                MaxHTLC:                   upd.HtlcMaximumMsat,
39✔
3231
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
39✔
3232
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
39✔
3233
                ExtraOpaqueData:           upd.ExtraOpaqueData,
39✔
3234
        }
39✔
3235

39✔
3236
        if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
39✔
UNCOV
3237
                if graph.IsError(
×
UNCOV
3238
                        err, graph.ErrOutdated,
×
UNCOV
3239
                        graph.ErrIgnored,
×
UNCOV
3240
                ) {
×
UNCOV
3241

×
UNCOV
3242
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
×
UNCOV
3243
                                shortChanID, err)
×
UNCOV
3244
                } else {
×
3245
                        // Since we know the stored SCID in the graph, we'll
×
3246
                        // cache that SCID.
×
3247
                        key := newRejectCacheKey(
×
3248
                                chanInfo.ChannelID,
×
3249
                                sourceToPub(nMsg.source),
×
3250
                        )
×
3251
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3252

×
3253
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3254
                                shortChanID, err)
×
3255
                }
×
3256

UNCOV
3257
                nMsg.err <- err
×
UNCOV
3258
                return nil, false
×
3259
        }
3260

3261
        // If this is a local ChannelUpdate without an AuthProof, it means it
3262
        // is an update to a channel that is not (yet) supposed to be announced
3263
        // to the greater network. However, our channel counter party will need
3264
        // to be given the update, so we'll try sending the update directly to
3265
        // the remote peer.
3266
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
50✔
3267
                if nMsg.optionalMsgFields != nil {
22✔
3268
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
11✔
3269
                        if remoteAlias != nil {
11✔
UNCOV
3270
                                // The remoteAlias field was specified, meaning
×
UNCOV
3271
                                // that we should replace the SCID in the
×
UNCOV
3272
                                // update with the remote's alias. We'll also
×
UNCOV
3273
                                // need to re-sign the channel update. This is
×
UNCOV
3274
                                // required for option-scid-alias feature-bit
×
UNCOV
3275
                                // negotiated channels.
×
UNCOV
3276
                                upd.ShortChannelID = *remoteAlias
×
UNCOV
3277

×
UNCOV
3278
                                sig, err := d.cfg.SignAliasUpdate(upd)
×
UNCOV
3279
                                if err != nil {
×
3280
                                        log.Error(err)
×
3281
                                        nMsg.err <- err
×
3282
                                        return nil, false
×
3283
                                }
×
3284

UNCOV
3285
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
UNCOV
3286
                                if err != nil {
×
3287
                                        log.Error(err)
×
3288
                                        nMsg.err <- err
×
3289
                                        return nil, false
×
3290
                                }
×
3291

UNCOV
3292
                                upd.Signature = lnSig
×
3293
                        }
3294
                }
3295

3296
                // Get our peer's public key.
3297
                remotePubKey := remotePubFromChanInfo(
11✔
3298
                        chanInfo, upd.ChannelFlags,
11✔
3299
                )
11✔
3300

11✔
3301
                log.Debugf("The message %v has no AuthProof, sending the "+
11✔
3302
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
11✔
3303

11✔
3304
                // Now we'll attempt to send the channel update message
11✔
3305
                // reliably to the remote peer in the background, so that we
11✔
3306
                // don't block if the peer happens to be offline at the moment.
11✔
3307
                err := d.reliableSender.sendMessage(upd, remotePubKey)
11✔
3308
                if err != nil {
11✔
3309
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3310
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3311
                                upd.ShortChannelID, remotePubKey, err)
×
3312
                        nMsg.err <- err
×
3313
                        return nil, false
×
3314
                }
×
3315
        }
3316

3317
        // Channel update announcement was successfully processed and now it
3318
        // can be broadcast to the rest of the network. However, we'll only
3319
        // broadcast the channel update announcement if it has an attached
3320
        // authentication proof. We also won't broadcast the update if it
3321
        // contains an alias because the network would reject this.
3322
        var announcements []networkMsg
39✔
3323
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
58✔
3324
                announcements = append(announcements, networkMsg{
19✔
3325
                        peer:     nMsg.peer,
19✔
3326
                        source:   nMsg.source,
19✔
3327
                        isRemote: nMsg.isRemote,
19✔
3328
                        msg:      upd,
19✔
3329
                })
19✔
3330
        }
19✔
3331

3332
        nMsg.err <- nil
39✔
3333

39✔
3334
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
39✔
3335
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
39✔
3336
                timestamp)
39✔
3337
        return announcements, true
39✔
3338
}
3339

3340
// handleAnnSig processes a new announcement signatures message.
3341
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
3342
        ann *lnwire.AnnounceSignatures1) ([]networkMsg, bool) {
21✔
3343

21✔
3344
        needBlockHeight := ann.ShortChannelID.BlockHeight +
21✔
3345
                d.cfg.ProofMatureDelta
21✔
3346
        shortChanID := ann.ShortChannelID.ToUint64()
21✔
3347

21✔
3348
        prefix := "local"
21✔
3349
        if nMsg.isRemote {
32✔
3350
                prefix = "remote"
11✔
3351
        }
11✔
3352

3353
        log.Infof("Received new %v announcement signature for %v", prefix,
21✔
3354
                ann.ShortChannelID)
21✔
3355

21✔
3356
        // By the specification, channel announcement proofs should be sent
21✔
3357
        // after some number of confirmations after channel was registered in
21✔
3358
        // bitcoin blockchain. Therefore, we check if the proof is mature.
21✔
3359
        d.Lock()
21✔
3360
        premature := d.isPremature(
21✔
3361
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
21✔
3362
        )
21✔
3363
        if premature {
21✔
UNCOV
3364
                log.Warnf("Premature proof announcement, current block height"+
×
UNCOV
3365
                        "lower than needed: %v < %v", d.bestHeight,
×
UNCOV
3366
                        needBlockHeight)
×
UNCOV
3367
                d.Unlock()
×
UNCOV
3368
                nMsg.err <- nil
×
UNCOV
3369
                return nil, false
×
UNCOV
3370
        }
×
3371
        d.Unlock()
21✔
3372

21✔
3373
        // Ensure that we know of a channel with the target channel ID before
21✔
3374
        // proceeding further.
21✔
3375
        //
21✔
3376
        // We must acquire the mutex for this channel ID before getting the
21✔
3377
        // channel from the database, to ensure what we read does not change
21✔
3378
        // before we call AddProof() later.
21✔
3379
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
21✔
3380
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
21✔
3381

21✔
3382
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
21✔
3383
                ann.ShortChannelID,
21✔
3384
        )
21✔
3385
        if err != nil {
22✔
3386
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
1✔
3387
                if err != nil {
1✔
UNCOV
3388
                        err := fmt.Errorf("unable to store the proof for "+
×
UNCOV
3389
                                "short_chan_id=%v: %v", shortChanID, err)
×
UNCOV
3390
                        log.Error(err)
×
UNCOV
3391
                        nMsg.err <- err
×
UNCOV
3392

×
UNCOV
3393
                        return nil, false
×
UNCOV
3394
                }
×
3395

3396
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
1✔
3397
                err := d.cfg.WaitingProofStore.Add(proof)
1✔
3398
                if err != nil {
1✔
3399
                        err := fmt.Errorf("unable to store the proof for "+
×
3400
                                "short_chan_id=%v: %v", shortChanID, err)
×
3401
                        log.Error(err)
×
3402
                        nMsg.err <- err
×
3403
                        return nil, false
×
3404
                }
×
3405

3406
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
1✔
3407
                        ", adding to waiting batch", prefix, shortChanID)
1✔
3408
                nMsg.err <- nil
1✔
3409
                return nil, false
1✔
3410
        }
3411

3412
        nodeID := nMsg.source.SerializeCompressed()
20✔
3413
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
20✔
3414
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
20✔
3415

20✔
3416
        // Ensure that channel that was retrieved belongs to the peer which
20✔
3417
        // sent the proof announcement.
20✔
3418
        if !(isFirstNode || isSecondNode) {
20✔
3419
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3420
                        "to the peer which sent the proof, short_chan_id=%v",
×
3421
                        shortChanID)
×
3422
                log.Error(err)
×
3423
                nMsg.err <- err
×
3424
                return nil, false
×
3425
        }
×
3426

3427
        // If proof was sent by a local sub-system, then we'll send the
3428
        // announcement signature to the remote node so they can also
3429
        // reconstruct the full channel announcement.
3430
        if !nMsg.isRemote {
30✔
3431
                var remotePubKey [33]byte
10✔
3432
                if isFirstNode {
20✔
3433
                        remotePubKey = chanInfo.NodeKey2Bytes
10✔
3434
                } else {
10✔
UNCOV
3435
                        remotePubKey = chanInfo.NodeKey1Bytes
×
UNCOV
3436
                }
×
3437

3438
                // Since the remote peer might not be online we'll call a
3439
                // method that will attempt to deliver the proof when it comes
3440
                // online.
3441
                err := d.reliableSender.sendMessage(ann, remotePubKey)
10✔
3442
                if err != nil {
10✔
3443
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3444
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3445
                                ann.ShortChannelID, remotePubKey, err)
×
3446
                        nMsg.err <- err
×
3447
                        return nil, false
×
3448
                }
×
3449
        }
3450

3451
        // Check if we already have the full proof for this channel.
3452
        if chanInfo.AuthProof != nil {
21✔
3453
                // If we already have the fully assembled proof, then the peer
1✔
3454
                // sending us their proof has probably not received our local
1✔
3455
                // proof yet. So be kind and send them the full proof.
1✔
3456
                if nMsg.isRemote {
2✔
3457
                        peerID := nMsg.source.SerializeCompressed()
1✔
3458
                        log.Debugf("Got AnnounceSignatures for channel with " +
1✔
3459
                                "full proof.")
1✔
3460

1✔
3461
                        d.wg.Add(1)
1✔
3462
                        go func() {
2✔
3463
                                defer d.wg.Done()
1✔
3464

1✔
3465
                                log.Debugf("Received half proof for channel "+
1✔
3466
                                        "%v with existing full proof. Sending"+
1✔
3467
                                        " full proof to peer=%x",
1✔
3468
                                        ann.ChannelID, peerID)
1✔
3469

1✔
3470
                                ca, _, _, err := netann.CreateChanAnnouncement(
1✔
3471
                                        chanInfo.AuthProof, chanInfo, e1, e2,
1✔
3472
                                )
1✔
3473
                                if err != nil {
1✔
3474
                                        log.Errorf("unable to gen ann: %v",
×
3475
                                                err)
×
3476
                                        return
×
3477
                                }
×
3478

3479
                                err = nMsg.peer.SendMessage(false, ca)
1✔
3480
                                if err != nil {
1✔
3481
                                        log.Errorf("Failed sending full proof"+
×
3482
                                                " to peer=%x: %v", peerID, err)
×
3483
                                        return
×
3484
                                }
×
3485

3486
                                log.Debugf("Full proof sent to peer=%x for "+
1✔
3487
                                        "chanID=%v", peerID, ann.ChannelID)
1✔
3488
                        }()
3489
                }
3490

3491
                log.Debugf("Already have proof for channel with chanID=%v",
1✔
3492
                        ann.ChannelID)
1✔
3493
                nMsg.err <- nil
1✔
3494
                return nil, true
1✔
3495
        }
3496

3497
        // Check that we received the opposite proof. If so, then we're now
3498
        // able to construct the full proof, and create the channel
3499
        // announcement. If we didn't receive the opposite half of the proof
3500
        // then we should store this one, and wait for the opposite to be
3501
        // received.
3502
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
19✔
3503
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
19✔
3504
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
19✔
3505
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3506
                        "short_chan_id=%v: %v", shortChanID, err)
×
3507
                log.Error(err)
×
3508
                nMsg.err <- err
×
3509
                return nil, false
×
3510
        }
×
3511

3512
        if err == channeldb.ErrWaitingProofNotFound {
28✔
3513
                err := d.cfg.WaitingProofStore.Add(proof)
9✔
3514
                if err != nil {
9✔
3515
                        err := fmt.Errorf("unable to store the proof for "+
×
3516
                                "short_chan_id=%v: %v", shortChanID, err)
×
3517
                        log.Error(err)
×
3518
                        nMsg.err <- err
×
3519
                        return nil, false
×
3520
                }
×
3521

3522
                log.Infof("1/2 of channel ann proof received for "+
9✔
3523
                        "short_chan_id=%v, waiting for other half",
9✔
3524
                        shortChanID)
9✔
3525

9✔
3526
                nMsg.err <- nil
9✔
3527
                return nil, false
9✔
3528
        }
3529

3530
        // We now have both halves of the channel announcement proof, then
3531
        // we'll reconstruct the initial announcement so we can validate it
3532
        // shortly below.
3533
        var dbProof models.ChannelAuthProof
10✔
3534
        if isFirstNode {
11✔
3535
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
1✔
3536
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
1✔
3537
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
1✔
3538
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
1✔
3539
        } else {
10✔
3540
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
9✔
3541
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
9✔
3542
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
9✔
3543
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
9✔
3544
        }
9✔
3545

3546
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
10✔
3547
                &dbProof, chanInfo, e1, e2,
10✔
3548
        )
10✔
3549
        if err != nil {
10✔
3550
                log.Error(err)
×
3551
                nMsg.err <- err
×
3552
                return nil, false
×
3553
        }
×
3554

3555
        // With all the necessary components assembled validate the full
3556
        // channel announcement proof.
3557
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
10✔
3558
        if err != nil {
10✔
3559
                err := fmt.Errorf("channel announcement proof for "+
×
3560
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3561

×
3562
                log.Error(err)
×
3563
                nMsg.err <- err
×
3564
                return nil, false
×
3565
        }
×
3566

3567
        // If the channel was returned by the router it means that existence of
3568
        // funding point and inclusion of nodes bitcoin keys in it already
3569
        // checked by the router. In this stage we should check that node keys
3570
        // attest to the bitcoin keys by validating the signatures of
3571
        // announcement. If proof is valid then we'll populate the channel edge
3572
        // with it, so we can announce it on peer connect.
3573
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
10✔
3574
        if err != nil {
10✔
3575
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3576
                        " %v", ann.ChannelID, err)
×
3577
                log.Error(err)
×
3578
                nMsg.err <- err
×
3579
                return nil, false
×
3580
        }
×
3581

3582
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
10✔
3583
        if err != nil {
10✔
3584
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3585
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3586
                log.Error(err)
×
3587
                nMsg.err <- err
×
3588
                return nil, false
×
3589
        }
×
3590

3591
        // Proof was successfully created and now can announce the channel to
3592
        // the remain network.
3593
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
10✔
3594
                ", adding to next ann batch", shortChanID)
10✔
3595

10✔
3596
        // Assemble the necessary announcements to add to the next broadcasting
10✔
3597
        // batch.
10✔
3598
        var announcements []networkMsg
10✔
3599
        announcements = append(announcements, networkMsg{
10✔
3600
                peer:   nMsg.peer,
10✔
3601
                source: nMsg.source,
10✔
3602
                msg:    chanAnn,
10✔
3603
        })
10✔
3604
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
19✔
3605
                announcements = append(announcements, networkMsg{
9✔
3606
                        peer:   nMsg.peer,
9✔
3607
                        source: src,
9✔
3608
                        msg:    e1Ann,
9✔
3609
                })
9✔
3610
        }
9✔
3611
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
18✔
3612
                announcements = append(announcements, networkMsg{
8✔
3613
                        peer:   nMsg.peer,
8✔
3614
                        source: src,
8✔
3615
                        msg:    e2Ann,
8✔
3616
                })
8✔
3617
        }
8✔
3618

3619
        // We'll also send along the node announcements for each channel
3620
        // participant if we know of them. To ensure our node announcement
3621
        // propagates to our channel counterparty, we'll set the source for
3622
        // each announcement to the node it belongs to, otherwise we won't send
3623
        // it since the source gets skipped. This isn't necessary for channel
3624
        // updates and announcement signatures since we send those directly to
3625
        // our channel counterparty through the gossiper's reliable sender.
3626
        node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
10✔
3627
        if err != nil {
12✔
3628
                log.Debugf("Unable to fetch node announcement for %x: %v",
2✔
3629
                        chanInfo.NodeKey1Bytes, err)
2✔
3630
        } else {
10✔
3631
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
16✔
3632
                        announcements = append(announcements, networkMsg{
8✔
3633
                                peer:   nMsg.peer,
8✔
3634
                                source: nodeKey1,
8✔
3635
                                msg:    node1Ann,
8✔
3636
                        })
8✔
3637
                }
8✔
3638
        }
3639

3640
        node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
10✔
3641
        if err != nil {
14✔
3642
                log.Debugf("Unable to fetch node announcement for %x: %v",
4✔
3643
                        chanInfo.NodeKey2Bytes, err)
4✔
3644
        } else {
10✔
3645
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
12✔
3646
                        announcements = append(announcements, networkMsg{
6✔
3647
                                peer:   nMsg.peer,
6✔
3648
                                source: nodeKey2,
6✔
3649
                                msg:    node2Ann,
6✔
3650
                        })
6✔
3651
                }
6✔
3652
        }
3653

3654
        nMsg.err <- nil
10✔
3655
        return announcements, true
10✔
3656
}
3657

3658
// isBanned returns true if the peer identified by pubkey is banned for sending
3659
// invalid channel announcements.
3660
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
208✔
3661
        return d.banman.isBanned(pubkey)
208✔
3662
}
208✔
3663

3664
// ShouldDisconnect returns true if we should disconnect the peer identified by
3665
// pubkey.
3666
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3667
        bool, error) {
206✔
3668

206✔
3669
        pubkeySer := pubkey.SerializeCompressed()
206✔
3670

206✔
3671
        var pubkeyBytes [33]byte
206✔
3672
        copy(pubkeyBytes[:], pubkeySer)
206✔
3673

206✔
3674
        // If the public key is banned, check whether or not this is a channel
206✔
3675
        // peer.
206✔
3676
        if d.isBanned(pubkeyBytes) {
208✔
3677
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
2✔
3678
                if err != nil {
2✔
3679
                        return false, err
×
3680
                }
×
3681

3682
                // We should only disconnect non-channel peers.
3683
                if !isChanPeer {
3✔
3684
                        return true, nil
1✔
3685
                }
1✔
3686
        }
3687

3688
        return false, nil
205✔
3689
}
3690

3691
// validateFundingTransaction fetches the channel announcements claimed funding
3692
// transaction from chain to ensure that it exists, is not spent and matches
3693
// the channel announcement proof. The transaction's outpoint and value are
3694
// returned if we can glean them from the work done in this method.
3695
func (d *AuthenticatedGossiper) validateFundingTransaction(
3696
        ann *lnwire.ChannelAnnouncement1,
3697
        tapscriptRoot fn.Option[chainhash.Hash]) (wire.OutPoint, btcutil.Amount,
3698
        []byte, error) {
228✔
3699

228✔
3700
        scid := ann.ShortChannelID
228✔
3701

228✔
3702
        // Before we can add the channel to the channel graph, we need to obtain
228✔
3703
        // the full funding outpoint that's encoded within the channel ID.
228✔
3704
        fundingTx, err := lnwallet.FetchFundingTxWrapper(
228✔
3705
                d.cfg.ChainIO, &scid, d.quit,
228✔
3706
        )
228✔
3707
        if err != nil {
229✔
3708
                //nolint:ll
1✔
3709
                //
1✔
3710
                // In order to ensure we don't erroneously mark a channel as a
1✔
3711
                // zombie due to an RPC failure, we'll attempt to string match
1✔
3712
                // for the relevant errors.
1✔
3713
                //
1✔
3714
                // * btcd:
1✔
3715
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
1✔
3716
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
1✔
3717
                // * bitcoind:
1✔
3718
                //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
1✔
3719
                //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
1✔
3720
                switch {
1✔
3721
                case strings.Contains(err.Error(), "not found"):
1✔
3722
                        fallthrough
1✔
3723

3724
                case strings.Contains(err.Error(), "out of range"):
1✔
3725
                        // If the funding transaction isn't found at all, then
1✔
3726
                        // we'll mark the edge itself as a zombie so we don't
1✔
3727
                        // continue to request it. We use the "zero key" for
1✔
3728
                        // both node pubkeys so this edge can't be resurrected.
1✔
3729
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
1✔
3730
                        if zErr != nil {
1✔
3731
                                return wire.OutPoint{}, 0, nil, zErr
×
3732
                        }
×
3733

3734
                default:
×
3735
                }
3736

3737
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
1✔
3738
                        ErrNoFundingTransaction, err)
1✔
3739
        }
3740

3741
        // Recreate witness output to be sure that declared in channel edge
3742
        // bitcoin keys and channel value corresponds to the reality.
3743
        fundingPkScript, err := makeFundingScript(
227✔
3744
                ann.BitcoinKey1[:], ann.BitcoinKey2[:], ann.Features,
227✔
3745
                tapscriptRoot,
227✔
3746
        )
227✔
3747
        if err != nil {
227✔
3748
                return wire.OutPoint{}, 0, nil, err
×
3749
        }
×
3750

3751
        // Next we'll validate that this channel is actually well formed. If
3752
        // this check fails, then this channel either doesn't exist, or isn't
3753
        // the one that was meant to be created according to the passed channel
3754
        // proofs.
3755
        fundingPoint, err := chanvalidate.Validate(
227✔
3756
                &chanvalidate.Context{
227✔
3757
                        Locator: &chanvalidate.ShortChanIDChanLocator{
227✔
3758
                                ID: scid,
227✔
3759
                        },
227✔
3760
                        MultiSigPkScript: fundingPkScript,
227✔
3761
                        FundingTx:        fundingTx,
227✔
3762
                },
227✔
3763
        )
227✔
3764
        if err != nil {
428✔
3765
                // Mark the edge as a zombie so we won't try to re-validate it
201✔
3766
                // on start up.
201✔
3767
                zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
201✔
3768
                if zErr != nil {
201✔
3769
                        return wire.OutPoint{}, 0, nil, zErr
×
3770
                }
×
3771

3772
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
201✔
3773
                        ErrInvalidFundingOutput, err)
201✔
3774
        }
3775

3776
        // Now that we have the funding outpoint of the channel, ensure
3777
        // that it hasn't yet been spent. If so, then this channel has
3778
        // been closed so we'll ignore it.
3779
        chanUtxo, err := d.cfg.ChainIO.GetUtxo(
26✔
3780
                fundingPoint, fundingPkScript, scid.BlockHeight, d.quit,
26✔
3781
        )
26✔
3782
        if err != nil {
28✔
3783
                if errors.Is(err, btcwallet.ErrOutputSpent) {
4✔
3784
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
2✔
3785
                        if zErr != nil {
2✔
3786
                                return wire.OutPoint{}, 0, nil, zErr
×
3787
                        }
×
3788
                }
3789

3790
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: unable to "+
2✔
3791
                        "fetch utxo for chan_id=%v, chan_point=%v: %w",
2✔
3792
                        ErrChannelSpent, scid.ToUint64(), fundingPoint, err)
2✔
3793
        }
3794

3795
        return *fundingPoint, btcutil.Amount(chanUtxo.Value), fundingPkScript,
24✔
3796
                nil
24✔
3797
}
3798

3799
// makeFundingScript is used to make the funding script for both segwit v0 and
3800
// segwit v1 (taproot) channels.
3801
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
3802
        features *lnwire.RawFeatureVector,
3803
        tapscriptRoot fn.Option[chainhash.Hash]) ([]byte, error) {
227✔
3804

227✔
3805
        legacyFundingScript := func() ([]byte, error) {
454✔
3806
                witnessScript, err := input.GenMultiSigScript(
227✔
3807
                        bitcoinKey1, bitcoinKey2,
227✔
3808
                )
227✔
3809
                if err != nil {
227✔
3810
                        return nil, err
×
3811
                }
×
3812
                pkScript, err := input.WitnessScriptHash(witnessScript)
227✔
3813
                if err != nil {
227✔
3814
                        return nil, err
×
3815
                }
×
3816

3817
                return pkScript, nil
227✔
3818
        }
3819

3820
        if features.IsEmpty() {
454✔
3821
                return legacyFundingScript()
227✔
3822
        }
227✔
3823

UNCOV
3824
        chanFeatureBits := lnwire.NewFeatureVector(features, lnwire.Features)
×
UNCOV
3825
        if chanFeatureBits.HasFeature(
×
UNCOV
3826
                lnwire.SimpleTaprootChannelsOptionalStaging,
×
UNCOV
3827
        ) {
×
UNCOV
3828

×
UNCOV
3829
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
×
UNCOV
3830
                if err != nil {
×
3831
                        return nil, err
×
3832
                }
×
UNCOV
3833
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
×
UNCOV
3834
                if err != nil {
×
3835
                        return nil, err
×
3836
                }
×
3837

UNCOV
3838
                fundingScript, _, err := input.GenTaprootFundingScript(
×
UNCOV
3839
                        pubKey1, pubKey2, 0, tapscriptRoot,
×
UNCOV
3840
                )
×
UNCOV
3841
                if err != nil {
×
3842
                        return nil, err
×
3843
                }
×
3844

3845
                // TODO(roasbeef): add tapscript root to gossip v1.5
3846

UNCOV
3847
                return fundingScript, nil
×
3848
        }
3849

3850
        return legacyFundingScript()
×
3851
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc