• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11364879019

16 Oct 2024 11:39AM UTC coverage: 49.778% (+0.5%) from 49.297%
11364879019

Pull #9175

github

ellemouton
netann: update ChanAnn2 validation to work for P2WSH channels

This commit expands the ChannelAnnouncement2 validation for the case
where it is announcing a P2WSH channel.
Pull Request #9175: lnwire+netann: update structure of g175 messages to be pure TLV

6 of 314 new or added lines in 9 files covered. (1.91%)

99 existing lines in 18 files now uncovered.

98794 of 198470 relevant lines covered (49.78%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.73
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg"
15
        "github.com/btcsuite/btcd/chaincfg/chainhash"
16
        "github.com/btcsuite/btcd/txscript"
17
        "github.com/btcsuite/btcd/wire"
18
        "github.com/davecgh/go-spew/spew"
19
        "github.com/lightninglabs/neutrino/cache"
20
        "github.com/lightninglabs/neutrino/cache/lru"
21
        "github.com/lightningnetwork/lnd/batch"
22
        "github.com/lightningnetwork/lnd/chainntnfs"
23
        "github.com/lightningnetwork/lnd/channeldb"
24
        "github.com/lightningnetwork/lnd/channeldb/models"
25
        "github.com/lightningnetwork/lnd/fn"
26
        "github.com/lightningnetwork/lnd/graph"
27
        "github.com/lightningnetwork/lnd/keychain"
28
        "github.com/lightningnetwork/lnd/kvdb"
29
        "github.com/lightningnetwork/lnd/lnpeer"
30
        "github.com/lightningnetwork/lnd/lnutils"
31
        "github.com/lightningnetwork/lnd/lnwallet"
32
        "github.com/lightningnetwork/lnd/lnwire"
33
        "github.com/lightningnetwork/lnd/multimutex"
34
        "github.com/lightningnetwork/lnd/netann"
35
        "github.com/lightningnetwork/lnd/routing/route"
36
        "github.com/lightningnetwork/lnd/ticker"
37
        "golang.org/x/time/rate"
38
)
39

40
const (
41
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
42
        // for a specific channel and direction that we'll accept over an
43
        // interval.
44
        DefaultMaxChannelUpdateBurst = 10
45

46
        // DefaultChannelUpdateInterval is the default interval we'll use to
47
        // determine how often we should allow a new update for a specific
48
        // channel and direction.
49
        DefaultChannelUpdateInterval = time.Minute
50

51
        // maxPrematureUpdates tracks the max amount of premature channel
52
        // updates that we'll hold onto.
53
        maxPrematureUpdates = 100
54

55
        // maxFutureMessages tracks the max amount of future messages that
56
        // we'll hold onto.
57
        maxFutureMessages = 1000
58

59
        // DefaultSubBatchDelay is the default delay we'll use when
60
        // broadcasting the next announcement batch.
61
        DefaultSubBatchDelay = 5 * time.Second
62

63
        // maxRejectedUpdates tracks the max amount of rejected channel updates
64
        // we'll maintain. This is the global size across all peers. We'll
65
        // allocate ~3 MB max to the cache.
66
        maxRejectedUpdates = 10_000
67
)
68

69
var (
70
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
71
        // is in the process of being shut down.
72
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
73

74
        // ErrGossipSyncerNotFound signals that we were unable to find an active
75
        // gossip syncer corresponding to a gossip query message received from
76
        // the remote peer.
77
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
78

79
        // emptyPubkey is used to compare compressed pubkeys against an empty
80
        // byte array.
81
        emptyPubkey [33]byte
82
)
83

84
// optionalMsgFields is a set of optional message fields that external callers
85
// can provide that serve useful when processing a specific network
86
// announcement.
87
type optionalMsgFields struct {
88
        capacity      *btcutil.Amount
89
        channelPoint  *wire.OutPoint
90
        remoteAlias   *lnwire.ShortChannelID
91
        tapscriptRoot fn.Option[chainhash.Hash]
92
}
93

94
// apply applies the optional fields within the functional options.
95
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
4✔
96
        for _, optionalMsgField := range optionalMsgFields {
8✔
97
                optionalMsgField(f)
4✔
98
        }
4✔
99
}
100

101
// OptionalMsgField is a functional option parameter that can be used to provide
102
// external information that is not included within a network message but serves
103
// useful when processing it.
104
type OptionalMsgField func(*optionalMsgFields)
105

106
// ChannelCapacity is an optional field that lets the gossiper know of the
107
// capacity of a channel.
108
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
4✔
109
        return func(f *optionalMsgFields) {
8✔
110
                f.capacity = &capacity
4✔
111
        }
4✔
112
}
113

114
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
115
// of a channel.
116
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
4✔
117
        return func(f *optionalMsgFields) {
8✔
118
                f.channelPoint = &op
4✔
119
        }
4✔
120
}
121

122
// TapscriptRoot is an optional field that lets the gossiper know of the root of
123
// the tapscript tree for a custom channel.
124
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
4✔
125
        return func(f *optionalMsgFields) {
8✔
126
                f.tapscriptRoot = root
4✔
127
        }
4✔
128
}
129

130
// RemoteAlias is an optional field that lets the gossiper know that a locally
131
// sent channel update is actually an update for the peer that should replace
132
// the ShortChannelID field with the remote's alias. This is only used for
133
// channels with peers where the option-scid-alias feature bit was negotiated.
134
// The channel update will be added to the graph under the original SCID, but
135
// will be modified and re-signed with this alias.
136
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
4✔
137
        return func(f *optionalMsgFields) {
8✔
138
                f.remoteAlias = alias
4✔
139
        }
4✔
140
}
141

142
// networkMsg couples a routing related wire message with the peer that
143
// originally sent it.
144
type networkMsg struct {
145
        peer              lnpeer.Peer
146
        source            *btcec.PublicKey
147
        msg               lnwire.Message
148
        optionalMsgFields *optionalMsgFields
149

150
        isRemote bool
151

152
        err chan error
153
}
154

155
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
156
// wishes to update a particular set of channels. New ChannelUpdate messages
157
// will be crafted to be sent out during the next broadcast epoch and the fee
158
// updates committed to the lower layer.
159
type chanPolicyUpdateRequest struct {
160
        edgesToUpdate []EdgeWithInfo
161
        errChan       chan error
162
}
163

164
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
165
// syncer at all times.
166
type PinnedSyncers map[route.Vertex]struct{}
167

168
// Config defines the configuration for the service. ALL elements within the
169
// configuration MUST be non-nil for the service to carry out its duties.
170
type Config struct {
171
        // ChainParams holds the chain parameters for the active network this
172
        // node is participating on.
173
        ChainParams *chaincfg.Params
174

175
        // Graph is the subsystem which is responsible for managing the
176
        // topology of lightning network. After incoming channel, node, channel
177
        // updates announcements are validated they are sent to the router in
178
        // order to be included in the LN graph.
179
        Graph graph.ChannelGraphSource
180

181
        // ChainIO represents an abstraction over a source that can query the
182
        // blockchain.
183
        ChainIO lnwallet.BlockChainIO
184

185
        // ChanSeries is an interfaces that provides access to a time series
186
        // view of the current known channel graph. Each GossipSyncer enabled
187
        // peer will utilize this in order to create and respond to channel
188
        // graph time series queries.
189
        ChanSeries ChannelGraphTimeSeries
190

191
        // Notifier is used for receiving notifications of incoming blocks.
192
        // With each new incoming block found we process previously premature
193
        // announcements.
194
        //
195
        // TODO(roasbeef): could possibly just replace this with an epoch
196
        // channel.
197
        Notifier chainntnfs.ChainNotifier
198

199
        // Broadcast broadcasts a particular set of announcements to all peers
200
        // that the daemon is connected to. If supplied, the exclude parameter
201
        // indicates that the target peer should be excluded from the
202
        // broadcast.
203
        Broadcast func(skips map[route.Vertex]struct{},
204
                msg ...lnwire.Message) error
205

206
        // NotifyWhenOnline is a function that allows the gossiper to be
207
        // notified when a certain peer comes online, allowing it to
208
        // retry sending a peer message.
209
        //
210
        // NOTE: The peerChan channel must be buffered.
211
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
212

213
        // NotifyWhenOffline is a function that allows the gossiper to be
214
        // notified when a certain peer disconnects, allowing it to request a
215
        // notification for when it reconnects.
216
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
217

218
        // FetchSelfAnnouncement retrieves our current node announcement, for
219
        // use when determining whether we should update our peers about our
220
        // presence in the network.
221
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
222

223
        // UpdateSelfAnnouncement produces a new announcement for our node with
224
        // an updated timestamp which can be broadcast to our peers.
225
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
226

227
        // ProofMatureDelta the number of confirmations which is needed before
228
        // exchange the channel announcement proofs.
229
        ProofMatureDelta uint32
230

231
        // TrickleDelay the period of trickle timer which flushes to the
232
        // network the pending batch of new announcements we've received since
233
        // the last trickle tick.
234
        TrickleDelay time.Duration
235

236
        // RetransmitTicker is a ticker that ticks with a period which
237
        // indicates that we should check if we need re-broadcast any of our
238
        // personal channels.
239
        RetransmitTicker ticker.Ticker
240

241
        // RebroadcastInterval is the maximum time we wait between sending out
242
        // channel updates for our active channels and our own node
243
        // announcement. We do this to ensure our active presence on the
244
        // network is known, and we are not being considered a zombie node or
245
        // having zombie channels.
246
        RebroadcastInterval time.Duration
247

248
        // WaitingProofStore is a persistent storage of partial channel proof
249
        // announcement messages. We use it to buffer half of the material
250
        // needed to reconstruct a full authenticated channel announcement.
251
        // Once we receive the other half the channel proof, we'll be able to
252
        // properly validate it and re-broadcast it out to the network.
253
        //
254
        // TODO(wilmer): make interface to prevent channeldb dependency.
255
        WaitingProofStore *channeldb.WaitingProofStore
256

257
        // MessageStore is a persistent storage of gossip messages which we will
258
        // use to determine which messages need to be resent for a given peer.
259
        MessageStore GossipMessageStore
260

261
        // AnnSigner is an instance of the MessageSigner interface which will
262
        // be used to manually sign any outgoing channel updates. The signer
263
        // implementation should be backed by the public key of the backing
264
        // Lightning node.
265
        //
266
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
267
        // here?
268
        AnnSigner lnwallet.MessageSigner
269

270
        // ScidCloser is an instance of ClosedChannelTracker that helps the
271
        // gossiper cut down on spam channel announcements for already closed
272
        // channels.
273
        ScidCloser ClosedChannelTracker
274

275
        // NumActiveSyncers is the number of peers for which we should have
276
        // active syncers with. After reaching NumActiveSyncers, any future
277
        // gossip syncers will be passive.
278
        NumActiveSyncers int
279

280
        // NoTimestampQueries will prevent the GossipSyncer from querying
281
        // timestamps of announcement messages from the peer and from replying
282
        // to timestamp queries.
283
        NoTimestampQueries bool
284

285
        // RotateTicker is a ticker responsible for notifying the SyncManager
286
        // when it should rotate its active syncers. A single active syncer with
287
        // a chansSynced state will be exchanged for a passive syncer in order
288
        // to ensure we don't keep syncing with the same peers.
289
        RotateTicker ticker.Ticker
290

291
        // HistoricalSyncTicker is a ticker responsible for notifying the
292
        // syncManager when it should attempt a historical sync with a gossip
293
        // sync peer.
294
        HistoricalSyncTicker ticker.Ticker
295

296
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
297
        // syncManager when it should attempt to start the next pending
298
        // activeSyncer due to the current one not completing its state machine
299
        // within the timeout.
300
        ActiveSyncerTimeoutTicker ticker.Ticker
301

302
        // MinimumBatchSize is minimum size of a sub batch of announcement
303
        // messages.
304
        MinimumBatchSize int
305

306
        // SubBatchDelay is the delay between sending sub batches of
307
        // gossip messages.
308
        SubBatchDelay time.Duration
309

310
        // IgnoreHistoricalFilters will prevent syncers from replying with
311
        // historical data when the remote peer sets a gossip_timestamp_range.
312
        // This prevents ranges with old start times from causing us to dump the
313
        // graph on connect.
314
        IgnoreHistoricalFilters bool
315

316
        // PinnedSyncers is a set of peers that will always transition to
317
        // ActiveSync upon connection. These peers will never transition to
318
        // PassiveSync.
319
        PinnedSyncers PinnedSyncers
320

321
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
322
        // specific channel and direction that we'll accept over an interval.
323
        MaxChannelUpdateBurst int
324

325
        // ChannelUpdateInterval specifies the interval we'll use to determine
326
        // how often we should allow a new update for a specific channel and
327
        // direction.
328
        ChannelUpdateInterval time.Duration
329

330
        // IsAlias returns true if a given ShortChannelID is an alias for
331
        // option_scid_alias channels.
332
        IsAlias func(scid lnwire.ShortChannelID) bool
333

334
        // SignAliasUpdate is used to re-sign a channel update using the
335
        // remote's alias if the option-scid-alias feature bit was negotiated.
336
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
337
                error)
338

339
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
340
        // This is used for channels that have negotiated the option-scid-alias
341
        // feature bit.
342
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
343
                lnwire.ShortChannelID, error)
344

345
        // GetAlias allows the gossiper to look up the peer's alias for a given
346
        // ChannelID. This is used to sign updates for them if the channel has
347
        // no AuthProof and the option-scid-alias feature bit was negotiated.
348
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
349

350
        // FindChannel allows the gossiper to find a channel that we're party
351
        // to without iterating over the entire set of open channels.
352
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
353
                *channeldb.OpenChannel, error)
354

355
        // IsStillZombieChannel takes the timestamps of the latest channel
356
        // updates for a channel and returns true if the channel should be
357
        // considered a zombie based on these timestamps.
358
        IsStillZombieChannel func(time.Time, time.Time) bool
359

360
        // chainHash is a hash that indicates which resident chain of the
361
        // AuthenticatedGossiper. Any announcements that don't match this
362
        // chain hash will be ignored. This is an internal config value obtained
363
        // from ChainParams.
364
        chainHash *chainhash.Hash
365
}
366

367
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
368
// used to let the caller of the lru.Cache know if a message has already been
369
// processed or not.
370
type processedNetworkMsg struct {
371
        processed bool
372
        msg       *networkMsg
373
}
374

375
// cachedNetworkMsg is a wrapper around a network message that can be used with
376
// *lru.Cache.
377
type cachedNetworkMsg struct {
378
        msgs []*processedNetworkMsg
379
}
380

381
// Size returns the "size" of an entry. We return the number of items as we
382
// just want to limit the total amount of entries rather than do accurate size
383
// accounting.
384
func (c *cachedNetworkMsg) Size() (uint64, error) {
4✔
385
        return uint64(len(c.msgs)), nil
4✔
386
}
4✔
387

388
// rejectCacheKey is the cache key that we'll use to track announcements we've
389
// recently rejected.
390
type rejectCacheKey struct {
391
        pubkey [33]byte
392
        chanID uint64
393
}
394

395
// newRejectCacheKey returns a new cache key for the reject cache.
396
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
4✔
397
        k := rejectCacheKey{
4✔
398
                chanID: cid,
4✔
399
                pubkey: pub,
4✔
400
        }
4✔
401

4✔
402
        return k
4✔
403
}
4✔
404

405
// sourceToPub returns a serialized-compressed public key for use in the reject
406
// cache.
407
func sourceToPub(pk *btcec.PublicKey) [33]byte {
4✔
408
        var pub [33]byte
4✔
409
        copy(pub[:], pk.SerializeCompressed())
4✔
410
        return pub
4✔
411
}
4✔
412

413
// cachedReject is the empty value used to track the value for rejects.
414
type cachedReject struct {
415
}
416

417
// Size returns the "size" of an entry. We return 1 as we just want to limit
418
// the total size.
419
func (c *cachedReject) Size() (uint64, error) {
×
420
        return 1, nil
×
421
}
×
422

423
// AuthenticatedGossiper is a subsystem which is responsible for receiving
424
// announcements, validating them and applying the changes to router, syncing
425
// lightning network with newly connected nodes, broadcasting announcements
426
// after validation, negotiating the channel announcement proofs exchange and
427
// handling the premature announcements. All outgoing announcements are
428
// expected to be properly signed as dictated in BOLT#7, additionally, all
429
// incoming message are expected to be well formed and signed. Invalid messages
430
// will be rejected by this struct.
431
type AuthenticatedGossiper struct {
432
        // Parameters which are needed to properly handle the start and stop of
433
        // the service.
434
        started sync.Once
435
        stopped sync.Once
436

437
        // bestHeight is the height of the block at the tip of the main chain
438
        // as we know it. Accesses *MUST* be done with the gossiper's lock
439
        // held.
440
        bestHeight uint32
441

442
        quit chan struct{}
443
        wg   sync.WaitGroup
444

445
        // cfg is a copy of the configuration struct that the gossiper service
446
        // was initialized with.
447
        cfg *Config
448

449
        // blockEpochs encapsulates a stream of block epochs that are sent at
450
        // every new block height.
451
        blockEpochs *chainntnfs.BlockEpochEvent
452

453
        // prematureChannelUpdates is a map of ChannelUpdates we have received
454
        // that wasn't associated with any channel we know about.  We store
455
        // them temporarily, such that we can reprocess them when a
456
        // ChannelAnnouncement for the channel is received.
457
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
458

459
        // banman tracks our peer's ban status.
460
        banman *banman
461

462
        // networkMsgs is a channel that carries new network broadcasted
463
        // message from outside the gossiper service to be processed by the
464
        // networkHandler.
465
        networkMsgs chan *networkMsg
466

467
        // futureMsgs is a list of premature network messages that have a block
468
        // height specified in the future. We will save them and resend it to
469
        // the chan networkMsgs once the block height has reached. The cached
470
        // map format is,
471
        //   {msgID1: msg1, msgID2: msg2, ...}
472
        futureMsgs *futureMsgCache
473

474
        // chanPolicyUpdates is a channel that requests to update the
475
        // forwarding policy of a set of channels is sent over.
476
        chanPolicyUpdates chan *chanPolicyUpdateRequest
477

478
        // selfKey is the identity public key of the backing Lightning node.
479
        selfKey *btcec.PublicKey
480

481
        // selfKeyLoc is the locator for the identity public key of the backing
482
        // Lightning node.
483
        selfKeyLoc keychain.KeyLocator
484

485
        // channelMtx is used to restrict the database access to one
486
        // goroutine per channel ID. This is done to ensure that when
487
        // the gossiper is handling an announcement, the db state stays
488
        // consistent between when the DB is first read until it's written.
489
        channelMtx *multimutex.Mutex[uint64]
490

491
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
492

493
        // syncMgr is a subsystem responsible for managing the gossip syncers
494
        // for peers currently connected. When a new peer is connected, the
495
        // manager will create its accompanying gossip syncer and determine
496
        // whether it should have an activeSync or passiveSync sync type based
497
        // on how many other gossip syncers are currently active. Any activeSync
498
        // gossip syncers are started in a round-robin manner to ensure we're
499
        // not syncing with multiple peers at the same time.
500
        syncMgr *SyncManager
501

502
        // reliableSender is a subsystem responsible for handling reliable
503
        // message send requests to peers. This should only be used for channels
504
        // that are unadvertised at the time of handling the message since if it
505
        // is advertised, then peers should be able to get the message from the
506
        // network.
507
        reliableSender *reliableSender
508

509
        // chanUpdateRateLimiter contains rate limiters for each direction of
510
        // a channel update we've processed. We'll use these to determine
511
        // whether we should accept a new update for a specific channel and
512
        // direction.
513
        //
514
        // NOTE: This map must be synchronized with the main
515
        // AuthenticatedGossiper lock.
516
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
517

518
        sync.Mutex
519
}
520

521
// New creates a new AuthenticatedGossiper instance, initialized with the
522
// passed configuration parameters.
523
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
4✔
524
        cfg.chainHash = cfg.ChainParams.GenesisHash
4✔
525

4✔
526
        gossiper := &AuthenticatedGossiper{
4✔
527
                selfKey:           selfKeyDesc.PubKey,
4✔
528
                selfKeyLoc:        selfKeyDesc.KeyLocator,
4✔
529
                cfg:               &cfg,
4✔
530
                networkMsgs:       make(chan *networkMsg),
4✔
531
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
4✔
532
                quit:              make(chan struct{}),
4✔
533
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
4✔
534
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: lll
4✔
535
                        maxPrematureUpdates,
4✔
536
                ),
4✔
537
                channelMtx: multimutex.NewMutex[uint64](),
4✔
538
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
4✔
539
                        maxRejectedUpdates,
4✔
540
                ),
4✔
541
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
4✔
542
                banman:                newBanman(),
4✔
543
        }
4✔
544

4✔
545
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
4✔
546
                ChainHash:               *cfg.chainHash,
4✔
547
                ChanSeries:              cfg.ChanSeries,
4✔
548
                RotateTicker:            cfg.RotateTicker,
4✔
549
                HistoricalSyncTicker:    cfg.HistoricalSyncTicker,
4✔
550
                NumActiveSyncers:        cfg.NumActiveSyncers,
4✔
551
                NoTimestampQueries:      cfg.NoTimestampQueries,
4✔
552
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
4✔
553
                BestHeight:              gossiper.latestHeight,
4✔
554
                PinnedSyncers:           cfg.PinnedSyncers,
4✔
555
                IsStillZombieChannel:    cfg.IsStillZombieChannel,
4✔
556
        })
4✔
557

4✔
558
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
4✔
559
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
4✔
560
                NotifyWhenOffline: cfg.NotifyWhenOffline,
4✔
561
                MessageStore:      cfg.MessageStore,
4✔
562
                IsMsgStale:        gossiper.isMsgStale,
4✔
563
        })
4✔
564

4✔
565
        return gossiper
4✔
566
}
4✔
567

568
// EdgeWithInfo contains the information that is required to update an edge.
569
type EdgeWithInfo struct {
570
        // Info describes the channel.
571
        Info *models.ChannelEdgeInfo
572

573
        // Edge describes the policy in one direction of the channel.
574
        Edge *models.ChannelEdgePolicy
575
}
576

577
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
578
// specified edge updates. Updates are done in two stages: first, the
579
// AuthenticatedGossiper ensures the update has been committed by dependent
580
// sub-systems, then it signs and broadcasts new updates to the network. A
581
// mapping between outpoints and updated channel policies is returned, which is
582
// used to update the forwarding policies of the underlying links.
583
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
584
        edgesToUpdate []EdgeWithInfo) error {
4✔
585

4✔
586
        errChan := make(chan error, 1)
4✔
587
        policyUpdate := &chanPolicyUpdateRequest{
4✔
588
                edgesToUpdate: edgesToUpdate,
4✔
589
                errChan:       errChan,
4✔
590
        }
4✔
591

4✔
592
        select {
4✔
593
        case d.chanPolicyUpdates <- policyUpdate:
4✔
594
                err := <-errChan
4✔
595
                return err
4✔
596
        case <-d.quit:
×
597
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
598
        }
599
}
600

601
// Start spawns network messages handler goroutine and registers on new block
602
// notifications in order to properly handle the premature announcements.
603
func (d *AuthenticatedGossiper) Start() error {
4✔
604
        var err error
4✔
605
        d.started.Do(func() {
8✔
606
                log.Info("Authenticated Gossiper starting")
4✔
607
                err = d.start()
4✔
608
        })
4✔
609
        return err
4✔
610
}
611

612
func (d *AuthenticatedGossiper) start() error {
4✔
613
        // First we register for new notifications of newly discovered blocks.
4✔
614
        // We do this immediately so we'll later be able to consume any/all
4✔
615
        // blocks which were discovered.
4✔
616
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
4✔
617
        if err != nil {
4✔
618
                return err
×
619
        }
×
620
        d.blockEpochs = blockEpochs
4✔
621

4✔
622
        height, err := d.cfg.Graph.CurrentBlockHeight()
4✔
623
        if err != nil {
4✔
624
                return err
×
625
        }
×
626
        d.bestHeight = height
4✔
627

4✔
628
        // Start the reliable sender. In case we had any pending messages ready
4✔
629
        // to be sent when the gossiper was last shut down, we must continue on
4✔
630
        // our quest to deliver them to their respective peers.
4✔
631
        if err := d.reliableSender.Start(); err != nil {
4✔
632
                return err
×
633
        }
×
634

635
        d.syncMgr.Start()
4✔
636

4✔
637
        d.banman.start()
4✔
638

4✔
639
        // Start receiving blocks in its dedicated goroutine.
4✔
640
        d.wg.Add(2)
4✔
641
        go d.syncBlockHeight()
4✔
642
        go d.networkHandler()
4✔
643

4✔
644
        return nil
4✔
645
}
646

647
// syncBlockHeight syncs the best block height for the gossiper by reading
648
// blockEpochs.
649
//
650
// NOTE: must be run as a goroutine.
651
func (d *AuthenticatedGossiper) syncBlockHeight() {
4✔
652
        defer d.wg.Done()
4✔
653

4✔
654
        for {
8✔
655
                select {
4✔
656
                // A new block has arrived, so we can re-process the previously
657
                // premature announcements.
658
                case newBlock, ok := <-d.blockEpochs.Epochs:
4✔
659
                        // If the channel has been closed, then this indicates
4✔
660
                        // the daemon is shutting down, so we exit ourselves.
4✔
661
                        if !ok {
8✔
662
                                return
4✔
663
                        }
4✔
664

665
                        // Once a new block arrives, we update our running
666
                        // track of the height of the chain tip.
667
                        d.Lock()
4✔
668
                        blockHeight := uint32(newBlock.Height)
4✔
669
                        d.bestHeight = blockHeight
4✔
670
                        d.Unlock()
4✔
671

4✔
672
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
4✔
673
                                newBlock.Hash)
4✔
674

4✔
675
                        // Resend future messages, if any.
4✔
676
                        d.resendFutureMessages(blockHeight)
4✔
677

678
                case <-d.quit:
×
679
                        return
×
680
                }
681
        }
682
}
683

684
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
685
// the unique ID when saving the message.
686
type futureMsgCache struct {
687
        *lru.Cache[uint64, *cachedFutureMsg]
688

689
        // msgID is a monotonically increased integer.
690
        msgID atomic.Uint64
691
}
692

693
// nextMsgID returns a unique message ID.
694
func (f *futureMsgCache) nextMsgID() uint64 {
2✔
695
        return f.msgID.Add(1)
2✔
696
}
2✔
697

698
// newFutureMsgCache creates a new future message cache with the underlying lru
699
// cache being initialized with the specified capacity.
700
func newFutureMsgCache(capacity uint64) *futureMsgCache {
4✔
701
        // Create a new cache.
4✔
702
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
4✔
703

4✔
704
        return &futureMsgCache{
4✔
705
                Cache: cache,
4✔
706
        }
4✔
707
}
4✔
708

709
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
710
type cachedFutureMsg struct {
711
        // msg is the network message.
712
        msg *networkMsg
713

714
        // height is the block height.
715
        height uint32
716
}
717

718
// Size returns the size of the message.
719
func (c *cachedFutureMsg) Size() (uint64, error) {
2✔
720
        // Return a constant 1.
2✔
721
        return 1, nil
2✔
722
}
2✔
723

724
// resendFutureMessages takes a block height, resends all the future messages
725
// found below and equal to that height and deletes those messages found in the
726
// gossiper's futureMsgs.
727
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
4✔
728
        var (
4✔
729
                // msgs are the target messages.
4✔
730
                msgs []*networkMsg
4✔
731

4✔
732
                // keys are the target messages' caching keys.
4✔
733
                keys []uint64
4✔
734
        )
4✔
735

4✔
736
        // filterMsgs is the visitor used when iterating the future cache.
4✔
737
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
6✔
738
                if cmsg.height <= height {
4✔
739
                        msgs = append(msgs, cmsg.msg)
2✔
740
                        keys = append(keys, k)
2✔
741
                }
2✔
742

743
                return true
2✔
744
        }
745

746
        // Filter out the target messages.
747
        d.futureMsgs.Range(filterMsgs)
4✔
748

4✔
749
        // Return early if no messages found.
4✔
750
        if len(msgs) == 0 {
8✔
751
                return
4✔
752
        }
4✔
753

754
        // Remove the filtered messages.
755
        for _, key := range keys {
4✔
756
                d.futureMsgs.Delete(key)
2✔
757
        }
2✔
758

759
        log.Debugf("Resending %d network messages at height %d",
2✔
760
                len(msgs), height)
2✔
761

2✔
762
        for _, msg := range msgs {
4✔
763
                select {
2✔
764
                case d.networkMsgs <- msg:
2✔
765
                case <-d.quit:
×
766
                        msg.err <- ErrGossiperShuttingDown
×
767
                }
768
        }
769
}
770

771
// Stop signals any active goroutines for a graceful closure.
772
func (d *AuthenticatedGossiper) Stop() error {
4✔
773
        d.stopped.Do(func() {
8✔
774
                log.Info("Authenticated gossiper shutting down...")
4✔
775
                defer log.Debug("Authenticated gossiper shutdown complete")
4✔
776

4✔
777
                d.stop()
4✔
778
        })
4✔
779
        return nil
4✔
780
}
781

782
func (d *AuthenticatedGossiper) stop() {
4✔
783
        log.Debug("Authenticated Gossiper is stopping")
4✔
784
        defer log.Debug("Authenticated Gossiper stopped")
4✔
785

4✔
786
        // `blockEpochs` is only initialized in the start routine so we make
4✔
787
        // sure we don't panic here in the case where the `Stop` method is
4✔
788
        // called when the `Start` method does not complete.
4✔
789
        if d.blockEpochs != nil {
8✔
790
                d.blockEpochs.Cancel()
4✔
791
        }
4✔
792

793
        d.syncMgr.Stop()
4✔
794

4✔
795
        d.banman.stop()
4✔
796

4✔
797
        close(d.quit)
4✔
798
        d.wg.Wait()
4✔
799

4✔
800
        // We'll stop our reliable sender after all of the gossiper's goroutines
4✔
801
        // have exited to ensure nothing can cause it to continue executing.
4✔
802
        d.reliableSender.Stop()
4✔
803
}
804

805
// TODO(roasbeef): need method to get current gossip timestamp?
806
//  * using mtx, check time rotate forward is needed?
807

808
// ProcessRemoteAnnouncement sends a new remote announcement message along with
809
// the peer that sent the routing message. The announcement will be processed
810
// then added to a queue for batched trickled announcement to all connected
811
// peers.  Remote channel announcements should contain the announcement proof
812
// and be fully validated.
813
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
814
        peer lnpeer.Peer) chan error {
4✔
815

4✔
816
        errChan := make(chan error, 1)
4✔
817

4✔
818
        // For messages in the known set of channel series queries, we'll
4✔
819
        // dispatch the message directly to the GossipSyncer, and skip the main
4✔
820
        // processing loop.
4✔
821
        switch m := msg.(type) {
4✔
822
        case *lnwire.QueryShortChanIDs,
823
                *lnwire.QueryChannelRange,
824
                *lnwire.ReplyChannelRange,
825
                *lnwire.ReplyShortChanIDsEnd:
4✔
826

4✔
827
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
4✔
828
                if !ok {
4✔
829
                        log.Warnf("Gossip syncer for peer=%x not found",
×
830
                                peer.PubKey())
×
831

×
832
                        errChan <- ErrGossipSyncerNotFound
×
833
                        return errChan
×
834
                }
×
835

836
                // If we've found the message target, then we'll dispatch the
837
                // message directly to it.
838
                syncer.ProcessQueryMsg(m, peer.QuitSignal())
4✔
839

4✔
840
                errChan <- nil
4✔
841
                return errChan
4✔
842

843
        // If a peer is updating its current update horizon, then we'll dispatch
844
        // that directly to the proper GossipSyncer.
845
        case *lnwire.GossipTimestampRange:
4✔
846
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
4✔
847
                if !ok {
4✔
848
                        log.Warnf("Gossip syncer for peer=%x not found",
×
849
                                peer.PubKey())
×
850

×
851
                        errChan <- ErrGossipSyncerNotFound
×
852
                        return errChan
×
853
                }
×
854

855
                // If we've found the message target, then we'll dispatch the
856
                // message directly to it.
857
                if err := syncer.ApplyGossipFilter(m); err != nil {
4✔
858
                        log.Warnf("Unable to apply gossip filter for peer=%x: "+
×
859
                                "%v", peer.PubKey(), err)
×
860

×
861
                        errChan <- err
×
862
                        return errChan
×
863
                }
×
864

865
                errChan <- nil
4✔
866
                return errChan
4✔
867

868
        // To avoid inserting edges in the graph for our own channels that we
869
        // have already closed, we ignore such channel announcements coming
870
        // from the remote.
871
        case *lnwire.ChannelAnnouncement1:
4✔
872
                ownKey := d.selfKey.SerializeCompressed()
4✔
873
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
4✔
874
                        "for own channel")
4✔
875

4✔
876
                if bytes.Equal(m.NodeID1[:], ownKey) ||
4✔
877
                        bytes.Equal(m.NodeID2[:], ownKey) {
8✔
878

4✔
879
                        log.Warn(ownErr)
4✔
880
                        errChan <- ownErr
4✔
881
                        return errChan
4✔
882
                }
4✔
883
        }
884

885
        nMsg := &networkMsg{
4✔
886
                msg:      msg,
4✔
887
                isRemote: true,
4✔
888
                peer:     peer,
4✔
889
                source:   peer.IdentityKey(),
4✔
890
                err:      errChan,
4✔
891
        }
4✔
892

4✔
893
        select {
4✔
894
        case d.networkMsgs <- nMsg:
4✔
895

896
        // If the peer that sent us this error is quitting, then we don't need
897
        // to send back an error and can return immediately.
898
        case <-peer.QuitSignal():
×
899
                return nil
×
900
        case <-d.quit:
×
901
                nMsg.err <- ErrGossiperShuttingDown
×
902
        }
903

904
        return nMsg.err
4✔
905
}
906

907
// ProcessLocalAnnouncement sends a new remote announcement message along with
908
// the peer that sent the routing message. The announcement will be processed
909
// then added to a queue for batched trickled announcement to all connected
910
// peers.  Local channel announcements don't contain the announcement proof and
911
// will not be fully validated. Once the channel proofs are received, the
912
// entire channel announcement and update messages will be re-constructed and
913
// broadcast to the rest of the network.
914
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
915
        optionalFields ...OptionalMsgField) chan error {
4✔
916

4✔
917
        optionalMsgFields := &optionalMsgFields{}
4✔
918
        optionalMsgFields.apply(optionalFields...)
4✔
919

4✔
920
        nMsg := &networkMsg{
4✔
921
                msg:               msg,
4✔
922
                optionalMsgFields: optionalMsgFields,
4✔
923
                isRemote:          false,
4✔
924
                source:            d.selfKey,
4✔
925
                err:               make(chan error, 1),
4✔
926
        }
4✔
927

4✔
928
        select {
4✔
929
        case d.networkMsgs <- nMsg:
4✔
930
        case <-d.quit:
×
931
                nMsg.err <- ErrGossiperShuttingDown
×
932
        }
933

934
        return nMsg.err
4✔
935
}
936

937
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
938
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
939
// tuple.
940
type channelUpdateID struct {
941
        // channelID represents the set of data which is needed to
942
        // retrieve all necessary data to validate the channel existence.
943
        channelID lnwire.ShortChannelID
944

945
        // Flags least-significant bit must be set to 0 if the creating node
946
        // corresponds to the first node in the previously sent channel
947
        // announcement and 1 otherwise.
948
        flags lnwire.ChanUpdateChanFlags
949
}
950

951
// msgWithSenders is a wrapper struct around a message, and the set of peers
952
// that originally sent us this message. Using this struct, we can ensure that
953
// we don't re-send a message to the peer that sent it to us in the first
954
// place.
955
type msgWithSenders struct {
956
        // msg is the wire message itself.
957
        msg lnwire.Message
958

959
        // isLocal is true if this was a message that originated locally. We'll
960
        // use this to bypass our normal checks to ensure we prioritize sending
961
        // out our own updates.
962
        isLocal bool
963

964
        // sender is the set of peers that sent us this message.
965
        senders map[route.Vertex]struct{}
966
}
967

968
// mergeSyncerMap is used to merge the set of senders of a particular message
969
// with peers that we have an active GossipSyncer with. We do this to ensure
970
// that we don't broadcast messages to any peers that we have active gossip
971
// syncers for.
972
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
4✔
973
        for peerPub := range syncers {
8✔
974
                m.senders[peerPub] = struct{}{}
4✔
975
        }
4✔
976
}
977

978
// deDupedAnnouncements de-duplicates announcements that have been added to the
979
// batch. Internally, announcements are stored in three maps
980
// (one each for channel announcements, channel updates, and node
981
// announcements). These maps keep track of unique announcements and ensure no
982
// announcements are duplicated. We keep the three message types separate, such
983
// that we can send channel announcements first, then channel updates, and
984
// finally node announcements when it's time to broadcast them.
985
type deDupedAnnouncements struct {
986
        // channelAnnouncements are identified by the short channel id field.
987
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
988

989
        // channelUpdates are identified by the channel update id field.
990
        channelUpdates map[channelUpdateID]msgWithSenders
991

992
        // nodeAnnouncements are identified by the Vertex field.
993
        nodeAnnouncements map[route.Vertex]msgWithSenders
994

995
        sync.Mutex
996
}
997

998
// Reset operates on deDupedAnnouncements to reset the storage of
999
// announcements.
1000
func (d *deDupedAnnouncements) Reset() {
4✔
1001
        d.Lock()
4✔
1002
        defer d.Unlock()
4✔
1003

4✔
1004
        d.reset()
4✔
1005
}
4✔
1006

1007
// reset is the private version of the Reset method. We have this so we can
1008
// call this method within method that are already holding the lock.
1009
func (d *deDupedAnnouncements) reset() {
4✔
1010
        // Storage of each type of announcement (channel announcements, channel
4✔
1011
        // updates, node announcements) is set to an empty map where the
4✔
1012
        // appropriate key points to the corresponding lnwire.Message.
4✔
1013
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
4✔
1014
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
4✔
1015
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
4✔
1016
}
4✔
1017

1018
// addMsg adds a new message to the current batch. If the message is already
1019
// present in the current batch, then this new instance replaces the latter,
1020
// and the set of senders is updated to reflect which node sent us this
1021
// message.
1022
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
4✔
1023
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
4✔
1024

4✔
1025
        // Depending on the message type (channel announcement, channel update,
4✔
1026
        // or node announcement), the message is added to the corresponding map
4✔
1027
        // in deDupedAnnouncements. Because each identifying key can have at
4✔
1028
        // most one value, the announcements are de-duplicated, with newer ones
4✔
1029
        // replacing older ones.
4✔
1030
        switch msg := message.msg.(type) {
4✔
1031

1032
        // Channel announcements are identified by the short channel id field.
1033
        case *lnwire.ChannelAnnouncement1:
4✔
1034
                deDupKey := msg.ShortChannelID
4✔
1035
                sender := route.NewVertex(message.source)
4✔
1036

4✔
1037
                mws, ok := d.channelAnnouncements[deDupKey]
4✔
1038
                if !ok {
8✔
1039
                        mws = msgWithSenders{
4✔
1040
                                msg:     msg,
4✔
1041
                                isLocal: !message.isRemote,
4✔
1042
                                senders: make(map[route.Vertex]struct{}),
4✔
1043
                        }
4✔
1044
                        mws.senders[sender] = struct{}{}
4✔
1045

4✔
1046
                        d.channelAnnouncements[deDupKey] = mws
4✔
1047

4✔
1048
                        return
4✔
1049
                }
4✔
1050

1051
                mws.msg = msg
×
1052
                mws.senders[sender] = struct{}{}
×
1053
                d.channelAnnouncements[deDupKey] = mws
×
1054

1055
        // Channel updates are identified by the (short channel id,
1056
        // channelflags) tuple.
1057
        case *lnwire.ChannelUpdate1:
4✔
1058
                sender := route.NewVertex(message.source)
4✔
1059
                deDupKey := channelUpdateID{
4✔
1060
                        msg.ShortChannelID,
4✔
1061
                        msg.ChannelFlags,
4✔
1062
                }
4✔
1063

4✔
1064
                oldTimestamp := uint32(0)
4✔
1065
                mws, ok := d.channelUpdates[deDupKey]
4✔
1066
                if ok {
4✔
1067
                        // If we already have seen this message, record its
×
1068
                        // timestamp.
×
1069
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
×
1070
                        if !ok {
×
1071
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1072
                                        "got: %T", mws.msg)
×
1073

×
1074
                                return
×
1075
                        }
×
1076

1077
                        oldTimestamp = update.Timestamp
×
1078
                }
1079

1080
                // If we already had this message with a strictly newer
1081
                // timestamp, then we'll just discard the message we got.
1082
                if oldTimestamp > msg.Timestamp {
4✔
1083
                        log.Debugf("Ignored outdated network message: "+
×
1084
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
×
1085
                        return
×
1086
                }
×
1087

1088
                // If the message we just got is newer than what we previously
1089
                // have seen, or this is the first time we see it, then we'll
1090
                // add it to our map of announcements.
1091
                if oldTimestamp < msg.Timestamp {
8✔
1092
                        mws = msgWithSenders{
4✔
1093
                                msg:     msg,
4✔
1094
                                isLocal: !message.isRemote,
4✔
1095
                                senders: make(map[route.Vertex]struct{}),
4✔
1096
                        }
4✔
1097

4✔
1098
                        // We'll mark the sender of the message in the
4✔
1099
                        // senders map.
4✔
1100
                        mws.senders[sender] = struct{}{}
4✔
1101

4✔
1102
                        d.channelUpdates[deDupKey] = mws
4✔
1103

4✔
1104
                        return
4✔
1105
                }
4✔
1106

1107
                // Lastly, if we had seen this exact message from before, with
1108
                // the same timestamp, we'll add the sender to the map of
1109
                // senders, such that we can skip sending this message back in
1110
                // the next batch.
1111
                mws.msg = msg
×
1112
                mws.senders[sender] = struct{}{}
×
1113
                d.channelUpdates[deDupKey] = mws
×
1114

1115
        // Node announcements are identified by the Vertex field.  Use the
1116
        // NodeID to create the corresponding Vertex.
1117
        case *lnwire.NodeAnnouncement:
4✔
1118
                sender := route.NewVertex(message.source)
4✔
1119
                deDupKey := route.Vertex(msg.NodeID)
4✔
1120

4✔
1121
                // We do the same for node announcements as we did for channel
4✔
1122
                // updates, as they also carry a timestamp.
4✔
1123
                oldTimestamp := uint32(0)
4✔
1124
                mws, ok := d.nodeAnnouncements[deDupKey]
4✔
1125
                if ok {
8✔
1126
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
4✔
1127
                }
4✔
1128

1129
                // Discard the message if it's old.
1130
                if oldTimestamp > msg.Timestamp {
8✔
1131
                        return
4✔
1132
                }
4✔
1133

1134
                // Replace if it's newer.
1135
                if oldTimestamp < msg.Timestamp {
8✔
1136
                        mws = msgWithSenders{
4✔
1137
                                msg:     msg,
4✔
1138
                                isLocal: !message.isRemote,
4✔
1139
                                senders: make(map[route.Vertex]struct{}),
4✔
1140
                        }
4✔
1141

4✔
1142
                        mws.senders[sender] = struct{}{}
4✔
1143

4✔
1144
                        d.nodeAnnouncements[deDupKey] = mws
4✔
1145

4✔
1146
                        return
4✔
1147
                }
4✔
1148

1149
                // Add to senders map if it's the same as we had.
1150
                mws.msg = msg
4✔
1151
                mws.senders[sender] = struct{}{}
4✔
1152
                d.nodeAnnouncements[deDupKey] = mws
4✔
1153
        }
1154
}
1155

1156
// AddMsgs is a helper method to add multiple messages to the announcement
1157
// batch.
1158
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
4✔
1159
        d.Lock()
4✔
1160
        defer d.Unlock()
4✔
1161

4✔
1162
        for _, msg := range msgs {
8✔
1163
                d.addMsg(msg)
4✔
1164
        }
4✔
1165
}
1166

1167
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1168
// to broadcast next into messages that are locally sourced and those that are
1169
// sourced remotely.
1170
type msgsToBroadcast struct {
1171
        // localMsgs is the set of messages we created locally.
1172
        localMsgs []msgWithSenders
1173

1174
        // remoteMsgs is the set of messages that we received from a remote
1175
        // party.
1176
        remoteMsgs []msgWithSenders
1177
}
1178

1179
// addMsg adds a new message to the appropriate sub-slice.
1180
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
4✔
1181
        if msg.isLocal {
8✔
1182
                m.localMsgs = append(m.localMsgs, msg)
4✔
1183
        } else {
8✔
1184
                m.remoteMsgs = append(m.remoteMsgs, msg)
4✔
1185
        }
4✔
1186
}
1187

1188
// isEmpty returns true if the batch is empty.
1189
func (m *msgsToBroadcast) isEmpty() bool {
4✔
1190
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
4✔
1191
}
4✔
1192

1193
// length returns the length of the combined message set.
1194
func (m *msgsToBroadcast) length() int {
×
1195
        return len(m.localMsgs) + len(m.remoteMsgs)
×
1196
}
×
1197

1198
// Emit returns the set of de-duplicated announcements to be sent out during
1199
// the next announcement epoch, in the order of channel announcements, channel
1200
// updates, and node announcements. Each message emitted, contains the set of
1201
// peers that sent us the message. This way, we can ensure that we don't waste
1202
// bandwidth by re-sending a message to the peer that sent it to us in the
1203
// first place. Additionally, the set of stored messages are reset.
1204
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
4✔
1205
        d.Lock()
4✔
1206
        defer d.Unlock()
4✔
1207

4✔
1208
        // Get the total number of announcements.
4✔
1209
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
4✔
1210
                len(d.nodeAnnouncements)
4✔
1211

4✔
1212
        // Create an empty array of lnwire.Messages with a length equal to
4✔
1213
        // the total number of announcements.
4✔
1214
        msgs := msgsToBroadcast{
4✔
1215
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
4✔
1216
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
4✔
1217
        }
4✔
1218

4✔
1219
        // Add the channel announcements to the array first.
4✔
1220
        for _, message := range d.channelAnnouncements {
8✔
1221
                msgs.addMsg(message)
4✔
1222
        }
4✔
1223

1224
        // Then add the channel updates.
1225
        for _, message := range d.channelUpdates {
8✔
1226
                msgs.addMsg(message)
4✔
1227
        }
4✔
1228

1229
        // Finally add the node announcements.
1230
        for _, message := range d.nodeAnnouncements {
8✔
1231
                msgs.addMsg(message)
4✔
1232
        }
4✔
1233

1234
        d.reset()
4✔
1235

4✔
1236
        // Return the array of lnwire.messages.
4✔
1237
        return msgs
4✔
1238
}
1239

1240
// calculateSubBatchSize is a helper function that calculates the size to break
1241
// down the batchSize into.
1242
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1243
        minimumBatchSize, batchSize int) int {
4✔
1244
        if subBatchDelay > totalDelay {
4✔
1245
                return batchSize
×
1246
        }
×
1247

1248
        subBatchSize := (batchSize*int(subBatchDelay) +
4✔
1249
                int(totalDelay) - 1) / int(totalDelay)
4✔
1250

4✔
1251
        if subBatchSize < minimumBatchSize {
8✔
1252
                return minimumBatchSize
4✔
1253
        }
4✔
1254

1255
        return subBatchSize
×
1256
}
1257

1258
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1259
// this variable so the function can be mocked in our test.
1260
var batchSizeCalculator = calculateSubBatchSize
1261

1262
// splitAnnouncementBatches takes an exiting list of announcements and
1263
// decomposes it into sub batches controlled by the `subBatchSize`.
1264
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1265
        announcementBatch []msgWithSenders) [][]msgWithSenders {
4✔
1266

4✔
1267
        subBatchSize := batchSizeCalculator(
4✔
1268
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
4✔
1269
                d.cfg.MinimumBatchSize, len(announcementBatch),
4✔
1270
        )
4✔
1271

4✔
1272
        var splitAnnouncementBatch [][]msgWithSenders
4✔
1273

4✔
1274
        for subBatchSize < len(announcementBatch) {
8✔
1275
                // For slicing with minimal allocation
4✔
1276
                // https://github.com/golang/go/wiki/SliceTricks
4✔
1277
                announcementBatch, splitAnnouncementBatch =
4✔
1278
                        announcementBatch[subBatchSize:],
4✔
1279
                        append(splitAnnouncementBatch,
4✔
1280
                                announcementBatch[0:subBatchSize:subBatchSize])
4✔
1281
        }
4✔
1282
        splitAnnouncementBatch = append(
4✔
1283
                splitAnnouncementBatch, announcementBatch,
4✔
1284
        )
4✔
1285

4✔
1286
        return splitAnnouncementBatch
4✔
1287
}
1288

1289
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1290
// split size, and then sends out all items to the set of target peers. Locally
1291
// generated announcements are always sent before remotely generated
1292
// announcements.
1293
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
1294
        annBatch msgsToBroadcast) {
4✔
1295

4✔
1296
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
4✔
1297
        // duration to delay the sending of next announcement batch.
4✔
1298
        delayNextBatch := func() {
8✔
1299
                select {
4✔
1300
                case <-time.After(d.cfg.SubBatchDelay):
4✔
1301
                case <-d.quit:
×
1302
                        return
×
1303
                }
1304
        }
1305

1306
        // Fetch the local and remote announcements.
1307
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
4✔
1308
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
4✔
1309

4✔
1310
        d.wg.Add(1)
4✔
1311
        go func() {
8✔
1312
                defer d.wg.Done()
4✔
1313

4✔
1314
                log.Debugf("Broadcasting %v new local announcements in %d "+
4✔
1315
                        "sub batches", len(annBatch.localMsgs),
4✔
1316
                        len(localBatches))
4✔
1317

4✔
1318
                // Send out the local announcements first.
4✔
1319
                for _, annBatch := range localBatches {
8✔
1320
                        d.sendLocalBatch(annBatch)
4✔
1321
                        delayNextBatch()
4✔
1322
                }
4✔
1323

1324
                log.Debugf("Broadcasting %v new remote announcements in %d "+
4✔
1325
                        "sub batches", len(annBatch.remoteMsgs),
4✔
1326
                        len(remoteBatches))
4✔
1327

4✔
1328
                // Now send the remote announcements.
4✔
1329
                for _, annBatch := range remoteBatches {
8✔
1330
                        d.sendRemoteBatch(annBatch)
4✔
1331
                        delayNextBatch()
4✔
1332
                }
4✔
1333
        }()
1334
}
1335

1336
// sendLocalBatch broadcasts a list of locally generated announcements to our
1337
// peers. For local announcements, we skip the filter and dedup logic and just
1338
// send the announcements out to all our coonnected peers.
1339
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
4✔
1340
        msgsToSend := lnutils.Map(
4✔
1341
                annBatch, func(m msgWithSenders) lnwire.Message {
8✔
1342
                        return m.msg
4✔
1343
                },
4✔
1344
        )
1345

1346
        err := d.cfg.Broadcast(nil, msgsToSend...)
4✔
1347
        if err != nil {
4✔
1348
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1349
        }
×
1350
}
1351

1352
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1353
// peers.
1354
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
4✔
1355
        syncerPeers := d.syncMgr.GossipSyncers()
4✔
1356

4✔
1357
        // We'll first attempt to filter out this new message for all peers
4✔
1358
        // that have active gossip syncers active.
4✔
1359
        for pub, syncer := range syncerPeers {
8✔
1360
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
4✔
1361
                syncer.FilterGossipMsgs(annBatch...)
4✔
1362
        }
4✔
1363

1364
        for _, msgChunk := range annBatch {
8✔
1365
                msgChunk := msgChunk
4✔
1366

4✔
1367
                // With the syncers taken care of, we'll merge the sender map
4✔
1368
                // with the set of syncers, so we don't send out duplicate
4✔
1369
                // messages.
4✔
1370
                msgChunk.mergeSyncerMap(syncerPeers)
4✔
1371

4✔
1372
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
4✔
1373
                if err != nil {
4✔
1374
                        log.Errorf("Unable to send batch "+
×
1375
                                "announcements: %v", err)
×
1376
                        continue
×
1377
                }
1378
        }
1379
}
1380

1381
// networkHandler is the primary goroutine that drives this service. The roles
1382
// of this goroutine includes answering queries related to the state of the
1383
// network, syncing up newly connected peers, and also periodically
1384
// broadcasting our latest topology state to all connected peers.
1385
//
1386
// NOTE: This MUST be run as a goroutine.
1387
func (d *AuthenticatedGossiper) networkHandler() {
4✔
1388
        defer d.wg.Done()
4✔
1389

4✔
1390
        // Initialize empty deDupedAnnouncements to store announcement batch.
4✔
1391
        announcements := deDupedAnnouncements{}
4✔
1392
        announcements.Reset()
4✔
1393

4✔
1394
        d.cfg.RetransmitTicker.Resume()
4✔
1395
        defer d.cfg.RetransmitTicker.Stop()
4✔
1396

4✔
1397
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
4✔
1398
        defer trickleTimer.Stop()
4✔
1399

4✔
1400
        // To start, we'll first check to see if there are any stale channel or
4✔
1401
        // node announcements that we need to re-transmit.
4✔
1402
        if err := d.retransmitStaleAnns(time.Now()); err != nil {
4✔
1403
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1404
        }
×
1405

1406
        // We'll use this validation to ensure that we process jobs in their
1407
        // dependency order during parallel validation.
1408
        validationBarrier := graph.NewValidationBarrier(1000, d.quit)
4✔
1409

4✔
1410
        for {
8✔
1411
                select {
4✔
1412
                // A new policy update has arrived. We'll commit it to the
1413
                // sub-systems below us, then craft, sign, and broadcast a new
1414
                // ChannelUpdate for the set of affected clients.
1415
                case policyUpdate := <-d.chanPolicyUpdates:
4✔
1416
                        log.Tracef("Received channel %d policy update requests",
4✔
1417
                                len(policyUpdate.edgesToUpdate))
4✔
1418

4✔
1419
                        // First, we'll now create new fully signed updates for
4✔
1420
                        // the affected channels and also update the underlying
4✔
1421
                        // graph with the new state.
4✔
1422
                        newChanUpdates, err := d.processChanPolicyUpdate(
4✔
1423
                                policyUpdate.edgesToUpdate,
4✔
1424
                        )
4✔
1425
                        policyUpdate.errChan <- err
4✔
1426
                        if err != nil {
4✔
1427
                                log.Errorf("Unable to craft policy updates: %v",
×
1428
                                        err)
×
1429
                                continue
×
1430
                        }
1431

1432
                        // Finally, with the updates committed, we'll now add
1433
                        // them to the announcement batch to be flushed at the
1434
                        // start of the next epoch.
1435
                        announcements.AddMsgs(newChanUpdates...)
4✔
1436

1437
                case announcement := <-d.networkMsgs:
4✔
1438
                        log.Tracef("Received network message: "+
4✔
1439
                                "peer=%v, msg=%s, is_remote=%v",
4✔
1440
                                announcement.peer, announcement.msg.MsgType(),
4✔
1441
                                announcement.isRemote)
4✔
1442

4✔
1443
                        switch announcement.msg.(type) {
4✔
1444
                        // Channel announcement signatures are amongst the only
1445
                        // messages that we'll process serially.
1446
                        case *lnwire.AnnounceSignatures1:
4✔
1447
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
4✔
1448
                                        announcement,
4✔
1449
                                )
4✔
1450
                                log.Debugf("Processed network message %s, "+
4✔
1451
                                        "returned len(announcements)=%v",
4✔
1452
                                        announcement.msg.MsgType(),
4✔
1453
                                        len(emittedAnnouncements))
4✔
1454

4✔
1455
                                if emittedAnnouncements != nil {
8✔
1456
                                        announcements.AddMsgs(
4✔
1457
                                                emittedAnnouncements...,
4✔
1458
                                        )
4✔
1459
                                }
4✔
1460
                                continue
4✔
1461
                        }
1462

1463
                        // If this message was recently rejected, then we won't
1464
                        // attempt to re-process it.
1465
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
4✔
1466
                                announcement.msg,
4✔
1467
                                sourceToPub(announcement.source),
4✔
1468
                        ) {
4✔
1469

×
1470
                                announcement.err <- fmt.Errorf("recently " +
×
1471
                                        "rejected")
×
1472
                                continue
×
1473
                        }
1474

1475
                        // We'll set up any dependent, and wait until a free
1476
                        // slot for this job opens up, this allow us to not
1477
                        // have thousands of goroutines active.
1478
                        validationBarrier.InitJobDependencies(announcement.msg)
4✔
1479

4✔
1480
                        d.wg.Add(1)
4✔
1481
                        go d.handleNetworkMessages(
4✔
1482
                                announcement, &announcements, validationBarrier,
4✔
1483
                        )
4✔
1484

1485
                // The trickle timer has ticked, which indicates we should
1486
                // flush to the network the pending batch of new announcements
1487
                // we've received since the last trickle tick.
1488
                case <-trickleTimer.C:
4✔
1489
                        // Emit the current batch of announcements from
4✔
1490
                        // deDupedAnnouncements.
4✔
1491
                        announcementBatch := announcements.Emit()
4✔
1492

4✔
1493
                        // If the current announcements batch is nil, then we
4✔
1494
                        // have no further work here.
4✔
1495
                        if announcementBatch.isEmpty() {
8✔
1496
                                continue
4✔
1497
                        }
1498

1499
                        // At this point, we have the set of local and remote
1500
                        // announcements we want to send out. We'll do the
1501
                        // batching as normal for both, but for local
1502
                        // announcements, we'll blast them out w/o regard for
1503
                        // our peer's policies so we ensure they propagate
1504
                        // properly.
1505
                        d.splitAndSendAnnBatch(announcementBatch)
4✔
1506

1507
                // The retransmission timer has ticked which indicates that we
1508
                // should check if we need to prune or re-broadcast any of our
1509
                // personal channels or node announcement. This addresses the
1510
                // case of "zombie" channels and channel advertisements that
1511
                // have been dropped, or not properly propagated through the
1512
                // network.
1513
                case tick := <-d.cfg.RetransmitTicker.Ticks():
×
1514
                        if err := d.retransmitStaleAnns(tick); err != nil {
×
1515
                                log.Errorf("unable to rebroadcast stale "+
×
1516
                                        "announcements: %v", err)
×
1517
                        }
×
1518

1519
                // The gossiper has been signalled to exit, to we exit our
1520
                // main loop so the wait group can be decremented.
1521
                case <-d.quit:
4✔
1522
                        return
4✔
1523
                }
1524
        }
1525
}
1526

1527
// handleNetworkMessages is responsible for waiting for dependencies for a
1528
// given network message and processing the message. Once processed, it will
1529
// signal its dependants and add the new announcements to the announce batch.
1530
//
1531
// NOTE: must be run as a goroutine.
1532
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1533
        deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {
4✔
1534

4✔
1535
        defer d.wg.Done()
4✔
1536
        defer vb.CompleteJob()
4✔
1537

4✔
1538
        // We should only broadcast this message forward if it originated from
4✔
1539
        // us or it wasn't received as part of our initial historical sync.
4✔
1540
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
4✔
1541

4✔
1542
        // If this message has an existing dependency, then we'll wait until
4✔
1543
        // that has been fully validated before we proceed.
4✔
1544
        err := vb.WaitForDependants(nMsg.msg)
4✔
1545
        if err != nil {
4✔
1546
                log.Debugf("Validating network message %s got err: %v",
×
1547
                        nMsg.msg.MsgType(), err)
×
1548

×
1549
                if !graph.IsError(
×
1550
                        err,
×
1551
                        graph.ErrVBarrierShuttingDown,
×
1552
                        graph.ErrParentValidationFailed,
×
1553
                ) {
×
1554

×
1555
                        log.Warnf("unexpected error during validation "+
×
1556
                                "barrier shutdown: %v", err)
×
1557
                }
×
1558
                nMsg.err <- err
×
1559

×
1560
                return
×
1561
        }
1562

1563
        // Process the network announcement to determine if this is either a
1564
        // new announcement from our PoV or an edges to a prior vertex/edge we
1565
        // previously proceeded.
1566
        newAnns, allow := d.processNetworkAnnouncement(nMsg)
4✔
1567

4✔
1568
        log.Tracef("Processed network message %s, returned "+
4✔
1569
                "len(announcements)=%v, allowDependents=%v",
4✔
1570
                nMsg.msg.MsgType(), len(newAnns), allow)
4✔
1571

4✔
1572
        // If this message had any dependencies, then we can now signal them to
4✔
1573
        // continue.
4✔
1574
        vb.SignalDependants(nMsg.msg, allow)
4✔
1575

4✔
1576
        // If the announcement was accepted, then add the emitted announcements
4✔
1577
        // to our announce batch to be broadcast once the trickle timer ticks
4✔
1578
        // gain.
4✔
1579
        if newAnns != nil && shouldBroadcast {
8✔
1580
                // TODO(roasbeef): exclude peer that sent.
4✔
1581
                deDuped.AddMsgs(newAnns...)
4✔
1582
        } else if newAnns != nil {
12✔
1583
                log.Trace("Skipping broadcast of announcements received " +
4✔
1584
                        "during initial graph sync")
4✔
1585
        }
4✔
1586
}
1587

1588
// TODO(roasbeef): d/c peers that send updates not on our chain
1589

1590
// InitSyncState is called by outside sub-systems when a connection is
1591
// established to a new peer that understands how to perform channel range
1592
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1593
// needed to handle new queries.
1594
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
4✔
1595
        d.syncMgr.InitSyncState(syncPeer)
4✔
1596
}
4✔
1597

1598
// PruneSyncState is called by outside sub-systems once a peer that we were
1599
// previously connected to has been disconnected. In this case we can stop the
1600
// existing GossipSyncer assigned to the peer and free up resources.
1601
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
4✔
1602
        d.syncMgr.PruneSyncState(peer)
4✔
1603
}
4✔
1604

1605
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1606
// false otherwise, This avoids expensive reprocessing of the message.
1607
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1608
        peerPub [33]byte) bool {
4✔
1609

4✔
1610
        var scid uint64
4✔
1611
        switch m := msg.(type) {
4✔
1612
        case *lnwire.ChannelUpdate1:
4✔
1613
                scid = m.ShortChannelID.ToUint64()
4✔
1614

1615
        case *lnwire.ChannelAnnouncement1:
4✔
1616
                scid = m.ShortChannelID.ToUint64()
4✔
1617

1618
        default:
4✔
1619
                return false
4✔
1620
        }
1621

1622
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
4✔
1623
        return err != cache.ErrElementNotFound
4✔
1624
}
1625

1626
// retransmitStaleAnns examines all outgoing channels that the source node is
1627
// known to maintain to check to see if any of them are "stale". A channel is
1628
// stale iff, the last timestamp of its rebroadcast is older than the
1629
// RebroadcastInterval. We also check if a refreshed node announcement should
1630
// be resent.
1631
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
4✔
1632
        // Iterate over all of our channels and check if any of them fall
4✔
1633
        // within the prune interval or re-broadcast interval.
4✔
1634
        type updateTuple struct {
4✔
1635
                info *models.ChannelEdgeInfo
4✔
1636
                edge *models.ChannelEdgePolicy
4✔
1637
        }
4✔
1638

4✔
1639
        var (
4✔
1640
                havePublicChannels bool
4✔
1641
                edgesToUpdate      []updateTuple
4✔
1642
        )
4✔
1643
        err := d.cfg.Graph.ForAllOutgoingChannels(func(
4✔
1644
                _ kvdb.RTx,
4✔
1645
                info *models.ChannelEdgeInfo,
4✔
1646
                edge *models.ChannelEdgePolicy) error {
8✔
1647

4✔
1648
                // If there's no auth proof attached to this edge, it means
4✔
1649
                // that it is a private channel not meant to be announced to
4✔
1650
                // the greater network, so avoid sending channel updates for
4✔
1651
                // this channel to not leak its
4✔
1652
                // existence.
4✔
1653
                if info.AuthProof == nil {
8✔
1654
                        log.Debugf("Skipping retransmission of channel "+
4✔
1655
                                "without AuthProof: %v", info.ChannelID)
4✔
1656
                        return nil
4✔
1657
                }
4✔
1658

1659
                // We make a note that we have at least one public channel. We
1660
                // use this to determine whether we should send a node
1661
                // announcement below.
1662
                havePublicChannels = true
4✔
1663

4✔
1664
                // If this edge has a ChannelUpdate that was created before the
4✔
1665
                // introduction of the MaxHTLC field, then we'll update this
4✔
1666
                // edge to propagate this information in the network.
4✔
1667
                if !edge.MessageFlags.HasMaxHtlc() {
4✔
1668
                        // We'll make sure we support the new max_htlc field if
×
1669
                        // not already present.
×
1670
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1671
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1672

×
1673
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1674
                                info: info,
×
1675
                                edge: edge,
×
1676
                        })
×
1677
                        return nil
×
1678
                }
×
1679

1680
                timeElapsed := now.Sub(edge.LastUpdate)
4✔
1681

4✔
1682
                // If it's been longer than RebroadcastInterval since we've
4✔
1683
                // re-broadcasted the channel, add the channel to the set of
4✔
1684
                // edges we need to update.
4✔
1685
                if timeElapsed >= d.cfg.RebroadcastInterval {
4✔
1686
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1687
                                info: info,
×
1688
                                edge: edge,
×
1689
                        })
×
1690
                }
×
1691

1692
                return nil
4✔
1693
        })
1694
        if err != nil && err != channeldb.ErrGraphNoEdgesFound {
4✔
1695
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1696
                        err)
×
1697
        }
×
1698

1699
        var signedUpdates []lnwire.Message
4✔
1700
        for _, chanToUpdate := range edgesToUpdate {
4✔
1701
                // Re-sign and update the channel on disk and retrieve our
×
1702
                // ChannelUpdate to broadcast.
×
1703
                chanAnn, chanUpdate, err := d.updateChannel(
×
1704
                        chanToUpdate.info, chanToUpdate.edge,
×
1705
                )
×
1706
                if err != nil {
×
1707
                        return fmt.Errorf("unable to update channel: %w", err)
×
1708
                }
×
1709

1710
                // If we have a valid announcement to transmit, then we'll send
1711
                // that along with the update.
1712
                if chanAnn != nil {
×
1713
                        signedUpdates = append(signedUpdates, chanAnn)
×
1714
                }
×
1715

1716
                signedUpdates = append(signedUpdates, chanUpdate)
×
1717
        }
1718

1719
        // If we don't have any public channels, we return as we don't want to
1720
        // broadcast anything that would reveal our existence.
1721
        if !havePublicChannels {
8✔
1722
                return nil
4✔
1723
        }
4✔
1724

1725
        // We'll also check that our NodeAnnouncement is not too old.
1726
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
4✔
1727
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
4✔
1728
        timeElapsed := now.Sub(timestamp)
4✔
1729

4✔
1730
        // If it's been a full day since we've re-broadcasted the
4✔
1731
        // node announcement, refresh it and resend it.
4✔
1732
        nodeAnnStr := ""
4✔
1733
        if timeElapsed >= d.cfg.RebroadcastInterval {
4✔
1734
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
×
1735
                if err != nil {
×
1736
                        return fmt.Errorf("unable to get refreshed node "+
×
1737
                                "announcement: %v", err)
×
1738
                }
×
1739

1740
                signedUpdates = append(signedUpdates, &newNodeAnn)
×
1741
                nodeAnnStr = " and our refreshed node announcement"
×
1742

×
1743
                // Before broadcasting the refreshed node announcement, add it
×
1744
                // to our own graph.
×
1745
                if err := d.addNode(&newNodeAnn); err != nil {
×
1746
                        log.Errorf("Unable to add refreshed node announcement "+
×
1747
                                "to graph: %v", err)
×
1748
                }
×
1749
        }
1750

1751
        // If we don't have any updates to re-broadcast, then we'll exit
1752
        // early.
1753
        if len(signedUpdates) == 0 {
8✔
1754
                return nil
4✔
1755
        }
4✔
1756

1757
        log.Infof("Retransmitting %v outgoing channels%v",
×
1758
                len(edgesToUpdate), nodeAnnStr)
×
1759

×
1760
        // With all the wire announcements properly crafted, we'll broadcast
×
1761
        // our known outgoing channels to all our immediate peers.
×
1762
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
×
1763
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1764
        }
×
1765

1766
        return nil
×
1767
}
1768

1769
// processChanPolicyUpdate generates a new set of channel updates for the
1770
// provided list of edges and updates the backing ChannelGraphSource.
1771
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
1772
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
4✔
1773

4✔
1774
        var chanUpdates []networkMsg
4✔
1775
        for _, edgeInfo := range edgesToUpdate {
8✔
1776
                // Now that we've collected all the channels we need to update,
4✔
1777
                // we'll re-sign and update the backing ChannelGraphSource, and
4✔
1778
                // retrieve our ChannelUpdate to broadcast.
4✔
1779
                _, chanUpdate, err := d.updateChannel(
4✔
1780
                        edgeInfo.Info, edgeInfo.Edge,
4✔
1781
                )
4✔
1782
                if err != nil {
4✔
1783
                        return nil, err
×
1784
                }
×
1785

1786
                // We'll avoid broadcasting any updates for private channels to
1787
                // avoid directly giving away their existence. Instead, we'll
1788
                // send the update directly to the remote party.
1789
                if edgeInfo.Info.AuthProof == nil {
8✔
1790
                        // If AuthProof is nil and an alias was found for this
4✔
1791
                        // ChannelID (meaning the option-scid-alias feature was
4✔
1792
                        // negotiated), we'll replace the ShortChannelID in the
4✔
1793
                        // update with the peer's alias. We do this after
4✔
1794
                        // updateChannel so that the alias isn't persisted to
4✔
1795
                        // the database.
4✔
1796
                        chanID := lnwire.NewChanIDFromOutPoint(
4✔
1797
                                edgeInfo.Info.ChannelPoint,
4✔
1798
                        )
4✔
1799

4✔
1800
                        var defaultAlias lnwire.ShortChannelID
4✔
1801
                        foundAlias, _ := d.cfg.GetAlias(chanID)
4✔
1802
                        if foundAlias != defaultAlias {
8✔
1803
                                chanUpdate.ShortChannelID = foundAlias
4✔
1804

4✔
1805
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
4✔
1806
                                if err != nil {
4✔
1807
                                        log.Errorf("Unable to sign alias "+
×
1808
                                                "update: %v", err)
×
1809
                                        continue
×
1810
                                }
1811

1812
                                lnSig, err := lnwire.NewSigFromSignature(sig)
4✔
1813
                                if err != nil {
4✔
1814
                                        log.Errorf("Unable to create sig: %v",
×
1815
                                                err)
×
1816
                                        continue
×
1817
                                }
1818

1819
                                chanUpdate.Signature = lnSig
4✔
1820
                        }
1821

1822
                        remotePubKey := remotePubFromChanInfo(
4✔
1823
                                edgeInfo.Info, chanUpdate.ChannelFlags,
4✔
1824
                        )
4✔
1825
                        err := d.reliableSender.sendMessage(
4✔
1826
                                chanUpdate, remotePubKey,
4✔
1827
                        )
4✔
1828
                        if err != nil {
4✔
1829
                                log.Errorf("Unable to reliably send %v for "+
×
1830
                                        "channel=%v to peer=%x: %v",
×
1831
                                        chanUpdate.MsgType(),
×
1832
                                        chanUpdate.ShortChannelID,
×
1833
                                        remotePubKey, err)
×
1834
                        }
×
1835
                        continue
4✔
1836
                }
1837

1838
                // We set ourselves as the source of this message to indicate
1839
                // that we shouldn't skip any peers when sending this message.
1840
                chanUpdates = append(chanUpdates, networkMsg{
4✔
1841
                        source:   d.selfKey,
4✔
1842
                        isRemote: false,
4✔
1843
                        msg:      chanUpdate,
4✔
1844
                })
4✔
1845
        }
1846

1847
        return chanUpdates, nil
4✔
1848
}
1849

1850
// remotePubFromChanInfo returns the public key of the remote peer given a
1851
// ChannelEdgeInfo that describe a channel we have with them.
1852
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1853
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
4✔
1854

4✔
1855
        var remotePubKey [33]byte
4✔
1856
        switch {
4✔
1857
        case chanFlags&lnwire.ChanUpdateDirection == 0:
4✔
1858
                remotePubKey = chanInfo.NodeKey2Bytes
4✔
1859
        case chanFlags&lnwire.ChanUpdateDirection == 1:
4✔
1860
                remotePubKey = chanInfo.NodeKey1Bytes
4✔
1861
        }
1862

1863
        return remotePubKey
4✔
1864
}
1865

1866
// processRejectedEdge examines a rejected edge to see if we can extract any
1867
// new announcements from it.  An edge will get rejected if we already added
1868
// the same edge without AuthProof to the graph. If the received announcement
1869
// contains a proof, we can add this proof to our edge.  We can end up in this
1870
// situation in the case where we create a channel, but for some reason fail
1871
// to receive the remote peer's proof, while the remote peer is able to fully
1872
// assemble the proof and craft the ChannelAnnouncement.
1873
func (d *AuthenticatedGossiper) processRejectedEdge(
1874
        chanAnnMsg *lnwire.ChannelAnnouncement1,
1875
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
4✔
1876

4✔
1877
        // First, we'll fetch the state of the channel as we know if from the
4✔
1878
        // database.
4✔
1879
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
4✔
1880
                chanAnnMsg.ShortChannelID,
4✔
1881
        )
4✔
1882
        if err != nil {
4✔
1883
                return nil, err
×
1884
        }
×
1885

1886
        // The edge is in the graph, and has a proof attached, then we'll just
1887
        // reject it as normal.
1888
        if chanInfo.AuthProof != nil {
8✔
1889
                return nil, nil
4✔
1890
        }
4✔
1891

1892
        // Otherwise, this means that the edge is within the graph, but it
1893
        // doesn't yet have a proper proof attached. If we did not receive
1894
        // the proof such that we now can add it, there's nothing more we
1895
        // can do.
1896
        if proof == nil {
×
1897
                return nil, nil
×
1898
        }
×
1899

1900
        // We'll then create then validate the new fully assembled
1901
        // announcement.
1902
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1903
                proof, chanInfo, e1, e2,
×
1904
        )
×
1905
        if err != nil {
×
1906
                return nil, err
×
1907
        }
×
1908
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1909
        if err != nil {
×
1910
                err := fmt.Errorf("assembled channel announcement proof "+
×
1911
                        "for shortChanID=%v isn't valid: %v",
×
1912
                        chanAnnMsg.ShortChannelID, err)
×
1913
                log.Error(err)
×
1914
                return nil, err
×
1915
        }
×
1916

1917
        // If everything checks out, then we'll add the fully assembled proof
1918
        // to the database.
1919
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
1920
        if err != nil {
×
1921
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
1922
                        chanAnnMsg.ShortChannelID, err)
×
1923
                log.Error(err)
×
1924
                return nil, err
×
1925
        }
×
1926

1927
        // As we now have a complete channel announcement for this channel,
1928
        // we'll construct the announcement so they can be broadcast out to all
1929
        // our peers.
1930
        announcements := make([]networkMsg, 0, 3)
×
1931
        announcements = append(announcements, networkMsg{
×
1932
                source: d.selfKey,
×
1933
                msg:    chanAnn,
×
1934
        })
×
1935
        if e1Ann != nil {
×
1936
                announcements = append(announcements, networkMsg{
×
1937
                        source: d.selfKey,
×
1938
                        msg:    e1Ann,
×
1939
                })
×
1940
        }
×
1941
        if e2Ann != nil {
×
1942
                announcements = append(announcements, networkMsg{
×
1943
                        source: d.selfKey,
×
1944
                        msg:    e2Ann,
×
1945
                })
×
1946

×
1947
        }
×
1948

1949
        return announcements, nil
×
1950
}
1951

1952
// fetchPKScript fetches the output script for the given SCID.
1953
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
NEW
1954
        txscript.ScriptClass, btcutil.Address, error) {
×
NEW
1955

×
NEW
1956
        pkScript, err := lnwallet.FetchPKScriptWithQuit(
×
NEW
1957
                d.cfg.ChainIO, chanID, d.quit,
×
NEW
1958
        )
×
NEW
1959
        if err != nil {
×
NEW
1960
                return txscript.WitnessUnknownTy, nil, err
×
NEW
1961
        }
×
1962

NEW
1963
        scriptClass, addrs, _, err := txscript.ExtractPkScriptAddrs(
×
NEW
1964
                pkScript, d.cfg.ChainParams,
×
NEW
1965
        )
×
NEW
1966
        if err != nil {
×
NEW
1967
                return txscript.WitnessUnknownTy, nil, err
×
NEW
1968
        }
×
1969

NEW
1970
        if len(addrs) != 1 {
×
NEW
1971
                return txscript.WitnessUnknownTy, nil, fmt.Errorf("expected "+
×
NEW
1972
                        "1 address, got: %d", len(addrs))
×
NEW
1973
        }
×
1974

NEW
1975
        return scriptClass, addrs[0], nil
×
1976
}
1977

1978
// addNode processes the given node announcement, and adds it to our channel
1979
// graph.
1980
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
1981
        op ...batch.SchedulerOption) error {
4✔
1982

4✔
1983
        if err := graph.ValidateNodeAnn(msg); err != nil {
4✔
1984
                return fmt.Errorf("unable to validate node announcement: %w",
×
1985
                        err)
×
1986
        }
×
1987

1988
        timestamp := time.Unix(int64(msg.Timestamp), 0)
4✔
1989
        features := lnwire.NewFeatureVector(msg.Features, lnwire.Features)
4✔
1990
        node := &channeldb.LightningNode{
4✔
1991
                HaveNodeAnnouncement: true,
4✔
1992
                LastUpdate:           timestamp,
4✔
1993
                Addresses:            msg.Addresses,
4✔
1994
                PubKeyBytes:          msg.NodeID,
4✔
1995
                Alias:                msg.Alias.String(),
4✔
1996
                AuthSigBytes:         msg.Signature.ToSignatureBytes(),
4✔
1997
                Features:             features,
4✔
1998
                Color:                msg.RGBColor,
4✔
1999
                ExtraOpaqueData:      msg.ExtraOpaqueData,
4✔
2000
        }
4✔
2001

4✔
2002
        return d.cfg.Graph.AddNode(node, op...)
4✔
2003
}
2004

2005
// isPremature decides whether a given network message has a block height+delta
2006
// value specified in the future. If so, the message will be added to the
2007
// future message map and be processed when the block height as reached.
2008
//
2009
// NOTE: must be used inside a lock.
2010
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
2011
        delta uint32, msg *networkMsg) bool {
4✔
2012
        // TODO(roasbeef) make height delta 6
4✔
2013
        //  * or configurable
4✔
2014

4✔
2015
        msgHeight := chanID.BlockHeight + delta
4✔
2016

4✔
2017
        // The message height is smaller or equal to our best known height,
4✔
2018
        // thus the message is mature.
4✔
2019
        if msgHeight <= d.bestHeight {
8✔
2020
                return false
4✔
2021
        }
4✔
2022

2023
        // Add the premature message to our future messages which will be
2024
        // resent once the block height has reached.
2025
        //
2026
        // Copy the networkMsgs since the old message's err chan will be
2027
        // consumed.
2028
        copied := &networkMsg{
2✔
2029
                peer:              msg.peer,
2✔
2030
                source:            msg.source,
2✔
2031
                msg:               msg.msg,
2✔
2032
                optionalMsgFields: msg.optionalMsgFields,
2✔
2033
                isRemote:          msg.isRemote,
2✔
2034
                err:               make(chan error, 1),
2✔
2035
        }
2✔
2036

2✔
2037
        // Create the cached message.
2✔
2038
        cachedMsg := &cachedFutureMsg{
2✔
2039
                msg:    copied,
2✔
2040
                height: msgHeight,
2✔
2041
        }
2✔
2042

2✔
2043
        // Increment the msg ID and add it to the cache.
2✔
2044
        nextMsgID := d.futureMsgs.nextMsgID()
2✔
2045
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
2✔
2046
        if err != nil {
2✔
2047
                log.Errorf("Adding future message got error: %v", err)
×
2048
        }
×
2049

2050
        log.Debugf("Network message: %v added to future messages for "+
2✔
2051
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
2✔
2052
                msgHeight, d.bestHeight)
2✔
2053

2✔
2054
        return true
2✔
2055
}
2056

2057
// processNetworkAnnouncement processes a new network relate authenticated
2058
// channel or node announcement or announcements proofs. If the announcement
2059
// didn't affect the internal state due to either being out of date, invalid,
2060
// or redundant, then nil is returned. Otherwise, the set of announcements will
2061
// be returned which should be broadcasted to the rest of the network. The
2062
// boolean returned indicates whether any dependents of the announcement should
2063
// attempt to be processed as well.
2064
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
2065
        nMsg *networkMsg) ([]networkMsg, bool) {
4✔
2066

4✔
2067
        // If this is a remote update, we set the scheduler option to lazily
4✔
2068
        // add it to the graph.
4✔
2069
        var schedulerOp []batch.SchedulerOption
4✔
2070
        if nMsg.isRemote {
8✔
2071
                schedulerOp = append(schedulerOp, batch.LazyAdd())
4✔
2072
        }
4✔
2073

2074
        switch msg := nMsg.msg.(type) {
4✔
2075
        // A new node announcement has arrived which either presents new
2076
        // information about a node in one of the channels we know about, or a
2077
        // updating previously advertised information.
2078
        case *lnwire.NodeAnnouncement:
4✔
2079
                return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
4✔
2080

2081
        // A new channel announcement has arrived, this indicates the
2082
        // *creation* of a new channel within the network. This only advertises
2083
        // the existence of a channel and not yet the routing policies in
2084
        // either direction of the channel.
2085
        case *lnwire.ChannelAnnouncement1:
4✔
2086
                return d.handleChanAnnouncement(nMsg, msg, schedulerOp)
4✔
2087

2088
        // A new authenticated channel edge update has arrived. This indicates
2089
        // that the directional information for an already known channel has
2090
        // been updated.
2091
        case *lnwire.ChannelUpdate1:
4✔
2092
                return d.handleChanUpdate(nMsg, msg, schedulerOp)
4✔
2093

2094
        // A new signature announcement has been received. This indicates
2095
        // willingness of nodes involved in the funding of a channel to
2096
        // announce this new channel to the rest of the world.
2097
        case *lnwire.AnnounceSignatures1:
4✔
2098
                return d.handleAnnSig(nMsg, msg)
4✔
2099

2100
        default:
×
2101
                err := errors.New("wrong type of the announcement")
×
2102
                nMsg.err <- err
×
2103
                return nil, false
×
2104
        }
2105
}
2106

2107
// processZombieUpdate determines whether the provided channel update should
2108
// resurrect a given zombie edge.
2109
//
2110
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2111
// should be inspected.
2112
func (d *AuthenticatedGossiper) processZombieUpdate(
2113
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2114
        msg *lnwire.ChannelUpdate1) error {
4✔
2115

4✔
2116
        // The least-significant bit in the flag on the channel update tells us
4✔
2117
        // which edge is being updated.
4✔
2118
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
4✔
2119

4✔
2120
        // Since we've deemed the update as not stale above, before marking it
4✔
2121
        // live, we'll make sure it has been signed by the correct party. If we
4✔
2122
        // have both pubkeys, either party can resurrect the channel. If we've
4✔
2123
        // already marked this with the stricter, single-sided resurrection we
4✔
2124
        // will only have the pubkey of the node with the oldest timestamp.
4✔
2125
        var pubKey *btcec.PublicKey
4✔
2126
        switch {
4✔
2127
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
3✔
2128
                pubKey, _ = chanInfo.NodeKey1()
3✔
2129
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
1✔
2130
                pubKey, _ = chanInfo.NodeKey2()
1✔
2131
        }
2132
        if pubKey == nil {
4✔
2133
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
×
2134
                        "with chan_id=%v", msg.ShortChannelID)
×
2135
        }
×
2136

2137
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
4✔
2138
        if err != nil {
4✔
2139
                return fmt.Errorf("unable to verify channel "+
×
2140
                        "update signature: %v", err)
×
2141
        }
×
2142

2143
        // With the signature valid, we'll proceed to mark the
2144
        // edge as live and wait for the channel announcement to
2145
        // come through again.
2146
        err = d.cfg.Graph.MarkEdgeLive(scid)
4✔
2147
        switch {
4✔
2148
        case errors.Is(err, channeldb.ErrZombieEdgeNotFound):
×
2149
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2150
                        "zombie index: %v", err)
×
2151

×
2152
                return nil
×
2153

2154
        case err != nil:
×
2155
                return fmt.Errorf("unable to remove edge with "+
×
2156
                        "chan_id=%v from zombie index: %v",
×
2157
                        msg.ShortChannelID, err)
×
2158

2159
        default:
4✔
2160
        }
2161

2162
        log.Debugf("Removed edge with chan_id=%v from zombie "+
4✔
2163
                "index", msg.ShortChannelID)
4✔
2164

4✔
2165
        return nil
4✔
2166
}
2167

2168
// fetchNodeAnn fetches the latest signed node announcement from our point of
2169
// view for the node with the given public key.
2170
func (d *AuthenticatedGossiper) fetchNodeAnn(
2171
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
4✔
2172

4✔
2173
        node, err := d.cfg.Graph.FetchLightningNode(pubKey)
4✔
2174
        if err != nil {
4✔
2175
                return nil, err
×
2176
        }
×
2177

2178
        return node.NodeAnnouncement(true)
4✔
2179
}
2180

2181
// isMsgStale determines whether a message retrieved from the backing
2182
// MessageStore is seen as stale by the current graph.
2183
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
4✔
2184
        switch msg := msg.(type) {
4✔
2185
        case *lnwire.AnnounceSignatures1:
4✔
2186
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
4✔
2187
                        msg.ShortChannelID,
4✔
2188
                )
4✔
2189

4✔
2190
                // If the channel cannot be found, it is most likely a leftover
4✔
2191
                // message for a channel that was closed, so we can consider it
4✔
2192
                // stale.
4✔
2193
                if errors.Is(err, channeldb.ErrEdgeNotFound) {
8✔
2194
                        return true
4✔
2195
                }
4✔
2196
                if err != nil {
4✔
2197
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2198
                                "%v", chanInfo.ChannelID, err)
×
2199
                        return false
×
2200
                }
×
2201

2202
                // If the proof exists in the graph, then we have successfully
2203
                // received the remote proof and assembled the full proof, so we
2204
                // can safely delete the local proof from the database.
2205
                return chanInfo.AuthProof != nil
4✔
2206

2207
        case *lnwire.ChannelUpdate1:
4✔
2208
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
4✔
2209

4✔
2210
                // If the channel cannot be found, it is most likely a leftover
4✔
2211
                // message for a channel that was closed, so we can consider it
4✔
2212
                // stale.
4✔
2213
                if errors.Is(err, channeldb.ErrEdgeNotFound) {
8✔
2214
                        return true
4✔
2215
                }
4✔
2216
                if err != nil {
4✔
2217
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2218
                                "%v", msg.ShortChannelID, err)
×
2219
                        return false
×
2220
                }
×
2221

2222
                // Otherwise, we'll retrieve the correct policy that we
2223
                // currently have stored within our graph to check if this
2224
                // message is stale by comparing its timestamp.
2225
                var p *models.ChannelEdgePolicy
4✔
2226
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
8✔
2227
                        p = p1
4✔
2228
                } else {
8✔
2229
                        p = p2
4✔
2230
                }
4✔
2231

2232
                // If the policy is still unknown, then we can consider this
2233
                // policy fresh.
2234
                if p == nil {
4✔
2235
                        return false
×
2236
                }
×
2237

2238
                timestamp := time.Unix(int64(msg.Timestamp), 0)
4✔
2239
                return p.LastUpdate.After(timestamp)
4✔
2240

2241
        default:
×
2242
                // We'll make sure to not mark any unsupported messages as stale
×
2243
                // to ensure they are not removed.
×
2244
                return false
×
2245
        }
2246
}
2247

2248
// updateChannel creates a new fully signed update for the channel, and updates
2249
// the underlying graph with the new state.
2250
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
2251
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2252
        *lnwire.ChannelUpdate1, error) {
4✔
2253

4✔
2254
        // Parse the unsigned edge into a channel update.
4✔
2255
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
4✔
2256

4✔
2257
        // We'll generate a new signature over a digest of the channel
4✔
2258
        // announcement itself and update the timestamp to ensure it propagate.
4✔
2259
        err := netann.SignChannelUpdate(
4✔
2260
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
4✔
2261
                netann.ChanUpdSetTimestamp,
4✔
2262
        )
4✔
2263
        if err != nil {
4✔
2264
                return nil, nil, err
×
2265
        }
×
2266

2267
        // Next, we'll set the new signature in place, and update the reference
2268
        // in the backing slice.
2269
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
4✔
2270
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
4✔
2271

4✔
2272
        // To ensure that our signature is valid, we'll verify it ourself
4✔
2273
        // before committing it to the slice returned.
4✔
2274
        err = netann.ValidateChannelUpdateAnn(
4✔
2275
                d.selfKey, info.Capacity, chanUpdate,
4✔
2276
        )
4✔
2277
        if err != nil {
4✔
2278
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2279
                        "update sig: %v", err)
×
2280
        }
×
2281

2282
        // Finally, we'll write the new edge policy to disk.
2283
        if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
4✔
2284
                return nil, nil, err
×
2285
        }
×
2286

2287
        // We'll also create the original channel announcement so the two can
2288
        // be broadcast along side each other (if necessary), but only if we
2289
        // have a full channel announcement for this channel.
2290
        var chanAnn *lnwire.ChannelAnnouncement1
4✔
2291
        if info.AuthProof != nil {
8✔
2292
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
4✔
2293
                chanAnn = &lnwire.ChannelAnnouncement1{
4✔
2294
                        ShortChannelID:  chanID,
4✔
2295
                        NodeID1:         info.NodeKey1Bytes,
4✔
2296
                        NodeID2:         info.NodeKey2Bytes,
4✔
2297
                        ChainHash:       info.ChainHash,
4✔
2298
                        BitcoinKey1:     info.BitcoinKey1Bytes,
4✔
2299
                        Features:        lnwire.NewRawFeatureVector(),
4✔
2300
                        BitcoinKey2:     info.BitcoinKey2Bytes,
4✔
2301
                        ExtraOpaqueData: info.ExtraOpaqueData,
4✔
2302
                }
4✔
2303
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
4✔
2304
                        info.AuthProof.NodeSig1Bytes,
4✔
2305
                )
4✔
2306
                if err != nil {
4✔
2307
                        return nil, nil, err
×
2308
                }
×
2309
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
4✔
2310
                        info.AuthProof.NodeSig2Bytes,
4✔
2311
                )
4✔
2312
                if err != nil {
4✔
2313
                        return nil, nil, err
×
2314
                }
×
2315
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
4✔
2316
                        info.AuthProof.BitcoinSig1Bytes,
4✔
2317
                )
4✔
2318
                if err != nil {
4✔
2319
                        return nil, nil, err
×
2320
                }
×
2321
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
4✔
2322
                        info.AuthProof.BitcoinSig2Bytes,
4✔
2323
                )
4✔
2324
                if err != nil {
4✔
2325
                        return nil, nil, err
×
2326
                }
×
2327
        }
2328

2329
        return chanAnn, chanUpdate, err
4✔
2330
}
2331

2332
// SyncManager returns the gossiper's SyncManager instance.
2333
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
4✔
2334
        return d.syncMgr
4✔
2335
}
4✔
2336

2337
// IsKeepAliveUpdate determines whether this channel update is considered a
2338
// keep-alive update based on the previous channel update processed for the same
2339
// direction.
2340
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2341
        prev *models.ChannelEdgePolicy) bool {
4✔
2342

4✔
2343
        // Both updates should be from the same direction.
4✔
2344
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
4✔
2345
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
4✔
2346

×
2347
                return false
×
2348
        }
×
2349

2350
        // The timestamp should always increase for a keep-alive update.
2351
        timestamp := time.Unix(int64(update.Timestamp), 0)
4✔
2352
        if !timestamp.After(prev.LastUpdate) {
8✔
2353
                return false
4✔
2354
        }
4✔
2355

2356
        // None of the remaining fields should change for a keep-alive update.
2357
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
8✔
2358
                return false
4✔
2359
        }
4✔
2360
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
8✔
2361
                return false
4✔
2362
        }
4✔
2363
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
8✔
2364
                return false
4✔
2365
        }
4✔
2366
        if update.TimeLockDelta != prev.TimeLockDelta {
4✔
2367
                return false
×
2368
        }
×
2369
        if update.HtlcMinimumMsat != prev.MinHTLC {
4✔
2370
                return false
×
2371
        }
×
2372
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
4✔
2373
                return false
×
2374
        }
×
2375
        if update.HtlcMaximumMsat != prev.MaxHTLC {
4✔
2376
                return false
×
2377
        }
×
2378
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
8✔
2379
                return false
4✔
2380
        }
4✔
2381
        return true
4✔
2382
}
2383

2384
// latestHeight returns the gossiper's latest height known of the chain.
2385
func (d *AuthenticatedGossiper) latestHeight() uint32 {
4✔
2386
        d.Lock()
4✔
2387
        defer d.Unlock()
4✔
2388
        return d.bestHeight
4✔
2389
}
4✔
2390

2391
// handleNodeAnnouncement processes a new node announcement.
2392
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
2393
        nodeAnn *lnwire.NodeAnnouncement,
2394
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
4✔
2395

4✔
2396
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
4✔
2397

4✔
2398
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
4✔
2399
                "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
4✔
2400

4✔
2401
        // We'll quickly ask the router if it already has a newer update for
4✔
2402
        // this node so we can skip validating signatures if not required.
4✔
2403
        if d.cfg.Graph.IsStaleNode(nodeAnn.NodeID, timestamp) {
8✔
2404
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
4✔
2405
                nMsg.err <- nil
4✔
2406
                return nil, true
4✔
2407
        }
4✔
2408

2409
        if err := d.addNode(nodeAnn, ops...); err != nil {
8✔
2410
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
4✔
2411
                        err)
4✔
2412

4✔
2413
                if !graph.IsError(
4✔
2414
                        err,
4✔
2415
                        graph.ErrOutdated,
4✔
2416
                        graph.ErrIgnored,
4✔
2417
                        graph.ErrVBarrierShuttingDown,
4✔
2418
                ) {
4✔
2419

×
2420
                        log.Error(err)
×
2421
                }
×
2422

2423
                nMsg.err <- err
4✔
2424
                return nil, false
4✔
2425
        }
2426

2427
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2428
        // quick check to ensure this node intends to publicly advertise itself
2429
        // to the network.
2430
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
4✔
2431
        if err != nil {
4✔
2432
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2433
                        nodeAnn.NodeID, err)
×
2434
                nMsg.err <- err
×
2435
                return nil, false
×
2436
        }
×
2437

2438
        var announcements []networkMsg
4✔
2439

4✔
2440
        // If it does, we'll add their announcement to our batch so that it can
4✔
2441
        // be broadcast to the rest of our peers.
4✔
2442
        if isPublic {
8✔
2443
                announcements = append(announcements, networkMsg{
4✔
2444
                        peer:     nMsg.peer,
4✔
2445
                        isRemote: nMsg.isRemote,
4✔
2446
                        source:   nMsg.source,
4✔
2447
                        msg:      nodeAnn,
4✔
2448
                })
4✔
2449
        } else {
8✔
2450
                log.Tracef("Skipping broadcasting node announcement for %x "+
4✔
2451
                        "due to being unadvertised", nodeAnn.NodeID)
4✔
2452
        }
4✔
2453

2454
        nMsg.err <- nil
4✔
2455
        // TODO(roasbeef): get rid of the above
4✔
2456

4✔
2457
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
4✔
2458
                "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
4✔
2459

4✔
2460
        return announcements, true
4✔
2461
}
2462

2463
// handleChanAnnouncement processes a new channel announcement.
2464
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
2465
        ann *lnwire.ChannelAnnouncement1,
2466
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
4✔
2467

4✔
2468
        scid := ann.ShortChannelID
4✔
2469

4✔
2470
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
4✔
2471
                nMsg.peer, scid.ToUint64())
4✔
2472

4✔
2473
        // We'll ignore any channel announcements that target any chain other
4✔
2474
        // than the set of chains we know of.
4✔
2475
        if !bytes.Equal(ann.ChainHash[:], d.cfg.chainHash[:]) {
4✔
2476
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2477
                        ", gossiper on chain=%v", ann.ChainHash,
×
NEW
2478
                        d.cfg.chainHash)
×
2479
                log.Errorf(err.Error())
×
2480

×
2481
                key := newRejectCacheKey(
×
2482
                        scid.ToUint64(),
×
2483
                        sourceToPub(nMsg.source),
×
2484
                )
×
2485
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2486

×
2487
                nMsg.err <- err
×
2488
                return nil, false
×
2489
        }
×
2490

2491
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2492
        // reject the announcement. Since the router accepts alias SCIDs,
2493
        // not erroring out would be a DoS vector.
2494
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
4✔
2495
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2496
                log.Errorf(err.Error())
×
2497

×
2498
                key := newRejectCacheKey(
×
2499
                        scid.ToUint64(),
×
2500
                        sourceToPub(nMsg.source),
×
2501
                )
×
2502
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2503

×
2504
                nMsg.err <- err
×
2505
                return nil, false
×
2506
        }
×
2507

2508
        // If the advertised inclusionary block is beyond our knowledge of the
2509
        // chain tip, then we'll ignore it for now.
2510
        d.Lock()
4✔
2511
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
4✔
2512
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
×
2513
                        "advertises height %v, only height %v is known",
×
2514
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
×
2515
                d.Unlock()
×
2516
                nMsg.err <- nil
×
2517
                return nil, false
×
2518
        }
×
2519
        d.Unlock()
4✔
2520

4✔
2521
        // At this point, we'll now ask the router if this is a zombie/known
4✔
2522
        // edge. If so we can skip all the processing below.
4✔
2523
        if d.cfg.Graph.IsKnownEdge(scid) {
8✔
2524
                nMsg.err <- nil
4✔
2525
                return nil, true
4✔
2526
        }
4✔
2527

2528
        // Check if the channel is already closed in which case we can ignore
2529
        // it.
2530
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
4✔
2531
        if err != nil {
4✔
2532
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2533
                        err)
×
2534
                nMsg.err <- err
×
2535

×
2536
                return nil, false
×
2537
        }
×
2538

2539
        if closed {
4✔
2540
                err = fmt.Errorf("ignoring closed channel %v", scid)
×
2541
                log.Error(err)
×
2542

×
2543
                // If this is an announcement from us, we'll just ignore it.
×
2544
                if !nMsg.isRemote {
×
2545
                        nMsg.err <- err
×
2546
                        return nil, false
×
2547
                }
×
2548

2549
                // Increment the peer's ban score if they are sending closed
2550
                // channel announcements.
2551
                d.banman.incrementBanScore(nMsg.peer.PubKey())
×
2552

×
2553
                // If the peer is banned and not a channel peer, we'll
×
2554
                // disconnect them.
×
2555
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
×
2556
                if dcErr != nil {
×
2557
                        log.Errorf("failed to check if we should disconnect "+
×
2558
                                "peer: %v", dcErr)
×
2559
                        nMsg.err <- dcErr
×
2560

×
2561
                        return nil, false
×
2562
                }
×
2563

2564
                if shouldDc {
×
2565
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2566
                }
×
2567

2568
                nMsg.err <- err
×
2569

×
2570
                return nil, false
×
2571
        }
2572

2573
        // If this is a remote channel announcement, then we'll validate all
2574
        // the signatures within the proof as it should be well formed.
2575
        var proof *models.ChannelAuthProof
4✔
2576
        if nMsg.isRemote {
8✔
2577
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
4✔
2578
                if err != nil {
4✔
2579
                        err := fmt.Errorf("unable to validate announcement: "+
×
2580
                                "%v", err)
×
2581

×
2582
                        key := newRejectCacheKey(
×
2583
                                scid.ToUint64(),
×
2584
                                sourceToPub(nMsg.source),
×
2585
                        )
×
2586
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2587

×
2588
                        log.Error(err)
×
2589
                        nMsg.err <- err
×
2590
                        return nil, false
×
2591
                }
×
2592

2593
                // If the proof checks out, then we'll save the proof itself to
2594
                // the database so we can fetch it later when gossiping with
2595
                // other nodes.
2596
                proof = &models.ChannelAuthProof{
4✔
2597
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
4✔
2598
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
4✔
2599
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
4✔
2600
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
4✔
2601
                }
4✔
2602
        }
2603

2604
        // With the proof validated (if necessary), we can now store it within
2605
        // the database for our path finding and syncing needs.
2606
        var featureBuf bytes.Buffer
4✔
2607
        if err := ann.Features.Encode(&featureBuf); err != nil {
4✔
2608
                log.Errorf("unable to encode features: %v", err)
×
2609
                nMsg.err <- err
×
2610
                return nil, false
×
2611
        }
×
2612

2613
        edge := &models.ChannelEdgeInfo{
4✔
2614
                ChannelID:        scid.ToUint64(),
4✔
2615
                ChainHash:        ann.ChainHash,
4✔
2616
                NodeKey1Bytes:    ann.NodeID1,
4✔
2617
                NodeKey2Bytes:    ann.NodeID2,
4✔
2618
                BitcoinKey1Bytes: ann.BitcoinKey1,
4✔
2619
                BitcoinKey2Bytes: ann.BitcoinKey2,
4✔
2620
                AuthProof:        proof,
4✔
2621
                Features:         featureBuf.Bytes(),
4✔
2622
                ExtraOpaqueData:  ann.ExtraOpaqueData,
4✔
2623
        }
4✔
2624

4✔
2625
        // If there were any optional message fields provided, we'll include
4✔
2626
        // them in its serialized disk representation now.
4✔
2627
        if nMsg.optionalMsgFields != nil {
8✔
2628
                if nMsg.optionalMsgFields.capacity != nil {
8✔
2629
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
4✔
2630
                }
4✔
2631
                if nMsg.optionalMsgFields.channelPoint != nil {
8✔
2632
                        cp := *nMsg.optionalMsgFields.channelPoint
4✔
2633
                        edge.ChannelPoint = cp
4✔
2634
                }
4✔
2635

2636
                // Optional tapscript root for custom channels.
2637
                edge.TapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
4✔
2638
        }
2639

2640
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
4✔
2641

4✔
2642
        // We will add the edge to the channel router. If the nodes present in
4✔
2643
        // this channel are not present in the database, a partial node will be
4✔
2644
        // added to represent each node while we wait for a node announcement.
4✔
2645
        //
4✔
2646
        // Before we add the edge to the database, we obtain the mutex for this
4✔
2647
        // channel ID. We do this to ensure no other goroutine has read the
4✔
2648
        // database and is now making decisions based on this DB state, before
4✔
2649
        // it writes to the DB.
4✔
2650
        d.channelMtx.Lock(scid.ToUint64())
4✔
2651
        err = d.cfg.Graph.AddEdge(edge, ops...)
4✔
2652
        if err != nil {
8✔
2653
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
4✔
2654
                        scid.ToUint64(), err)
4✔
2655

4✔
2656
                defer d.channelMtx.Unlock(scid.ToUint64())
4✔
2657

4✔
2658
                // If the edge was rejected due to already being known, then it
4✔
2659
                // may be the case that this new message has a fresh channel
4✔
2660
                // proof, so we'll check.
4✔
2661
                switch {
4✔
2662
                case graph.IsError(err, graph.ErrIgnored):
4✔
2663
                        // Attempt to process the rejected message to see if we
4✔
2664
                        // get any new announcements.
4✔
2665
                        anns, rErr := d.processRejectedEdge(ann, proof)
4✔
2666
                        if rErr != nil {
4✔
2667
                                key := newRejectCacheKey(
×
2668
                                        scid.ToUint64(),
×
2669
                                        sourceToPub(nMsg.source),
×
2670
                                )
×
2671
                                cr := &cachedReject{}
×
2672
                                _, _ = d.recentRejects.Put(key, cr)
×
2673

×
2674
                                nMsg.err <- rErr
×
2675
                                return nil, false
×
2676
                        }
×
2677

2678
                        log.Debugf("Extracted %v announcements from rejected "+
4✔
2679
                                "msgs", len(anns))
4✔
2680

4✔
2681
                        // If while processing this rejected edge, we realized
4✔
2682
                        // there's a set of announcements we could extract,
4✔
2683
                        // then we'll return those directly.
4✔
2684
                        //
4✔
2685
                        // NOTE: since this is an ErrIgnored, we can return
4✔
2686
                        // true here to signal "allow" to its dependants.
4✔
2687
                        nMsg.err <- nil
4✔
2688

4✔
2689
                        return anns, true
4✔
2690

2691
                case graph.IsError(
2692
                        err, graph.ErrNoFundingTransaction,
2693
                        graph.ErrInvalidFundingOutput,
2694
                ):
×
2695
                        key := newRejectCacheKey(
×
2696
                                scid.ToUint64(),
×
2697
                                sourceToPub(nMsg.source),
×
2698
                        )
×
2699
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2700

×
2701
                        // Increment the peer's ban score. We check isRemote
×
2702
                        // so we don't actually ban the peer in case of a local
×
2703
                        // bug.
×
2704
                        if nMsg.isRemote {
×
2705
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
×
2706
                        }
×
2707

2708
                case graph.IsError(err, graph.ErrChannelSpent):
×
2709
                        key := newRejectCacheKey(
×
2710
                                scid.ToUint64(),
×
2711
                                sourceToPub(nMsg.source),
×
2712
                        )
×
2713
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2714

×
2715
                        // Since this channel has already been closed, we'll
×
2716
                        // add it to the graph's closed channel index such that
×
2717
                        // we won't attempt to do expensive validation checks
×
2718
                        // on it again.
×
2719
                        // TODO: Populate the ScidCloser by using closed
×
2720
                        // channel notifications.
×
2721
                        dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
×
2722
                        if dbErr != nil {
×
2723
                                log.Errorf("failed to mark scid(%v) as "+
×
2724
                                        "closed: %v", scid, dbErr)
×
2725

×
2726
                                nMsg.err <- dbErr
×
2727

×
2728
                                return nil, false
×
2729
                        }
×
2730

2731
                        // Increment the peer's ban score. We check isRemote
2732
                        // so we don't accidentally ban ourselves in case of a
2733
                        // bug.
2734
                        if nMsg.isRemote {
×
2735
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
×
2736
                        }
×
2737

2738
                default:
×
2739
                        // Otherwise, this is just a regular rejected edge.
×
2740
                        key := newRejectCacheKey(
×
2741
                                scid.ToUint64(),
×
2742
                                sourceToPub(nMsg.source),
×
2743
                        )
×
2744
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2745
                }
2746

2747
                if !nMsg.isRemote {
×
2748
                        log.Errorf("failed to add edge for local channel: %v",
×
2749
                                err)
×
2750
                        nMsg.err <- err
×
2751

×
2752
                        return nil, false
×
2753
                }
×
2754

2755
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
×
2756
                if dcErr != nil {
×
2757
                        log.Errorf("failed to check if we should disconnect "+
×
2758
                                "peer: %v", dcErr)
×
2759
                        nMsg.err <- dcErr
×
2760

×
2761
                        return nil, false
×
2762
                }
×
2763

2764
                if shouldDc {
×
2765
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2766
                }
×
2767

2768
                nMsg.err <- err
×
2769

×
2770
                return nil, false
×
2771
        }
2772

2773
        // If err is nil, release the lock immediately.
2774
        d.channelMtx.Unlock(scid.ToUint64())
4✔
2775

4✔
2776
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
4✔
2777

4✔
2778
        // If we earlier received any ChannelUpdates for this channel, we can
4✔
2779
        // now process them, as the channel is added to the graph.
4✔
2780
        var channelUpdates []*processedNetworkMsg
4✔
2781

4✔
2782
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
4✔
2783
        if err == nil {
8✔
2784
                // There was actually an entry in the map, so we'll accumulate
4✔
2785
                // it. We don't worry about deletion, since it'll eventually
4✔
2786
                // fall out anyway.
4✔
2787
                chanMsgs := earlyChanUpdates
4✔
2788
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
4✔
2789
        }
4✔
2790

2791
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2792
        // ensure we don't block here, as we can handle only one announcement
2793
        // at a time.
2794
        for _, cu := range channelUpdates {
8✔
2795
                // Skip if already processed.
4✔
2796
                if cu.processed {
4✔
UNCOV
2797
                        continue
×
2798
                }
2799

2800
                // Mark the ChannelUpdate as processed. This ensures that a
2801
                // subsequent announcement in the option-scid-alias case does
2802
                // not re-use an old ChannelUpdate.
2803
                cu.processed = true
4✔
2804

4✔
2805
                d.wg.Add(1)
4✔
2806
                go func(updMsg *networkMsg) {
8✔
2807
                        defer d.wg.Done()
4✔
2808

4✔
2809
                        switch msg := updMsg.msg.(type) {
4✔
2810
                        // Reprocess the message, making sure we return an
2811
                        // error to the original caller in case the gossiper
2812
                        // shuts down.
2813
                        case *lnwire.ChannelUpdate1:
4✔
2814
                                log.Debugf("Reprocessing ChannelUpdate for "+
4✔
2815
                                        "shortChanID=%v", scid.ToUint64())
4✔
2816

4✔
2817
                                select {
4✔
2818
                                case d.networkMsgs <- updMsg:
4✔
2819
                                case <-d.quit:
×
2820
                                        updMsg.err <- ErrGossiperShuttingDown
×
2821
                                }
2822

2823
                        // We don't expect any other message type than
2824
                        // ChannelUpdate to be in this cache.
2825
                        default:
×
2826
                                log.Errorf("Unsupported message type found "+
×
2827
                                        "among ChannelUpdates: %T", msg)
×
2828
                        }
2829
                }(cu.msg)
2830
        }
2831

2832
        // Channel announcement was successfully processed and now it might be
2833
        // broadcast to other connected nodes if it was an announcement with
2834
        // proof (remote).
2835
        var announcements []networkMsg
4✔
2836

4✔
2837
        if proof != nil {
8✔
2838
                announcements = append(announcements, networkMsg{
4✔
2839
                        peer:     nMsg.peer,
4✔
2840
                        isRemote: nMsg.isRemote,
4✔
2841
                        source:   nMsg.source,
4✔
2842
                        msg:      ann,
4✔
2843
                })
4✔
2844
        }
4✔
2845

2846
        nMsg.err <- nil
4✔
2847

4✔
2848
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
4✔
2849
                nMsg.peer, scid.ToUint64())
4✔
2850

4✔
2851
        return announcements, true
4✔
2852
}
2853

2854
// handleChanUpdate processes a new channel update.
2855
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
2856
        upd *lnwire.ChannelUpdate1,
2857
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
4✔
2858

4✔
2859
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
4✔
2860
                nMsg.peer, upd.ShortChannelID.ToUint64())
4✔
2861

4✔
2862
        // We'll ignore any channel updates that target any chain other than
4✔
2863
        // the set of chains we know of.
4✔
2864
        if !bytes.Equal(upd.ChainHash[:], d.cfg.chainHash[:]) {
4✔
2865
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
NEW
2866
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.chainHash)
×
2867
                log.Errorf(err.Error())
×
2868

×
2869
                key := newRejectCacheKey(
×
2870
                        upd.ShortChannelID.ToUint64(),
×
2871
                        sourceToPub(nMsg.source),
×
2872
                )
×
2873
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2874

×
2875
                nMsg.err <- err
×
2876
                return nil, false
×
2877
        }
×
2878

2879
        blockHeight := upd.ShortChannelID.BlockHeight
4✔
2880
        shortChanID := upd.ShortChannelID.ToUint64()
4✔
2881

4✔
2882
        // If the advertised inclusionary block is beyond our knowledge of the
4✔
2883
        // chain tip, then we'll put the announcement in limbo to be fully
4✔
2884
        // verified once we advance forward in the chain. If the update has an
4✔
2885
        // alias SCID, we'll skip the isPremature check. This is necessary
4✔
2886
        // since aliases start at block height 16_000_000.
4✔
2887
        d.Lock()
4✔
2888
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
4✔
2889
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
6✔
2890

2✔
2891
                log.Warnf("Update announcement for short_chan_id(%v), is "+
2✔
2892
                        "premature: advertises height %v, only height %v is "+
2✔
2893
                        "known", shortChanID, blockHeight, d.bestHeight)
2✔
2894
                d.Unlock()
2✔
2895
                nMsg.err <- nil
2✔
2896
                return nil, false
2✔
2897
        }
2✔
2898
        d.Unlock()
4✔
2899

4✔
2900
        // Before we perform any of the expensive checks below, we'll check
4✔
2901
        // whether this update is stale or is for a zombie channel in order to
4✔
2902
        // quickly reject it.
4✔
2903
        timestamp := time.Unix(int64(upd.Timestamp), 0)
4✔
2904

4✔
2905
        // Fetch the SCID we should be using to lock the channelMtx and make
4✔
2906
        // graph queries with.
4✔
2907
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
4✔
2908
        if err != nil {
8✔
2909
                // Fallback and set the graphScid to the peer-provided SCID.
4✔
2910
                // This will occur for non-option-scid-alias channels and for
4✔
2911
                // public option-scid-alias channels after 6 confirmations.
4✔
2912
                // Once public option-scid-alias channels have 6 confs, we'll
4✔
2913
                // ignore ChannelUpdates with one of their aliases.
4✔
2914
                graphScid = upd.ShortChannelID
4✔
2915
        }
4✔
2916

2917
        if d.cfg.Graph.IsStaleEdgePolicy(
4✔
2918
                graphScid, timestamp, upd.ChannelFlags,
4✔
2919
        ) {
8✔
2920

4✔
2921
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
4✔
2922
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
4✔
2923
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
4✔
2924
                )
4✔
2925

4✔
2926
                nMsg.err <- nil
4✔
2927
                return nil, true
4✔
2928
        }
4✔
2929

2930
        // Check that the ChanUpdate is not too far into the future, this could
2931
        // reveal some faulty implementation therefore we log an error.
2932
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
4✔
2933
                log.Errorf("Skewed timestamp (%v) for edge policy of "+
×
2934
                        "short_chan_id(%v), timestamp too far in the future: "+
×
2935
                        "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
×
2936
                        shortChanID, nMsg.peer, nMsg.msg.MsgType(),
×
2937
                        nMsg.isRemote,
×
2938
                )
×
2939

×
2940
                nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
×
2941
                        "timestamp too far in the future: %v", timestamp.Unix())
×
2942

×
2943
                return nil, false
×
2944
        }
×
2945

2946
        // Get the node pub key as far since we don't have it in the channel
2947
        // update announcement message. We'll need this to properly verify the
2948
        // message's signature.
2949
        //
2950
        // We make sure to obtain the mutex for this channel ID before we
2951
        // access the database. This ensures the state we read from the
2952
        // database has not changed between this point and when we call
2953
        // UpdateEdge() later.
2954
        d.channelMtx.Lock(graphScid.ToUint64())
4✔
2955
        defer d.channelMtx.Unlock(graphScid.ToUint64())
4✔
2956

4✔
2957
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
4✔
2958
        switch {
4✔
2959
        // No error, break.
2960
        case err == nil:
4✔
2961
                break
4✔
2962

2963
        case errors.Is(err, channeldb.ErrZombieEdge):
4✔
2964
                err = d.processZombieUpdate(chanInfo, graphScid, upd)
4✔
2965
                if err != nil {
4✔
2966
                        log.Debug(err)
×
2967
                        nMsg.err <- err
×
2968
                        return nil, false
×
2969
                }
×
2970

2971
                // We'll fallthrough to ensure we stash the update until we
2972
                // receive its corresponding ChannelAnnouncement. This is
2973
                // needed to ensure the edge exists in the graph before
2974
                // applying the update.
2975
                fallthrough
4✔
2976
        case errors.Is(err, channeldb.ErrGraphNotFound):
4✔
2977
                fallthrough
4✔
2978
        case errors.Is(err, channeldb.ErrGraphNoEdgesFound):
4✔
2979
                fallthrough
4✔
2980
        case errors.Is(err, channeldb.ErrEdgeNotFound):
4✔
2981
                // If the edge corresponding to this ChannelUpdate was not
4✔
2982
                // found in the graph, this might be a channel in the process
4✔
2983
                // of being opened, and we haven't processed our own
4✔
2984
                // ChannelAnnouncement yet, hence it is not not found in the
4✔
2985
                // graph. This usually gets resolved after the channel proofs
4✔
2986
                // are exchanged and the channel is broadcasted to the rest of
4✔
2987
                // the network, but in case this is a private channel this
4✔
2988
                // won't ever happen. This can also happen in the case of a
4✔
2989
                // zombie channel with a fresh update for which we don't have a
4✔
2990
                // ChannelAnnouncement for since we reject them. Because of
4✔
2991
                // this, we temporarily add it to a map, and reprocess it after
4✔
2992
                // our own ChannelAnnouncement has been processed.
4✔
2993
                //
4✔
2994
                // The shortChanID may be an alias, but it is fine to use here
4✔
2995
                // since we don't have an edge in the graph and if the peer is
4✔
2996
                // not buggy, we should be able to use it once the gossiper
4✔
2997
                // receives the local announcement.
4✔
2998
                pMsg := &processedNetworkMsg{msg: nMsg}
4✔
2999

4✔
3000
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
4✔
3001
                switch {
4✔
3002
                // Nothing in the cache yet, we can just directly insert this
3003
                // element.
3004
                case err == cache.ErrElementNotFound:
4✔
3005
                        _, _ = d.prematureChannelUpdates.Put(
4✔
3006
                                shortChanID, &cachedNetworkMsg{
4✔
3007
                                        msgs: []*processedNetworkMsg{pMsg},
4✔
3008
                                })
4✔
3009

3010
                // There's already something in the cache, so we'll combine the
3011
                // set of messages into a single value.
3012
                default:
4✔
3013
                        msgs := earlyMsgs.msgs
4✔
3014
                        msgs = append(msgs, pMsg)
4✔
3015
                        _, _ = d.prematureChannelUpdates.Put(
4✔
3016
                                shortChanID, &cachedNetworkMsg{
4✔
3017
                                        msgs: msgs,
4✔
3018
                                })
4✔
3019
                }
3020

3021
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
4✔
3022
                        "(shortChanID=%v), saving for reprocessing later",
4✔
3023
                        shortChanID)
4✔
3024

4✔
3025
                // NOTE: We don't return anything on the error channel for this
4✔
3026
                // message, as we expect that will be done when this
4✔
3027
                // ChannelUpdate is later reprocessed.
4✔
3028
                return nil, false
4✔
3029

3030
        default:
×
3031
                err := fmt.Errorf("unable to validate channel update "+
×
3032
                        "short_chan_id=%v: %v", shortChanID, err)
×
3033
                log.Error(err)
×
3034
                nMsg.err <- err
×
3035

×
3036
                key := newRejectCacheKey(
×
3037
                        upd.ShortChannelID.ToUint64(),
×
3038
                        sourceToPub(nMsg.source),
×
3039
                )
×
3040
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3041

×
3042
                return nil, false
×
3043
        }
3044

3045
        // The least-significant bit in the flag on the channel update
3046
        // announcement tells us "which" side of the channels directed edge is
3047
        // being updated.
3048
        var (
4✔
3049
                pubKey       *btcec.PublicKey
4✔
3050
                edgeToUpdate *models.ChannelEdgePolicy
4✔
3051
        )
4✔
3052
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
4✔
3053
        switch direction {
4✔
3054
        case 0:
4✔
3055
                pubKey, _ = chanInfo.NodeKey1()
4✔
3056
                edgeToUpdate = e1
4✔
3057
        case 1:
4✔
3058
                pubKey, _ = chanInfo.NodeKey2()
4✔
3059
                edgeToUpdate = e2
4✔
3060
        }
3061

3062
        log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+
4✔
3063
                "edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(),
4✔
3064
                edgeToUpdate != nil)
4✔
3065

4✔
3066
        // Validate the channel announcement with the expected public key and
4✔
3067
        // channel capacity. In the case of an invalid channel update, we'll
4✔
3068
        // return an error to the caller and exit early.
4✔
3069
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
4✔
3070
        if err != nil {
4✔
3071
                rErr := fmt.Errorf("unable to validate channel update "+
×
3072
                        "announcement for short_chan_id=%v: %v",
×
3073
                        spew.Sdump(upd.ShortChannelID), err)
×
3074

×
3075
                log.Error(rErr)
×
3076
                nMsg.err <- rErr
×
3077
                return nil, false
×
3078
        }
×
3079

3080
        // If we have a previous version of the edge being updated, we'll want
3081
        // to rate limit its updates to prevent spam throughout the network.
3082
        if nMsg.isRemote && edgeToUpdate != nil {
8✔
3083
                // If it's a keep-alive update, we'll only propagate one if
4✔
3084
                // it's been a day since the previous. This follows our own
4✔
3085
                // heuristic of sending keep-alive updates after the same
4✔
3086
                // duration (see retransmitStaleAnns).
4✔
3087
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
4✔
3088
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
8✔
3089
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
8✔
3090
                                log.Debugf("Ignoring keep alive update not "+
4✔
3091
                                        "within %v period for channel %v",
4✔
3092
                                        d.cfg.RebroadcastInterval, shortChanID)
4✔
3093
                                nMsg.err <- nil
4✔
3094
                                return nil, false
4✔
3095
                        }
4✔
3096
                } else {
4✔
3097
                        // If it's not, we'll allow an update per minute with a
4✔
3098
                        // maximum burst of 10. If we haven't seen an update
4✔
3099
                        // for this channel before, we'll need to initialize a
4✔
3100
                        // rate limiter for each direction.
4✔
3101
                        //
4✔
3102
                        // Since the edge exists in the graph, we'll create a
4✔
3103
                        // rate limiter for chanInfo.ChannelID rather then the
4✔
3104
                        // SCID the peer sent. This is because there may be
4✔
3105
                        // multiple aliases for a channel and we may otherwise
4✔
3106
                        // rate-limit only a single alias of the channel,
4✔
3107
                        // instead of the whole channel.
4✔
3108
                        baseScid := chanInfo.ChannelID
4✔
3109
                        d.Lock()
4✔
3110
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
4✔
3111
                        if !ok {
8✔
3112
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
4✔
3113
                                b := d.cfg.MaxChannelUpdateBurst
4✔
3114
                                rls = [2]*rate.Limiter{
4✔
3115
                                        rate.NewLimiter(r, b),
4✔
3116
                                        rate.NewLimiter(r, b),
4✔
3117
                                }
4✔
3118
                                d.chanUpdateRateLimiter[baseScid] = rls
4✔
3119
                        }
4✔
3120
                        d.Unlock()
4✔
3121

4✔
3122
                        if !rls[direction].Allow() {
8✔
3123
                                log.Debugf("Rate limiting update for channel "+
4✔
3124
                                        "%v from direction %x", shortChanID,
4✔
3125
                                        pubKey.SerializeCompressed())
4✔
3126
                                nMsg.err <- nil
4✔
3127
                                return nil, false
4✔
3128
                        }
4✔
3129
                }
3130
        }
3131

3132
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3133
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3134
        // lookup the stored SCID. If we're sending the update, we'll always
3135
        // use the SCID stored in the database rather than a potentially
3136
        // different alias. This might mean that SigBytes is incorrect as it
3137
        // signs a different SCID than the database SCID, but since there will
3138
        // only be a difference if AuthProof == nil, this is fine.
3139
        update := &models.ChannelEdgePolicy{
4✔
3140
                SigBytes:                  upd.Signature.ToSignatureBytes(),
4✔
3141
                ChannelID:                 chanInfo.ChannelID,
4✔
3142
                LastUpdate:                timestamp,
4✔
3143
                MessageFlags:              upd.MessageFlags,
4✔
3144
                ChannelFlags:              upd.ChannelFlags,
4✔
3145
                TimeLockDelta:             upd.TimeLockDelta,
4✔
3146
                MinHTLC:                   upd.HtlcMinimumMsat,
4✔
3147
                MaxHTLC:                   upd.HtlcMaximumMsat,
4✔
3148
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
4✔
3149
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
4✔
3150
                ExtraOpaqueData:           upd.ExtraOpaqueData,
4✔
3151
        }
4✔
3152

4✔
3153
        if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
8✔
3154
                if graph.IsError(
4✔
3155
                        err, graph.ErrOutdated,
4✔
3156
                        graph.ErrIgnored,
4✔
3157
                        graph.ErrVBarrierShuttingDown,
4✔
3158
                ) {
8✔
3159

4✔
3160
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
4✔
3161
                                shortChanID, err)
4✔
3162
                } else {
4✔
3163
                        // Since we know the stored SCID in the graph, we'll
×
3164
                        // cache that SCID.
×
3165
                        key := newRejectCacheKey(
×
3166
                                chanInfo.ChannelID,
×
3167
                                sourceToPub(nMsg.source),
×
3168
                        )
×
3169
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3170

×
3171
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3172
                                shortChanID, err)
×
3173
                }
×
3174

3175
                nMsg.err <- err
4✔
3176
                return nil, false
4✔
3177
        }
3178

3179
        // If this is a local ChannelUpdate without an AuthProof, it means it
3180
        // is an update to a channel that is not (yet) supposed to be announced
3181
        // to the greater network. However, our channel counter party will need
3182
        // to be given the update, so we'll try sending the update directly to
3183
        // the remote peer.
3184
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
8✔
3185
                if nMsg.optionalMsgFields != nil {
8✔
3186
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
4✔
3187
                        if remoteAlias != nil {
8✔
3188
                                // The remoteAlias field was specified, meaning
4✔
3189
                                // that we should replace the SCID in the
4✔
3190
                                // update with the remote's alias. We'll also
4✔
3191
                                // need to re-sign the channel update. This is
4✔
3192
                                // required for option-scid-alias feature-bit
4✔
3193
                                // negotiated channels.
4✔
3194
                                upd.ShortChannelID = *remoteAlias
4✔
3195

4✔
3196
                                sig, err := d.cfg.SignAliasUpdate(upd)
4✔
3197
                                if err != nil {
4✔
3198
                                        log.Error(err)
×
3199
                                        nMsg.err <- err
×
3200
                                        return nil, false
×
3201
                                }
×
3202

3203
                                lnSig, err := lnwire.NewSigFromSignature(sig)
4✔
3204
                                if err != nil {
4✔
3205
                                        log.Error(err)
×
3206
                                        nMsg.err <- err
×
3207
                                        return nil, false
×
3208
                                }
×
3209

3210
                                upd.Signature = lnSig
4✔
3211
                        }
3212
                }
3213

3214
                // Get our peer's public key.
3215
                remotePubKey := remotePubFromChanInfo(
4✔
3216
                        chanInfo, upd.ChannelFlags,
4✔
3217
                )
4✔
3218

4✔
3219
                log.Debugf("The message %v has no AuthProof, sending the "+
4✔
3220
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
4✔
3221

4✔
3222
                // Now we'll attempt to send the channel update message
4✔
3223
                // reliably to the remote peer in the background, so that we
4✔
3224
                // don't block if the peer happens to be offline at the moment.
4✔
3225
                err := d.reliableSender.sendMessage(upd, remotePubKey)
4✔
3226
                if err != nil {
4✔
3227
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3228
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3229
                                upd.ShortChannelID, remotePubKey, err)
×
3230
                        nMsg.err <- err
×
3231
                        return nil, false
×
3232
                }
×
3233
        }
3234

3235
        // Channel update announcement was successfully processed and now it
3236
        // can be broadcast to the rest of the network. However, we'll only
3237
        // broadcast the channel update announcement if it has an attached
3238
        // authentication proof. We also won't broadcast the update if it
3239
        // contains an alias because the network would reject this.
3240
        var announcements []networkMsg
4✔
3241
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
8✔
3242
                announcements = append(announcements, networkMsg{
4✔
3243
                        peer:     nMsg.peer,
4✔
3244
                        source:   nMsg.source,
4✔
3245
                        isRemote: nMsg.isRemote,
4✔
3246
                        msg:      upd,
4✔
3247
                })
4✔
3248
        }
4✔
3249

3250
        nMsg.err <- nil
4✔
3251

4✔
3252
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
4✔
3253
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
4✔
3254
                timestamp)
4✔
3255
        return announcements, true
4✔
3256
}
3257

3258
// handleAnnSig processes a new announcement signatures message.
3259
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
3260
        ann *lnwire.AnnounceSignatures1) ([]networkMsg, bool) {
4✔
3261

4✔
3262
        needBlockHeight := ann.ShortChannelID.BlockHeight +
4✔
3263
                d.cfg.ProofMatureDelta
4✔
3264
        shortChanID := ann.ShortChannelID.ToUint64()
4✔
3265

4✔
3266
        prefix := "local"
4✔
3267
        if nMsg.isRemote {
8✔
3268
                prefix = "remote"
4✔
3269
        }
4✔
3270

3271
        log.Infof("Received new %v announcement signature for %v", prefix,
4✔
3272
                ann.ShortChannelID)
4✔
3273

4✔
3274
        // By the specification, channel announcement proofs should be sent
4✔
3275
        // after some number of confirmations after channel was registered in
4✔
3276
        // bitcoin blockchain. Therefore, we check if the proof is mature.
4✔
3277
        d.Lock()
4✔
3278
        premature := d.isPremature(
4✔
3279
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
4✔
3280
        )
4✔
3281
        if premature {
6✔
3282
                log.Warnf("Premature proof announcement, current block height"+
2✔
3283
                        "lower than needed: %v < %v", d.bestHeight,
2✔
3284
                        needBlockHeight)
2✔
3285
                d.Unlock()
2✔
3286
                nMsg.err <- nil
2✔
3287
                return nil, false
2✔
3288
        }
2✔
3289
        d.Unlock()
4✔
3290

4✔
3291
        // Ensure that we know of a channel with the target channel ID before
4✔
3292
        // proceeding further.
4✔
3293
        //
4✔
3294
        // We must acquire the mutex for this channel ID before getting the
4✔
3295
        // channel from the database, to ensure what we read does not change
4✔
3296
        // before we call AddProof() later.
4✔
3297
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
4✔
3298
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
4✔
3299

4✔
3300
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
4✔
3301
                ann.ShortChannelID,
4✔
3302
        )
4✔
3303
        if err != nil {
8✔
3304
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
4✔
3305
                if err != nil {
8✔
3306
                        err := fmt.Errorf("unable to store the proof for "+
4✔
3307
                                "short_chan_id=%v: %v", shortChanID, err)
4✔
3308
                        log.Error(err)
4✔
3309
                        nMsg.err <- err
4✔
3310

4✔
3311
                        return nil, false
4✔
3312
                }
4✔
3313

3314
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
4✔
3315
                err := d.cfg.WaitingProofStore.Add(proof)
4✔
3316
                if err != nil {
4✔
3317
                        err := fmt.Errorf("unable to store the proof for "+
×
3318
                                "short_chan_id=%v: %v", shortChanID, err)
×
3319
                        log.Error(err)
×
3320
                        nMsg.err <- err
×
3321
                        return nil, false
×
3322
                }
×
3323

3324
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
4✔
3325
                        ", adding to waiting batch", prefix, shortChanID)
4✔
3326
                nMsg.err <- nil
4✔
3327
                return nil, false
4✔
3328
        }
3329

3330
        nodeID := nMsg.source.SerializeCompressed()
4✔
3331
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
4✔
3332
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
4✔
3333

4✔
3334
        // Ensure that channel that was retrieved belongs to the peer which
4✔
3335
        // sent the proof announcement.
4✔
3336
        if !(isFirstNode || isSecondNode) {
4✔
3337
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3338
                        "to the peer which sent the proof, short_chan_id=%v",
×
3339
                        shortChanID)
×
3340
                log.Error(err)
×
3341
                nMsg.err <- err
×
3342
                return nil, false
×
3343
        }
×
3344

3345
        // If proof was sent by a local sub-system, then we'll send the
3346
        // announcement signature to the remote node so they can also
3347
        // reconstruct the full channel announcement.
3348
        if !nMsg.isRemote {
8✔
3349
                var remotePubKey [33]byte
4✔
3350
                if isFirstNode {
8✔
3351
                        remotePubKey = chanInfo.NodeKey2Bytes
4✔
3352
                } else {
8✔
3353
                        remotePubKey = chanInfo.NodeKey1Bytes
4✔
3354
                }
4✔
3355

3356
                // Since the remote peer might not be online we'll call a
3357
                // method that will attempt to deliver the proof when it comes
3358
                // online.
3359
                err := d.reliableSender.sendMessage(ann, remotePubKey)
4✔
3360
                if err != nil {
4✔
3361
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3362
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3363
                                ann.ShortChannelID, remotePubKey, err)
×
3364
                        nMsg.err <- err
×
3365
                        return nil, false
×
3366
                }
×
3367
        }
3368

3369
        // Check if we already have the full proof for this channel.
3370
        if chanInfo.AuthProof != nil {
8✔
3371
                // If we already have the fully assembled proof, then the peer
4✔
3372
                // sending us their proof has probably not received our local
4✔
3373
                // proof yet. So be kind and send them the full proof.
4✔
3374
                if nMsg.isRemote {
8✔
3375
                        peerID := nMsg.source.SerializeCompressed()
4✔
3376
                        log.Debugf("Got AnnounceSignatures for channel with " +
4✔
3377
                                "full proof.")
4✔
3378

4✔
3379
                        d.wg.Add(1)
4✔
3380
                        go func() {
8✔
3381
                                defer d.wg.Done()
4✔
3382

4✔
3383
                                log.Debugf("Received half proof for channel "+
4✔
3384
                                        "%v with existing full proof. Sending"+
4✔
3385
                                        " full proof to peer=%x",
4✔
3386
                                        ann.ChannelID, peerID)
4✔
3387

4✔
3388
                                ca, _, _, err := netann.CreateChanAnnouncement(
4✔
3389
                                        chanInfo.AuthProof, chanInfo, e1, e2,
4✔
3390
                                )
4✔
3391
                                if err != nil {
4✔
3392
                                        log.Errorf("unable to gen ann: %v",
×
3393
                                                err)
×
3394
                                        return
×
3395
                                }
×
3396

3397
                                err = nMsg.peer.SendMessage(false, ca)
4✔
3398
                                if err != nil {
4✔
3399
                                        log.Errorf("Failed sending full proof"+
×
3400
                                                " to peer=%x: %v", peerID, err)
×
3401
                                        return
×
3402
                                }
×
3403

3404
                                log.Debugf("Full proof sent to peer=%x for "+
4✔
3405
                                        "chanID=%v", peerID, ann.ChannelID)
4✔
3406
                        }()
3407
                }
3408

3409
                log.Debugf("Already have proof for channel with chanID=%v",
4✔
3410
                        ann.ChannelID)
4✔
3411
                nMsg.err <- nil
4✔
3412
                return nil, true
4✔
3413
        }
3414

3415
        // Check that we received the opposite proof. If so, then we're now
3416
        // able to construct the full proof, and create the channel
3417
        // announcement. If we didn't receive the opposite half of the proof
3418
        // then we should store this one, and wait for the opposite to be
3419
        // received.
3420
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
4✔
3421
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
4✔
3422
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
4✔
3423
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3424
                        "short_chan_id=%v: %v", shortChanID, err)
×
3425
                log.Error(err)
×
3426
                nMsg.err <- err
×
3427
                return nil, false
×
3428
        }
×
3429

3430
        if err == channeldb.ErrWaitingProofNotFound {
8✔
3431
                err := d.cfg.WaitingProofStore.Add(proof)
4✔
3432
                if err != nil {
4✔
3433
                        err := fmt.Errorf("unable to store the proof for "+
×
3434
                                "short_chan_id=%v: %v", shortChanID, err)
×
3435
                        log.Error(err)
×
3436
                        nMsg.err <- err
×
3437
                        return nil, false
×
3438
                }
×
3439

3440
                log.Infof("1/2 of channel ann proof received for "+
4✔
3441
                        "short_chan_id=%v, waiting for other half",
4✔
3442
                        shortChanID)
4✔
3443

4✔
3444
                nMsg.err <- nil
4✔
3445
                return nil, false
4✔
3446
        }
3447

3448
        // We now have both halves of the channel announcement proof, then
3449
        // we'll reconstruct the initial announcement so we can validate it
3450
        // shortly below.
3451
        var dbProof models.ChannelAuthProof
4✔
3452
        if isFirstNode {
8✔
3453
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
4✔
3454
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
4✔
3455
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
4✔
3456
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
4✔
3457
        } else {
8✔
3458
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
4✔
3459
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
4✔
3460
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
4✔
3461
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
4✔
3462
        }
4✔
3463

3464
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
4✔
3465
                &dbProof, chanInfo, e1, e2,
4✔
3466
        )
4✔
3467
        if err != nil {
4✔
3468
                log.Error(err)
×
3469
                nMsg.err <- err
×
3470
                return nil, false
×
3471
        }
×
3472

3473
        // With all the necessary components assembled validate the full
3474
        // channel announcement proof.
3475
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
4✔
3476
        if err != nil {
4✔
3477
                err := fmt.Errorf("channel announcement proof for "+
×
3478
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3479

×
3480
                log.Error(err)
×
3481
                nMsg.err <- err
×
3482
                return nil, false
×
3483
        }
×
3484

3485
        // If the channel was returned by the router it means that existence of
3486
        // funding point and inclusion of nodes bitcoin keys in it already
3487
        // checked by the router. In this stage we should check that node keys
3488
        // attest to the bitcoin keys by validating the signatures of
3489
        // announcement. If proof is valid then we'll populate the channel edge
3490
        // with it, so we can announce it on peer connect.
3491
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
4✔
3492
        if err != nil {
4✔
3493
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3494
                        " %v", ann.ChannelID, err)
×
3495
                log.Error(err)
×
3496
                nMsg.err <- err
×
3497
                return nil, false
×
3498
        }
×
3499

3500
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
4✔
3501
        if err != nil {
4✔
3502
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3503
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3504
                log.Error(err)
×
3505
                nMsg.err <- err
×
3506
                return nil, false
×
3507
        }
×
3508

3509
        // Proof was successfully created and now can announce the channel to
3510
        // the remain network.
3511
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
4✔
3512
                ", adding to next ann batch", shortChanID)
4✔
3513

4✔
3514
        // Assemble the necessary announcements to add to the next broadcasting
4✔
3515
        // batch.
4✔
3516
        var announcements []networkMsg
4✔
3517
        announcements = append(announcements, networkMsg{
4✔
3518
                peer:   nMsg.peer,
4✔
3519
                source: nMsg.source,
4✔
3520
                msg:    chanAnn,
4✔
3521
        })
4✔
3522
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
8✔
3523
                announcements = append(announcements, networkMsg{
4✔
3524
                        peer:   nMsg.peer,
4✔
3525
                        source: src,
4✔
3526
                        msg:    e1Ann,
4✔
3527
                })
4✔
3528
        }
4✔
3529
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
8✔
3530
                announcements = append(announcements, networkMsg{
4✔
3531
                        peer:   nMsg.peer,
4✔
3532
                        source: src,
4✔
3533
                        msg:    e2Ann,
4✔
3534
                })
4✔
3535
        }
4✔
3536

3537
        // We'll also send along the node announcements for each channel
3538
        // participant if we know of them. To ensure our node announcement
3539
        // propagates to our channel counterparty, we'll set the source for
3540
        // each announcement to the node it belongs to, otherwise we won't send
3541
        // it since the source gets skipped. This isn't necessary for channel
3542
        // updates and announcement signatures since we send those directly to
3543
        // our channel counterparty through the gossiper's reliable sender.
3544
        node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
4✔
3545
        if err != nil {
8✔
3546
                log.Debugf("Unable to fetch node announcement for %x: %v",
4✔
3547
                        chanInfo.NodeKey1Bytes, err)
4✔
3548
        } else {
8✔
3549
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
8✔
3550
                        announcements = append(announcements, networkMsg{
4✔
3551
                                peer:   nMsg.peer,
4✔
3552
                                source: nodeKey1,
4✔
3553
                                msg:    node1Ann,
4✔
3554
                        })
4✔
3555
                }
4✔
3556
        }
3557

3558
        node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
4✔
3559
        if err != nil {
8✔
3560
                log.Debugf("Unable to fetch node announcement for %x: %v",
4✔
3561
                        chanInfo.NodeKey2Bytes, err)
4✔
3562
        } else {
8✔
3563
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
8✔
3564
                        announcements = append(announcements, networkMsg{
4✔
3565
                                peer:   nMsg.peer,
4✔
3566
                                source: nodeKey2,
4✔
3567
                                msg:    node2Ann,
4✔
3568
                        })
4✔
3569
                }
4✔
3570
        }
3571

3572
        nMsg.err <- nil
4✔
3573
        return announcements, true
4✔
3574
}
3575

3576
// isBanned returns true if the peer identified by pubkey is banned for sending
3577
// invalid channel announcements.
3578
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
4✔
3579
        return d.banman.isBanned(pubkey)
4✔
3580
}
4✔
3581

3582
// ShouldDisconnect returns true if we should disconnect the peer identified by
3583
// pubkey.
3584
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3585
        bool, error) {
4✔
3586

4✔
3587
        pubkeySer := pubkey.SerializeCompressed()
4✔
3588

4✔
3589
        var pubkeyBytes [33]byte
4✔
3590
        copy(pubkeyBytes[:], pubkeySer)
4✔
3591

4✔
3592
        // If the public key is banned, check whether or not this is a channel
4✔
3593
        // peer.
4✔
3594
        if d.isBanned(pubkeyBytes) {
4✔
3595
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
×
3596
                if err != nil {
×
3597
                        return false, err
×
3598
                }
×
3599

3600
                // We should only disconnect non-channel peers.
3601
                if !isChanPeer {
×
3602
                        return true, nil
×
3603
                }
×
3604
        }
3605

3606
        return false, nil
4✔
3607
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc