• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13436790699

20 Feb 2025 01:40PM UTC coverage: 58.78% (-0.01%) from 58.794%
13436790699

Pull #9534

github

ellemouton
graph: refactor Builder network message handling

The exposed AddNode, AddEdge and UpdateEdge methods of the Builder are
currently synchronous since even though they pass messages to the
network handler which spins off the handling in a goroutine, the public
methods still wait for a response from the handling before returning.
The only part that is actually done asynchronously is the topology
notifications.

We previously tried to simplify things in [this
commit](https://github.com/lightningnetwork/lnd/pull/9476/commits/d757b3bcf)
but we soon realised that there was a reason for sending the messages to
the central/synchronous network handler first: it was to ensure
consistency for topology clients: ie, the ordering between when there is
a new topology client or if it is cancelled needs to be consistent and
handled synchronously with new network updates. So for example, if a new
update comes in right after a topology client cancels its subscription,
then it should _not_ be notified. Similariy for new subscriptions. So
this commit was reverted soon after.

We can, however, still simplify things as is done in this commit by
noting that _only topology subscriptions and notifications_ need to be
handled separately. The actual network updates do not need to. So that
is what is done here.

This refactor will make moving the topology subscription logic to a new
subsystem later on much easier.
Pull Request #9534: graph: refactor Builder network message handling

38 of 44 new or added lines in 1 file covered. (86.36%)

55 existing lines in 11 files now uncovered.

136048 of 231453 relevant lines covered (58.78%)

19264.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.71
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/btcsuite/btcd/btcec/v2"
13
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
14
        "github.com/btcsuite/btcd/btcutil"
15
        "github.com/btcsuite/btcd/chaincfg/chainhash"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/davecgh/go-spew/spew"
18
        "github.com/lightninglabs/neutrino/cache"
19
        "github.com/lightninglabs/neutrino/cache/lru"
20
        "github.com/lightningnetwork/lnd/batch"
21
        "github.com/lightningnetwork/lnd/chainntnfs"
22
        "github.com/lightningnetwork/lnd/channeldb"
23
        "github.com/lightningnetwork/lnd/fn/v2"
24
        "github.com/lightningnetwork/lnd/graph"
25
        graphdb "github.com/lightningnetwork/lnd/graph/db"
26
        "github.com/lightningnetwork/lnd/graph/db/models"
27
        "github.com/lightningnetwork/lnd/input"
28
        "github.com/lightningnetwork/lnd/keychain"
29
        "github.com/lightningnetwork/lnd/lnpeer"
30
        "github.com/lightningnetwork/lnd/lnutils"
31
        "github.com/lightningnetwork/lnd/lnwallet"
32
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
33
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
34
        "github.com/lightningnetwork/lnd/lnwire"
35
        "github.com/lightningnetwork/lnd/multimutex"
36
        "github.com/lightningnetwork/lnd/netann"
37
        "github.com/lightningnetwork/lnd/routing/route"
38
        "github.com/lightningnetwork/lnd/ticker"
39
        "golang.org/x/time/rate"
40
)
41

42
const (
43
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
44
        // for a specific channel and direction that we'll accept over an
45
        // interval.
46
        DefaultMaxChannelUpdateBurst = 10
47

48
        // DefaultChannelUpdateInterval is the default interval we'll use to
49
        // determine how often we should allow a new update for a specific
50
        // channel and direction.
51
        DefaultChannelUpdateInterval = time.Minute
52

53
        // maxPrematureUpdates tracks the max amount of premature channel
54
        // updates that we'll hold onto.
55
        maxPrematureUpdates = 100
56

57
        // maxFutureMessages tracks the max amount of future messages that
58
        // we'll hold onto.
59
        maxFutureMessages = 1000
60

61
        // DefaultSubBatchDelay is the default delay we'll use when
62
        // broadcasting the next announcement batch.
63
        DefaultSubBatchDelay = 5 * time.Second
64

65
        // maxRejectedUpdates tracks the max amount of rejected channel updates
66
        // we'll maintain. This is the global size across all peers. We'll
67
        // allocate ~3 MB max to the cache.
68
        maxRejectedUpdates = 10_000
69

70
        // DefaultProofMatureDelta specifies the default value used for
71
        // ProofMatureDelta, which is the number of confirmations needed before
72
        // processing the announcement signatures.
73
        DefaultProofMatureDelta = 6
74
)
75

76
var (
77
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
78
        // is in the process of being shut down.
79
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
80

81
        // ErrGossipSyncerNotFound signals that we were unable to find an active
82
        // gossip syncer corresponding to a gossip query message received from
83
        // the remote peer.
84
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
85

86
        // ErrNoFundingTransaction is returned when we are unable to find the
87
        // funding transaction described by the short channel ID on chain.
88
        ErrNoFundingTransaction = errors.New(
89
                "unable to find the funding transaction",
90
        )
91

92
        // ErrInvalidFundingOutput is returned if the channel funding output
93
        // fails validation.
94
        ErrInvalidFundingOutput = errors.New(
95
                "channel funding output validation failed",
96
        )
97

98
        // ErrChannelSpent is returned when we go to validate a channel, but
99
        // the purported funding output has actually already been spent on
100
        // chain.
101
        ErrChannelSpent = errors.New("channel output has been spent")
102

103
        // emptyPubkey is used to compare compressed pubkeys against an empty
104
        // byte array.
105
        emptyPubkey [33]byte
106
)
107

108
// optionalMsgFields is a set of optional message fields that external callers
109
// can provide that serve useful when processing a specific network
110
// announcement.
111
type optionalMsgFields struct {
112
        capacity      *btcutil.Amount
113
        channelPoint  *wire.OutPoint
114
        remoteAlias   *lnwire.ShortChannelID
115
        tapscriptRoot fn.Option[chainhash.Hash]
116
}
117

118
// apply applies the optional fields within the functional options.
119
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
50✔
120
        for _, optionalMsgField := range optionalMsgFields {
58✔
121
                optionalMsgField(f)
8✔
122
        }
8✔
123
}
124

125
// OptionalMsgField is a functional option parameter that can be used to provide
126
// external information that is not included within a network message but serves
127
// useful when processing it.
128
type OptionalMsgField func(*optionalMsgFields)
129

130
// ChannelCapacity is an optional field that lets the gossiper know of the
131
// capacity of a channel.
132
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
30✔
133
        return func(f *optionalMsgFields) {
34✔
134
                f.capacity = &capacity
4✔
135
        }
4✔
136
}
137

138
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
139
// of a channel.
140
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
33✔
141
        return func(f *optionalMsgFields) {
40✔
142
                f.channelPoint = &op
7✔
143
        }
7✔
144
}
145

146
// TapscriptRoot is an optional field that lets the gossiper know of the root of
147
// the tapscript tree for a custom channel.
148
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
29✔
149
        return func(f *optionalMsgFields) {
32✔
150
                f.tapscriptRoot = root
3✔
151
        }
3✔
152
}
153

154
// RemoteAlias is an optional field that lets the gossiper know that a locally
155
// sent channel update is actually an update for the peer that should replace
156
// the ShortChannelID field with the remote's alias. This is only used for
157
// channels with peers where the option-scid-alias feature bit was negotiated.
158
// The channel update will be added to the graph under the original SCID, but
159
// will be modified and re-signed with this alias.
160
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
29✔
161
        return func(f *optionalMsgFields) {
32✔
162
                f.remoteAlias = alias
3✔
163
        }
3✔
164
}
165

166
// networkMsg couples a routing related wire message with the peer that
167
// originally sent it.
168
type networkMsg struct {
169
        peer              lnpeer.Peer
170
        source            *btcec.PublicKey
171
        msg               lnwire.Message
172
        optionalMsgFields *optionalMsgFields
173

174
        isRemote bool
175

176
        err chan error
177
}
178

179
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
180
// wishes to update a particular set of channels. New ChannelUpdate messages
181
// will be crafted to be sent out during the next broadcast epoch and the fee
182
// updates committed to the lower layer.
183
type chanPolicyUpdateRequest struct {
184
        edgesToUpdate []EdgeWithInfo
185
        errChan       chan error
186
}
187

188
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
189
// syncer at all times.
190
type PinnedSyncers map[route.Vertex]struct{}
191

192
// Config defines the configuration for the service. ALL elements within the
193
// configuration MUST be non-nil for the service to carry out its duties.
194
type Config struct {
195
        // ChainHash is a hash that indicates which resident chain of the
196
        // AuthenticatedGossiper. Any announcements that don't match this
197
        // chain hash will be ignored.
198
        //
199
        // TODO(roasbeef): eventually make into map so can de-multiplex
200
        // incoming announcements
201
        //   * also need to do same for Notifier
202
        ChainHash chainhash.Hash
203

204
        // Graph is the subsystem which is responsible for managing the
205
        // topology of lightning network. After incoming channel, node, channel
206
        // updates announcements are validated they are sent to the router in
207
        // order to be included in the LN graph.
208
        Graph graph.ChannelGraphSource
209

210
        // ChainIO represents an abstraction over a source that can query the
211
        // blockchain.
212
        ChainIO lnwallet.BlockChainIO
213

214
        // ChanSeries is an interfaces that provides access to a time series
215
        // view of the current known channel graph. Each GossipSyncer enabled
216
        // peer will utilize this in order to create and respond to channel
217
        // graph time series queries.
218
        ChanSeries ChannelGraphTimeSeries
219

220
        // Notifier is used for receiving notifications of incoming blocks.
221
        // With each new incoming block found we process previously premature
222
        // announcements.
223
        //
224
        // TODO(roasbeef): could possibly just replace this with an epoch
225
        // channel.
226
        Notifier chainntnfs.ChainNotifier
227

228
        // Broadcast broadcasts a particular set of announcements to all peers
229
        // that the daemon is connected to. If supplied, the exclude parameter
230
        // indicates that the target peer should be excluded from the
231
        // broadcast.
232
        Broadcast func(skips map[route.Vertex]struct{},
233
                msg ...lnwire.Message) error
234

235
        // NotifyWhenOnline is a function that allows the gossiper to be
236
        // notified when a certain peer comes online, allowing it to
237
        // retry sending a peer message.
238
        //
239
        // NOTE: The peerChan channel must be buffered.
240
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
241

242
        // NotifyWhenOffline is a function that allows the gossiper to be
243
        // notified when a certain peer disconnects, allowing it to request a
244
        // notification for when it reconnects.
245
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
246

247
        // FetchSelfAnnouncement retrieves our current node announcement, for
248
        // use when determining whether we should update our peers about our
249
        // presence in the network.
250
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
251

252
        // UpdateSelfAnnouncement produces a new announcement for our node with
253
        // an updated timestamp which can be broadcast to our peers.
254
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
255

256
        // ProofMatureDelta the number of confirmations which is needed before
257
        // exchange the channel announcement proofs.
258
        ProofMatureDelta uint32
259

260
        // TrickleDelay the period of trickle timer which flushes to the
261
        // network the pending batch of new announcements we've received since
262
        // the last trickle tick.
263
        TrickleDelay time.Duration
264

265
        // RetransmitTicker is a ticker that ticks with a period which
266
        // indicates that we should check if we need re-broadcast any of our
267
        // personal channels.
268
        RetransmitTicker ticker.Ticker
269

270
        // RebroadcastInterval is the maximum time we wait between sending out
271
        // channel updates for our active channels and our own node
272
        // announcement. We do this to ensure our active presence on the
273
        // network is known, and we are not being considered a zombie node or
274
        // having zombie channels.
275
        RebroadcastInterval time.Duration
276

277
        // WaitingProofStore is a persistent storage of partial channel proof
278
        // announcement messages. We use it to buffer half of the material
279
        // needed to reconstruct a full authenticated channel announcement.
280
        // Once we receive the other half the channel proof, we'll be able to
281
        // properly validate it and re-broadcast it out to the network.
282
        //
283
        // TODO(wilmer): make interface to prevent channeldb dependency.
284
        WaitingProofStore *channeldb.WaitingProofStore
285

286
        // MessageStore is a persistent storage of gossip messages which we will
287
        // use to determine which messages need to be resent for a given peer.
288
        MessageStore GossipMessageStore
289

290
        // AnnSigner is an instance of the MessageSigner interface which will
291
        // be used to manually sign any outgoing channel updates. The signer
292
        // implementation should be backed by the public key of the backing
293
        // Lightning node.
294
        //
295
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
296
        // here?
297
        AnnSigner lnwallet.MessageSigner
298

299
        // ScidCloser is an instance of ClosedChannelTracker that helps the
300
        // gossiper cut down on spam channel announcements for already closed
301
        // channels.
302
        ScidCloser ClosedChannelTracker
303

304
        // NumActiveSyncers is the number of peers for which we should have
305
        // active syncers with. After reaching NumActiveSyncers, any future
306
        // gossip syncers will be passive.
307
        NumActiveSyncers int
308

309
        // NoTimestampQueries will prevent the GossipSyncer from querying
310
        // timestamps of announcement messages from the peer and from replying
311
        // to timestamp queries.
312
        NoTimestampQueries bool
313

314
        // RotateTicker is a ticker responsible for notifying the SyncManager
315
        // when it should rotate its active syncers. A single active syncer with
316
        // a chansSynced state will be exchanged for a passive syncer in order
317
        // to ensure we don't keep syncing with the same peers.
318
        RotateTicker ticker.Ticker
319

320
        // HistoricalSyncTicker is a ticker responsible for notifying the
321
        // syncManager when it should attempt a historical sync with a gossip
322
        // sync peer.
323
        HistoricalSyncTicker ticker.Ticker
324

325
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
326
        // syncManager when it should attempt to start the next pending
327
        // activeSyncer due to the current one not completing its state machine
328
        // within the timeout.
329
        ActiveSyncerTimeoutTicker ticker.Ticker
330

331
        // MinimumBatchSize is minimum size of a sub batch of announcement
332
        // messages.
333
        MinimumBatchSize int
334

335
        // SubBatchDelay is the delay between sending sub batches of
336
        // gossip messages.
337
        SubBatchDelay time.Duration
338

339
        // IgnoreHistoricalFilters will prevent syncers from replying with
340
        // historical data when the remote peer sets a gossip_timestamp_range.
341
        // This prevents ranges with old start times from causing us to dump the
342
        // graph on connect.
343
        IgnoreHistoricalFilters bool
344

345
        // PinnedSyncers is a set of peers that will always transition to
346
        // ActiveSync upon connection. These peers will never transition to
347
        // PassiveSync.
348
        PinnedSyncers PinnedSyncers
349

350
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
351
        // specific channel and direction that we'll accept over an interval.
352
        MaxChannelUpdateBurst int
353

354
        // ChannelUpdateInterval specifies the interval we'll use to determine
355
        // how often we should allow a new update for a specific channel and
356
        // direction.
357
        ChannelUpdateInterval time.Duration
358

359
        // IsAlias returns true if a given ShortChannelID is an alias for
360
        // option_scid_alias channels.
361
        IsAlias func(scid lnwire.ShortChannelID) bool
362

363
        // SignAliasUpdate is used to re-sign a channel update using the
364
        // remote's alias if the option-scid-alias feature bit was negotiated.
365
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
366
                error)
367

368
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
369
        // This is used for channels that have negotiated the option-scid-alias
370
        // feature bit.
371
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
372
                lnwire.ShortChannelID, error)
373

374
        // GetAlias allows the gossiper to look up the peer's alias for a given
375
        // ChannelID. This is used to sign updates for them if the channel has
376
        // no AuthProof and the option-scid-alias feature bit was negotiated.
377
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
378

379
        // FindChannel allows the gossiper to find a channel that we're party
380
        // to without iterating over the entire set of open channels.
381
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
382
                *channeldb.OpenChannel, error)
383

384
        // IsStillZombieChannel takes the timestamps of the latest channel
385
        // updates for a channel and returns true if the channel should be
386
        // considered a zombie based on these timestamps.
387
        IsStillZombieChannel func(time.Time, time.Time) bool
388

389
        // AssumeChannelValid toggles whether the gossiper will check for
390
        // spent-ness of channel outpoints. For neutrino, this saves long
391
        // rescans from blocking initial usage of the daemon.
392
        AssumeChannelValid bool
393
}
394

395
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
396
// used to let the caller of the lru.Cache know if a message has already been
397
// processed or not.
398
type processedNetworkMsg struct {
399
        processed bool
400
        msg       *networkMsg
401
}
402

403
// cachedNetworkMsg is a wrapper around a network message that can be used with
404
// *lru.Cache.
405
type cachedNetworkMsg struct {
406
        msgs []*processedNetworkMsg
407
}
408

409
// Size returns the "size" of an entry. We return the number of items as we
410
// just want to limit the total amount of entries rather than do accurate size
411
// accounting.
412
func (c *cachedNetworkMsg) Size() (uint64, error) {
5✔
413
        return uint64(len(c.msgs)), nil
5✔
414
}
5✔
415

416
// rejectCacheKey is the cache key that we'll use to track announcements we've
417
// recently rejected.
418
type rejectCacheKey struct {
419
        pubkey [33]byte
420
        chanID uint64
421
}
422

423
// newRejectCacheKey returns a new cache key for the reject cache.
424
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
468✔
425
        k := rejectCacheKey{
468✔
426
                chanID: cid,
468✔
427
                pubkey: pub,
468✔
428
        }
468✔
429

468✔
430
        return k
468✔
431
}
468✔
432

433
// sourceToPub returns a serialized-compressed public key for use in the reject
434
// cache.
435
func sourceToPub(pk *btcec.PublicKey) [33]byte {
482✔
436
        var pub [33]byte
482✔
437
        copy(pub[:], pk.SerializeCompressed())
482✔
438
        return pub
482✔
439
}
482✔
440

441
// cachedReject is the empty value used to track the value for rejects.
442
type cachedReject struct {
443
}
444

445
// Size returns the "size" of an entry. We return 1 as we just want to limit
446
// the total size.
447
func (c *cachedReject) Size() (uint64, error) {
206✔
448
        return 1, nil
206✔
449
}
206✔
450

451
// AuthenticatedGossiper is a subsystem which is responsible for receiving
452
// announcements, validating them and applying the changes to router, syncing
453
// lightning network with newly connected nodes, broadcasting announcements
454
// after validation, negotiating the channel announcement proofs exchange and
455
// handling the premature announcements. All outgoing announcements are
456
// expected to be properly signed as dictated in BOLT#7, additionally, all
457
// incoming message are expected to be well formed and signed. Invalid messages
458
// will be rejected by this struct.
459
type AuthenticatedGossiper struct {
460
        // Parameters which are needed to properly handle the start and stop of
461
        // the service.
462
        started sync.Once
463
        stopped sync.Once
464

465
        // bestHeight is the height of the block at the tip of the main chain
466
        // as we know it. Accesses *MUST* be done with the gossiper's lock
467
        // held.
468
        bestHeight uint32
469

470
        quit chan struct{}
471
        wg   sync.WaitGroup
472

473
        // cfg is a copy of the configuration struct that the gossiper service
474
        // was initialized with.
475
        cfg *Config
476

477
        // blockEpochs encapsulates a stream of block epochs that are sent at
478
        // every new block height.
479
        blockEpochs *chainntnfs.BlockEpochEvent
480

481
        // prematureChannelUpdates is a map of ChannelUpdates we have received
482
        // that wasn't associated with any channel we know about.  We store
483
        // them temporarily, such that we can reprocess them when a
484
        // ChannelAnnouncement for the channel is received.
485
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
486

487
        // banman tracks our peer's ban status.
488
        banman *banman
489

490
        // networkMsgs is a channel that carries new network broadcasted
491
        // message from outside the gossiper service to be processed by the
492
        // networkHandler.
493
        networkMsgs chan *networkMsg
494

495
        // futureMsgs is a list of premature network messages that have a block
496
        // height specified in the future. We will save them and resend it to
497
        // the chan networkMsgs once the block height has reached. The cached
498
        // map format is,
499
        //   {msgID1: msg1, msgID2: msg2, ...}
500
        futureMsgs *futureMsgCache
501

502
        // chanPolicyUpdates is a channel that requests to update the
503
        // forwarding policy of a set of channels is sent over.
504
        chanPolicyUpdates chan *chanPolicyUpdateRequest
505

506
        // selfKey is the identity public key of the backing Lightning node.
507
        selfKey *btcec.PublicKey
508

509
        // selfKeyLoc is the locator for the identity public key of the backing
510
        // Lightning node.
511
        selfKeyLoc keychain.KeyLocator
512

513
        // channelMtx is used to restrict the database access to one
514
        // goroutine per channel ID. This is done to ensure that when
515
        // the gossiper is handling an announcement, the db state stays
516
        // consistent between when the DB is first read until it's written.
517
        channelMtx *multimutex.Mutex[uint64]
518

519
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
520

521
        // syncMgr is a subsystem responsible for managing the gossip syncers
522
        // for peers currently connected. When a new peer is connected, the
523
        // manager will create its accompanying gossip syncer and determine
524
        // whether it should have an activeSync or passiveSync sync type based
525
        // on how many other gossip syncers are currently active. Any activeSync
526
        // gossip syncers are started in a round-robin manner to ensure we're
527
        // not syncing with multiple peers at the same time.
528
        syncMgr *SyncManager
529

530
        // reliableSender is a subsystem responsible for handling reliable
531
        // message send requests to peers. This should only be used for channels
532
        // that are unadvertised at the time of handling the message since if it
533
        // is advertised, then peers should be able to get the message from the
534
        // network.
535
        reliableSender *reliableSender
536

537
        // chanUpdateRateLimiter contains rate limiters for each direction of
538
        // a channel update we've processed. We'll use these to determine
539
        // whether we should accept a new update for a specific channel and
540
        // direction.
541
        //
542
        // NOTE: This map must be synchronized with the main
543
        // AuthenticatedGossiper lock.
544
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
545

546
        // vb is used to enforce job dependency ordering of gossip messages.
547
        vb *ValidationBarrier
548

549
        sync.Mutex
550
}
551

552
// New creates a new AuthenticatedGossiper instance, initialized with the
553
// passed configuration parameters.
554
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
32✔
555
        gossiper := &AuthenticatedGossiper{
32✔
556
                selfKey:           selfKeyDesc.PubKey,
32✔
557
                selfKeyLoc:        selfKeyDesc.KeyLocator,
32✔
558
                cfg:               &cfg,
32✔
559
                networkMsgs:       make(chan *networkMsg),
32✔
560
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
32✔
561
                quit:              make(chan struct{}),
32✔
562
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
32✔
563
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
32✔
564
                        maxPrematureUpdates,
32✔
565
                ),
32✔
566
                channelMtx: multimutex.NewMutex[uint64](),
32✔
567
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
32✔
568
                        maxRejectedUpdates,
32✔
569
                ),
32✔
570
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
32✔
571
                banman:                newBanman(),
32✔
572
        }
32✔
573

32✔
574
        gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
32✔
575

32✔
576
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
32✔
577
                ChainHash:               cfg.ChainHash,
32✔
578
                ChanSeries:              cfg.ChanSeries,
32✔
579
                RotateTicker:            cfg.RotateTicker,
32✔
580
                HistoricalSyncTicker:    cfg.HistoricalSyncTicker,
32✔
581
                NumActiveSyncers:        cfg.NumActiveSyncers,
32✔
582
                NoTimestampQueries:      cfg.NoTimestampQueries,
32✔
583
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
32✔
584
                BestHeight:              gossiper.latestHeight,
32✔
585
                PinnedSyncers:           cfg.PinnedSyncers,
32✔
586
                IsStillZombieChannel:    cfg.IsStillZombieChannel,
32✔
587
        })
32✔
588

32✔
589
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
32✔
590
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
32✔
591
                NotifyWhenOffline: cfg.NotifyWhenOffline,
32✔
592
                MessageStore:      cfg.MessageStore,
32✔
593
                IsMsgStale:        gossiper.isMsgStale,
32✔
594
        })
32✔
595

32✔
596
        return gossiper
32✔
597
}
32✔
598

599
// EdgeWithInfo contains the information that is required to update an edge.
600
type EdgeWithInfo struct {
601
        // Info describes the channel.
602
        Info *models.ChannelEdgeInfo
603

604
        // Edge describes the policy in one direction of the channel.
605
        Edge *models.ChannelEdgePolicy
606
}
607

608
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
609
// specified edge updates. Updates are done in two stages: first, the
610
// AuthenticatedGossiper ensures the update has been committed by dependent
611
// sub-systems, then it signs and broadcasts new updates to the network. A
612
// mapping between outpoints and updated channel policies is returned, which is
613
// used to update the forwarding policies of the underlying links.
614
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
615
        edgesToUpdate []EdgeWithInfo) error {
4✔
616

4✔
617
        errChan := make(chan error, 1)
4✔
618
        policyUpdate := &chanPolicyUpdateRequest{
4✔
619
                edgesToUpdate: edgesToUpdate,
4✔
620
                errChan:       errChan,
4✔
621
        }
4✔
622

4✔
623
        select {
4✔
624
        case d.chanPolicyUpdates <- policyUpdate:
4✔
625
                err := <-errChan
4✔
626
                return err
4✔
627
        case <-d.quit:
×
628
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
629
        }
630
}
631

632
// Start spawns network messages handler goroutine and registers on new block
633
// notifications in order to properly handle the premature announcements.
634
func (d *AuthenticatedGossiper) Start() error {
32✔
635
        var err error
32✔
636
        d.started.Do(func() {
64✔
637
                log.Info("Authenticated Gossiper starting")
32✔
638
                err = d.start()
32✔
639
        })
32✔
640
        return err
32✔
641
}
642

643
func (d *AuthenticatedGossiper) start() error {
32✔
644
        // First we register for new notifications of newly discovered blocks.
32✔
645
        // We do this immediately so we'll later be able to consume any/all
32✔
646
        // blocks which were discovered.
32✔
647
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
32✔
648
        if err != nil {
32✔
649
                return err
×
650
        }
×
651
        d.blockEpochs = blockEpochs
32✔
652

32✔
653
        height, err := d.cfg.Graph.CurrentBlockHeight()
32✔
654
        if err != nil {
32✔
655
                return err
×
656
        }
×
657
        d.bestHeight = height
32✔
658

32✔
659
        // Start the reliable sender. In case we had any pending messages ready
32✔
660
        // to be sent when the gossiper was last shut down, we must continue on
32✔
661
        // our quest to deliver them to their respective peers.
32✔
662
        if err := d.reliableSender.Start(); err != nil {
32✔
663
                return err
×
664
        }
×
665

666
        d.syncMgr.Start()
32✔
667

32✔
668
        d.banman.start()
32✔
669

32✔
670
        // Start receiving blocks in its dedicated goroutine.
32✔
671
        d.wg.Add(2)
32✔
672
        go d.syncBlockHeight()
32✔
673
        go d.networkHandler()
32✔
674

32✔
675
        return nil
32✔
676
}
677

678
// syncBlockHeight syncs the best block height for the gossiper by reading
679
// blockEpochs.
680
//
681
// NOTE: must be run as a goroutine.
682
func (d *AuthenticatedGossiper) syncBlockHeight() {
32✔
683
        defer d.wg.Done()
32✔
684

32✔
685
        for {
64✔
686
                select {
32✔
687
                // A new block has arrived, so we can re-process the previously
688
                // premature announcements.
689
                case newBlock, ok := <-d.blockEpochs.Epochs:
3✔
690
                        // If the channel has been closed, then this indicates
3✔
691
                        // the daemon is shutting down, so we exit ourselves.
3✔
692
                        if !ok {
6✔
693
                                return
3✔
694
                        }
3✔
695

696
                        // Once a new block arrives, we update our running
697
                        // track of the height of the chain tip.
698
                        d.Lock()
3✔
699
                        blockHeight := uint32(newBlock.Height)
3✔
700
                        d.bestHeight = blockHeight
3✔
701
                        d.Unlock()
3✔
702

3✔
703
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
3✔
704
                                newBlock.Hash)
3✔
705

3✔
706
                        // Resend future messages, if any.
3✔
707
                        d.resendFutureMessages(blockHeight)
3✔
708

709
                case <-d.quit:
29✔
710
                        return
29✔
711
                }
712
        }
713
}
714

715
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
716
// the unique ID when saving the message.
717
type futureMsgCache struct {
718
        *lru.Cache[uint64, *cachedFutureMsg]
719

720
        // msgID is a monotonically increased integer.
721
        msgID atomic.Uint64
722
}
723

724
// nextMsgID returns a unique message ID.
725
func (f *futureMsgCache) nextMsgID() uint64 {
6✔
726
        return f.msgID.Add(1)
6✔
727
}
6✔
728

729
// newFutureMsgCache creates a new future message cache with the underlying lru
730
// cache being initialized with the specified capacity.
731
func newFutureMsgCache(capacity uint64) *futureMsgCache {
33✔
732
        // Create a new cache.
33✔
733
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
33✔
734

33✔
735
        return &futureMsgCache{
33✔
736
                Cache: cache,
33✔
737
        }
33✔
738
}
33✔
739

740
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
741
type cachedFutureMsg struct {
742
        // msg is the network message.
743
        msg *networkMsg
744

745
        // height is the block height.
746
        height uint32
747
}
748

749
// Size returns the size of the message.
750
func (c *cachedFutureMsg) Size() (uint64, error) {
7✔
751
        // Return a constant 1.
7✔
752
        return 1, nil
7✔
753
}
7✔
754

755
// resendFutureMessages takes a block height, resends all the future messages
756
// found below and equal to that height and deletes those messages found in the
757
// gossiper's futureMsgs.
758
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
3✔
759
        var (
3✔
760
                // msgs are the target messages.
3✔
761
                msgs []*networkMsg
3✔
762

3✔
763
                // keys are the target messages' caching keys.
3✔
764
                keys []uint64
3✔
765
        )
3✔
766

3✔
767
        // filterMsgs is the visitor used when iterating the future cache.
3✔
768
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
6✔
769
                if cmsg.height <= height {
6✔
770
                        msgs = append(msgs, cmsg.msg)
3✔
771
                        keys = append(keys, k)
3✔
772
                }
3✔
773

774
                return true
3✔
775
        }
776

777
        // Filter out the target messages.
778
        d.futureMsgs.Range(filterMsgs)
3✔
779

3✔
780
        // Return early if no messages found.
3✔
781
        if len(msgs) == 0 {
6✔
782
                return
3✔
783
        }
3✔
784

785
        // Remove the filtered messages.
786
        for _, key := range keys {
6✔
787
                d.futureMsgs.Delete(key)
3✔
788
        }
3✔
789

790
        log.Debugf("Resending %d network messages at height %d",
3✔
791
                len(msgs), height)
3✔
792

3✔
793
        for _, msg := range msgs {
6✔
794
                select {
3✔
795
                case d.networkMsgs <- msg:
3✔
796
                case <-d.quit:
×
797
                        msg.err <- ErrGossiperShuttingDown
×
798
                }
799
        }
800
}
801

802
// Stop signals any active goroutines for a graceful closure.
803
func (d *AuthenticatedGossiper) Stop() error {
33✔
804
        d.stopped.Do(func() {
65✔
805
                log.Info("Authenticated gossiper shutting down...")
32✔
806
                defer log.Debug("Authenticated gossiper shutdown complete")
32✔
807

32✔
808
                d.stop()
32✔
809
        })
32✔
810
        return nil
33✔
811
}
812

813
func (d *AuthenticatedGossiper) stop() {
32✔
814
        log.Debug("Authenticated Gossiper is stopping")
32✔
815
        defer log.Debug("Authenticated Gossiper stopped")
32✔
816

32✔
817
        // `blockEpochs` is only initialized in the start routine so we make
32✔
818
        // sure we don't panic here in the case where the `Stop` method is
32✔
819
        // called when the `Start` method does not complete.
32✔
820
        if d.blockEpochs != nil {
64✔
821
                d.blockEpochs.Cancel()
32✔
822
        }
32✔
823

824
        d.syncMgr.Stop()
32✔
825

32✔
826
        d.banman.stop()
32✔
827

32✔
828
        close(d.quit)
32✔
829
        d.wg.Wait()
32✔
830

32✔
831
        // We'll stop our reliable sender after all of the gossiper's goroutines
32✔
832
        // have exited to ensure nothing can cause it to continue executing.
32✔
833
        d.reliableSender.Stop()
32✔
834
}
835

836
// TODO(roasbeef): need method to get current gossip timestamp?
837
//  * using mtx, check time rotate forward is needed?
838

839
// ProcessRemoteAnnouncement sends a new remote announcement message along with
840
// the peer that sent the routing message. The announcement will be processed
841
// then added to a queue for batched trickled announcement to all connected
842
// peers.  Remote channel announcements should contain the announcement proof
843
// and be fully validated.
844
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
845
        peer lnpeer.Peer) chan error {
287✔
846

287✔
847
        log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey())
287✔
848

287✔
849
        errChan := make(chan error, 1)
287✔
850

287✔
851
        // For messages in the known set of channel series queries, we'll
287✔
852
        // dispatch the message directly to the GossipSyncer, and skip the main
287✔
853
        // processing loop.
287✔
854
        switch m := msg.(type) {
287✔
855
        case *lnwire.QueryShortChanIDs,
856
                *lnwire.QueryChannelRange,
857
                *lnwire.ReplyChannelRange,
858
                *lnwire.ReplyShortChanIDsEnd:
3✔
859

3✔
860
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
861
                if !ok {
3✔
862
                        log.Warnf("Gossip syncer for peer=%x not found",
×
863
                                peer.PubKey())
×
864

×
865
                        errChan <- ErrGossipSyncerNotFound
×
866
                        return errChan
×
867
                }
×
868

869
                // If we've found the message target, then we'll dispatch the
870
                // message directly to it.
871
                err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
3✔
872
                if err != nil {
3✔
UNCOV
873
                        log.Errorf("Process query msg from peer %x got %v",
×
UNCOV
874
                                peer.PubKey(), err)
×
UNCOV
875
                }
×
876

877
                errChan <- err
3✔
878
                return errChan
3✔
879

880
        // If a peer is updating its current update horizon, then we'll dispatch
881
        // that directly to the proper GossipSyncer.
882
        case *lnwire.GossipTimestampRange:
3✔
883
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
884
                if !ok {
3✔
885
                        log.Warnf("Gossip syncer for peer=%x not found",
×
886
                                peer.PubKey())
×
887

×
888
                        errChan <- ErrGossipSyncerNotFound
×
889
                        return errChan
×
890
                }
×
891

892
                // If we've found the message target, then we'll dispatch the
893
                // message directly to it.
894
                if err := syncer.ApplyGossipFilter(m); err != nil {
3✔
895
                        log.Warnf("Unable to apply gossip filter for peer=%x: "+
×
896
                                "%v", peer.PubKey(), err)
×
897

×
898
                        errChan <- err
×
899
                        return errChan
×
900
                }
×
901

902
                errChan <- nil
3✔
903
                return errChan
3✔
904

905
        // To avoid inserting edges in the graph for our own channels that we
906
        // have already closed, we ignore such channel announcements coming
907
        // from the remote.
908
        case *lnwire.ChannelAnnouncement1:
222✔
909
                ownKey := d.selfKey.SerializeCompressed()
222✔
910
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
222✔
911
                        "for own channel")
222✔
912

222✔
913
                if bytes.Equal(m.NodeID1[:], ownKey) ||
222✔
914
                        bytes.Equal(m.NodeID2[:], ownKey) {
227✔
915

5✔
916
                        log.Warn(ownErr)
5✔
917
                        errChan <- ownErr
5✔
918
                        return errChan
5✔
919
                }
5✔
920
        }
921

922
        nMsg := &networkMsg{
285✔
923
                msg:      msg,
285✔
924
                isRemote: true,
285✔
925
                peer:     peer,
285✔
926
                source:   peer.IdentityKey(),
285✔
927
                err:      errChan,
285✔
928
        }
285✔
929

285✔
930
        select {
285✔
931
        case d.networkMsgs <- nMsg:
285✔
932

933
        // If the peer that sent us this error is quitting, then we don't need
934
        // to send back an error and can return immediately.
935
        case <-peer.QuitSignal():
×
936
                return nil
×
937
        case <-d.quit:
×
938
                nMsg.err <- ErrGossiperShuttingDown
×
939
        }
940

941
        return nMsg.err
285✔
942
}
943

944
// ProcessLocalAnnouncement sends a new remote announcement message along with
945
// the peer that sent the routing message. The announcement will be processed
946
// then added to a queue for batched trickled announcement to all connected
947
// peers.  Local channel announcements don't contain the announcement proof and
948
// will not be fully validated. Once the channel proofs are received, the
949
// entire channel announcement and update messages will be re-constructed and
950
// broadcast to the rest of the network.
951
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
952
        optionalFields ...OptionalMsgField) chan error {
50✔
953

50✔
954
        optionalMsgFields := &optionalMsgFields{}
50✔
955
        optionalMsgFields.apply(optionalFields...)
50✔
956

50✔
957
        nMsg := &networkMsg{
50✔
958
                msg:               msg,
50✔
959
                optionalMsgFields: optionalMsgFields,
50✔
960
                isRemote:          false,
50✔
961
                source:            d.selfKey,
50✔
962
                err:               make(chan error, 1),
50✔
963
        }
50✔
964

50✔
965
        select {
50✔
966
        case d.networkMsgs <- nMsg:
50✔
967
        case <-d.quit:
×
968
                nMsg.err <- ErrGossiperShuttingDown
×
969
        }
970

971
        return nMsg.err
50✔
972
}
973

974
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
975
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
976
// tuple.
977
type channelUpdateID struct {
978
        // channelID represents the set of data which is needed to
979
        // retrieve all necessary data to validate the channel existence.
980
        channelID lnwire.ShortChannelID
981

982
        // Flags least-significant bit must be set to 0 if the creating node
983
        // corresponds to the first node in the previously sent channel
984
        // announcement and 1 otherwise.
985
        flags lnwire.ChanUpdateChanFlags
986
}
987

988
// msgWithSenders is a wrapper struct around a message, and the set of peers
989
// that originally sent us this message. Using this struct, we can ensure that
990
// we don't re-send a message to the peer that sent it to us in the first
991
// place.
992
type msgWithSenders struct {
993
        // msg is the wire message itself.
994
        msg lnwire.Message
995

996
        // isLocal is true if this was a message that originated locally. We'll
997
        // use this to bypass our normal checks to ensure we prioritize sending
998
        // out our own updates.
999
        isLocal bool
1000

1001
        // sender is the set of peers that sent us this message.
1002
        senders map[route.Vertex]struct{}
1003
}
1004

1005
// mergeSyncerMap is used to merge the set of senders of a particular message
1006
// with peers that we have an active GossipSyncer with. We do this to ensure
1007
// that we don't broadcast messages to any peers that we have active gossip
1008
// syncers for.
1009
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
27✔
1010
        for peerPub := range syncers {
30✔
1011
                m.senders[peerPub] = struct{}{}
3✔
1012
        }
3✔
1013
}
1014

1015
// deDupedAnnouncements de-duplicates announcements that have been added to the
1016
// batch. Internally, announcements are stored in three maps
1017
// (one each for channel announcements, channel updates, and node
1018
// announcements). These maps keep track of unique announcements and ensure no
1019
// announcements are duplicated. We keep the three message types separate, such
1020
// that we can send channel announcements first, then channel updates, and
1021
// finally node announcements when it's time to broadcast them.
1022
type deDupedAnnouncements struct {
1023
        // channelAnnouncements are identified by the short channel id field.
1024
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
1025

1026
        // channelUpdates are identified by the channel update id field.
1027
        channelUpdates map[channelUpdateID]msgWithSenders
1028

1029
        // nodeAnnouncements are identified by the Vertex field.
1030
        nodeAnnouncements map[route.Vertex]msgWithSenders
1031

1032
        sync.Mutex
1033
}
1034

1035
// Reset operates on deDupedAnnouncements to reset the storage of
1036
// announcements.
1037
func (d *deDupedAnnouncements) Reset() {
34✔
1038
        d.Lock()
34✔
1039
        defer d.Unlock()
34✔
1040

34✔
1041
        d.reset()
34✔
1042
}
34✔
1043

1044
// reset is the private version of the Reset method. We have this so we can
1045
// call this method within method that are already holding the lock.
1046
func (d *deDupedAnnouncements) reset() {
321✔
1047
        // Storage of each type of announcement (channel announcements, channel
321✔
1048
        // updates, node announcements) is set to an empty map where the
321✔
1049
        // appropriate key points to the corresponding lnwire.Message.
321✔
1050
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
321✔
1051
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
321✔
1052
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
321✔
1053
}
321✔
1054

1055
// addMsg adds a new message to the current batch. If the message is already
1056
// present in the current batch, then this new instance replaces the latter,
1057
// and the set of senders is updated to reflect which node sent us this
1058
// message.
1059
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
89✔
1060
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
89✔
1061

89✔
1062
        // Depending on the message type (channel announcement, channel update,
89✔
1063
        // or node announcement), the message is added to the corresponding map
89✔
1064
        // in deDupedAnnouncements. Because each identifying key can have at
89✔
1065
        // most one value, the announcements are de-duplicated, with newer ones
89✔
1066
        // replacing older ones.
89✔
1067
        switch msg := message.msg.(type) {
89✔
1068

1069
        // Channel announcements are identified by the short channel id field.
1070
        case *lnwire.ChannelAnnouncement1:
25✔
1071
                deDupKey := msg.ShortChannelID
25✔
1072
                sender := route.NewVertex(message.source)
25✔
1073

25✔
1074
                mws, ok := d.channelAnnouncements[deDupKey]
25✔
1075
                if !ok {
49✔
1076
                        mws = msgWithSenders{
24✔
1077
                                msg:     msg,
24✔
1078
                                isLocal: !message.isRemote,
24✔
1079
                                senders: make(map[route.Vertex]struct{}),
24✔
1080
                        }
24✔
1081
                        mws.senders[sender] = struct{}{}
24✔
1082

24✔
1083
                        d.channelAnnouncements[deDupKey] = mws
24✔
1084

24✔
1085
                        return
24✔
1086
                }
24✔
1087

1088
                mws.msg = msg
1✔
1089
                mws.senders[sender] = struct{}{}
1✔
1090
                d.channelAnnouncements[deDupKey] = mws
1✔
1091

1092
        // Channel updates are identified by the (short channel id,
1093
        // channelflags) tuple.
1094
        case *lnwire.ChannelUpdate1:
45✔
1095
                sender := route.NewVertex(message.source)
45✔
1096
                deDupKey := channelUpdateID{
45✔
1097
                        msg.ShortChannelID,
45✔
1098
                        msg.ChannelFlags,
45✔
1099
                }
45✔
1100

45✔
1101
                oldTimestamp := uint32(0)
45✔
1102
                mws, ok := d.channelUpdates[deDupKey]
45✔
1103
                if ok {
48✔
1104
                        // If we already have seen this message, record its
3✔
1105
                        // timestamp.
3✔
1106
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
3✔
1107
                        if !ok {
3✔
1108
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1109
                                        "got: %T", mws.msg)
×
1110

×
1111
                                return
×
1112
                        }
×
1113

1114
                        oldTimestamp = update.Timestamp
3✔
1115
                }
1116

1117
                // If we already had this message with a strictly newer
1118
                // timestamp, then we'll just discard the message we got.
1119
                if oldTimestamp > msg.Timestamp {
46✔
1120
                        log.Debugf("Ignored outdated network message: "+
1✔
1121
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1✔
1122
                        return
1✔
1123
                }
1✔
1124

1125
                // If the message we just got is newer than what we previously
1126
                // have seen, or this is the first time we see it, then we'll
1127
                // add it to our map of announcements.
1128
                if oldTimestamp < msg.Timestamp {
87✔
1129
                        mws = msgWithSenders{
43✔
1130
                                msg:     msg,
43✔
1131
                                isLocal: !message.isRemote,
43✔
1132
                                senders: make(map[route.Vertex]struct{}),
43✔
1133
                        }
43✔
1134

43✔
1135
                        // We'll mark the sender of the message in the
43✔
1136
                        // senders map.
43✔
1137
                        mws.senders[sender] = struct{}{}
43✔
1138

43✔
1139
                        d.channelUpdates[deDupKey] = mws
43✔
1140

43✔
1141
                        return
43✔
1142
                }
43✔
1143

1144
                // Lastly, if we had seen this exact message from before, with
1145
                // the same timestamp, we'll add the sender to the map of
1146
                // senders, such that we can skip sending this message back in
1147
                // the next batch.
1148
                mws.msg = msg
1✔
1149
                mws.senders[sender] = struct{}{}
1✔
1150
                d.channelUpdates[deDupKey] = mws
1✔
1151

1152
        // Node announcements are identified by the Vertex field.  Use the
1153
        // NodeID to create the corresponding Vertex.
1154
        case *lnwire.NodeAnnouncement:
25✔
1155
                sender := route.NewVertex(message.source)
25✔
1156
                deDupKey := route.Vertex(msg.NodeID)
25✔
1157

25✔
1158
                // We do the same for node announcements as we did for channel
25✔
1159
                // updates, as they also carry a timestamp.
25✔
1160
                oldTimestamp := uint32(0)
25✔
1161
                mws, ok := d.nodeAnnouncements[deDupKey]
25✔
1162
                if ok {
33✔
1163
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
8✔
1164
                }
8✔
1165

1166
                // Discard the message if it's old.
1167
                if oldTimestamp > msg.Timestamp {
28✔
1168
                        return
3✔
1169
                }
3✔
1170

1171
                // Replace if it's newer.
1172
                if oldTimestamp < msg.Timestamp {
46✔
1173
                        mws = msgWithSenders{
21✔
1174
                                msg:     msg,
21✔
1175
                                isLocal: !message.isRemote,
21✔
1176
                                senders: make(map[route.Vertex]struct{}),
21✔
1177
                        }
21✔
1178

21✔
1179
                        mws.senders[sender] = struct{}{}
21✔
1180

21✔
1181
                        d.nodeAnnouncements[deDupKey] = mws
21✔
1182

21✔
1183
                        return
21✔
1184
                }
21✔
1185

1186
                // Add to senders map if it's the same as we had.
1187
                mws.msg = msg
7✔
1188
                mws.senders[sender] = struct{}{}
7✔
1189
                d.nodeAnnouncements[deDupKey] = mws
7✔
1190
        }
1191
}
1192

1193
// AddMsgs is a helper method to add multiple messages to the announcement
1194
// batch.
1195
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
57✔
1196
        d.Lock()
57✔
1197
        defer d.Unlock()
57✔
1198

57✔
1199
        for _, msg := range msgs {
146✔
1200
                d.addMsg(msg)
89✔
1201
        }
89✔
1202
}
1203

1204
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1205
// to broadcast next into messages that are locally sourced and those that are
1206
// sourced remotely.
1207
type msgsToBroadcast struct {
1208
        // localMsgs is the set of messages we created locally.
1209
        localMsgs []msgWithSenders
1210

1211
        // remoteMsgs is the set of messages that we received from a remote
1212
        // party.
1213
        remoteMsgs []msgWithSenders
1214
}
1215

1216
// addMsg adds a new message to the appropriate sub-slice.
1217
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
74✔
1218
        if msg.isLocal {
124✔
1219
                m.localMsgs = append(m.localMsgs, msg)
50✔
1220
        } else {
77✔
1221
                m.remoteMsgs = append(m.remoteMsgs, msg)
27✔
1222
        }
27✔
1223
}
1224

1225
// isEmpty returns true if the batch is empty.
1226
func (m *msgsToBroadcast) isEmpty() bool {
289✔
1227
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
289✔
1228
}
289✔
1229

1230
// length returns the length of the combined message set.
1231
func (m *msgsToBroadcast) length() int {
1✔
1232
        return len(m.localMsgs) + len(m.remoteMsgs)
1✔
1233
}
1✔
1234

1235
// Emit returns the set of de-duplicated announcements to be sent out during
1236
// the next announcement epoch, in the order of channel announcements, channel
1237
// updates, and node announcements. Each message emitted, contains the set of
1238
// peers that sent us the message. This way, we can ensure that we don't waste
1239
// bandwidth by re-sending a message to the peer that sent it to us in the
1240
// first place. Additionally, the set of stored messages are reset.
1241
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
290✔
1242
        d.Lock()
290✔
1243
        defer d.Unlock()
290✔
1244

290✔
1245
        // Get the total number of announcements.
290✔
1246
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
290✔
1247
                len(d.nodeAnnouncements)
290✔
1248

290✔
1249
        // Create an empty array of lnwire.Messages with a length equal to
290✔
1250
        // the total number of announcements.
290✔
1251
        msgs := msgsToBroadcast{
290✔
1252
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
290✔
1253
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
290✔
1254
        }
290✔
1255

290✔
1256
        // Add the channel announcements to the array first.
290✔
1257
        for _, message := range d.channelAnnouncements {
311✔
1258
                msgs.addMsg(message)
21✔
1259
        }
21✔
1260

1261
        // Then add the channel updates.
1262
        for _, message := range d.channelUpdates {
329✔
1263
                msgs.addMsg(message)
39✔
1264
        }
39✔
1265

1266
        // Finally add the node announcements.
1267
        for _, message := range d.nodeAnnouncements {
310✔
1268
                msgs.addMsg(message)
20✔
1269
        }
20✔
1270

1271
        d.reset()
290✔
1272

290✔
1273
        // Return the array of lnwire.messages.
290✔
1274
        return msgs
290✔
1275
}
1276

1277
// calculateSubBatchSize is a helper function that calculates the size to break
1278
// down the batchSize into.
1279
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1280
        minimumBatchSize, batchSize int) int {
16✔
1281
        if subBatchDelay > totalDelay {
18✔
1282
                return batchSize
2✔
1283
        }
2✔
1284

1285
        subBatchSize := (batchSize*int(subBatchDelay) +
14✔
1286
                int(totalDelay) - 1) / int(totalDelay)
14✔
1287

14✔
1288
        if subBatchSize < minimumBatchSize {
18✔
1289
                return minimumBatchSize
4✔
1290
        }
4✔
1291

1292
        return subBatchSize
10✔
1293
}
1294

1295
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1296
// this variable so the function can be mocked in our test.
1297
var batchSizeCalculator = calculateSubBatchSize
1298

1299
// splitAnnouncementBatches takes an exiting list of announcements and
1300
// decomposes it into sub batches controlled by the `subBatchSize`.
1301
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1302
        announcementBatch []msgWithSenders) [][]msgWithSenders {
72✔
1303

72✔
1304
        subBatchSize := batchSizeCalculator(
72✔
1305
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
72✔
1306
                d.cfg.MinimumBatchSize, len(announcementBatch),
72✔
1307
        )
72✔
1308

72✔
1309
        var splitAnnouncementBatch [][]msgWithSenders
72✔
1310

72✔
1311
        for subBatchSize < len(announcementBatch) {
196✔
1312
                // For slicing with minimal allocation
124✔
1313
                // https://github.com/golang/go/wiki/SliceTricks
124✔
1314
                announcementBatch, splitAnnouncementBatch =
124✔
1315
                        announcementBatch[subBatchSize:],
124✔
1316
                        append(splitAnnouncementBatch,
124✔
1317
                                announcementBatch[0:subBatchSize:subBatchSize])
124✔
1318
        }
124✔
1319
        splitAnnouncementBatch = append(
72✔
1320
                splitAnnouncementBatch, announcementBatch,
72✔
1321
        )
72✔
1322

72✔
1323
        return splitAnnouncementBatch
72✔
1324
}
1325

1326
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1327
// split size, and then sends out all items to the set of target peers. Locally
1328
// generated announcements are always sent before remotely generated
1329
// announcements.
1330
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
1331
        annBatch msgsToBroadcast) {
34✔
1332

34✔
1333
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
34✔
1334
        // duration to delay the sending of next announcement batch.
34✔
1335
        delayNextBatch := func() {
99✔
1336
                select {
65✔
1337
                case <-time.After(d.cfg.SubBatchDelay):
48✔
1338
                case <-d.quit:
17✔
1339
                        return
17✔
1340
                }
1341
        }
1342

1343
        // Fetch the local and remote announcements.
1344
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
34✔
1345
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
34✔
1346

34✔
1347
        d.wg.Add(1)
34✔
1348
        go func() {
68✔
1349
                defer d.wg.Done()
34✔
1350

34✔
1351
                log.Debugf("Broadcasting %v new local announcements in %d "+
34✔
1352
                        "sub batches", len(annBatch.localMsgs),
34✔
1353
                        len(localBatches))
34✔
1354

34✔
1355
                // Send out the local announcements first.
34✔
1356
                for _, annBatch := range localBatches {
68✔
1357
                        d.sendLocalBatch(annBatch)
34✔
1358
                        delayNextBatch()
34✔
1359
                }
34✔
1360

1361
                log.Debugf("Broadcasting %v new remote announcements in %d "+
34✔
1362
                        "sub batches", len(annBatch.remoteMsgs),
34✔
1363
                        len(remoteBatches))
34✔
1364

34✔
1365
                // Now send the remote announcements.
34✔
1366
                for _, annBatch := range remoteBatches {
68✔
1367
                        d.sendRemoteBatch(annBatch)
34✔
1368
                        delayNextBatch()
34✔
1369
                }
34✔
1370
        }()
1371
}
1372

1373
// sendLocalBatch broadcasts a list of locally generated announcements to our
1374
// peers. For local announcements, we skip the filter and dedup logic and just
1375
// send the announcements out to all our coonnected peers.
1376
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
34✔
1377
        msgsToSend := lnutils.Map(
34✔
1378
                annBatch, func(m msgWithSenders) lnwire.Message {
80✔
1379
                        return m.msg
46✔
1380
                },
46✔
1381
        )
1382

1383
        err := d.cfg.Broadcast(nil, msgsToSend...)
34✔
1384
        if err != nil {
34✔
1385
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1386
        }
×
1387
}
1388

1389
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1390
// peers.
1391
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
34✔
1392
        syncerPeers := d.syncMgr.GossipSyncers()
34✔
1393

34✔
1394
        // We'll first attempt to filter out this new message for all peers
34✔
1395
        // that have active gossip syncers active.
34✔
1396
        for pub, syncer := range syncerPeers {
37✔
1397
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
3✔
1398
                syncer.FilterGossipMsgs(annBatch...)
3✔
1399
        }
3✔
1400

1401
        for _, msgChunk := range annBatch {
61✔
1402
                msgChunk := msgChunk
27✔
1403

27✔
1404
                // With the syncers taken care of, we'll merge the sender map
27✔
1405
                // with the set of syncers, so we don't send out duplicate
27✔
1406
                // messages.
27✔
1407
                msgChunk.mergeSyncerMap(syncerPeers)
27✔
1408

27✔
1409
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
27✔
1410
                if err != nil {
27✔
1411
                        log.Errorf("Unable to send batch "+
×
1412
                                "announcements: %v", err)
×
1413
                        continue
×
1414
                }
1415
        }
1416
}
1417

1418
// networkHandler is the primary goroutine that drives this service. The roles
1419
// of this goroutine includes answering queries related to the state of the
1420
// network, syncing up newly connected peers, and also periodically
1421
// broadcasting our latest topology state to all connected peers.
1422
//
1423
// NOTE: This MUST be run as a goroutine.
1424
func (d *AuthenticatedGossiper) networkHandler() {
32✔
1425
        defer d.wg.Done()
32✔
1426

32✔
1427
        // Initialize empty deDupedAnnouncements to store announcement batch.
32✔
1428
        announcements := deDupedAnnouncements{}
32✔
1429
        announcements.Reset()
32✔
1430

32✔
1431
        d.cfg.RetransmitTicker.Resume()
32✔
1432
        defer d.cfg.RetransmitTicker.Stop()
32✔
1433

32✔
1434
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
32✔
1435
        defer trickleTimer.Stop()
32✔
1436

32✔
1437
        // To start, we'll first check to see if there are any stale channel or
32✔
1438
        // node announcements that we need to re-transmit.
32✔
1439
        if err := d.retransmitStaleAnns(time.Now()); err != nil {
32✔
1440
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1441
        }
×
1442

1443
        for {
683✔
1444
                select {
651✔
1445
                // A new policy update has arrived. We'll commit it to the
1446
                // sub-systems below us, then craft, sign, and broadcast a new
1447
                // ChannelUpdate for the set of affected clients.
1448
                case policyUpdate := <-d.chanPolicyUpdates:
4✔
1449
                        log.Tracef("Received channel %d policy update requests",
4✔
1450
                                len(policyUpdate.edgesToUpdate))
4✔
1451

4✔
1452
                        // First, we'll now create new fully signed updates for
4✔
1453
                        // the affected channels and also update the underlying
4✔
1454
                        // graph with the new state.
4✔
1455
                        newChanUpdates, err := d.processChanPolicyUpdate(
4✔
1456
                                policyUpdate.edgesToUpdate,
4✔
1457
                        )
4✔
1458
                        policyUpdate.errChan <- err
4✔
1459
                        if err != nil {
4✔
1460
                                log.Errorf("Unable to craft policy updates: %v",
×
1461
                                        err)
×
1462
                                continue
×
1463
                        }
1464

1465
                        // Finally, with the updates committed, we'll now add
1466
                        // them to the announcement batch to be flushed at the
1467
                        // start of the next epoch.
1468
                        announcements.AddMsgs(newChanUpdates...)
4✔
1469

1470
                case announcement := <-d.networkMsgs:
334✔
1471
                        log.Tracef("Received network message: "+
334✔
1472
                                "peer=%v, msg=%s, is_remote=%v",
334✔
1473
                                announcement.peer, announcement.msg.MsgType(),
334✔
1474
                                announcement.isRemote)
334✔
1475

334✔
1476
                        switch announcement.msg.(type) {
334✔
1477
                        // Channel announcement signatures are amongst the only
1478
                        // messages that we'll process serially.
1479
                        case *lnwire.AnnounceSignatures1:
24✔
1480
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
24✔
1481
                                        announcement,
24✔
1482
                                )
24✔
1483
                                log.Debugf("Processed network message %s, "+
24✔
1484
                                        "returned len(announcements)=%v",
24✔
1485
                                        announcement.msg.MsgType(),
24✔
1486
                                        len(emittedAnnouncements))
24✔
1487

24✔
1488
                                if emittedAnnouncements != nil {
37✔
1489
                                        announcements.AddMsgs(
13✔
1490
                                                emittedAnnouncements...,
13✔
1491
                                        )
13✔
1492
                                }
13✔
1493
                                continue
24✔
1494
                        }
1495

1496
                        // If this message was recently rejected, then we won't
1497
                        // attempt to re-process it.
1498
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
313✔
1499
                                announcement.msg,
313✔
1500
                                sourceToPub(announcement.source),
313✔
1501
                        ) {
314✔
1502

1✔
1503
                                announcement.err <- fmt.Errorf("recently " +
1✔
1504
                                        "rejected")
1✔
1505
                                continue
1✔
1506
                        }
1507

1508
                        // We'll set up any dependent, and wait until a free
1509
                        // slot for this job opens up, this allow us to not
1510
                        // have thousands of goroutines active.
1511
                        annJobID, err := d.vb.InitJobDependencies(
312✔
1512
                                announcement.msg,
312✔
1513
                        )
312✔
1514
                        if err != nil {
312✔
1515
                                announcement.err <- err
×
1516
                                continue
×
1517
                        }
1518

1519
                        d.wg.Add(1)
312✔
1520
                        go d.handleNetworkMessages(
312✔
1521
                                announcement, &announcements, annJobID,
312✔
1522
                        )
312✔
1523

1524
                // The trickle timer has ticked, which indicates we should
1525
                // flush to the network the pending batch of new announcements
1526
                // we've received since the last trickle tick.
1527
                case <-trickleTimer.C:
289✔
1528
                        // Emit the current batch of announcements from
289✔
1529
                        // deDupedAnnouncements.
289✔
1530
                        announcementBatch := announcements.Emit()
289✔
1531

289✔
1532
                        // If the current announcements batch is nil, then we
289✔
1533
                        // have no further work here.
289✔
1534
                        if announcementBatch.isEmpty() {
547✔
1535
                                continue
258✔
1536
                        }
1537

1538
                        // At this point, we have the set of local and remote
1539
                        // announcements we want to send out. We'll do the
1540
                        // batching as normal for both, but for local
1541
                        // announcements, we'll blast them out w/o regard for
1542
                        // our peer's policies so we ensure they propagate
1543
                        // properly.
1544
                        d.splitAndSendAnnBatch(announcementBatch)
34✔
1545

1546
                // The retransmission timer has ticked which indicates that we
1547
                // should check if we need to prune or re-broadcast any of our
1548
                // personal channels or node announcement. This addresses the
1549
                // case of "zombie" channels and channel advertisements that
1550
                // have been dropped, or not properly propagated through the
1551
                // network.
1552
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1✔
1553
                        if err := d.retransmitStaleAnns(tick); err != nil {
1✔
1554
                                log.Errorf("unable to rebroadcast stale "+
×
1555
                                        "announcements: %v", err)
×
1556
                        }
×
1557

1558
                // The gossiper has been signalled to exit, to we exit our
1559
                // main loop so the wait group can be decremented.
1560
                case <-d.quit:
32✔
1561
                        return
32✔
1562
                }
1563
        }
1564
}
1565

1566
// handleNetworkMessages is responsible for waiting for dependencies for a
1567
// given network message and processing the message. Once processed, it will
1568
// signal its dependants and add the new announcements to the announce batch.
1569
//
1570
// NOTE: must be run as a goroutine.
1571
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1572
        deDuped *deDupedAnnouncements, jobID JobID) {
312✔
1573

312✔
1574
        defer d.wg.Done()
312✔
1575
        defer d.vb.CompleteJob()
312✔
1576

312✔
1577
        // We should only broadcast this message forward if it originated from
312✔
1578
        // us or it wasn't received as part of our initial historical sync.
312✔
1579
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
312✔
1580

312✔
1581
        // If this message has an existing dependency, then we'll wait until
312✔
1582
        // that has been fully validated before we proceed.
312✔
1583
        err := d.vb.WaitForParents(jobID, nMsg.msg)
312✔
1584
        if err != nil {
312✔
1585
                log.Debugf("Validating network message %s got err: %v",
×
1586
                        nMsg.msg.MsgType(), err)
×
1587

×
1588
                if errors.Is(err, ErrVBarrierShuttingDown) {
×
1589
                        log.Warnf("unexpected error during validation "+
×
1590
                                "barrier shutdown: %v", err)
×
1591
                }
×
1592
                nMsg.err <- err
×
1593

×
1594
                return
×
1595
        }
1596

1597
        // Process the network announcement to determine if this is either a
1598
        // new announcement from our PoV or an edges to a prior vertex/edge we
1599
        // previously proceeded.
1600
        newAnns, allow := d.processNetworkAnnouncement(nMsg)
312✔
1601

312✔
1602
        log.Tracef("Processed network message %s, returned "+
312✔
1603
                "len(announcements)=%v, allowDependents=%v",
312✔
1604
                nMsg.msg.MsgType(), len(newAnns), allow)
312✔
1605

312✔
1606
        // If this message had any dependencies, then we can now signal them to
312✔
1607
        // continue.
312✔
1608
        err = d.vb.SignalDependents(nMsg.msg, jobID)
312✔
1609
        if err != nil {
312✔
1610
                // Something is wrong if SignalDependents returns an error.
×
1611
                log.Errorf("SignalDependents returned error for msg=%v with "+
×
1612
                        "JobID=%v", spew.Sdump(nMsg.msg), jobID)
×
1613

×
1614
                nMsg.err <- err
×
1615

×
1616
                return
×
1617
        }
×
1618

1619
        // If the announcement was accepted, then add the emitted announcements
1620
        // to our announce batch to be broadcast once the trickle timer ticks
1621
        // gain.
1622
        if newAnns != nil && shouldBroadcast {
347✔
1623
                // TODO(roasbeef): exclude peer that sent.
35✔
1624
                deDuped.AddMsgs(newAnns...)
35✔
1625
        } else if newAnns != nil {
319✔
1626
                log.Trace("Skipping broadcast of announcements received " +
4✔
1627
                        "during initial graph sync")
4✔
1628
        }
4✔
1629
}
1630

1631
// TODO(roasbeef): d/c peers that send updates not on our chain
1632

1633
// InitSyncState is called by outside sub-systems when a connection is
1634
// established to a new peer that understands how to perform channel range
1635
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1636
// needed to handle new queries.
1637
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
3✔
1638
        d.syncMgr.InitSyncState(syncPeer)
3✔
1639
}
3✔
1640

1641
// PruneSyncState is called by outside sub-systems once a peer that we were
1642
// previously connected to has been disconnected. In this case we can stop the
1643
// existing GossipSyncer assigned to the peer and free up resources.
1644
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
3✔
1645
        d.syncMgr.PruneSyncState(peer)
3✔
1646
}
3✔
1647

1648
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1649
// false otherwise, This avoids expensive reprocessing of the message.
1650
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1651
        peerPub [33]byte) bool {
276✔
1652

276✔
1653
        var scid uint64
276✔
1654
        switch m := msg.(type) {
276✔
1655
        case *lnwire.ChannelUpdate1:
45✔
1656
                scid = m.ShortChannelID.ToUint64()
45✔
1657

1658
        case *lnwire.ChannelAnnouncement1:
220✔
1659
                scid = m.ShortChannelID.ToUint64()
220✔
1660

1661
        default:
17✔
1662
                return false
17✔
1663
        }
1664

1665
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
262✔
1666
        return err != cache.ErrElementNotFound
262✔
1667
}
1668

1669
// retransmitStaleAnns examines all outgoing channels that the source node is
1670
// known to maintain to check to see if any of them are "stale". A channel is
1671
// stale iff, the last timestamp of its rebroadcast is older than the
1672
// RebroadcastInterval. We also check if a refreshed node announcement should
1673
// be resent.
1674
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
33✔
1675
        // Iterate over all of our channels and check if any of them fall
33✔
1676
        // within the prune interval or re-broadcast interval.
33✔
1677
        type updateTuple struct {
33✔
1678
                info *models.ChannelEdgeInfo
33✔
1679
                edge *models.ChannelEdgePolicy
33✔
1680
        }
33✔
1681

33✔
1682
        var (
33✔
1683
                havePublicChannels bool
33✔
1684
                edgesToUpdate      []updateTuple
33✔
1685
        )
33✔
1686
        err := d.cfg.Graph.ForAllOutgoingChannels(func(
33✔
1687
                info *models.ChannelEdgeInfo,
33✔
1688
                edge *models.ChannelEdgePolicy) error {
38✔
1689

5✔
1690
                // If there's no auth proof attached to this edge, it means
5✔
1691
                // that it is a private channel not meant to be announced to
5✔
1692
                // the greater network, so avoid sending channel updates for
5✔
1693
                // this channel to not leak its
5✔
1694
                // existence.
5✔
1695
                if info.AuthProof == nil {
9✔
1696
                        log.Debugf("Skipping retransmission of channel "+
4✔
1697
                                "without AuthProof: %v", info.ChannelID)
4✔
1698
                        return nil
4✔
1699
                }
4✔
1700

1701
                // We make a note that we have at least one public channel. We
1702
                // use this to determine whether we should send a node
1703
                // announcement below.
1704
                havePublicChannels = true
4✔
1705

4✔
1706
                // If this edge has a ChannelUpdate that was created before the
4✔
1707
                // introduction of the MaxHTLC field, then we'll update this
4✔
1708
                // edge to propagate this information in the network.
4✔
1709
                if !edge.MessageFlags.HasMaxHtlc() {
4✔
1710
                        // We'll make sure we support the new max_htlc field if
×
1711
                        // not already present.
×
1712
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1713
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1714

×
1715
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1716
                                info: info,
×
1717
                                edge: edge,
×
1718
                        })
×
1719
                        return nil
×
1720
                }
×
1721

1722
                timeElapsed := now.Sub(edge.LastUpdate)
4✔
1723

4✔
1724
                // If it's been longer than RebroadcastInterval since we've
4✔
1725
                // re-broadcasted the channel, add the channel to the set of
4✔
1726
                // edges we need to update.
4✔
1727
                if timeElapsed >= d.cfg.RebroadcastInterval {
5✔
1728
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
1✔
1729
                                info: info,
1✔
1730
                                edge: edge,
1✔
1731
                        })
1✔
1732
                }
1✔
1733

1734
                return nil
4✔
1735
        })
1736
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
33✔
1737
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1738
                        err)
×
1739
        }
×
1740

1741
        var signedUpdates []lnwire.Message
33✔
1742
        for _, chanToUpdate := range edgesToUpdate {
34✔
1743
                // Re-sign and update the channel on disk and retrieve our
1✔
1744
                // ChannelUpdate to broadcast.
1✔
1745
                chanAnn, chanUpdate, err := d.updateChannel(
1✔
1746
                        chanToUpdate.info, chanToUpdate.edge,
1✔
1747
                )
1✔
1748
                if err != nil {
1✔
1749
                        return fmt.Errorf("unable to update channel: %w", err)
×
1750
                }
×
1751

1752
                // If we have a valid announcement to transmit, then we'll send
1753
                // that along with the update.
1754
                if chanAnn != nil {
2✔
1755
                        signedUpdates = append(signedUpdates, chanAnn)
1✔
1756
                }
1✔
1757

1758
                signedUpdates = append(signedUpdates, chanUpdate)
1✔
1759
        }
1760

1761
        // If we don't have any public channels, we return as we don't want to
1762
        // broadcast anything that would reveal our existence.
1763
        if !havePublicChannels {
65✔
1764
                return nil
32✔
1765
        }
32✔
1766

1767
        // We'll also check that our NodeAnnouncement is not too old.
1768
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
4✔
1769
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
4✔
1770
        timeElapsed := now.Sub(timestamp)
4✔
1771

4✔
1772
        // If it's been a full day since we've re-broadcasted the
4✔
1773
        // node announcement, refresh it and resend it.
4✔
1774
        nodeAnnStr := ""
4✔
1775
        if timeElapsed >= d.cfg.RebroadcastInterval {
5✔
1776
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
1✔
1777
                if err != nil {
1✔
1778
                        return fmt.Errorf("unable to get refreshed node "+
×
1779
                                "announcement: %v", err)
×
1780
                }
×
1781

1782
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1783
                nodeAnnStr = " and our refreshed node announcement"
1✔
1784

1✔
1785
                // Before broadcasting the refreshed node announcement, add it
1✔
1786
                // to our own graph.
1✔
1787
                if err := d.addNode(&newNodeAnn); err != nil {
2✔
1788
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1789
                                "to graph: %v", err)
1✔
1790
                }
1✔
1791
        }
1792

1793
        // If we don't have any updates to re-broadcast, then we'll exit
1794
        // early.
1795
        if len(signedUpdates) == 0 {
7✔
1796
                return nil
3✔
1797
        }
3✔
1798

1799
        log.Infof("Retransmitting %v outgoing channels%v",
1✔
1800
                len(edgesToUpdate), nodeAnnStr)
1✔
1801

1✔
1802
        // With all the wire announcements properly crafted, we'll broadcast
1✔
1803
        // our known outgoing channels to all our immediate peers.
1✔
1804
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
1805
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1806
        }
×
1807

1808
        return nil
1✔
1809
}
1810

1811
// processChanPolicyUpdate generates a new set of channel updates for the
1812
// provided list of edges and updates the backing ChannelGraphSource.
1813
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
1814
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
4✔
1815

4✔
1816
        var chanUpdates []networkMsg
4✔
1817
        for _, edgeInfo := range edgesToUpdate {
10✔
1818
                // Now that we've collected all the channels we need to update,
6✔
1819
                // we'll re-sign and update the backing ChannelGraphSource, and
6✔
1820
                // retrieve our ChannelUpdate to broadcast.
6✔
1821
                _, chanUpdate, err := d.updateChannel(
6✔
1822
                        edgeInfo.Info, edgeInfo.Edge,
6✔
1823
                )
6✔
1824
                if err != nil {
6✔
1825
                        return nil, err
×
1826
                }
×
1827

1828
                // We'll avoid broadcasting any updates for private channels to
1829
                // avoid directly giving away their existence. Instead, we'll
1830
                // send the update directly to the remote party.
1831
                if edgeInfo.Info.AuthProof == nil {
10✔
1832
                        // If AuthProof is nil and an alias was found for this
4✔
1833
                        // ChannelID (meaning the option-scid-alias feature was
4✔
1834
                        // negotiated), we'll replace the ShortChannelID in the
4✔
1835
                        // update with the peer's alias. We do this after
4✔
1836
                        // updateChannel so that the alias isn't persisted to
4✔
1837
                        // the database.
4✔
1838
                        chanID := lnwire.NewChanIDFromOutPoint(
4✔
1839
                                edgeInfo.Info.ChannelPoint,
4✔
1840
                        )
4✔
1841

4✔
1842
                        var defaultAlias lnwire.ShortChannelID
4✔
1843
                        foundAlias, _ := d.cfg.GetAlias(chanID)
4✔
1844
                        if foundAlias != defaultAlias {
7✔
1845
                                chanUpdate.ShortChannelID = foundAlias
3✔
1846

3✔
1847
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
3✔
1848
                                if err != nil {
3✔
1849
                                        log.Errorf("Unable to sign alias "+
×
1850
                                                "update: %v", err)
×
1851
                                        continue
×
1852
                                }
1853

1854
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
1855
                                if err != nil {
3✔
1856
                                        log.Errorf("Unable to create sig: %v",
×
1857
                                                err)
×
1858
                                        continue
×
1859
                                }
1860

1861
                                chanUpdate.Signature = lnSig
3✔
1862
                        }
1863

1864
                        remotePubKey := remotePubFromChanInfo(
4✔
1865
                                edgeInfo.Info, chanUpdate.ChannelFlags,
4✔
1866
                        )
4✔
1867
                        err := d.reliableSender.sendMessage(
4✔
1868
                                chanUpdate, remotePubKey,
4✔
1869
                        )
4✔
1870
                        if err != nil {
4✔
1871
                                log.Errorf("Unable to reliably send %v for "+
×
1872
                                        "channel=%v to peer=%x: %v",
×
1873
                                        chanUpdate.MsgType(),
×
1874
                                        chanUpdate.ShortChannelID,
×
1875
                                        remotePubKey, err)
×
1876
                        }
×
1877
                        continue
4✔
1878
                }
1879

1880
                // We set ourselves as the source of this message to indicate
1881
                // that we shouldn't skip any peers when sending this message.
1882
                chanUpdates = append(chanUpdates, networkMsg{
5✔
1883
                        source:   d.selfKey,
5✔
1884
                        isRemote: false,
5✔
1885
                        msg:      chanUpdate,
5✔
1886
                })
5✔
1887
        }
1888

1889
        return chanUpdates, nil
4✔
1890
}
1891

1892
// remotePubFromChanInfo returns the public key of the remote peer given a
1893
// ChannelEdgeInfo that describe a channel we have with them.
1894
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1895
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
15✔
1896

15✔
1897
        var remotePubKey [33]byte
15✔
1898
        switch {
15✔
1899
        case chanFlags&lnwire.ChanUpdateDirection == 0:
15✔
1900
                remotePubKey = chanInfo.NodeKey2Bytes
15✔
1901
        case chanFlags&lnwire.ChanUpdateDirection == 1:
3✔
1902
                remotePubKey = chanInfo.NodeKey1Bytes
3✔
1903
        }
1904

1905
        return remotePubKey
15✔
1906
}
1907

1908
// processRejectedEdge examines a rejected edge to see if we can extract any
1909
// new announcements from it.  An edge will get rejected if we already added
1910
// the same edge without AuthProof to the graph. If the received announcement
1911
// contains a proof, we can add this proof to our edge.  We can end up in this
1912
// situation in the case where we create a channel, but for some reason fail
1913
// to receive the remote peer's proof, while the remote peer is able to fully
1914
// assemble the proof and craft the ChannelAnnouncement.
1915
func (d *AuthenticatedGossiper) processRejectedEdge(
1916
        chanAnnMsg *lnwire.ChannelAnnouncement1,
1917
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
3✔
1918

3✔
1919
        // First, we'll fetch the state of the channel as we know if from the
3✔
1920
        // database.
3✔
1921
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
3✔
1922
                chanAnnMsg.ShortChannelID,
3✔
1923
        )
3✔
1924
        if err != nil {
3✔
1925
                return nil, err
×
1926
        }
×
1927

1928
        // The edge is in the graph, and has a proof attached, then we'll just
1929
        // reject it as normal.
1930
        if chanInfo.AuthProof != nil {
6✔
1931
                return nil, nil
3✔
1932
        }
3✔
1933

1934
        // Otherwise, this means that the edge is within the graph, but it
1935
        // doesn't yet have a proper proof attached. If we did not receive
1936
        // the proof such that we now can add it, there's nothing more we
1937
        // can do.
1938
        if proof == nil {
×
1939
                return nil, nil
×
1940
        }
×
1941

1942
        // We'll then create then validate the new fully assembled
1943
        // announcement.
1944
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1945
                proof, chanInfo, e1, e2,
×
1946
        )
×
1947
        if err != nil {
×
1948
                return nil, err
×
1949
        }
×
1950
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1951
        if err != nil {
×
1952
                err := fmt.Errorf("assembled channel announcement proof "+
×
1953
                        "for shortChanID=%v isn't valid: %v",
×
1954
                        chanAnnMsg.ShortChannelID, err)
×
1955
                log.Error(err)
×
1956
                return nil, err
×
1957
        }
×
1958

1959
        // If everything checks out, then we'll add the fully assembled proof
1960
        // to the database.
1961
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
1962
        if err != nil {
×
1963
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
1964
                        chanAnnMsg.ShortChannelID, err)
×
1965
                log.Error(err)
×
1966
                return nil, err
×
1967
        }
×
1968

1969
        // As we now have a complete channel announcement for this channel,
1970
        // we'll construct the announcement so they can be broadcast out to all
1971
        // our peers.
1972
        announcements := make([]networkMsg, 0, 3)
×
1973
        announcements = append(announcements, networkMsg{
×
1974
                source: d.selfKey,
×
1975
                msg:    chanAnn,
×
1976
        })
×
1977
        if e1Ann != nil {
×
1978
                announcements = append(announcements, networkMsg{
×
1979
                        source: d.selfKey,
×
1980
                        msg:    e1Ann,
×
1981
                })
×
1982
        }
×
1983
        if e2Ann != nil {
×
1984
                announcements = append(announcements, networkMsg{
×
1985
                        source: d.selfKey,
×
1986
                        msg:    e2Ann,
×
1987
                })
×
1988

×
1989
        }
×
1990

1991
        return announcements, nil
×
1992
}
1993

1994
// fetchPKScript fetches the output script for the given SCID.
1995
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
1996
        []byte, error) {
×
1997

×
1998
        return lnwallet.FetchPKScriptWithQuit(d.cfg.ChainIO, chanID, d.quit)
×
1999
}
×
2000

2001
// addNode processes the given node announcement, and adds it to our channel
2002
// graph.
2003
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
2004
        op ...batch.SchedulerOption) error {
20✔
2005

20✔
2006
        if err := netann.ValidateNodeAnn(msg); err != nil {
21✔
2007
                return fmt.Errorf("unable to validate node announcement: %w",
1✔
2008
                        err)
1✔
2009
        }
1✔
2010

2011
        return d.cfg.Graph.AddNode(models.NodeFromWireAnnouncement(msg), op...)
19✔
2012
}
2013

2014
// isPremature decides whether a given network message has a block height+delta
2015
// value specified in the future. If so, the message will be added to the
2016
// future message map and be processed when the block height as reached.
2017
//
2018
// NOTE: must be used inside a lock.
2019
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2020
        delta uint32, msg *networkMsg) bool {
285✔
2021

285✔
2022
        // The channel is already confirmed at chanID.BlockHeight so we minus
285✔
2023
        // one block. For instance, if the required confirmation for this
285✔
2024
        // channel announcement is 6, we then only need to wait for 5 more
285✔
2025
        // blocks once the funding tx is confirmed.
285✔
2026
        if delta > 0 {
288✔
2027
                delta--
3✔
2028
        }
3✔
2029

2030
        msgHeight := chanID.BlockHeight + delta
285✔
2031

285✔
2032
        // The message height is smaller or equal to our best known height,
285✔
2033
        // thus the message is mature.
285✔
2034
        if msgHeight <= d.bestHeight {
569✔
2035
                return false
284✔
2036
        }
284✔
2037

2038
        // Add the premature message to our future messages which will be
2039
        // resent once the block height has reached.
2040
        //
2041
        // Copy the networkMsgs since the old message's err chan will be
2042
        // consumed.
2043
        copied := &networkMsg{
4✔
2044
                peer:              msg.peer,
4✔
2045
                source:            msg.source,
4✔
2046
                msg:               msg.msg,
4✔
2047
                optionalMsgFields: msg.optionalMsgFields,
4✔
2048
                isRemote:          msg.isRemote,
4✔
2049
                err:               make(chan error, 1),
4✔
2050
        }
4✔
2051

4✔
2052
        // Create the cached message.
4✔
2053
        cachedMsg := &cachedFutureMsg{
4✔
2054
                msg:    copied,
4✔
2055
                height: msgHeight,
4✔
2056
        }
4✔
2057

4✔
2058
        // Increment the msg ID and add it to the cache.
4✔
2059
        nextMsgID := d.futureMsgs.nextMsgID()
4✔
2060
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
4✔
2061
        if err != nil {
4✔
2062
                log.Errorf("Adding future message got error: %v", err)
×
2063
        }
×
2064

2065
        log.Debugf("Network message: %v added to future messages for "+
4✔
2066
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
4✔
2067
                msgHeight, d.bestHeight)
4✔
2068

4✔
2069
        return true
4✔
2070
}
2071

2072
// processNetworkAnnouncement processes a new network relate authenticated
2073
// channel or node announcement or announcements proofs. If the announcement
2074
// didn't affect the internal state due to either being out of date, invalid,
2075
// or redundant, then nil is returned. Otherwise, the set of announcements will
2076
// be returned which should be broadcasted to the rest of the network. The
2077
// boolean returned indicates whether any dependents of the announcement should
2078
// attempt to be processed as well.
2079
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
2080
        nMsg *networkMsg) ([]networkMsg, bool) {
333✔
2081

333✔
2082
        // If this is a remote update, we set the scheduler option to lazily
333✔
2083
        // add it to the graph.
333✔
2084
        var schedulerOp []batch.SchedulerOption
333✔
2085
        if nMsg.isRemote {
619✔
2086
                schedulerOp = append(schedulerOp, batch.LazyAdd())
286✔
2087
        }
286✔
2088

2089
        switch msg := nMsg.msg.(type) {
333✔
2090
        // A new node announcement has arrived which either presents new
2091
        // information about a node in one of the channels we know about, or a
2092
        // updating previously advertised information.
2093
        case *lnwire.NodeAnnouncement:
27✔
2094
                return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
27✔
2095

2096
        // A new channel announcement has arrived, this indicates the
2097
        // *creation* of a new channel within the network. This only advertises
2098
        // the existence of a channel and not yet the routing policies in
2099
        // either direction of the channel.
2100
        case *lnwire.ChannelAnnouncement1:
233✔
2101
                return d.handleChanAnnouncement(nMsg, msg, schedulerOp...)
233✔
2102

2103
        // A new authenticated channel edge update has arrived. This indicates
2104
        // that the directional information for an already known channel has
2105
        // been updated.
2106
        case *lnwire.ChannelUpdate1:
58✔
2107
                return d.handleChanUpdate(nMsg, msg, schedulerOp)
58✔
2108

2109
        // A new signature announcement has been received. This indicates
2110
        // willingness of nodes involved in the funding of a channel to
2111
        // announce this new channel to the rest of the world.
2112
        case *lnwire.AnnounceSignatures1:
24✔
2113
                return d.handleAnnSig(nMsg, msg)
24✔
2114

2115
        default:
×
2116
                err := errors.New("wrong type of the announcement")
×
2117
                nMsg.err <- err
×
2118
                return nil, false
×
2119
        }
2120
}
2121

2122
// processZombieUpdate determines whether the provided channel update should
2123
// resurrect a given zombie edge.
2124
//
2125
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2126
// should be inspected.
2127
func (d *AuthenticatedGossiper) processZombieUpdate(
2128
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2129
        msg *lnwire.ChannelUpdate1) error {
3✔
2130

3✔
2131
        // The least-significant bit in the flag on the channel update tells us
3✔
2132
        // which edge is being updated.
3✔
2133
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
2134

3✔
2135
        // Since we've deemed the update as not stale above, before marking it
3✔
2136
        // live, we'll make sure it has been signed by the correct party. If we
3✔
2137
        // have both pubkeys, either party can resurrect the channel. If we've
3✔
2138
        // already marked this with the stricter, single-sided resurrection we
3✔
2139
        // will only have the pubkey of the node with the oldest timestamp.
3✔
2140
        var pubKey *btcec.PublicKey
3✔
2141
        switch {
3✔
2142
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2143
                pubKey, _ = chanInfo.NodeKey1()
×
2144
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
2✔
2145
                pubKey, _ = chanInfo.NodeKey2()
2✔
2146
        }
2147
        if pubKey == nil {
4✔
2148
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
1✔
2149
                        "with chan_id=%v", msg.ShortChannelID)
1✔
2150
        }
1✔
2151

2152
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
2✔
2153
        if err != nil {
3✔
2154
                return fmt.Errorf("unable to verify channel "+
1✔
2155
                        "update signature: %v", err)
1✔
2156
        }
1✔
2157

2158
        // With the signature valid, we'll proceed to mark the
2159
        // edge as live and wait for the channel announcement to
2160
        // come through again.
2161
        err = d.cfg.Graph.MarkEdgeLive(scid)
1✔
2162
        switch {
1✔
2163
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2164
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2165
                        "zombie index: %v", err)
×
2166

×
2167
                return nil
×
2168

2169
        case err != nil:
×
2170
                return fmt.Errorf("unable to remove edge with "+
×
2171
                        "chan_id=%v from zombie index: %v",
×
2172
                        msg.ShortChannelID, err)
×
2173

2174
        default:
1✔
2175
        }
2176

2177
        log.Debugf("Removed edge with chan_id=%v from zombie "+
1✔
2178
                "index", msg.ShortChannelID)
1✔
2179

1✔
2180
        return nil
1✔
2181
}
2182

2183
// fetchNodeAnn fetches the latest signed node announcement from our point of
2184
// view for the node with the given public key.
2185
func (d *AuthenticatedGossiper) fetchNodeAnn(
2186
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
23✔
2187

23✔
2188
        node, err := d.cfg.Graph.FetchLightningNode(pubKey)
23✔
2189
        if err != nil {
29✔
2190
                return nil, err
6✔
2191
        }
6✔
2192

2193
        return node.NodeAnnouncement(true)
17✔
2194
}
2195

2196
// isMsgStale determines whether a message retrieved from the backing
2197
// MessageStore is seen as stale by the current graph.
2198
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
15✔
2199
        switch msg := msg.(type) {
15✔
2200
        case *lnwire.AnnounceSignatures1:
5✔
2201
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
5✔
2202
                        msg.ShortChannelID,
5✔
2203
                )
5✔
2204

5✔
2205
                // If the channel cannot be found, it is most likely a leftover
5✔
2206
                // message for a channel that was closed, so we can consider it
5✔
2207
                // stale.
5✔
2208
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
8✔
2209
                        return true
3✔
2210
                }
3✔
2211
                if err != nil {
5✔
2212
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2213
                                "%v", chanInfo.ChannelID, err)
×
2214
                        return false
×
2215
                }
×
2216

2217
                // If the proof exists in the graph, then we have successfully
2218
                // received the remote proof and assembled the full proof, so we
2219
                // can safely delete the local proof from the database.
2220
                return chanInfo.AuthProof != nil
5✔
2221

2222
        case *lnwire.ChannelUpdate1:
13✔
2223
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
13✔
2224

13✔
2225
                // If the channel cannot be found, it is most likely a leftover
13✔
2226
                // message for a channel that was closed, so we can consider it
13✔
2227
                // stale.
13✔
2228
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
16✔
2229
                        return true
3✔
2230
                }
3✔
2231
                if err != nil {
13✔
2232
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2233
                                "%v", msg.ShortChannelID, err)
×
2234
                        return false
×
2235
                }
×
2236

2237
                // Otherwise, we'll retrieve the correct policy that we
2238
                // currently have stored within our graph to check if this
2239
                // message is stale by comparing its timestamp.
2240
                var p *models.ChannelEdgePolicy
13✔
2241
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
26✔
2242
                        p = p1
13✔
2243
                } else {
16✔
2244
                        p = p2
3✔
2245
                }
3✔
2246

2247
                // If the policy is still unknown, then we can consider this
2248
                // policy fresh.
2249
                if p == nil {
13✔
2250
                        return false
×
2251
                }
×
2252

2253
                timestamp := time.Unix(int64(msg.Timestamp), 0)
13✔
2254
                return p.LastUpdate.After(timestamp)
13✔
2255

2256
        default:
×
2257
                // We'll make sure to not mark any unsupported messages as stale
×
2258
                // to ensure they are not removed.
×
2259
                return false
×
2260
        }
2261
}
2262

2263
// updateChannel creates a new fully signed update for the channel, and updates
2264
// the underlying graph with the new state.
2265
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
2266
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2267
        *lnwire.ChannelUpdate1, error) {
7✔
2268

7✔
2269
        // Parse the unsigned edge into a channel update.
7✔
2270
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
7✔
2271

7✔
2272
        // We'll generate a new signature over a digest of the channel
7✔
2273
        // announcement itself and update the timestamp to ensure it propagate.
7✔
2274
        err := netann.SignChannelUpdate(
7✔
2275
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
7✔
2276
                netann.ChanUpdSetTimestamp,
7✔
2277
        )
7✔
2278
        if err != nil {
7✔
2279
                return nil, nil, err
×
2280
        }
×
2281

2282
        // Next, we'll set the new signature in place, and update the reference
2283
        // in the backing slice.
2284
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
7✔
2285
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
7✔
2286

7✔
2287
        // To ensure that our signature is valid, we'll verify it ourself
7✔
2288
        // before committing it to the slice returned.
7✔
2289
        err = netann.ValidateChannelUpdateAnn(
7✔
2290
                d.selfKey, info.Capacity, chanUpdate,
7✔
2291
        )
7✔
2292
        if err != nil {
7✔
2293
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2294
                        "update sig: %v", err)
×
2295
        }
×
2296

2297
        // Finally, we'll write the new edge policy to disk.
2298
        if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
7✔
2299
                return nil, nil, err
×
2300
        }
×
2301

2302
        // We'll also create the original channel announcement so the two can
2303
        // be broadcast along side each other (if necessary), but only if we
2304
        // have a full channel announcement for this channel.
2305
        var chanAnn *lnwire.ChannelAnnouncement1
7✔
2306
        if info.AuthProof != nil {
13✔
2307
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
6✔
2308
                chanAnn = &lnwire.ChannelAnnouncement1{
6✔
2309
                        ShortChannelID:  chanID,
6✔
2310
                        NodeID1:         info.NodeKey1Bytes,
6✔
2311
                        NodeID2:         info.NodeKey2Bytes,
6✔
2312
                        ChainHash:       info.ChainHash,
6✔
2313
                        BitcoinKey1:     info.BitcoinKey1Bytes,
6✔
2314
                        Features:        lnwire.NewRawFeatureVector(),
6✔
2315
                        BitcoinKey2:     info.BitcoinKey2Bytes,
6✔
2316
                        ExtraOpaqueData: info.ExtraOpaqueData,
6✔
2317
                }
6✔
2318
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
6✔
2319
                        info.AuthProof.NodeSig1Bytes,
6✔
2320
                )
6✔
2321
                if err != nil {
6✔
2322
                        return nil, nil, err
×
2323
                }
×
2324
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
6✔
2325
                        info.AuthProof.NodeSig2Bytes,
6✔
2326
                )
6✔
2327
                if err != nil {
6✔
2328
                        return nil, nil, err
×
2329
                }
×
2330
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
6✔
2331
                        info.AuthProof.BitcoinSig1Bytes,
6✔
2332
                )
6✔
2333
                if err != nil {
6✔
2334
                        return nil, nil, err
×
2335
                }
×
2336
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
6✔
2337
                        info.AuthProof.BitcoinSig2Bytes,
6✔
2338
                )
6✔
2339
                if err != nil {
6✔
2340
                        return nil, nil, err
×
2341
                }
×
2342
        }
2343

2344
        return chanAnn, chanUpdate, err
7✔
2345
}
2346

2347
// SyncManager returns the gossiper's SyncManager instance.
2348
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
3✔
2349
        return d.syncMgr
3✔
2350
}
3✔
2351

2352
// IsKeepAliveUpdate determines whether this channel update is considered a
2353
// keep-alive update based on the previous channel update processed for the same
2354
// direction.
2355
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2356
        prev *models.ChannelEdgePolicy) bool {
17✔
2357

17✔
2358
        // Both updates should be from the same direction.
17✔
2359
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
17✔
2360
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
17✔
2361

×
2362
                return false
×
2363
        }
×
2364

2365
        // The timestamp should always increase for a keep-alive update.
2366
        timestamp := time.Unix(int64(update.Timestamp), 0)
17✔
2367
        if !timestamp.After(prev.LastUpdate) {
20✔
2368
                return false
3✔
2369
        }
3✔
2370

2371
        // None of the remaining fields should change for a keep-alive update.
2372
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
20✔
2373
                return false
3✔
2374
        }
3✔
2375
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
32✔
2376
                return false
15✔
2377
        }
15✔
2378
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
8✔
2379
                return false
3✔
2380
        }
3✔
2381
        if update.TimeLockDelta != prev.TimeLockDelta {
5✔
2382
                return false
×
2383
        }
×
2384
        if update.HtlcMinimumMsat != prev.MinHTLC {
5✔
2385
                return false
×
2386
        }
×
2387
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
5✔
2388
                return false
×
2389
        }
×
2390
        if update.HtlcMaximumMsat != prev.MaxHTLC {
5✔
2391
                return false
×
2392
        }
×
2393
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
8✔
2394
                return false
3✔
2395
        }
3✔
2396
        return true
5✔
2397
}
2398

2399
// latestHeight returns the gossiper's latest height known of the chain.
2400
func (d *AuthenticatedGossiper) latestHeight() uint32 {
3✔
2401
        d.Lock()
3✔
2402
        defer d.Unlock()
3✔
2403
        return d.bestHeight
3✔
2404
}
3✔
2405

2406
// handleNodeAnnouncement processes a new node announcement.
2407
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
2408
        nodeAnn *lnwire.NodeAnnouncement,
2409
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
27✔
2410

27✔
2411
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
27✔
2412

27✔
2413
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
27✔
2414
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
27✔
2415
                nMsg.source.SerializeCompressed())
27✔
2416

27✔
2417
        // We'll quickly ask the router if it already has a newer update for
27✔
2418
        // this node so we can skip validating signatures if not required.
27✔
2419
        if d.cfg.Graph.IsStaleNode(nodeAnn.NodeID, timestamp) {
38✔
2420
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
11✔
2421
                nMsg.err <- nil
11✔
2422
                return nil, true
11✔
2423
        }
11✔
2424

2425
        if err := d.addNode(nodeAnn, ops...); err != nil {
22✔
2426
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
3✔
2427
                        err)
3✔
2428

3✔
2429
                if !graph.IsError(
3✔
2430
                        err,
3✔
2431
                        graph.ErrOutdated,
3✔
2432
                        graph.ErrIgnored,
3✔
2433
                ) {
3✔
2434

×
2435
                        log.Error(err)
×
2436
                }
×
2437

2438
                nMsg.err <- err
3✔
2439
                return nil, false
3✔
2440
        }
2441

2442
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2443
        // quick check to ensure this node intends to publicly advertise itself
2444
        // to the network.
2445
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
19✔
2446
        if err != nil {
19✔
2447
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2448
                        nodeAnn.NodeID, err)
×
2449
                nMsg.err <- err
×
2450
                return nil, false
×
2451
        }
×
2452

2453
        var announcements []networkMsg
19✔
2454

19✔
2455
        // If it does, we'll add their announcement to our batch so that it can
19✔
2456
        // be broadcast to the rest of our peers.
19✔
2457
        if isPublic {
25✔
2458
                announcements = append(announcements, networkMsg{
6✔
2459
                        peer:     nMsg.peer,
6✔
2460
                        isRemote: nMsg.isRemote,
6✔
2461
                        source:   nMsg.source,
6✔
2462
                        msg:      nodeAnn,
6✔
2463
                })
6✔
2464
        } else {
22✔
2465
                log.Tracef("Skipping broadcasting node announcement for %x "+
16✔
2466
                        "due to being unadvertised", nodeAnn.NodeID)
16✔
2467
        }
16✔
2468

2469
        nMsg.err <- nil
19✔
2470
        // TODO(roasbeef): get rid of the above
19✔
2471

19✔
2472
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
19✔
2473
                "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
19✔
2474
                nMsg.source.SerializeCompressed())
19✔
2475

19✔
2476
        return announcements, true
19✔
2477
}
2478

2479
// handleChanAnnouncement processes a new channel announcement.
2480
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
2481
        ann *lnwire.ChannelAnnouncement1,
2482
        ops ...batch.SchedulerOption) ([]networkMsg, bool) {
236✔
2483

236✔
2484
        scid := ann.ShortChannelID
236✔
2485

236✔
2486
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
236✔
2487
                nMsg.peer, scid.ToUint64())
236✔
2488

236✔
2489
        // We'll ignore any channel announcements that target any chain other
236✔
2490
        // than the set of chains we know of.
236✔
2491
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
236✔
2492
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2493
                        ", gossiper on chain=%v", ann.ChainHash,
×
2494
                        d.cfg.ChainHash)
×
2495
                log.Errorf(err.Error())
×
2496

×
2497
                key := newRejectCacheKey(
×
2498
                        scid.ToUint64(),
×
2499
                        sourceToPub(nMsg.source),
×
2500
                )
×
2501
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2502

×
2503
                nMsg.err <- err
×
2504
                return nil, false
×
2505
        }
×
2506

2507
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2508
        // reject the announcement. Since the router accepts alias SCIDs,
2509
        // not erroring out would be a DoS vector.
2510
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
236✔
2511
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2512
                log.Errorf(err.Error())
×
2513

×
2514
                key := newRejectCacheKey(
×
2515
                        scid.ToUint64(),
×
2516
                        sourceToPub(nMsg.source),
×
2517
                )
×
2518
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2519

×
2520
                nMsg.err <- err
×
2521
                return nil, false
×
2522
        }
×
2523

2524
        // If the advertised inclusionary block is beyond our knowledge of the
2525
        // chain tip, then we'll ignore it for now.
2526
        d.Lock()
236✔
2527
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
237✔
2528
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
1✔
2529
                        "advertises height %v, only height %v is known",
1✔
2530
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
1✔
2531
                d.Unlock()
1✔
2532
                nMsg.err <- nil
1✔
2533
                return nil, false
1✔
2534
        }
1✔
2535
        d.Unlock()
235✔
2536

235✔
2537
        // At this point, we'll now ask the router if this is a zombie/known
235✔
2538
        // edge. If so we can skip all the processing below.
235✔
2539
        if d.cfg.Graph.IsKnownEdge(scid) {
239✔
2540
                nMsg.err <- nil
4✔
2541
                return nil, true
4✔
2542
        }
4✔
2543

2544
        // Check if the channel is already closed in which case we can ignore
2545
        // it.
2546
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
234✔
2547
        if err != nil {
234✔
2548
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2549
                        err)
×
2550
                nMsg.err <- err
×
2551

×
2552
                return nil, false
×
2553
        }
×
2554

2555
        if closed {
235✔
2556
                err = fmt.Errorf("ignoring closed channel %v", scid)
1✔
2557
                log.Error(err)
1✔
2558

1✔
2559
                // If this is an announcement from us, we'll just ignore it.
1✔
2560
                if !nMsg.isRemote {
1✔
2561
                        nMsg.err <- err
×
2562
                        return nil, false
×
2563
                }
×
2564

2565
                // Increment the peer's ban score if they are sending closed
2566
                // channel announcements.
2567
                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2568

1✔
2569
                // If the peer is banned and not a channel peer, we'll
1✔
2570
                // disconnect them.
1✔
2571
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2572
                if dcErr != nil {
1✔
2573
                        log.Errorf("failed to check if we should disconnect "+
×
2574
                                "peer: %v", dcErr)
×
2575
                        nMsg.err <- dcErr
×
2576

×
2577
                        return nil, false
×
2578
                }
×
2579

2580
                if shouldDc {
1✔
2581
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2582
                }
×
2583

2584
                nMsg.err <- err
1✔
2585

1✔
2586
                return nil, false
1✔
2587
        }
2588

2589
        // If this is a remote channel announcement, then we'll validate all
2590
        // the signatures within the proof as it should be well formed.
2591
        var proof *models.ChannelAuthProof
233✔
2592
        if nMsg.isRemote {
452✔
2593
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
219✔
2594
                if err != nil {
219✔
2595
                        err := fmt.Errorf("unable to validate announcement: "+
×
2596
                                "%v", err)
×
2597

×
2598
                        key := newRejectCacheKey(
×
2599
                                scid.ToUint64(),
×
2600
                                sourceToPub(nMsg.source),
×
2601
                        )
×
2602
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2603

×
2604
                        log.Error(err)
×
2605
                        nMsg.err <- err
×
2606
                        return nil, false
×
2607
                }
×
2608

2609
                // If the proof checks out, then we'll save the proof itself to
2610
                // the database so we can fetch it later when gossiping with
2611
                // other nodes.
2612
                proof = &models.ChannelAuthProof{
219✔
2613
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
219✔
2614
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
219✔
2615
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
219✔
2616
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
219✔
2617
                }
219✔
2618
        }
2619

2620
        // With the proof validated (if necessary), we can now store it within
2621
        // the database for our path finding and syncing needs.
2622
        var featureBuf bytes.Buffer
233✔
2623
        if err := ann.Features.Encode(&featureBuf); err != nil {
233✔
2624
                log.Errorf("unable to encode features: %v", err)
×
2625
                nMsg.err <- err
×
2626
                return nil, false
×
2627
        }
×
2628

2629
        edge := &models.ChannelEdgeInfo{
233✔
2630
                ChannelID:        scid.ToUint64(),
233✔
2631
                ChainHash:        ann.ChainHash,
233✔
2632
                NodeKey1Bytes:    ann.NodeID1,
233✔
2633
                NodeKey2Bytes:    ann.NodeID2,
233✔
2634
                BitcoinKey1Bytes: ann.BitcoinKey1,
233✔
2635
                BitcoinKey2Bytes: ann.BitcoinKey2,
233✔
2636
                AuthProof:        proof,
233✔
2637
                Features:         featureBuf.Bytes(),
233✔
2638
                ExtraOpaqueData:  ann.ExtraOpaqueData,
233✔
2639
        }
233✔
2640

233✔
2641
        // If there were any optional message fields provided, we'll include
233✔
2642
        // them in its serialized disk representation now.
233✔
2643
        var tapscriptRoot fn.Option[chainhash.Hash]
233✔
2644
        if nMsg.optionalMsgFields != nil {
250✔
2645
                if nMsg.optionalMsgFields.capacity != nil {
21✔
2646
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
4✔
2647
                }
4✔
2648
                if nMsg.optionalMsgFields.channelPoint != nil {
24✔
2649
                        cp := *nMsg.optionalMsgFields.channelPoint
7✔
2650
                        edge.ChannelPoint = cp
7✔
2651
                }
7✔
2652

2653
                // Optional tapscript root for custom channels.
2654
                tapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
17✔
2655
        }
2656

2657
        // Before we start validation or add the edge to the database, we obtain
2658
        // the mutex for this channel ID. We do this to ensure no other
2659
        // goroutine has read the database and is now making decisions based on
2660
        // this DB state, before it writes to the DB. It also ensures that we
2661
        // don't perform the expensive validation check on the same channel
2662
        // announcement at the same time.
2663
        d.channelMtx.Lock(scid.ToUint64())
233✔
2664

233✔
2665
        // If AssumeChannelValid is present, then we are unable to perform any
233✔
2666
        // of the expensive checks below, so we'll short-circuit our path
233✔
2667
        // straight to adding the edge to our graph. If the passed
233✔
2668
        // ShortChannelID is an alias, then we'll skip validation as it will
233✔
2669
        // not map to a legitimate tx. This is not a DoS vector as only we can
233✔
2670
        // add an alias ChannelAnnouncement from the gossiper.
233✔
2671
        if !(d.cfg.AssumeChannelValid || d.cfg.IsAlias(scid)) { //nolint:nestif
464✔
2672
                op, capacity, script, err := d.validateFundingTransaction(
231✔
2673
                        ann, tapscriptRoot,
231✔
2674
                )
231✔
2675
                if err != nil {
435✔
2676
                        defer d.channelMtx.Unlock(scid.ToUint64())
204✔
2677

204✔
2678
                        switch {
204✔
2679
                        case errors.Is(err, ErrNoFundingTransaction),
2680
                                errors.Is(err, ErrInvalidFundingOutput):
202✔
2681

202✔
2682
                                key := newRejectCacheKey(
202✔
2683
                                        scid.ToUint64(),
202✔
2684
                                        sourceToPub(nMsg.source),
202✔
2685
                                )
202✔
2686
                                _, _ = d.recentRejects.Put(
202✔
2687
                                        key, &cachedReject{},
202✔
2688
                                )
202✔
2689

202✔
2690
                                // Increment the peer's ban score. We check
202✔
2691
                                // isRemote so we don't actually ban the peer in
202✔
2692
                                // case of a local bug.
202✔
2693
                                if nMsg.isRemote {
404✔
2694
                                        d.banman.incrementBanScore(
202✔
2695
                                                nMsg.peer.PubKey(),
202✔
2696
                                        )
202✔
2697
                                }
202✔
2698

2699
                        case errors.Is(err, ErrChannelSpent):
2✔
2700
                                key := newRejectCacheKey(
2✔
2701
                                        scid.ToUint64(),
2✔
2702
                                        sourceToPub(nMsg.source),
2✔
2703
                                )
2✔
2704
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
2✔
2705

2✔
2706
                                // Since this channel has already been closed,
2✔
2707
                                // we'll add it to the graph's closed channel
2✔
2708
                                // index such that we won't attempt to do
2✔
2709
                                // expensive validation checks on it again.
2✔
2710
                                // TODO: Populate the ScidCloser by using closed
2✔
2711
                                // channel notifications.
2✔
2712
                                dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
2✔
2713
                                if dbErr != nil {
2✔
2714
                                        log.Errorf("failed to mark scid(%v) "+
×
2715
                                                "as closed: %v", scid, dbErr)
×
2716

×
2717
                                        nMsg.err <- dbErr
×
2718

×
2719
                                        return nil, false
×
2720
                                }
×
2721

2722
                                // Increment the peer's ban score. We check
2723
                                // isRemote so we don't accidentally ban
2724
                                // ourselves in case of a bug.
2725
                                if nMsg.isRemote {
4✔
2726
                                        d.banman.incrementBanScore(
2✔
2727
                                                nMsg.peer.PubKey(),
2✔
2728
                                        )
2✔
2729
                                }
2✔
2730

2731
                        default:
×
2732
                                // Otherwise, this is just a regular rejected
×
2733
                                // edge.
×
2734
                                key := newRejectCacheKey(
×
2735
                                        scid.ToUint64(),
×
2736
                                        sourceToPub(nMsg.source),
×
2737
                                )
×
2738
                                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2739
                        }
2740

2741
                        if !nMsg.isRemote {
204✔
2742
                                log.Errorf("failed to add edge for local "+
×
2743
                                        "channel: %v", err)
×
2744
                                nMsg.err <- err
×
2745

×
2746
                                return nil, false
×
2747
                        }
×
2748

2749
                        shouldDc, dcErr := d.ShouldDisconnect(
204✔
2750
                                nMsg.peer.IdentityKey(),
204✔
2751
                        )
204✔
2752
                        if dcErr != nil {
204✔
2753
                                log.Errorf("failed to check if we should "+
×
2754
                                        "disconnect peer: %v", dcErr)
×
2755
                                nMsg.err <- dcErr
×
2756

×
2757
                                return nil, false
×
2758
                        }
×
2759

2760
                        if shouldDc {
205✔
2761
                                nMsg.peer.Disconnect(ErrPeerBanned)
1✔
2762
                        }
1✔
2763

2764
                        nMsg.err <- err
204✔
2765

204✔
2766
                        return nil, false
204✔
2767
                }
2768

2769
                edge.FundingScript = fn.Some(script)
27✔
2770

27✔
2771
                // TODO(roasbeef): this is a hack, needs to be removed after
27✔
2772
                //  commitment fees are dynamic.
27✔
2773
                edge.Capacity = capacity
27✔
2774
                edge.ChannelPoint = op
27✔
2775
        }
2776

2777
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
29✔
2778

29✔
2779
        // We will add the edge to the channel router. If the nodes present in
29✔
2780
        // this channel are not present in the database, a partial node will be
29✔
2781
        // added to represent each node while we wait for a node announcement.
29✔
2782
        err = d.cfg.Graph.AddEdge(edge, ops...)
29✔
2783
        if err != nil {
33✔
2784
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
4✔
2785
                        scid.ToUint64(), err)
4✔
2786

4✔
2787
                defer d.channelMtx.Unlock(scid.ToUint64())
4✔
2788

4✔
2789
                // If the edge was rejected due to already being known, then it
4✔
2790
                // may be the case that this new message has a fresh channel
4✔
2791
                // proof, so we'll check.
4✔
2792
                if graph.IsError(err, graph.ErrIgnored) {
7✔
2793
                        // Attempt to process the rejected message to see if we
3✔
2794
                        // get any new announcements.
3✔
2795
                        anns, rErr := d.processRejectedEdge(ann, proof)
3✔
2796
                        if rErr != nil {
3✔
2797
                                key := newRejectCacheKey(
×
2798
                                        scid.ToUint64(),
×
2799
                                        sourceToPub(nMsg.source),
×
2800
                                )
×
2801
                                cr := &cachedReject{}
×
2802
                                _, _ = d.recentRejects.Put(key, cr)
×
2803

×
2804
                                nMsg.err <- rErr
×
2805

×
2806
                                return nil, false
×
2807
                        }
×
2808

2809
                        log.Debugf("Extracted %v announcements from rejected "+
3✔
2810
                                "msgs", len(anns))
3✔
2811

3✔
2812
                        // If while processing this rejected edge, we realized
3✔
2813
                        // there's a set of announcements we could extract,
3✔
2814
                        // then we'll return those directly.
3✔
2815
                        //
3✔
2816
                        // NOTE: since this is an ErrIgnored, we can return
3✔
2817
                        // true here to signal "allow" to its dependants.
3✔
2818
                        nMsg.err <- nil
3✔
2819

3✔
2820
                        return anns, true
3✔
2821
                }
2822

2823
                // Otherwise, this is just a regular rejected edge.
2824
                key := newRejectCacheKey(
1✔
2825
                        scid.ToUint64(),
1✔
2826
                        sourceToPub(nMsg.source),
1✔
2827
                )
1✔
2828
                _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2829

1✔
2830
                if !nMsg.isRemote {
1✔
2831
                        log.Errorf("failed to add edge for local channel: %v",
×
2832
                                err)
×
2833
                        nMsg.err <- err
×
2834

×
2835
                        return nil, false
×
2836
                }
×
2837

2838
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2839
                if dcErr != nil {
1✔
2840
                        log.Errorf("failed to check if we should disconnect "+
×
2841
                                "peer: %v", dcErr)
×
2842
                        nMsg.err <- dcErr
×
2843

×
2844
                        return nil, false
×
2845
                }
×
2846

2847
                if shouldDc {
1✔
2848
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2849
                }
×
2850

2851
                nMsg.err <- err
1✔
2852

1✔
2853
                return nil, false
1✔
2854
        }
2855

2856
        // If err is nil, release the lock immediately.
2857
        d.channelMtx.Unlock(scid.ToUint64())
28✔
2858

28✔
2859
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
28✔
2860

28✔
2861
        // If we earlier received any ChannelUpdates for this channel, we can
28✔
2862
        // now process them, as the channel is added to the graph.
28✔
2863
        var channelUpdates []*processedNetworkMsg
28✔
2864

28✔
2865
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
28✔
2866
        if err == nil {
33✔
2867
                // There was actually an entry in the map, so we'll accumulate
5✔
2868
                // it. We don't worry about deletion, since it'll eventually
5✔
2869
                // fall out anyway.
5✔
2870
                chanMsgs := earlyChanUpdates
5✔
2871
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
5✔
2872
        }
5✔
2873

2874
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2875
        // ensure we don't block here, as we can handle only one announcement
2876
        // at a time.
2877
        for _, cu := range channelUpdates {
33✔
2878
                // Skip if already processed.
5✔
2879
                if cu.processed {
6✔
2880
                        continue
1✔
2881
                }
2882

2883
                // Mark the ChannelUpdate as processed. This ensures that a
2884
                // subsequent announcement in the option-scid-alias case does
2885
                // not re-use an old ChannelUpdate.
2886
                cu.processed = true
5✔
2887

5✔
2888
                d.wg.Add(1)
5✔
2889
                go func(updMsg *networkMsg) {
10✔
2890
                        defer d.wg.Done()
5✔
2891

5✔
2892
                        switch msg := updMsg.msg.(type) {
5✔
2893
                        // Reprocess the message, making sure we return an
2894
                        // error to the original caller in case the gossiper
2895
                        // shuts down.
2896
                        case *lnwire.ChannelUpdate1:
5✔
2897
                                log.Debugf("Reprocessing ChannelUpdate for "+
5✔
2898
                                        "shortChanID=%v", scid.ToUint64())
5✔
2899

5✔
2900
                                select {
5✔
2901
                                case d.networkMsgs <- updMsg:
5✔
2902
                                case <-d.quit:
×
2903
                                        updMsg.err <- ErrGossiperShuttingDown
×
2904
                                }
2905

2906
                        // We don't expect any other message type than
2907
                        // ChannelUpdate to be in this cache.
2908
                        default:
×
2909
                                log.Errorf("Unsupported message type found "+
×
2910
                                        "among ChannelUpdates: %T", msg)
×
2911
                        }
2912
                }(cu.msg)
2913
        }
2914

2915
        // Channel announcement was successfully processed and now it might be
2916
        // broadcast to other connected nodes if it was an announcement with
2917
        // proof (remote).
2918
        var announcements []networkMsg
28✔
2919

28✔
2920
        if proof != nil {
42✔
2921
                announcements = append(announcements, networkMsg{
14✔
2922
                        peer:     nMsg.peer,
14✔
2923
                        isRemote: nMsg.isRemote,
14✔
2924
                        source:   nMsg.source,
14✔
2925
                        msg:      ann,
14✔
2926
                })
14✔
2927
        }
14✔
2928

2929
        nMsg.err <- nil
28✔
2930

28✔
2931
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
28✔
2932
                nMsg.peer, scid.ToUint64())
28✔
2933

28✔
2934
        return announcements, true
28✔
2935
}
2936

2937
// handleChanUpdate processes a new channel update.
2938
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
2939
        upd *lnwire.ChannelUpdate1,
2940
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
58✔
2941

58✔
2942
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
58✔
2943
                nMsg.peer, upd.ShortChannelID.ToUint64())
58✔
2944

58✔
2945
        // We'll ignore any channel updates that target any chain other than
58✔
2946
        // the set of chains we know of.
58✔
2947
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
58✔
2948
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2949
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
×
2950
                log.Errorf(err.Error())
×
2951

×
2952
                key := newRejectCacheKey(
×
2953
                        upd.ShortChannelID.ToUint64(),
×
2954
                        sourceToPub(nMsg.source),
×
2955
                )
×
2956
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2957

×
2958
                nMsg.err <- err
×
2959
                return nil, false
×
2960
        }
×
2961

2962
        blockHeight := upd.ShortChannelID.BlockHeight
58✔
2963
        shortChanID := upd.ShortChannelID.ToUint64()
58✔
2964

58✔
2965
        // If the advertised inclusionary block is beyond our knowledge of the
58✔
2966
        // chain tip, then we'll put the announcement in limbo to be fully
58✔
2967
        // verified once we advance forward in the chain. If the update has an
58✔
2968
        // alias SCID, we'll skip the isPremature check. This is necessary
58✔
2969
        // since aliases start at block height 16_000_000.
58✔
2970
        d.Lock()
58✔
2971
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
58✔
2972
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
59✔
2973

1✔
2974
                log.Warnf("Update announcement for short_chan_id(%v), is "+
1✔
2975
                        "premature: advertises height %v, only height %v is "+
1✔
2976
                        "known", shortChanID, blockHeight, d.bestHeight)
1✔
2977
                d.Unlock()
1✔
2978
                nMsg.err <- nil
1✔
2979
                return nil, false
1✔
2980
        }
1✔
2981
        d.Unlock()
58✔
2982

58✔
2983
        // Before we perform any of the expensive checks below, we'll check
58✔
2984
        // whether this update is stale or is for a zombie channel in order to
58✔
2985
        // quickly reject it.
58✔
2986
        timestamp := time.Unix(int64(upd.Timestamp), 0)
58✔
2987

58✔
2988
        // Fetch the SCID we should be using to lock the channelMtx and make
58✔
2989
        // graph queries with.
58✔
2990
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
58✔
2991
        if err != nil {
116✔
2992
                // Fallback and set the graphScid to the peer-provided SCID.
58✔
2993
                // This will occur for non-option-scid-alias channels and for
58✔
2994
                // public option-scid-alias channels after 6 confirmations.
58✔
2995
                // Once public option-scid-alias channels have 6 confs, we'll
58✔
2996
                // ignore ChannelUpdates with one of their aliases.
58✔
2997
                graphScid = upd.ShortChannelID
58✔
2998
        }
58✔
2999

3000
        if d.cfg.Graph.IsStaleEdgePolicy(
58✔
3001
                graphScid, timestamp, upd.ChannelFlags,
58✔
3002
        ) {
63✔
3003

5✔
3004
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
5✔
3005
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
5✔
3006
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
5✔
3007
                )
5✔
3008

5✔
3009
                nMsg.err <- nil
5✔
3010
                return nil, true
5✔
3011
        }
5✔
3012

3013
        // Check that the ChanUpdate is not too far into the future, this could
3014
        // reveal some faulty implementation therefore we log an error.
3015
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
56✔
3016
                log.Errorf("Skewed timestamp (%v) for edge policy of "+
×
3017
                        "short_chan_id(%v), timestamp too far in the future: "+
×
3018
                        "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
×
3019
                        shortChanID, nMsg.peer, nMsg.msg.MsgType(),
×
3020
                        nMsg.isRemote,
×
3021
                )
×
3022

×
3023
                nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
×
3024
                        "timestamp too far in the future: %v", timestamp.Unix())
×
3025

×
3026
                return nil, false
×
3027
        }
×
3028

3029
        // Get the node pub key as far since we don't have it in the channel
3030
        // update announcement message. We'll need this to properly verify the
3031
        // message's signature.
3032
        //
3033
        // We make sure to obtain the mutex for this channel ID before we
3034
        // access the database. This ensures the state we read from the
3035
        // database has not changed between this point and when we call
3036
        // UpdateEdge() later.
3037
        d.channelMtx.Lock(graphScid.ToUint64())
56✔
3038
        defer d.channelMtx.Unlock(graphScid.ToUint64())
56✔
3039

56✔
3040
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
56✔
3041
        switch {
56✔
3042
        // No error, break.
3043
        case err == nil:
52✔
3044
                break
52✔
3045

3046
        case errors.Is(err, graphdb.ErrZombieEdge):
3✔
3047
                err = d.processZombieUpdate(chanInfo, graphScid, upd)
3✔
3048
                if err != nil {
5✔
3049
                        log.Debug(err)
2✔
3050
                        nMsg.err <- err
2✔
3051
                        return nil, false
2✔
3052
                }
2✔
3053

3054
                // We'll fallthrough to ensure we stash the update until we
3055
                // receive its corresponding ChannelAnnouncement. This is
3056
                // needed to ensure the edge exists in the graph before
3057
                // applying the update.
3058
                fallthrough
1✔
3059
        case errors.Is(err, graphdb.ErrGraphNotFound):
1✔
3060
                fallthrough
1✔
3061
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
1✔
3062
                fallthrough
1✔
3063
        case errors.Is(err, graphdb.ErrEdgeNotFound):
5✔
3064
                // If the edge corresponding to this ChannelUpdate was not
5✔
3065
                // found in the graph, this might be a channel in the process
5✔
3066
                // of being opened, and we haven't processed our own
5✔
3067
                // ChannelAnnouncement yet, hence it is not not found in the
5✔
3068
                // graph. This usually gets resolved after the channel proofs
5✔
3069
                // are exchanged and the channel is broadcasted to the rest of
5✔
3070
                // the network, but in case this is a private channel this
5✔
3071
                // won't ever happen. This can also happen in the case of a
5✔
3072
                // zombie channel with a fresh update for which we don't have a
5✔
3073
                // ChannelAnnouncement for since we reject them. Because of
5✔
3074
                // this, we temporarily add it to a map, and reprocess it after
5✔
3075
                // our own ChannelAnnouncement has been processed.
5✔
3076
                //
5✔
3077
                // The shortChanID may be an alias, but it is fine to use here
5✔
3078
                // since we don't have an edge in the graph and if the peer is
5✔
3079
                // not buggy, we should be able to use it once the gossiper
5✔
3080
                // receives the local announcement.
5✔
3081
                pMsg := &processedNetworkMsg{msg: nMsg}
5✔
3082

5✔
3083
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
5✔
3084
                switch {
5✔
3085
                // Nothing in the cache yet, we can just directly insert this
3086
                // element.
3087
                case err == cache.ErrElementNotFound:
5✔
3088
                        _, _ = d.prematureChannelUpdates.Put(
5✔
3089
                                shortChanID, &cachedNetworkMsg{
5✔
3090
                                        msgs: []*processedNetworkMsg{pMsg},
5✔
3091
                                })
5✔
3092

3093
                // There's already something in the cache, so we'll combine the
3094
                // set of messages into a single value.
3095
                default:
3✔
3096
                        msgs := earlyMsgs.msgs
3✔
3097
                        msgs = append(msgs, pMsg)
3✔
3098
                        _, _ = d.prematureChannelUpdates.Put(
3✔
3099
                                shortChanID, &cachedNetworkMsg{
3✔
3100
                                        msgs: msgs,
3✔
3101
                                })
3✔
3102
                }
3103

3104
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
5✔
3105
                        "(shortChanID=%v), saving for reprocessing later",
5✔
3106
                        shortChanID)
5✔
3107

5✔
3108
                // NOTE: We don't return anything on the error channel for this
5✔
3109
                // message, as we expect that will be done when this
5✔
3110
                // ChannelUpdate is later reprocessed.
5✔
3111
                return nil, false
5✔
3112

3113
        default:
×
3114
                err := fmt.Errorf("unable to validate channel update "+
×
3115
                        "short_chan_id=%v: %v", shortChanID, err)
×
3116
                log.Error(err)
×
3117
                nMsg.err <- err
×
3118

×
3119
                key := newRejectCacheKey(
×
3120
                        upd.ShortChannelID.ToUint64(),
×
3121
                        sourceToPub(nMsg.source),
×
3122
                )
×
3123
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3124

×
3125
                return nil, false
×
3126
        }
3127

3128
        // The least-significant bit in the flag on the channel update
3129
        // announcement tells us "which" side of the channels directed edge is
3130
        // being updated.
3131
        var (
52✔
3132
                pubKey       *btcec.PublicKey
52✔
3133
                edgeToUpdate *models.ChannelEdgePolicy
52✔
3134
        )
52✔
3135
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
52✔
3136
        switch direction {
52✔
3137
        case 0:
37✔
3138
                pubKey, _ = chanInfo.NodeKey1()
37✔
3139
                edgeToUpdate = e1
37✔
3140
        case 1:
18✔
3141
                pubKey, _ = chanInfo.NodeKey2()
18✔
3142
                edgeToUpdate = e2
18✔
3143
        }
3144

3145
        log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
52✔
3146
                "edge policy=%v", chanInfo.ChannelID,
52✔
3147
                pubKey.SerializeCompressed(), edgeToUpdate != nil)
52✔
3148

52✔
3149
        // Validate the channel announcement with the expected public key and
52✔
3150
        // channel capacity. In the case of an invalid channel update, we'll
52✔
3151
        // return an error to the caller and exit early.
52✔
3152
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
52✔
3153
        if err != nil {
56✔
3154
                rErr := fmt.Errorf("unable to validate channel update "+
4✔
3155
                        "announcement for short_chan_id=%v: %v",
4✔
3156
                        spew.Sdump(upd.ShortChannelID), err)
4✔
3157

4✔
3158
                log.Error(rErr)
4✔
3159
                nMsg.err <- rErr
4✔
3160
                return nil, false
4✔
3161
        }
4✔
3162

3163
        // If we have a previous version of the edge being updated, we'll want
3164
        // to rate limit its updates to prevent spam throughout the network.
3165
        if nMsg.isRemote && edgeToUpdate != nil {
65✔
3166
                // If it's a keep-alive update, we'll only propagate one if
17✔
3167
                // it's been a day since the previous. This follows our own
17✔
3168
                // heuristic of sending keep-alive updates after the same
17✔
3169
                // duration (see retransmitStaleAnns).
17✔
3170
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
17✔
3171
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
22✔
3172
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
9✔
3173
                                log.Debugf("Ignoring keep alive update not "+
4✔
3174
                                        "within %v period for channel %v",
4✔
3175
                                        d.cfg.RebroadcastInterval, shortChanID)
4✔
3176
                                nMsg.err <- nil
4✔
3177
                                return nil, false
4✔
3178
                        }
4✔
3179
                } else {
15✔
3180
                        // If it's not, we'll allow an update per minute with a
15✔
3181
                        // maximum burst of 10. If we haven't seen an update
15✔
3182
                        // for this channel before, we'll need to initialize a
15✔
3183
                        // rate limiter for each direction.
15✔
3184
                        //
15✔
3185
                        // Since the edge exists in the graph, we'll create a
15✔
3186
                        // rate limiter for chanInfo.ChannelID rather then the
15✔
3187
                        // SCID the peer sent. This is because there may be
15✔
3188
                        // multiple aliases for a channel and we may otherwise
15✔
3189
                        // rate-limit only a single alias of the channel,
15✔
3190
                        // instead of the whole channel.
15✔
3191
                        baseScid := chanInfo.ChannelID
15✔
3192
                        d.Lock()
15✔
3193
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
15✔
3194
                        if !ok {
19✔
3195
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
4✔
3196
                                b := d.cfg.MaxChannelUpdateBurst
4✔
3197
                                rls = [2]*rate.Limiter{
4✔
3198
                                        rate.NewLimiter(r, b),
4✔
3199
                                        rate.NewLimiter(r, b),
4✔
3200
                                }
4✔
3201
                                d.chanUpdateRateLimiter[baseScid] = rls
4✔
3202
                        }
4✔
3203
                        d.Unlock()
15✔
3204

15✔
3205
                        if !rls[direction].Allow() {
23✔
3206
                                log.Debugf("Rate limiting update for channel "+
8✔
3207
                                        "%v from direction %x", shortChanID,
8✔
3208
                                        pubKey.SerializeCompressed())
8✔
3209
                                nMsg.err <- nil
8✔
3210
                                return nil, false
8✔
3211
                        }
8✔
3212
                }
3213
        }
3214

3215
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3216
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3217
        // lookup the stored SCID. If we're sending the update, we'll always
3218
        // use the SCID stored in the database rather than a potentially
3219
        // different alias. This might mean that SigBytes is incorrect as it
3220
        // signs a different SCID than the database SCID, but since there will
3221
        // only be a difference if AuthProof == nil, this is fine.
3222
        update := &models.ChannelEdgePolicy{
42✔
3223
                SigBytes:                  upd.Signature.ToSignatureBytes(),
42✔
3224
                ChannelID:                 chanInfo.ChannelID,
42✔
3225
                LastUpdate:                timestamp,
42✔
3226
                MessageFlags:              upd.MessageFlags,
42✔
3227
                ChannelFlags:              upd.ChannelFlags,
42✔
3228
                TimeLockDelta:             upd.TimeLockDelta,
42✔
3229
                MinHTLC:                   upd.HtlcMinimumMsat,
42✔
3230
                MaxHTLC:                   upd.HtlcMaximumMsat,
42✔
3231
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
42✔
3232
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
42✔
3233
                ExtraOpaqueData:           upd.ExtraOpaqueData,
42✔
3234
        }
42✔
3235

42✔
3236
        if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
45✔
3237
                if graph.IsError(
3✔
3238
                        err, graph.ErrOutdated,
3✔
3239
                        graph.ErrIgnored,
3✔
3240
                ) {
6✔
3241

3✔
3242
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
3✔
3243
                                shortChanID, err)
3✔
3244
                } else {
3✔
3245
                        // Since we know the stored SCID in the graph, we'll
×
3246
                        // cache that SCID.
×
3247
                        key := newRejectCacheKey(
×
3248
                                chanInfo.ChannelID,
×
3249
                                sourceToPub(nMsg.source),
×
3250
                        )
×
3251
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3252

×
3253
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3254
                                shortChanID, err)
×
3255
                }
×
3256

3257
                nMsg.err <- err
3✔
3258
                return nil, false
3✔
3259
        }
3260

3261
        // If this is a local ChannelUpdate without an AuthProof, it means it
3262
        // is an update to a channel that is not (yet) supposed to be announced
3263
        // to the greater network. However, our channel counter party will need
3264
        // to be given the update, so we'll try sending the update directly to
3265
        // the remote peer.
3266
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
56✔
3267
                if nMsg.optionalMsgFields != nil {
28✔
3268
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
14✔
3269
                        if remoteAlias != nil {
17✔
3270
                                // The remoteAlias field was specified, meaning
3✔
3271
                                // that we should replace the SCID in the
3✔
3272
                                // update with the remote's alias. We'll also
3✔
3273
                                // need to re-sign the channel update. This is
3✔
3274
                                // required for option-scid-alias feature-bit
3✔
3275
                                // negotiated channels.
3✔
3276
                                upd.ShortChannelID = *remoteAlias
3✔
3277

3✔
3278
                                sig, err := d.cfg.SignAliasUpdate(upd)
3✔
3279
                                if err != nil {
3✔
3280
                                        log.Error(err)
×
3281
                                        nMsg.err <- err
×
3282
                                        return nil, false
×
3283
                                }
×
3284

3285
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
3286
                                if err != nil {
3✔
3287
                                        log.Error(err)
×
3288
                                        nMsg.err <- err
×
3289
                                        return nil, false
×
3290
                                }
×
3291

3292
                                upd.Signature = lnSig
3✔
3293
                        }
3294
                }
3295

3296
                // Get our peer's public key.
3297
                remotePubKey := remotePubFromChanInfo(
14✔
3298
                        chanInfo, upd.ChannelFlags,
14✔
3299
                )
14✔
3300

14✔
3301
                log.Debugf("The message %v has no AuthProof, sending the "+
14✔
3302
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
14✔
3303

14✔
3304
                // Now we'll attempt to send the channel update message
14✔
3305
                // reliably to the remote peer in the background, so that we
14✔
3306
                // don't block if the peer happens to be offline at the moment.
14✔
3307
                err := d.reliableSender.sendMessage(upd, remotePubKey)
14✔
3308
                if err != nil {
14✔
3309
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3310
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3311
                                upd.ShortChannelID, remotePubKey, err)
×
3312
                        nMsg.err <- err
×
3313
                        return nil, false
×
3314
                }
×
3315
        }
3316

3317
        // Channel update announcement was successfully processed and now it
3318
        // can be broadcast to the rest of the network. However, we'll only
3319
        // broadcast the channel update announcement if it has an attached
3320
        // authentication proof. We also won't broadcast the update if it
3321
        // contains an alias because the network would reject this.
3322
        var announcements []networkMsg
42✔
3323
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
64✔
3324
                announcements = append(announcements, networkMsg{
22✔
3325
                        peer:     nMsg.peer,
22✔
3326
                        source:   nMsg.source,
22✔
3327
                        isRemote: nMsg.isRemote,
22✔
3328
                        msg:      upd,
22✔
3329
                })
22✔
3330
        }
22✔
3331

3332
        nMsg.err <- nil
42✔
3333

42✔
3334
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
42✔
3335
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
42✔
3336
                timestamp)
42✔
3337
        return announcements, true
42✔
3338
}
3339

3340
// handleAnnSig processes a new announcement signatures message.
3341
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
3342
        ann *lnwire.AnnounceSignatures1) ([]networkMsg, bool) {
24✔
3343

24✔
3344
        needBlockHeight := ann.ShortChannelID.BlockHeight +
24✔
3345
                d.cfg.ProofMatureDelta
24✔
3346
        shortChanID := ann.ShortChannelID.ToUint64()
24✔
3347

24✔
3348
        prefix := "local"
24✔
3349
        if nMsg.isRemote {
38✔
3350
                prefix = "remote"
14✔
3351
        }
14✔
3352

3353
        log.Infof("Received new %v announcement signature for %v", prefix,
24✔
3354
                ann.ShortChannelID)
24✔
3355

24✔
3356
        // By the specification, channel announcement proofs should be sent
24✔
3357
        // after some number of confirmations after channel was registered in
24✔
3358
        // bitcoin blockchain. Therefore, we check if the proof is mature.
24✔
3359
        d.Lock()
24✔
3360
        premature := d.isPremature(
24✔
3361
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
24✔
3362
        )
24✔
3363
        if premature {
27✔
3364
                log.Warnf("Premature proof announcement, current block height"+
3✔
3365
                        "lower than needed: %v < %v", d.bestHeight,
3✔
3366
                        needBlockHeight)
3✔
3367
                d.Unlock()
3✔
3368
                nMsg.err <- nil
3✔
3369
                return nil, false
3✔
3370
        }
3✔
3371
        d.Unlock()
24✔
3372

24✔
3373
        // Ensure that we know of a channel with the target channel ID before
24✔
3374
        // proceeding further.
24✔
3375
        //
24✔
3376
        // We must acquire the mutex for this channel ID before getting the
24✔
3377
        // channel from the database, to ensure what we read does not change
24✔
3378
        // before we call AddProof() later.
24✔
3379
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
24✔
3380
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
24✔
3381

24✔
3382
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
24✔
3383
                ann.ShortChannelID,
24✔
3384
        )
24✔
3385
        if err != nil {
28✔
3386
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
4✔
3387
                if err != nil {
7✔
3388
                        err := fmt.Errorf("unable to store the proof for "+
3✔
3389
                                "short_chan_id=%v: %v", shortChanID, err)
3✔
3390
                        log.Error(err)
3✔
3391
                        nMsg.err <- err
3✔
3392

3✔
3393
                        return nil, false
3✔
3394
                }
3✔
3395

3396
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
4✔
3397
                err := d.cfg.WaitingProofStore.Add(proof)
4✔
3398
                if err != nil {
4✔
3399
                        err := fmt.Errorf("unable to store the proof for "+
×
3400
                                "short_chan_id=%v: %v", shortChanID, err)
×
3401
                        log.Error(err)
×
3402
                        nMsg.err <- err
×
3403
                        return nil, false
×
3404
                }
×
3405

3406
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
4✔
3407
                        ", adding to waiting batch", prefix, shortChanID)
4✔
3408
                nMsg.err <- nil
4✔
3409
                return nil, false
4✔
3410
        }
3411

3412
        nodeID := nMsg.source.SerializeCompressed()
23✔
3413
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
23✔
3414
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
23✔
3415

23✔
3416
        // Ensure that channel that was retrieved belongs to the peer which
23✔
3417
        // sent the proof announcement.
23✔
3418
        if !(isFirstNode || isSecondNode) {
23✔
3419
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3420
                        "to the peer which sent the proof, short_chan_id=%v",
×
3421
                        shortChanID)
×
3422
                log.Error(err)
×
3423
                nMsg.err <- err
×
3424
                return nil, false
×
3425
        }
×
3426

3427
        // If proof was sent by a local sub-system, then we'll send the
3428
        // announcement signature to the remote node so they can also
3429
        // reconstruct the full channel announcement.
3430
        if !nMsg.isRemote {
36✔
3431
                var remotePubKey [33]byte
13✔
3432
                if isFirstNode {
26✔
3433
                        remotePubKey = chanInfo.NodeKey2Bytes
13✔
3434
                } else {
16✔
3435
                        remotePubKey = chanInfo.NodeKey1Bytes
3✔
3436
                }
3✔
3437

3438
                // Since the remote peer might not be online we'll call a
3439
                // method that will attempt to deliver the proof when it comes
3440
                // online.
3441
                err := d.reliableSender.sendMessage(ann, remotePubKey)
13✔
3442
                if err != nil {
13✔
3443
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3444
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3445
                                ann.ShortChannelID, remotePubKey, err)
×
3446
                        nMsg.err <- err
×
3447
                        return nil, false
×
3448
                }
×
3449
        }
3450

3451
        // Check if we already have the full proof for this channel.
3452
        if chanInfo.AuthProof != nil {
27✔
3453
                // If we already have the fully assembled proof, then the peer
4✔
3454
                // sending us their proof has probably not received our local
4✔
3455
                // proof yet. So be kind and send them the full proof.
4✔
3456
                if nMsg.isRemote {
8✔
3457
                        peerID := nMsg.source.SerializeCompressed()
4✔
3458
                        log.Debugf("Got AnnounceSignatures for channel with " +
4✔
3459
                                "full proof.")
4✔
3460

4✔
3461
                        d.wg.Add(1)
4✔
3462
                        go func() {
8✔
3463
                                defer d.wg.Done()
4✔
3464

4✔
3465
                                log.Debugf("Received half proof for channel "+
4✔
3466
                                        "%v with existing full proof. Sending"+
4✔
3467
                                        " full proof to peer=%x",
4✔
3468
                                        ann.ChannelID, peerID)
4✔
3469

4✔
3470
                                ca, _, _, err := netann.CreateChanAnnouncement(
4✔
3471
                                        chanInfo.AuthProof, chanInfo, e1, e2,
4✔
3472
                                )
4✔
3473
                                if err != nil {
4✔
3474
                                        log.Errorf("unable to gen ann: %v",
×
3475
                                                err)
×
3476
                                        return
×
3477
                                }
×
3478

3479
                                err = nMsg.peer.SendMessage(false, ca)
4✔
3480
                                if err != nil {
4✔
3481
                                        log.Errorf("Failed sending full proof"+
×
3482
                                                " to peer=%x: %v", peerID, err)
×
3483
                                        return
×
3484
                                }
×
3485

3486
                                log.Debugf("Full proof sent to peer=%x for "+
4✔
3487
                                        "chanID=%v", peerID, ann.ChannelID)
4✔
3488
                        }()
3489
                }
3490

3491
                log.Debugf("Already have proof for channel with chanID=%v",
4✔
3492
                        ann.ChannelID)
4✔
3493
                nMsg.err <- nil
4✔
3494
                return nil, true
4✔
3495
        }
3496

3497
        // Check that we received the opposite proof. If so, then we're now
3498
        // able to construct the full proof, and create the channel
3499
        // announcement. If we didn't receive the opposite half of the proof
3500
        // then we should store this one, and wait for the opposite to be
3501
        // received.
3502
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
22✔
3503
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
22✔
3504
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
22✔
3505
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3506
                        "short_chan_id=%v: %v", shortChanID, err)
×
3507
                log.Error(err)
×
3508
                nMsg.err <- err
×
3509
                return nil, false
×
3510
        }
×
3511

3512
        if err == channeldb.ErrWaitingProofNotFound {
34✔
3513
                err := d.cfg.WaitingProofStore.Add(proof)
12✔
3514
                if err != nil {
12✔
3515
                        err := fmt.Errorf("unable to store the proof for "+
×
3516
                                "short_chan_id=%v: %v", shortChanID, err)
×
3517
                        log.Error(err)
×
3518
                        nMsg.err <- err
×
3519
                        return nil, false
×
3520
                }
×
3521

3522
                log.Infof("1/2 of channel ann proof received for "+
12✔
3523
                        "short_chan_id=%v, waiting for other half",
12✔
3524
                        shortChanID)
12✔
3525

12✔
3526
                nMsg.err <- nil
12✔
3527
                return nil, false
12✔
3528
        }
3529

3530
        // We now have both halves of the channel announcement proof, then
3531
        // we'll reconstruct the initial announcement so we can validate it
3532
        // shortly below.
3533
        var dbProof models.ChannelAuthProof
13✔
3534
        if isFirstNode {
17✔
3535
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
4✔
3536
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
4✔
3537
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
4✔
3538
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
4✔
3539
        } else {
16✔
3540
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
12✔
3541
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
12✔
3542
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
12✔
3543
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
12✔
3544
        }
12✔
3545

3546
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
13✔
3547
                &dbProof, chanInfo, e1, e2,
13✔
3548
        )
13✔
3549
        if err != nil {
13✔
3550
                log.Error(err)
×
3551
                nMsg.err <- err
×
3552
                return nil, false
×
3553
        }
×
3554

3555
        // With all the necessary components assembled validate the full
3556
        // channel announcement proof.
3557
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
13✔
3558
        if err != nil {
13✔
3559
                err := fmt.Errorf("channel announcement proof for "+
×
3560
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3561

×
3562
                log.Error(err)
×
3563
                nMsg.err <- err
×
3564
                return nil, false
×
3565
        }
×
3566

3567
        // If the channel was returned by the router it means that existence of
3568
        // funding point and inclusion of nodes bitcoin keys in it already
3569
        // checked by the router. In this stage we should check that node keys
3570
        // attest to the bitcoin keys by validating the signatures of
3571
        // announcement. If proof is valid then we'll populate the channel edge
3572
        // with it, so we can announce it on peer connect.
3573
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
13✔
3574
        if err != nil {
13✔
3575
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3576
                        " %v", ann.ChannelID, err)
×
3577
                log.Error(err)
×
3578
                nMsg.err <- err
×
3579
                return nil, false
×
3580
        }
×
3581

3582
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
13✔
3583
        if err != nil {
13✔
3584
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3585
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3586
                log.Error(err)
×
3587
                nMsg.err <- err
×
3588
                return nil, false
×
3589
        }
×
3590

3591
        // Proof was successfully created and now can announce the channel to
3592
        // the remain network.
3593
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
13✔
3594
                ", adding to next ann batch", shortChanID)
13✔
3595

13✔
3596
        // Assemble the necessary announcements to add to the next broadcasting
13✔
3597
        // batch.
13✔
3598
        var announcements []networkMsg
13✔
3599
        announcements = append(announcements, networkMsg{
13✔
3600
                peer:   nMsg.peer,
13✔
3601
                source: nMsg.source,
13✔
3602
                msg:    chanAnn,
13✔
3603
        })
13✔
3604
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
25✔
3605
                announcements = append(announcements, networkMsg{
12✔
3606
                        peer:   nMsg.peer,
12✔
3607
                        source: src,
12✔
3608
                        msg:    e1Ann,
12✔
3609
                })
12✔
3610
        }
12✔
3611
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
24✔
3612
                announcements = append(announcements, networkMsg{
11✔
3613
                        peer:   nMsg.peer,
11✔
3614
                        source: src,
11✔
3615
                        msg:    e2Ann,
11✔
3616
                })
11✔
3617
        }
11✔
3618

3619
        // We'll also send along the node announcements for each channel
3620
        // participant if we know of them. To ensure our node announcement
3621
        // propagates to our channel counterparty, we'll set the source for
3622
        // each announcement to the node it belongs to, otherwise we won't send
3623
        // it since the source gets skipped. This isn't necessary for channel
3624
        // updates and announcement signatures since we send those directly to
3625
        // our channel counterparty through the gossiper's reliable sender.
3626
        node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
13✔
3627
        if err != nil {
18✔
3628
                log.Debugf("Unable to fetch node announcement for %x: %v",
5✔
3629
                        chanInfo.NodeKey1Bytes, err)
5✔
3630
        } else {
16✔
3631
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
22✔
3632
                        announcements = append(announcements, networkMsg{
11✔
3633
                                peer:   nMsg.peer,
11✔
3634
                                source: nodeKey1,
11✔
3635
                                msg:    node1Ann,
11✔
3636
                        })
11✔
3637
                }
11✔
3638
        }
3639

3640
        node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
13✔
3641
        if err != nil {
20✔
3642
                log.Debugf("Unable to fetch node announcement for %x: %v",
7✔
3643
                        chanInfo.NodeKey2Bytes, err)
7✔
3644
        } else {
16✔
3645
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
18✔
3646
                        announcements = append(announcements, networkMsg{
9✔
3647
                                peer:   nMsg.peer,
9✔
3648
                                source: nodeKey2,
9✔
3649
                                msg:    node2Ann,
9✔
3650
                        })
9✔
3651
                }
9✔
3652
        }
3653

3654
        nMsg.err <- nil
13✔
3655
        return announcements, true
13✔
3656
}
3657

3658
// isBanned returns true if the peer identified by pubkey is banned for sending
3659
// invalid channel announcements.
3660
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
211✔
3661
        return d.banman.isBanned(pubkey)
211✔
3662
}
211✔
3663

3664
// ShouldDisconnect returns true if we should disconnect the peer identified by
3665
// pubkey.
3666
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3667
        bool, error) {
209✔
3668

209✔
3669
        pubkeySer := pubkey.SerializeCompressed()
209✔
3670

209✔
3671
        var pubkeyBytes [33]byte
209✔
3672
        copy(pubkeyBytes[:], pubkeySer)
209✔
3673

209✔
3674
        // If the public key is banned, check whether or not this is a channel
209✔
3675
        // peer.
209✔
3676
        if d.isBanned(pubkeyBytes) {
211✔
3677
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
2✔
3678
                if err != nil {
2✔
3679
                        return false, err
×
3680
                }
×
3681

3682
                // We should only disconnect non-channel peers.
3683
                if !isChanPeer {
3✔
3684
                        return true, nil
1✔
3685
                }
1✔
3686
        }
3687

3688
        return false, nil
208✔
3689
}
3690

3691
// validateFundingTransaction fetches the channel announcements claimed funding
3692
// transaction from chain to ensure that it exists, is not spent and matches
3693
// the channel announcement proof. The transaction's outpoint and value are
3694
// returned if we can glean them from the work done in this method.
3695
func (d *AuthenticatedGossiper) validateFundingTransaction(
3696
        ann *lnwire.ChannelAnnouncement1,
3697
        tapscriptRoot fn.Option[chainhash.Hash]) (wire.OutPoint, btcutil.Amount,
3698
        []byte, error) {
231✔
3699

231✔
3700
        scid := ann.ShortChannelID
231✔
3701

231✔
3702
        // Before we can add the channel to the channel graph, we need to obtain
231✔
3703
        // the full funding outpoint that's encoded within the channel ID.
231✔
3704
        fundingTx, err := lnwallet.FetchFundingTxWrapper(
231✔
3705
                d.cfg.ChainIO, &scid, d.quit,
231✔
3706
        )
231✔
3707
        if err != nil {
232✔
3708
                //nolint:ll
1✔
3709
                //
1✔
3710
                // In order to ensure we don't erroneously mark a channel as a
1✔
3711
                // zombie due to an RPC failure, we'll attempt to string match
1✔
3712
                // for the relevant errors.
1✔
3713
                //
1✔
3714
                // * btcd:
1✔
3715
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
1✔
3716
                //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
1✔
3717
                // * bitcoind:
1✔
3718
                //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
1✔
3719
                //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
1✔
3720
                switch {
1✔
3721
                case strings.Contains(err.Error(), "not found"):
1✔
3722
                        fallthrough
1✔
3723

3724
                case strings.Contains(err.Error(), "out of range"):
1✔
3725
                        // If the funding transaction isn't found at all, then
1✔
3726
                        // we'll mark the edge itself as a zombie so we don't
1✔
3727
                        // continue to request it. We use the "zero key" for
1✔
3728
                        // both node pubkeys so this edge can't be resurrected.
1✔
3729
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
1✔
3730
                        if zErr != nil {
1✔
3731
                                return wire.OutPoint{}, 0, nil, zErr
×
3732
                        }
×
3733

3734
                default:
×
3735
                }
3736

3737
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
1✔
3738
                        ErrNoFundingTransaction, err)
1✔
3739
        }
3740

3741
        // Recreate witness output to be sure that declared in channel edge
3742
        // bitcoin keys and channel value corresponds to the reality.
3743
        fundingPkScript, err := makeFundingScript(
230✔
3744
                ann.BitcoinKey1[:], ann.BitcoinKey2[:], ann.Features,
230✔
3745
                tapscriptRoot,
230✔
3746
        )
230✔
3747
        if err != nil {
230✔
3748
                return wire.OutPoint{}, 0, nil, err
×
3749
        }
×
3750

3751
        // Next we'll validate that this channel is actually well formed. If
3752
        // this check fails, then this channel either doesn't exist, or isn't
3753
        // the one that was meant to be created according to the passed channel
3754
        // proofs.
3755
        fundingPoint, err := chanvalidate.Validate(
230✔
3756
                &chanvalidate.Context{
230✔
3757
                        Locator: &chanvalidate.ShortChanIDChanLocator{
230✔
3758
                                ID: scid,
230✔
3759
                        },
230✔
3760
                        MultiSigPkScript: fundingPkScript,
230✔
3761
                        FundingTx:        fundingTx,
230✔
3762
                },
230✔
3763
        )
230✔
3764
        if err != nil {
431✔
3765
                // Mark the edge as a zombie so we won't try to re-validate it
201✔
3766
                // on start up.
201✔
3767
                zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
201✔
3768
                if zErr != nil {
201✔
3769
                        return wire.OutPoint{}, 0, nil, zErr
×
3770
                }
×
3771

3772
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: %w",
201✔
3773
                        ErrInvalidFundingOutput, err)
201✔
3774
        }
3775

3776
        // Now that we have the funding outpoint of the channel, ensure
3777
        // that it hasn't yet been spent. If so, then this channel has
3778
        // been closed so we'll ignore it.
3779
        chanUtxo, err := d.cfg.ChainIO.GetUtxo(
29✔
3780
                fundingPoint, fundingPkScript, scid.BlockHeight, d.quit,
29✔
3781
        )
29✔
3782
        if err != nil {
31✔
3783
                if errors.Is(err, btcwallet.ErrOutputSpent) {
4✔
3784
                        zErr := d.cfg.Graph.MarkZombieEdge(scid.ToUint64())
2✔
3785
                        if zErr != nil {
2✔
3786
                                return wire.OutPoint{}, 0, nil, zErr
×
3787
                        }
×
3788
                }
3789

3790
                return wire.OutPoint{}, 0, nil, fmt.Errorf("%w: unable to "+
2✔
3791
                        "fetch utxo for chan_id=%v, chan_point=%v: %w",
2✔
3792
                        ErrChannelSpent, scid.ToUint64(), fundingPoint, err)
2✔
3793
        }
3794

3795
        return *fundingPoint, btcutil.Amount(chanUtxo.Value), fundingPkScript,
27✔
3796
                nil
27✔
3797
}
3798

3799
// makeFundingScript is used to make the funding script for both segwit v0 and
3800
// segwit v1 (taproot) channels.
3801
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
3802
        features *lnwire.RawFeatureVector,
3803
        tapscriptRoot fn.Option[chainhash.Hash]) ([]byte, error) {
230✔
3804

230✔
3805
        legacyFundingScript := func() ([]byte, error) {
460✔
3806
                witnessScript, err := input.GenMultiSigScript(
230✔
3807
                        bitcoinKey1, bitcoinKey2,
230✔
3808
                )
230✔
3809
                if err != nil {
230✔
3810
                        return nil, err
×
3811
                }
×
3812
                pkScript, err := input.WitnessScriptHash(witnessScript)
230✔
3813
                if err != nil {
230✔
3814
                        return nil, err
×
3815
                }
×
3816

3817
                return pkScript, nil
230✔
3818
        }
3819

3820
        if features.IsEmpty() {
460✔
3821
                return legacyFundingScript()
230✔
3822
        }
230✔
3823

3824
        chanFeatureBits := lnwire.NewFeatureVector(features, lnwire.Features)
3✔
3825
        if chanFeatureBits.HasFeature(
3✔
3826
                lnwire.SimpleTaprootChannelsOptionalStaging,
3✔
3827
        ) {
6✔
3828

3✔
3829
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
3✔
3830
                if err != nil {
3✔
3831
                        return nil, err
×
3832
                }
×
3833
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
3✔
3834
                if err != nil {
3✔
3835
                        return nil, err
×
3836
                }
×
3837

3838
                fundingScript, _, err := input.GenTaprootFundingScript(
3✔
3839
                        pubKey1, pubKey2, 0, tapscriptRoot,
3✔
3840
                )
3✔
3841
                if err != nil {
3✔
3842
                        return nil, err
×
3843
                }
×
3844

3845
                // TODO(roasbeef): add tapscript root to gossip v1.5
3846

3847
                return fundingScript, nil
3✔
3848
        }
3849

3850
        return legacyFundingScript()
×
3851
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc