• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.26
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/davecgh/go-spew/spew"
17
        "github.com/lightninglabs/neutrino/cache"
18
        "github.com/lightninglabs/neutrino/cache/lru"
19
        "github.com/lightningnetwork/lnd/batch"
20
        "github.com/lightningnetwork/lnd/chainntnfs"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/fn/v2"
23
        "github.com/lightningnetwork/lnd/graph"
24
        graphdb "github.com/lightningnetwork/lnd/graph/db"
25
        "github.com/lightningnetwork/lnd/graph/db/models"
26
        "github.com/lightningnetwork/lnd/keychain"
27
        "github.com/lightningnetwork/lnd/lnpeer"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwire"
31
        "github.com/lightningnetwork/lnd/multimutex"
32
        "github.com/lightningnetwork/lnd/netann"
33
        "github.com/lightningnetwork/lnd/routing/route"
34
        "github.com/lightningnetwork/lnd/ticker"
35
        "golang.org/x/time/rate"
36
)
37

38
const (
39
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
40
        // for a specific channel and direction that we'll accept over an
41
        // interval.
42
        DefaultMaxChannelUpdateBurst = 10
43

44
        // DefaultChannelUpdateInterval is the default interval we'll use to
45
        // determine how often we should allow a new update for a specific
46
        // channel and direction.
47
        DefaultChannelUpdateInterval = time.Minute
48

49
        // maxPrematureUpdates tracks the max amount of premature channel
50
        // updates that we'll hold onto.
51
        maxPrematureUpdates = 100
52

53
        // maxFutureMessages tracks the max amount of future messages that
54
        // we'll hold onto.
55
        maxFutureMessages = 1000
56

57
        // DefaultSubBatchDelay is the default delay we'll use when
58
        // broadcasting the next announcement batch.
59
        DefaultSubBatchDelay = 5 * time.Second
60

61
        // maxRejectedUpdates tracks the max amount of rejected channel updates
62
        // we'll maintain. This is the global size across all peers. We'll
63
        // allocate ~3 MB max to the cache.
64
        maxRejectedUpdates = 10_000
65
)
66

67
var (
68
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
69
        // is in the process of being shut down.
70
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
71

72
        // ErrGossipSyncerNotFound signals that we were unable to find an active
73
        // gossip syncer corresponding to a gossip query message received from
74
        // the remote peer.
75
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
76

77
        // emptyPubkey is used to compare compressed pubkeys against an empty
78
        // byte array.
79
        emptyPubkey [33]byte
80
)
81

82
// optionalMsgFields is a set of optional message fields that external callers
83
// can provide that serve useful when processing a specific network
84
// announcement.
85
type optionalMsgFields struct {
86
        capacity      *btcutil.Amount
87
        channelPoint  *wire.OutPoint
88
        remoteAlias   *lnwire.ShortChannelID
89
        tapscriptRoot fn.Option[chainhash.Hash]
90
}
91

92
// apply applies the optional fields within the functional options.
93
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
47✔
94
        for _, optionalMsgField := range optionalMsgFields {
52✔
95
                optionalMsgField(f)
5✔
96
        }
5✔
97
}
98

99
// OptionalMsgField is a functional option parameter that can be used to provide
100
// external information that is not included within a network message but serves
101
// useful when processing it.
102
type OptionalMsgField func(*optionalMsgFields)
103

104
// ChannelCapacity is an optional field that lets the gossiper know of the
105
// capacity of a channel.
106
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
27✔
107
        return func(f *optionalMsgFields) {
28✔
108
                f.capacity = &capacity
1✔
109
        }
1✔
110
}
111

112
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
113
// of a channel.
114
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
30✔
115
        return func(f *optionalMsgFields) {
34✔
116
                f.channelPoint = &op
4✔
117
        }
4✔
118
}
119

120
// TapscriptRoot is an optional field that lets the gossiper know of the root of
121
// the tapscript tree for a custom channel.
122
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
26✔
123
        return func(f *optionalMsgFields) {
26✔
124
                f.tapscriptRoot = root
×
125
        }
×
126
}
127

128
// RemoteAlias is an optional field that lets the gossiper know that a locally
129
// sent channel update is actually an update for the peer that should replace
130
// the ShortChannelID field with the remote's alias. This is only used for
131
// channels with peers where the option-scid-alias feature bit was negotiated.
132
// The channel update will be added to the graph under the original SCID, but
133
// will be modified and re-signed with this alias.
134
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
26✔
135
        return func(f *optionalMsgFields) {
26✔
136
                f.remoteAlias = alias
×
137
        }
×
138
}
139

140
// networkMsg couples a routing related wire message with the peer that
141
// originally sent it.
142
type networkMsg struct {
143
        peer              lnpeer.Peer
144
        source            *btcec.PublicKey
145
        msg               lnwire.Message
146
        optionalMsgFields *optionalMsgFields
147

148
        isRemote bool
149

150
        err chan error
151
}
152

153
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
154
// wishes to update a particular set of channels. New ChannelUpdate messages
155
// will be crafted to be sent out during the next broadcast epoch and the fee
156
// updates committed to the lower layer.
157
type chanPolicyUpdateRequest struct {
158
        edgesToUpdate []EdgeWithInfo
159
        errChan       chan error
160
}
161

162
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
163
// syncer at all times.
164
type PinnedSyncers map[route.Vertex]struct{}
165

166
// Config defines the configuration for the service. ALL elements within the
167
// configuration MUST be non-nil for the service to carry out its duties.
168
type Config struct {
169
        // ChainHash is a hash that indicates which resident chain of the
170
        // AuthenticatedGossiper. Any announcements that don't match this
171
        // chain hash will be ignored.
172
        //
173
        // TODO(roasbeef): eventually make into map so can de-multiplex
174
        // incoming announcements
175
        //   * also need to do same for Notifier
176
        ChainHash chainhash.Hash
177

178
        // Graph is the subsystem which is responsible for managing the
179
        // topology of lightning network. After incoming channel, node, channel
180
        // updates announcements are validated they are sent to the router in
181
        // order to be included in the LN graph.
182
        Graph graph.ChannelGraphSource
183

184
        // ChainIO represents an abstraction over a source that can query the
185
        // blockchain.
186
        ChainIO lnwallet.BlockChainIO
187

188
        // ChanSeries is an interfaces that provides access to a time series
189
        // view of the current known channel graph. Each GossipSyncer enabled
190
        // peer will utilize this in order to create and respond to channel
191
        // graph time series queries.
192
        ChanSeries ChannelGraphTimeSeries
193

194
        // Notifier is used for receiving notifications of incoming blocks.
195
        // With each new incoming block found we process previously premature
196
        // announcements.
197
        //
198
        // TODO(roasbeef): could possibly just replace this with an epoch
199
        // channel.
200
        Notifier chainntnfs.ChainNotifier
201

202
        // Broadcast broadcasts a particular set of announcements to all peers
203
        // that the daemon is connected to. If supplied, the exclude parameter
204
        // indicates that the target peer should be excluded from the
205
        // broadcast.
206
        Broadcast func(skips map[route.Vertex]struct{},
207
                msg ...lnwire.Message) error
208

209
        // NotifyWhenOnline is a function that allows the gossiper to be
210
        // notified when a certain peer comes online, allowing it to
211
        // retry sending a peer message.
212
        //
213
        // NOTE: The peerChan channel must be buffered.
214
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
215

216
        // NotifyWhenOffline is a function that allows the gossiper to be
217
        // notified when a certain peer disconnects, allowing it to request a
218
        // notification for when it reconnects.
219
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
220

221
        // FetchSelfAnnouncement retrieves our current node announcement, for
222
        // use when determining whether we should update our peers about our
223
        // presence in the network.
224
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
225

226
        // UpdateSelfAnnouncement produces a new announcement for our node with
227
        // an updated timestamp which can be broadcast to our peers.
228
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
229

230
        // ProofMatureDelta the number of confirmations which is needed before
231
        // exchange the channel announcement proofs.
232
        ProofMatureDelta uint32
233

234
        // TrickleDelay the period of trickle timer which flushes to the
235
        // network the pending batch of new announcements we've received since
236
        // the last trickle tick.
237
        TrickleDelay time.Duration
238

239
        // RetransmitTicker is a ticker that ticks with a period which
240
        // indicates that we should check if we need re-broadcast any of our
241
        // personal channels.
242
        RetransmitTicker ticker.Ticker
243

244
        // RebroadcastInterval is the maximum time we wait between sending out
245
        // channel updates for our active channels and our own node
246
        // announcement. We do this to ensure our active presence on the
247
        // network is known, and we are not being considered a zombie node or
248
        // having zombie channels.
249
        RebroadcastInterval time.Duration
250

251
        // WaitingProofStore is a persistent storage of partial channel proof
252
        // announcement messages. We use it to buffer half of the material
253
        // needed to reconstruct a full authenticated channel announcement.
254
        // Once we receive the other half the channel proof, we'll be able to
255
        // properly validate it and re-broadcast it out to the network.
256
        //
257
        // TODO(wilmer): make interface to prevent channeldb dependency.
258
        WaitingProofStore *channeldb.WaitingProofStore
259

260
        // MessageStore is a persistent storage of gossip messages which we will
261
        // use to determine which messages need to be resent for a given peer.
262
        MessageStore GossipMessageStore
263

264
        // AnnSigner is an instance of the MessageSigner interface which will
265
        // be used to manually sign any outgoing channel updates. The signer
266
        // implementation should be backed by the public key of the backing
267
        // Lightning node.
268
        //
269
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
270
        // here?
271
        AnnSigner lnwallet.MessageSigner
272

273
        // ScidCloser is an instance of ClosedChannelTracker that helps the
274
        // gossiper cut down on spam channel announcements for already closed
275
        // channels.
276
        ScidCloser ClosedChannelTracker
277

278
        // NumActiveSyncers is the number of peers for which we should have
279
        // active syncers with. After reaching NumActiveSyncers, any future
280
        // gossip syncers will be passive.
281
        NumActiveSyncers int
282

283
        // NoTimestampQueries will prevent the GossipSyncer from querying
284
        // timestamps of announcement messages from the peer and from replying
285
        // to timestamp queries.
286
        NoTimestampQueries bool
287

288
        // RotateTicker is a ticker responsible for notifying the SyncManager
289
        // when it should rotate its active syncers. A single active syncer with
290
        // a chansSynced state will be exchanged for a passive syncer in order
291
        // to ensure we don't keep syncing with the same peers.
292
        RotateTicker ticker.Ticker
293

294
        // HistoricalSyncTicker is a ticker responsible for notifying the
295
        // syncManager when it should attempt a historical sync with a gossip
296
        // sync peer.
297
        HistoricalSyncTicker ticker.Ticker
298

299
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
300
        // syncManager when it should attempt to start the next pending
301
        // activeSyncer due to the current one not completing its state machine
302
        // within the timeout.
303
        ActiveSyncerTimeoutTicker ticker.Ticker
304

305
        // MinimumBatchSize is minimum size of a sub batch of announcement
306
        // messages.
307
        MinimumBatchSize int
308

309
        // SubBatchDelay is the delay between sending sub batches of
310
        // gossip messages.
311
        SubBatchDelay time.Duration
312

313
        // IgnoreHistoricalFilters will prevent syncers from replying with
314
        // historical data when the remote peer sets a gossip_timestamp_range.
315
        // This prevents ranges with old start times from causing us to dump the
316
        // graph on connect.
317
        IgnoreHistoricalFilters bool
318

319
        // PinnedSyncers is a set of peers that will always transition to
320
        // ActiveSync upon connection. These peers will never transition to
321
        // PassiveSync.
322
        PinnedSyncers PinnedSyncers
323

324
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
325
        // specific channel and direction that we'll accept over an interval.
326
        MaxChannelUpdateBurst int
327

328
        // ChannelUpdateInterval specifies the interval we'll use to determine
329
        // how often we should allow a new update for a specific channel and
330
        // direction.
331
        ChannelUpdateInterval time.Duration
332

333
        // IsAlias returns true if a given ShortChannelID is an alias for
334
        // option_scid_alias channels.
335
        IsAlias func(scid lnwire.ShortChannelID) bool
336

337
        // SignAliasUpdate is used to re-sign a channel update using the
338
        // remote's alias if the option-scid-alias feature bit was negotiated.
339
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
340
                error)
341

342
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
343
        // This is used for channels that have negotiated the option-scid-alias
344
        // feature bit.
345
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
346
                lnwire.ShortChannelID, error)
347

348
        // GetAlias allows the gossiper to look up the peer's alias for a given
349
        // ChannelID. This is used to sign updates for them if the channel has
350
        // no AuthProof and the option-scid-alias feature bit was negotiated.
351
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
352

353
        // FindChannel allows the gossiper to find a channel that we're party
354
        // to without iterating over the entire set of open channels.
355
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
356
                *channeldb.OpenChannel, error)
357

358
        // IsStillZombieChannel takes the timestamps of the latest channel
359
        // updates for a channel and returns true if the channel should be
360
        // considered a zombie based on these timestamps.
361
        IsStillZombieChannel func(time.Time, time.Time) bool
362
}
363

364
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
365
// used to let the caller of the lru.Cache know if a message has already been
366
// processed or not.
367
type processedNetworkMsg struct {
368
        processed bool
369
        msg       *networkMsg
370
}
371

372
// cachedNetworkMsg is a wrapper around a network message that can be used with
373
// *lru.Cache.
374
type cachedNetworkMsg struct {
375
        msgs []*processedNetworkMsg
376
}
377

378
// Size returns the "size" of an entry. We return the number of items as we
379
// just want to limit the total amount of entries rather than do accurate size
380
// accounting.
381
func (c *cachedNetworkMsg) Size() (uint64, error) {
2✔
382
        return uint64(len(c.msgs)), nil
2✔
383
}
2✔
384

385
// rejectCacheKey is the cache key that we'll use to track announcements we've
386
// recently rejected.
387
type rejectCacheKey struct {
388
        pubkey [33]byte
389
        chanID uint64
390
}
391

392
// newRejectCacheKey returns a new cache key for the reject cache.
393
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
462✔
394
        k := rejectCacheKey{
462✔
395
                chanID: cid,
462✔
396
                pubkey: pub,
462✔
397
        }
462✔
398

462✔
399
        return k
462✔
400
}
462✔
401

402
// sourceToPub returns a serialized-compressed public key for use in the reject
403
// cache.
404
func sourceToPub(pk *btcec.PublicKey) [33]byte {
476✔
405
        var pub [33]byte
476✔
406
        copy(pub[:], pk.SerializeCompressed())
476✔
407
        return pub
476✔
408
}
476✔
409

410
// cachedReject is the empty value used to track the value for rejects.
411
type cachedReject struct {
412
}
413

414
// Size returns the "size" of an entry. We return 1 as we just want to limit
415
// the total size.
416
func (c *cachedReject) Size() (uint64, error) {
203✔
417
        return 1, nil
203✔
418
}
203✔
419

420
// AuthenticatedGossiper is a subsystem which is responsible for receiving
421
// announcements, validating them and applying the changes to router, syncing
422
// lightning network with newly connected nodes, broadcasting announcements
423
// after validation, negotiating the channel announcement proofs exchange and
424
// handling the premature announcements. All outgoing announcements are
425
// expected to be properly signed as dictated in BOLT#7, additionally, all
426
// incoming message are expected to be well formed and signed. Invalid messages
427
// will be rejected by this struct.
428
type AuthenticatedGossiper struct {
429
        // Parameters which are needed to properly handle the start and stop of
430
        // the service.
431
        started sync.Once
432
        stopped sync.Once
433

434
        // bestHeight is the height of the block at the tip of the main chain
435
        // as we know it. Accesses *MUST* be done with the gossiper's lock
436
        // held.
437
        bestHeight uint32
438

439
        quit chan struct{}
440
        wg   sync.WaitGroup
441

442
        // cfg is a copy of the configuration struct that the gossiper service
443
        // was initialized with.
444
        cfg *Config
445

446
        // blockEpochs encapsulates a stream of block epochs that are sent at
447
        // every new block height.
448
        blockEpochs *chainntnfs.BlockEpochEvent
449

450
        // prematureChannelUpdates is a map of ChannelUpdates we have received
451
        // that wasn't associated with any channel we know about.  We store
452
        // them temporarily, such that we can reprocess them when a
453
        // ChannelAnnouncement for the channel is received.
454
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
455

456
        // banman tracks our peer's ban status.
457
        banman *banman
458

459
        // networkMsgs is a channel that carries new network broadcasted
460
        // message from outside the gossiper service to be processed by the
461
        // networkHandler.
462
        networkMsgs chan *networkMsg
463

464
        // futureMsgs is a list of premature network messages that have a block
465
        // height specified in the future. We will save them and resend it to
466
        // the chan networkMsgs once the block height has reached. The cached
467
        // map format is,
468
        //   {msgID1: msg1, msgID2: msg2, ...}
469
        futureMsgs *futureMsgCache
470

471
        // chanPolicyUpdates is a channel that requests to update the
472
        // forwarding policy of a set of channels is sent over.
473
        chanPolicyUpdates chan *chanPolicyUpdateRequest
474

475
        // selfKey is the identity public key of the backing Lightning node.
476
        selfKey *btcec.PublicKey
477

478
        // selfKeyLoc is the locator for the identity public key of the backing
479
        // Lightning node.
480
        selfKeyLoc keychain.KeyLocator
481

482
        // channelMtx is used to restrict the database access to one
483
        // goroutine per channel ID. This is done to ensure that when
484
        // the gossiper is handling an announcement, the db state stays
485
        // consistent between when the DB is first read until it's written.
486
        channelMtx *multimutex.Mutex[uint64]
487

488
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
489

490
        // syncMgr is a subsystem responsible for managing the gossip syncers
491
        // for peers currently connected. When a new peer is connected, the
492
        // manager will create its accompanying gossip syncer and determine
493
        // whether it should have an activeSync or passiveSync sync type based
494
        // on how many other gossip syncers are currently active. Any activeSync
495
        // gossip syncers are started in a round-robin manner to ensure we're
496
        // not syncing with multiple peers at the same time.
497
        syncMgr *SyncManager
498

499
        // reliableSender is a subsystem responsible for handling reliable
500
        // message send requests to peers. This should only be used for channels
501
        // that are unadvertised at the time of handling the message since if it
502
        // is advertised, then peers should be able to get the message from the
503
        // network.
504
        reliableSender *reliableSender
505

506
        // chanUpdateRateLimiter contains rate limiters for each direction of
507
        // a channel update we've processed. We'll use these to determine
508
        // whether we should accept a new update for a specific channel and
509
        // direction.
510
        //
511
        // NOTE: This map must be synchronized with the main
512
        // AuthenticatedGossiper lock.
513
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
514

515
        sync.Mutex
516
}
517

518
// New creates a new AuthenticatedGossiper instance, initialized with the
519
// passed configuration parameters.
520
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
27✔
521
        gossiper := &AuthenticatedGossiper{
27✔
522
                selfKey:           selfKeyDesc.PubKey,
27✔
523
                selfKeyLoc:        selfKeyDesc.KeyLocator,
27✔
524
                cfg:               &cfg,
27✔
525
                networkMsgs:       make(chan *networkMsg),
27✔
526
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
27✔
527
                quit:              make(chan struct{}),
27✔
528
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
27✔
529
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
27✔
530
                        maxPrematureUpdates,
27✔
531
                ),
27✔
532
                channelMtx: multimutex.NewMutex[uint64](),
27✔
533
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
27✔
534
                        maxRejectedUpdates,
27✔
535
                ),
27✔
536
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
27✔
537
                banman:                newBanman(),
27✔
538
        }
27✔
539

27✔
540
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
27✔
541
                ChainHash:               cfg.ChainHash,
27✔
542
                ChanSeries:              cfg.ChanSeries,
27✔
543
                RotateTicker:            cfg.RotateTicker,
27✔
544
                HistoricalSyncTicker:    cfg.HistoricalSyncTicker,
27✔
545
                NumActiveSyncers:        cfg.NumActiveSyncers,
27✔
546
                NoTimestampQueries:      cfg.NoTimestampQueries,
27✔
547
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
27✔
548
                BestHeight:              gossiper.latestHeight,
27✔
549
                PinnedSyncers:           cfg.PinnedSyncers,
27✔
550
                IsStillZombieChannel:    cfg.IsStillZombieChannel,
27✔
551
        })
27✔
552

27✔
553
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
27✔
554
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
27✔
555
                NotifyWhenOffline: cfg.NotifyWhenOffline,
27✔
556
                MessageStore:      cfg.MessageStore,
27✔
557
                IsMsgStale:        gossiper.isMsgStale,
27✔
558
        })
27✔
559

27✔
560
        return gossiper
27✔
561
}
27✔
562

563
// EdgeWithInfo contains the information that is required to update an edge.
564
type EdgeWithInfo struct {
565
        // Info describes the channel.
566
        Info *models.ChannelEdgeInfo
567

568
        // Edge describes the policy in one direction of the channel.
569
        Edge *models.ChannelEdgePolicy
570
}
571

572
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
573
// specified edge updates. Updates are done in two stages: first, the
574
// AuthenticatedGossiper ensures the update has been committed by dependent
575
// sub-systems, then it signs and broadcasts new updates to the network. A
576
// mapping between outpoints and updated channel policies is returned, which is
577
// used to update the forwarding policies of the underlying links.
578
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
579
        edgesToUpdate []EdgeWithInfo) error {
1✔
580

1✔
581
        errChan := make(chan error, 1)
1✔
582
        policyUpdate := &chanPolicyUpdateRequest{
1✔
583
                edgesToUpdate: edgesToUpdate,
1✔
584
                errChan:       errChan,
1✔
585
        }
1✔
586

1✔
587
        select {
1✔
588
        case d.chanPolicyUpdates <- policyUpdate:
1✔
589
                err := <-errChan
1✔
590
                return err
1✔
591
        case <-d.quit:
×
592
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
593
        }
594
}
595

596
// Start spawns network messages handler goroutine and registers on new block
597
// notifications in order to properly handle the premature announcements.
598
func (d *AuthenticatedGossiper) Start() error {
27✔
599
        var err error
27✔
600
        d.started.Do(func() {
54✔
601
                log.Info("Authenticated Gossiper starting")
27✔
602
                err = d.start()
27✔
603
        })
27✔
604
        return err
27✔
605
}
606

607
func (d *AuthenticatedGossiper) start() error {
27✔
608
        // First we register for new notifications of newly discovered blocks.
27✔
609
        // We do this immediately so we'll later be able to consume any/all
27✔
610
        // blocks which were discovered.
27✔
611
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
27✔
612
        if err != nil {
27✔
613
                return err
×
614
        }
×
615
        d.blockEpochs = blockEpochs
27✔
616

27✔
617
        height, err := d.cfg.Graph.CurrentBlockHeight()
27✔
618
        if err != nil {
27✔
619
                return err
×
620
        }
×
621
        d.bestHeight = height
27✔
622

27✔
623
        // Start the reliable sender. In case we had any pending messages ready
27✔
624
        // to be sent when the gossiper was last shut down, we must continue on
27✔
625
        // our quest to deliver them to their respective peers.
27✔
626
        if err := d.reliableSender.Start(); err != nil {
27✔
627
                return err
×
628
        }
×
629

630
        d.syncMgr.Start()
27✔
631

27✔
632
        d.banman.start()
27✔
633

27✔
634
        // Start receiving blocks in its dedicated goroutine.
27✔
635
        d.wg.Add(2)
27✔
636
        go d.syncBlockHeight()
27✔
637
        go d.networkHandler()
27✔
638

27✔
639
        return nil
27✔
640
}
641

642
// syncBlockHeight syncs the best block height for the gossiper by reading
643
// blockEpochs.
644
//
645
// NOTE: must be run as a goroutine.
646
func (d *AuthenticatedGossiper) syncBlockHeight() {
27✔
647
        defer d.wg.Done()
27✔
648

27✔
649
        for {
54✔
650
                select {
27✔
651
                // A new block has arrived, so we can re-process the previously
652
                // premature announcements.
653
                case newBlock, ok := <-d.blockEpochs.Epochs:
×
654
                        // If the channel has been closed, then this indicates
×
655
                        // the daemon is shutting down, so we exit ourselves.
×
656
                        if !ok {
×
657
                                return
×
658
                        }
×
659

660
                        // Once a new block arrives, we update our running
661
                        // track of the height of the chain tip.
662
                        d.Lock()
×
663
                        blockHeight := uint32(newBlock.Height)
×
664
                        d.bestHeight = blockHeight
×
665
                        d.Unlock()
×
666

×
667
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
×
668
                                newBlock.Hash)
×
669

×
670
                        // Resend future messages, if any.
×
671
                        d.resendFutureMessages(blockHeight)
×
672

673
                case <-d.quit:
27✔
674
                        return
27✔
675
                }
676
        }
677
}
678

679
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
680
// the unique ID when saving the message.
681
type futureMsgCache struct {
682
        *lru.Cache[uint64, *cachedFutureMsg]
683

684
        // msgID is a monotonically increased integer.
685
        msgID atomic.Uint64
686
}
687

688
// nextMsgID returns a unique message ID.
689
func (f *futureMsgCache) nextMsgID() uint64 {
3✔
690
        return f.msgID.Add(1)
3✔
691
}
3✔
692

693
// newFutureMsgCache creates a new future message cache with the underlying lru
694
// cache being initialized with the specified capacity.
695
func newFutureMsgCache(capacity uint64) *futureMsgCache {
28✔
696
        // Create a new cache.
28✔
697
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
28✔
698

28✔
699
        return &futureMsgCache{
28✔
700
                Cache: cache,
28✔
701
        }
28✔
702
}
28✔
703

704
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
705
type cachedFutureMsg struct {
706
        // msg is the network message.
707
        msg *networkMsg
708

709
        // height is the block height.
710
        height uint32
711
}
712

713
// Size returns the size of the message.
714
func (c *cachedFutureMsg) Size() (uint64, error) {
4✔
715
        // Return a constant 1.
4✔
716
        return 1, nil
4✔
717
}
4✔
718

719
// resendFutureMessages takes a block height, resends all the future messages
720
// found below and equal to that height and deletes those messages found in the
721
// gossiper's futureMsgs.
722
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
×
723
        var (
×
724
                // msgs are the target messages.
×
725
                msgs []*networkMsg
×
726

×
727
                // keys are the target messages' caching keys.
×
728
                keys []uint64
×
729
        )
×
730

×
731
        // filterMsgs is the visitor used when iterating the future cache.
×
732
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
×
733
                if cmsg.height <= height {
×
734
                        msgs = append(msgs, cmsg.msg)
×
735
                        keys = append(keys, k)
×
736
                }
×
737

738
                return true
×
739
        }
740

741
        // Filter out the target messages.
742
        d.futureMsgs.Range(filterMsgs)
×
743

×
744
        // Return early if no messages found.
×
745
        if len(msgs) == 0 {
×
746
                return
×
747
        }
×
748

749
        // Remove the filtered messages.
750
        for _, key := range keys {
×
751
                d.futureMsgs.Delete(key)
×
752
        }
×
753

754
        log.Debugf("Resending %d network messages at height %d",
×
755
                len(msgs), height)
×
756

×
757
        for _, msg := range msgs {
×
758
                select {
×
759
                case d.networkMsgs <- msg:
×
760
                case <-d.quit:
×
761
                        msg.err <- ErrGossiperShuttingDown
×
762
                }
763
        }
764
}
765

766
// Stop signals any active goroutines for a graceful closure.
767
func (d *AuthenticatedGossiper) Stop() error {
28✔
768
        d.stopped.Do(func() {
55✔
769
                log.Info("Authenticated gossiper shutting down...")
27✔
770
                defer log.Debug("Authenticated gossiper shutdown complete")
27✔
771

27✔
772
                d.stop()
27✔
773
        })
27✔
774
        return nil
28✔
775
}
776

777
func (d *AuthenticatedGossiper) stop() {
27✔
778
        log.Debug("Authenticated Gossiper is stopping")
27✔
779
        defer log.Debug("Authenticated Gossiper stopped")
27✔
780

27✔
781
        // `blockEpochs` is only initialized in the start routine so we make
27✔
782
        // sure we don't panic here in the case where the `Stop` method is
27✔
783
        // called when the `Start` method does not complete.
27✔
784
        if d.blockEpochs != nil {
54✔
785
                d.blockEpochs.Cancel()
27✔
786
        }
27✔
787

788
        d.syncMgr.Stop()
27✔
789

27✔
790
        d.banman.stop()
27✔
791

27✔
792
        close(d.quit)
27✔
793
        d.wg.Wait()
27✔
794

27✔
795
        // We'll stop our reliable sender after all of the gossiper's goroutines
27✔
796
        // have exited to ensure nothing can cause it to continue executing.
27✔
797
        d.reliableSender.Stop()
27✔
798
}
799

800
// TODO(roasbeef): need method to get current gossip timestamp?
801
//  * using mtx, check time rotate forward is needed?
802

803
// ProcessRemoteAnnouncement sends a new remote announcement message along with
804
// the peer that sent the routing message. The announcement will be processed
805
// then added to a queue for batched trickled announcement to all connected
806
// peers.  Remote channel announcements should contain the announcement proof
807
// and be fully validated.
808
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
809
        peer lnpeer.Peer) chan error {
284✔
810

284✔
811
        errChan := make(chan error, 1)
284✔
812

284✔
813
        // For messages in the known set of channel series queries, we'll
284✔
814
        // dispatch the message directly to the GossipSyncer, and skip the main
284✔
815
        // processing loop.
284✔
816
        switch m := msg.(type) {
284✔
817
        case *lnwire.QueryShortChanIDs,
818
                *lnwire.QueryChannelRange,
819
                *lnwire.ReplyChannelRange,
820
                *lnwire.ReplyShortChanIDsEnd:
×
821

×
822
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
×
823
                if !ok {
×
824
                        log.Warnf("Gossip syncer for peer=%x not found",
×
825
                                peer.PubKey())
×
826

×
827
                        errChan <- ErrGossipSyncerNotFound
×
828
                        return errChan
×
829
                }
×
830

831
                // If we've found the message target, then we'll dispatch the
832
                // message directly to it.
833
                syncer.ProcessQueryMsg(m, peer.QuitSignal())
×
834

×
835
                errChan <- nil
×
836
                return errChan
×
837

838
        // If a peer is updating its current update horizon, then we'll dispatch
839
        // that directly to the proper GossipSyncer.
840
        case *lnwire.GossipTimestampRange:
×
841
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
×
842
                if !ok {
×
843
                        log.Warnf("Gossip syncer for peer=%x not found",
×
844
                                peer.PubKey())
×
845

×
846
                        errChan <- ErrGossipSyncerNotFound
×
847
                        return errChan
×
848
                }
×
849

850
                // If we've found the message target, then we'll dispatch the
851
                // message directly to it.
852
                if err := syncer.ApplyGossipFilter(m); err != nil {
×
853
                        log.Warnf("Unable to apply gossip filter for peer=%x: "+
×
854
                                "%v", peer.PubKey(), err)
×
855

×
856
                        errChan <- err
×
857
                        return errChan
×
858
                }
×
859

860
                errChan <- nil
×
861
                return errChan
×
862

863
        // To avoid inserting edges in the graph for our own channels that we
864
        // have already closed, we ignore such channel announcements coming
865
        // from the remote.
866
        case *lnwire.ChannelAnnouncement1:
219✔
867
                ownKey := d.selfKey.SerializeCompressed()
219✔
868
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
219✔
869
                        "for own channel")
219✔
870

219✔
871
                if bytes.Equal(m.NodeID1[:], ownKey) ||
219✔
872
                        bytes.Equal(m.NodeID2[:], ownKey) {
221✔
873

2✔
874
                        log.Warn(ownErr)
2✔
875
                        errChan <- ownErr
2✔
876
                        return errChan
2✔
877
                }
2✔
878
        }
879

880
        nMsg := &networkMsg{
282✔
881
                msg:      msg,
282✔
882
                isRemote: true,
282✔
883
                peer:     peer,
282✔
884
                source:   peer.IdentityKey(),
282✔
885
                err:      errChan,
282✔
886
        }
282✔
887

282✔
888
        select {
282✔
889
        case d.networkMsgs <- nMsg:
282✔
890

891
        // If the peer that sent us this error is quitting, then we don't need
892
        // to send back an error and can return immediately.
893
        case <-peer.QuitSignal():
×
894
                return nil
×
895
        case <-d.quit:
×
896
                nMsg.err <- ErrGossiperShuttingDown
×
897
        }
898

899
        return nMsg.err
282✔
900
}
901

902
// ProcessLocalAnnouncement sends a new remote announcement message along with
903
// the peer that sent the routing message. The announcement will be processed
904
// then added to a queue for batched trickled announcement to all connected
905
// peers.  Local channel announcements don't contain the announcement proof and
906
// will not be fully validated. Once the channel proofs are received, the
907
// entire channel announcement and update messages will be re-constructed and
908
// broadcast to the rest of the network.
909
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
910
        optionalFields ...OptionalMsgField) chan error {
47✔
911

47✔
912
        optionalMsgFields := &optionalMsgFields{}
47✔
913
        optionalMsgFields.apply(optionalFields...)
47✔
914

47✔
915
        nMsg := &networkMsg{
47✔
916
                msg:               msg,
47✔
917
                optionalMsgFields: optionalMsgFields,
47✔
918
                isRemote:          false,
47✔
919
                source:            d.selfKey,
47✔
920
                err:               make(chan error, 1),
47✔
921
        }
47✔
922

47✔
923
        select {
47✔
924
        case d.networkMsgs <- nMsg:
47✔
925
        case <-d.quit:
×
926
                nMsg.err <- ErrGossiperShuttingDown
×
927
        }
928

929
        return nMsg.err
47✔
930
}
931

932
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
933
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
934
// tuple.
935
type channelUpdateID struct {
936
        // channelID represents the set of data which is needed to
937
        // retrieve all necessary data to validate the channel existence.
938
        channelID lnwire.ShortChannelID
939

940
        // Flags least-significant bit must be set to 0 if the creating node
941
        // corresponds to the first node in the previously sent channel
942
        // announcement and 1 otherwise.
943
        flags lnwire.ChanUpdateChanFlags
944
}
945

946
// msgWithSenders is a wrapper struct around a message, and the set of peers
947
// that originally sent us this message. Using this struct, we can ensure that
948
// we don't re-send a message to the peer that sent it to us in the first
949
// place.
950
type msgWithSenders struct {
951
        // msg is the wire message itself.
952
        msg lnwire.Message
953

954
        // isLocal is true if this was a message that originated locally. We'll
955
        // use this to bypass our normal checks to ensure we prioritize sending
956
        // out our own updates.
957
        isLocal bool
958

959
        // sender is the set of peers that sent us this message.
960
        senders map[route.Vertex]struct{}
961
}
962

963
// mergeSyncerMap is used to merge the set of senders of a particular message
964
// with peers that we have an active GossipSyncer with. We do this to ensure
965
// that we don't broadcast messages to any peers that we have active gossip
966
// syncers for.
967
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
24✔
968
        for peerPub := range syncers {
24✔
969
                m.senders[peerPub] = struct{}{}
×
970
        }
×
971
}
972

973
// deDupedAnnouncements de-duplicates announcements that have been added to the
974
// batch. Internally, announcements are stored in three maps
975
// (one each for channel announcements, channel updates, and node
976
// announcements). These maps keep track of unique announcements and ensure no
977
// announcements are duplicated. We keep the three message types separate, such
978
// that we can send channel announcements first, then channel updates, and
979
// finally node announcements when it's time to broadcast them.
980
type deDupedAnnouncements struct {
981
        // channelAnnouncements are identified by the short channel id field.
982
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
983

984
        // channelUpdates are identified by the channel update id field.
985
        channelUpdates map[channelUpdateID]msgWithSenders
986

987
        // nodeAnnouncements are identified by the Vertex field.
988
        nodeAnnouncements map[route.Vertex]msgWithSenders
989

990
        sync.Mutex
991
}
992

993
// Reset operates on deDupedAnnouncements to reset the storage of
994
// announcements.
995
func (d *deDupedAnnouncements) Reset() {
29✔
996
        d.Lock()
29✔
997
        defer d.Unlock()
29✔
998

29✔
999
        d.reset()
29✔
1000
}
29✔
1001

1002
// reset is the private version of the Reset method. We have this so we can
1003
// call this method within method that are already holding the lock.
1004
func (d *deDupedAnnouncements) reset() {
316✔
1005
        // Storage of each type of announcement (channel announcements, channel
316✔
1006
        // updates, node announcements) is set to an empty map where the
316✔
1007
        // appropriate key points to the corresponding lnwire.Message.
316✔
1008
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
316✔
1009
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
316✔
1010
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
316✔
1011
}
316✔
1012

1013
// addMsg adds a new message to the current batch. If the message is already
1014
// present in the current batch, then this new instance replaces the latter,
1015
// and the set of senders is updated to reflect which node sent us this
1016
// message.
1017
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
86✔
1018
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
86✔
1019

86✔
1020
        // Depending on the message type (channel announcement, channel update,
86✔
1021
        // or node announcement), the message is added to the corresponding map
86✔
1022
        // in deDupedAnnouncements. Because each identifying key can have at
86✔
1023
        // most one value, the announcements are de-duplicated, with newer ones
86✔
1024
        // replacing older ones.
86✔
1025
        switch msg := message.msg.(type) {
86✔
1026

1027
        // Channel announcements are identified by the short channel id field.
1028
        case *lnwire.ChannelAnnouncement1:
22✔
1029
                deDupKey := msg.ShortChannelID
22✔
1030
                sender := route.NewVertex(message.source)
22✔
1031

22✔
1032
                mws, ok := d.channelAnnouncements[deDupKey]
22✔
1033
                if !ok {
43✔
1034
                        mws = msgWithSenders{
21✔
1035
                                msg:     msg,
21✔
1036
                                isLocal: !message.isRemote,
21✔
1037
                                senders: make(map[route.Vertex]struct{}),
21✔
1038
                        }
21✔
1039
                        mws.senders[sender] = struct{}{}
21✔
1040

21✔
1041
                        d.channelAnnouncements[deDupKey] = mws
21✔
1042

21✔
1043
                        return
21✔
1044
                }
21✔
1045

1046
                mws.msg = msg
1✔
1047
                mws.senders[sender] = struct{}{}
1✔
1048
                d.channelAnnouncements[deDupKey] = mws
1✔
1049

1050
        // Channel updates are identified by the (short channel id,
1051
        // channelflags) tuple.
1052
        case *lnwire.ChannelUpdate1:
42✔
1053
                sender := route.NewVertex(message.source)
42✔
1054
                deDupKey := channelUpdateID{
42✔
1055
                        msg.ShortChannelID,
42✔
1056
                        msg.ChannelFlags,
42✔
1057
                }
42✔
1058

42✔
1059
                oldTimestamp := uint32(0)
42✔
1060
                mws, ok := d.channelUpdates[deDupKey]
42✔
1061
                if ok {
45✔
1062
                        // If we already have seen this message, record its
3✔
1063
                        // timestamp.
3✔
1064
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
3✔
1065
                        if !ok {
3✔
1066
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
×
1067
                                        "got: %T", mws.msg)
×
1068

×
1069
                                return
×
1070
                        }
×
1071

1072
                        oldTimestamp = update.Timestamp
3✔
1073
                }
1074

1075
                // If we already had this message with a strictly newer
1076
                // timestamp, then we'll just discard the message we got.
1077
                if oldTimestamp > msg.Timestamp {
43✔
1078
                        log.Debugf("Ignored outdated network message: "+
1✔
1079
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1✔
1080
                        return
1✔
1081
                }
1✔
1082

1083
                // If the message we just got is newer than what we previously
1084
                // have seen, or this is the first time we see it, then we'll
1085
                // add it to our map of announcements.
1086
                if oldTimestamp < msg.Timestamp {
81✔
1087
                        mws = msgWithSenders{
40✔
1088
                                msg:     msg,
40✔
1089
                                isLocal: !message.isRemote,
40✔
1090
                                senders: make(map[route.Vertex]struct{}),
40✔
1091
                        }
40✔
1092

40✔
1093
                        // We'll mark the sender of the message in the
40✔
1094
                        // senders map.
40✔
1095
                        mws.senders[sender] = struct{}{}
40✔
1096

40✔
1097
                        d.channelUpdates[deDupKey] = mws
40✔
1098

40✔
1099
                        return
40✔
1100
                }
40✔
1101

1102
                // Lastly, if we had seen this exact message from before, with
1103
                // the same timestamp, we'll add the sender to the map of
1104
                // senders, such that we can skip sending this message back in
1105
                // the next batch.
1106
                mws.msg = msg
1✔
1107
                mws.senders[sender] = struct{}{}
1✔
1108
                d.channelUpdates[deDupKey] = mws
1✔
1109

1110
        // Node announcements are identified by the Vertex field.  Use the
1111
        // NodeID to create the corresponding Vertex.
1112
        case *lnwire.NodeAnnouncement:
22✔
1113
                sender := route.NewVertex(message.source)
22✔
1114
                deDupKey := route.Vertex(msg.NodeID)
22✔
1115

22✔
1116
                // We do the same for node announcements as we did for channel
22✔
1117
                // updates, as they also carry a timestamp.
22✔
1118
                oldTimestamp := uint32(0)
22✔
1119
                mws, ok := d.nodeAnnouncements[deDupKey]
22✔
1120
                if ok {
27✔
1121
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
5✔
1122
                }
5✔
1123

1124
                // Discard the message if it's old.
1125
                if oldTimestamp > msg.Timestamp {
22✔
1126
                        return
×
1127
                }
×
1128

1129
                // Replace if it's newer.
1130
                if oldTimestamp < msg.Timestamp {
40✔
1131
                        mws = msgWithSenders{
18✔
1132
                                msg:     msg,
18✔
1133
                                isLocal: !message.isRemote,
18✔
1134
                                senders: make(map[route.Vertex]struct{}),
18✔
1135
                        }
18✔
1136

18✔
1137
                        mws.senders[sender] = struct{}{}
18✔
1138

18✔
1139
                        d.nodeAnnouncements[deDupKey] = mws
18✔
1140

18✔
1141
                        return
18✔
1142
                }
18✔
1143

1144
                // Add to senders map if it's the same as we had.
1145
                mws.msg = msg
4✔
1146
                mws.senders[sender] = struct{}{}
4✔
1147
                d.nodeAnnouncements[deDupKey] = mws
4✔
1148
        }
1149
}
1150

1151
// AddMsgs is a helper method to add multiple messages to the announcement
1152
// batch.
1153
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
54✔
1154
        d.Lock()
54✔
1155
        defer d.Unlock()
54✔
1156

54✔
1157
        for _, msg := range msgs {
140✔
1158
                d.addMsg(msg)
86✔
1159
        }
86✔
1160
}
1161

1162
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1163
// to broadcast next into messages that are locally sourced and those that are
1164
// sourced remotely.
1165
type msgsToBroadcast struct {
1166
        // localMsgs is the set of messages we created locally.
1167
        localMsgs []msgWithSenders
1168

1169
        // remoteMsgs is the set of messages that we received from a remote
1170
        // party.
1171
        remoteMsgs []msgWithSenders
1172
}
1173

1174
// addMsg adds a new message to the appropriate sub-slice.
1175
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
71✔
1176
        if msg.isLocal {
118✔
1177
                m.localMsgs = append(m.localMsgs, msg)
47✔
1178
        } else {
71✔
1179
                m.remoteMsgs = append(m.remoteMsgs, msg)
24✔
1180
        }
24✔
1181
}
1182

1183
// isEmpty returns true if the batch is empty.
1184
func (m *msgsToBroadcast) isEmpty() bool {
286✔
1185
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
286✔
1186
}
286✔
1187

1188
// length returns the length of the combined message set.
1189
func (m *msgsToBroadcast) length() int {
1✔
1190
        return len(m.localMsgs) + len(m.remoteMsgs)
1✔
1191
}
1✔
1192

1193
// Emit returns the set of de-duplicated announcements to be sent out during
1194
// the next announcement epoch, in the order of channel announcements, channel
1195
// updates, and node announcements. Each message emitted, contains the set of
1196
// peers that sent us the message. This way, we can ensure that we don't waste
1197
// bandwidth by re-sending a message to the peer that sent it to us in the
1198
// first place. Additionally, the set of stored messages are reset.
1199
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
287✔
1200
        d.Lock()
287✔
1201
        defer d.Unlock()
287✔
1202

287✔
1203
        // Get the total number of announcements.
287✔
1204
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
287✔
1205
                len(d.nodeAnnouncements)
287✔
1206

287✔
1207
        // Create an empty array of lnwire.Messages with a length equal to
287✔
1208
        // the total number of announcements.
287✔
1209
        msgs := msgsToBroadcast{
287✔
1210
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
287✔
1211
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
287✔
1212
        }
287✔
1213

287✔
1214
        // Add the channel announcements to the array first.
287✔
1215
        for _, message := range d.channelAnnouncements {
305✔
1216
                msgs.addMsg(message)
18✔
1217
        }
18✔
1218

1219
        // Then add the channel updates.
1220
        for _, message := range d.channelUpdates {
323✔
1221
                msgs.addMsg(message)
36✔
1222
        }
36✔
1223

1224
        // Finally add the node announcements.
1225
        for _, message := range d.nodeAnnouncements {
304✔
1226
                msgs.addMsg(message)
17✔
1227
        }
17✔
1228

1229
        d.reset()
287✔
1230

287✔
1231
        // Return the array of lnwire.messages.
287✔
1232
        return msgs
287✔
1233
}
1234

1235
// calculateSubBatchSize is a helper function that calculates the size to break
1236
// down the batchSize into.
1237
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1238
        minimumBatchSize, batchSize int) int {
13✔
1239
        if subBatchDelay > totalDelay {
15✔
1240
                return batchSize
2✔
1241
        }
2✔
1242

1243
        subBatchSize := (batchSize*int(subBatchDelay) +
11✔
1244
                int(totalDelay) - 1) / int(totalDelay)
11✔
1245

11✔
1246
        if subBatchSize < minimumBatchSize {
12✔
1247
                return minimumBatchSize
1✔
1248
        }
1✔
1249

1250
        return subBatchSize
10✔
1251
}
1252

1253
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1254
// this variable so the function can be mocked in our test.
1255
var batchSizeCalculator = calculateSubBatchSize
1256

1257
// splitAnnouncementBatches takes an exiting list of announcements and
1258
// decomposes it into sub batches controlled by the `subBatchSize`.
1259
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1260
        announcementBatch []msgWithSenders) [][]msgWithSenders {
69✔
1261

69✔
1262
        subBatchSize := batchSizeCalculator(
69✔
1263
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
69✔
1264
                d.cfg.MinimumBatchSize, len(announcementBatch),
69✔
1265
        )
69✔
1266

69✔
1267
        var splitAnnouncementBatch [][]msgWithSenders
69✔
1268

69✔
1269
        for subBatchSize < len(announcementBatch) {
190✔
1270
                // For slicing with minimal allocation
121✔
1271
                // https://github.com/golang/go/wiki/SliceTricks
121✔
1272
                announcementBatch, splitAnnouncementBatch =
121✔
1273
                        announcementBatch[subBatchSize:],
121✔
1274
                        append(splitAnnouncementBatch,
121✔
1275
                                announcementBatch[0:subBatchSize:subBatchSize])
121✔
1276
        }
121✔
1277
        splitAnnouncementBatch = append(
69✔
1278
                splitAnnouncementBatch, announcementBatch,
69✔
1279
        )
69✔
1280

69✔
1281
        return splitAnnouncementBatch
69✔
1282
}
1283

1284
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1285
// split size, and then sends out all items to the set of target peers. Locally
1286
// generated announcements are always sent before remotely generated
1287
// announcements.
1288
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
1289
        annBatch msgsToBroadcast) {
31✔
1290

31✔
1291
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
31✔
1292
        // duration to delay the sending of next announcement batch.
31✔
1293
        delayNextBatch := func() {
93✔
1294
                select {
62✔
1295
                case <-time.After(d.cfg.SubBatchDelay):
45✔
1296
                case <-d.quit:
17✔
1297
                        return
17✔
1298
                }
1299
        }
1300

1301
        // Fetch the local and remote announcements.
1302
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
31✔
1303
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
31✔
1304

31✔
1305
        d.wg.Add(1)
31✔
1306
        go func() {
62✔
1307
                defer d.wg.Done()
31✔
1308

31✔
1309
                log.Debugf("Broadcasting %v new local announcements in %d "+
31✔
1310
                        "sub batches", len(annBatch.localMsgs),
31✔
1311
                        len(localBatches))
31✔
1312

31✔
1313
                // Send out the local announcements first.
31✔
1314
                for _, annBatch := range localBatches {
62✔
1315
                        d.sendLocalBatch(annBatch)
31✔
1316
                        delayNextBatch()
31✔
1317
                }
31✔
1318

1319
                log.Debugf("Broadcasting %v new remote announcements in %d "+
31✔
1320
                        "sub batches", len(annBatch.remoteMsgs),
31✔
1321
                        len(remoteBatches))
31✔
1322

31✔
1323
                // Now send the remote announcements.
31✔
1324
                for _, annBatch := range remoteBatches {
62✔
1325
                        d.sendRemoteBatch(annBatch)
31✔
1326
                        delayNextBatch()
31✔
1327
                }
31✔
1328
        }()
1329
}
1330

1331
// sendLocalBatch broadcasts a list of locally generated announcements to our
1332
// peers. For local announcements, we skip the filter and dedup logic and just
1333
// send the announcements out to all our coonnected peers.
1334
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
31✔
1335
        msgsToSend := lnutils.Map(
31✔
1336
                annBatch, func(m msgWithSenders) lnwire.Message {
74✔
1337
                        return m.msg
43✔
1338
                },
43✔
1339
        )
1340

1341
        err := d.cfg.Broadcast(nil, msgsToSend...)
31✔
1342
        if err != nil {
31✔
1343
                log.Errorf("Unable to send local batch announcements: %v", err)
×
1344
        }
×
1345
}
1346

1347
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1348
// peers.
1349
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
31✔
1350
        syncerPeers := d.syncMgr.GossipSyncers()
31✔
1351

31✔
1352
        // We'll first attempt to filter out this new message for all peers
31✔
1353
        // that have active gossip syncers active.
31✔
1354
        for pub, syncer := range syncerPeers {
31✔
1355
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
×
1356
                syncer.FilterGossipMsgs(annBatch...)
×
1357
        }
×
1358

1359
        for _, msgChunk := range annBatch {
55✔
1360
                msgChunk := msgChunk
24✔
1361

24✔
1362
                // With the syncers taken care of, we'll merge the sender map
24✔
1363
                // with the set of syncers, so we don't send out duplicate
24✔
1364
                // messages.
24✔
1365
                msgChunk.mergeSyncerMap(syncerPeers)
24✔
1366

24✔
1367
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
24✔
1368
                if err != nil {
24✔
1369
                        log.Errorf("Unable to send batch "+
×
1370
                                "announcements: %v", err)
×
1371
                        continue
×
1372
                }
1373
        }
1374
}
1375

1376
// networkHandler is the primary goroutine that drives this service. The roles
1377
// of this goroutine includes answering queries related to the state of the
1378
// network, syncing up newly connected peers, and also periodically
1379
// broadcasting our latest topology state to all connected peers.
1380
//
1381
// NOTE: This MUST be run as a goroutine.
1382
func (d *AuthenticatedGossiper) networkHandler() {
27✔
1383
        defer d.wg.Done()
27✔
1384

27✔
1385
        // Initialize empty deDupedAnnouncements to store announcement batch.
27✔
1386
        announcements := deDupedAnnouncements{}
27✔
1387
        announcements.Reset()
27✔
1388

27✔
1389
        d.cfg.RetransmitTicker.Resume()
27✔
1390
        defer d.cfg.RetransmitTicker.Stop()
27✔
1391

27✔
1392
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
27✔
1393
        defer trickleTimer.Stop()
27✔
1394

27✔
1395
        // To start, we'll first check to see if there are any stale channel or
27✔
1396
        // node announcements that we need to re-transmit.
27✔
1397
        if err := d.retransmitStaleAnns(time.Now()); err != nil {
27✔
1398
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
1399
        }
×
1400

1401
        // We'll use this validation to ensure that we process jobs in their
1402
        // dependency order during parallel validation.
1403
        validationBarrier := graph.NewValidationBarrier(1000, d.quit)
27✔
1404

27✔
1405
        for {
673✔
1406
                select {
646✔
1407
                // A new policy update has arrived. We'll commit it to the
1408
                // sub-systems below us, then craft, sign, and broadcast a new
1409
                // ChannelUpdate for the set of affected clients.
1410
                case policyUpdate := <-d.chanPolicyUpdates:
1✔
1411
                        log.Tracef("Received channel %d policy update requests",
1✔
1412
                                len(policyUpdate.edgesToUpdate))
1✔
1413

1✔
1414
                        // First, we'll now create new fully signed updates for
1✔
1415
                        // the affected channels and also update the underlying
1✔
1416
                        // graph with the new state.
1✔
1417
                        newChanUpdates, err := d.processChanPolicyUpdate(
1✔
1418
                                policyUpdate.edgesToUpdate,
1✔
1419
                        )
1✔
1420
                        policyUpdate.errChan <- err
1✔
1421
                        if err != nil {
1✔
1422
                                log.Errorf("Unable to craft policy updates: %v",
×
1423
                                        err)
×
1424
                                continue
×
1425
                        }
1426

1427
                        // Finally, with the updates committed, we'll now add
1428
                        // them to the announcement batch to be flushed at the
1429
                        // start of the next epoch.
1430
                        announcements.AddMsgs(newChanUpdates...)
1✔
1431

1432
                case announcement := <-d.networkMsgs:
331✔
1433
                        log.Tracef("Received network message: "+
331✔
1434
                                "peer=%v, msg=%s, is_remote=%v",
331✔
1435
                                announcement.peer, announcement.msg.MsgType(),
331✔
1436
                                announcement.isRemote)
331✔
1437

331✔
1438
                        switch announcement.msg.(type) {
331✔
1439
                        // Channel announcement signatures are amongst the only
1440
                        // messages that we'll process serially.
1441
                        case *lnwire.AnnounceSignatures1:
21✔
1442
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
21✔
1443
                                        announcement,
21✔
1444
                                )
21✔
1445
                                log.Debugf("Processed network message %s, "+
21✔
1446
                                        "returned len(announcements)=%v",
21✔
1447
                                        announcement.msg.MsgType(),
21✔
1448
                                        len(emittedAnnouncements))
21✔
1449

21✔
1450
                                if emittedAnnouncements != nil {
31✔
1451
                                        announcements.AddMsgs(
10✔
1452
                                                emittedAnnouncements...,
10✔
1453
                                        )
10✔
1454
                                }
10✔
1455
                                continue
21✔
1456
                        }
1457

1458
                        // If this message was recently rejected, then we won't
1459
                        // attempt to re-process it.
1460
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
310✔
1461
                                announcement.msg,
310✔
1462
                                sourceToPub(announcement.source),
310✔
1463
                        ) {
311✔
1464

1✔
1465
                                announcement.err <- fmt.Errorf("recently " +
1✔
1466
                                        "rejected")
1✔
1467
                                continue
1✔
1468
                        }
1469

1470
                        // We'll set up any dependent, and wait until a free
1471
                        // slot for this job opens up, this allow us to not
1472
                        // have thousands of goroutines active.
1473
                        validationBarrier.InitJobDependencies(announcement.msg)
309✔
1474

309✔
1475
                        d.wg.Add(1)
309✔
1476
                        go d.handleNetworkMessages(
309✔
1477
                                announcement, &announcements, validationBarrier,
309✔
1478
                        )
309✔
1479

1480
                // The trickle timer has ticked, which indicates we should
1481
                // flush to the network the pending batch of new announcements
1482
                // we've received since the last trickle tick.
1483
                case <-trickleTimer.C:
286✔
1484
                        // Emit the current batch of announcements from
286✔
1485
                        // deDupedAnnouncements.
286✔
1486
                        announcementBatch := announcements.Emit()
286✔
1487

286✔
1488
                        // If the current announcements batch is nil, then we
286✔
1489
                        // have no further work here.
286✔
1490
                        if announcementBatch.isEmpty() {
541✔
1491
                                continue
255✔
1492
                        }
1493

1494
                        // At this point, we have the set of local and remote
1495
                        // announcements we want to send out. We'll do the
1496
                        // batching as normal for both, but for local
1497
                        // announcements, we'll blast them out w/o regard for
1498
                        // our peer's policies so we ensure they propagate
1499
                        // properly.
1500
                        d.splitAndSendAnnBatch(announcementBatch)
31✔
1501

1502
                // The retransmission timer has ticked which indicates that we
1503
                // should check if we need to prune or re-broadcast any of our
1504
                // personal channels or node announcement. This addresses the
1505
                // case of "zombie" channels and channel advertisements that
1506
                // have been dropped, or not properly propagated through the
1507
                // network.
1508
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1✔
1509
                        if err := d.retransmitStaleAnns(tick); err != nil {
1✔
1510
                                log.Errorf("unable to rebroadcast stale "+
×
1511
                                        "announcements: %v", err)
×
1512
                        }
×
1513

1514
                // The gossiper has been signalled to exit, to we exit our
1515
                // main loop so the wait group can be decremented.
1516
                case <-d.quit:
27✔
1517
                        return
27✔
1518
                }
1519
        }
1520
}
1521

1522
// handleNetworkMessages is responsible for waiting for dependencies for a
1523
// given network message and processing the message. Once processed, it will
1524
// signal its dependants and add the new announcements to the announce batch.
1525
//
1526
// NOTE: must be run as a goroutine.
1527
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1528
        deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {
309✔
1529

309✔
1530
        defer d.wg.Done()
309✔
1531
        defer vb.CompleteJob()
309✔
1532

309✔
1533
        // We should only broadcast this message forward if it originated from
309✔
1534
        // us or it wasn't received as part of our initial historical sync.
309✔
1535
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
309✔
1536

309✔
1537
        // If this message has an existing dependency, then we'll wait until
309✔
1538
        // that has been fully validated before we proceed.
309✔
1539
        err := vb.WaitForDependants(nMsg.msg)
309✔
1540
        if err != nil {
309✔
1541
                log.Debugf("Validating network message %s got err: %v",
×
1542
                        nMsg.msg.MsgType(), err)
×
1543

×
1544
                if !graph.IsError(
×
1545
                        err,
×
1546
                        graph.ErrVBarrierShuttingDown,
×
1547
                        graph.ErrParentValidationFailed,
×
1548
                ) {
×
1549

×
1550
                        log.Warnf("unexpected error during validation "+
×
1551
                                "barrier shutdown: %v", err)
×
1552
                }
×
1553
                nMsg.err <- err
×
1554

×
1555
                return
×
1556
        }
1557

1558
        // Process the network announcement to determine if this is either a
1559
        // new announcement from our PoV or an edges to a prior vertex/edge we
1560
        // previously proceeded.
1561
        newAnns, allow := d.processNetworkAnnouncement(nMsg)
309✔
1562

309✔
1563
        log.Tracef("Processed network message %s, returned "+
309✔
1564
                "len(announcements)=%v, allowDependents=%v",
309✔
1565
                nMsg.msg.MsgType(), len(newAnns), allow)
309✔
1566

309✔
1567
        // If this message had any dependencies, then we can now signal them to
309✔
1568
        // continue.
309✔
1569
        vb.SignalDependants(nMsg.msg, allow)
309✔
1570

309✔
1571
        // If the announcement was accepted, then add the emitted announcements
309✔
1572
        // to our announce batch to be broadcast once the trickle timer ticks
309✔
1573
        // gain.
309✔
1574
        if newAnns != nil && shouldBroadcast {
341✔
1575
                // TODO(roasbeef): exclude peer that sent.
32✔
1576
                deDuped.AddMsgs(newAnns...)
32✔
1577
        } else if newAnns != nil {
310✔
1578
                log.Trace("Skipping broadcast of announcements received " +
1✔
1579
                        "during initial graph sync")
1✔
1580
        }
1✔
1581
}
1582

1583
// TODO(roasbeef): d/c peers that send updates not on our chain
1584

1585
// InitSyncState is called by outside sub-systems when a connection is
1586
// established to a new peer that understands how to perform channel range
1587
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1588
// needed to handle new queries.
1589
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
×
1590
        d.syncMgr.InitSyncState(syncPeer)
×
1591
}
×
1592

1593
// PruneSyncState is called by outside sub-systems once a peer that we were
1594
// previously connected to has been disconnected. In this case we can stop the
1595
// existing GossipSyncer assigned to the peer and free up resources.
1596
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
×
1597
        d.syncMgr.PruneSyncState(peer)
×
1598
}
×
1599

1600
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1601
// false otherwise, This avoids expensive reprocessing of the message.
1602
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1603
        peerPub [33]byte) bool {
273✔
1604

273✔
1605
        var scid uint64
273✔
1606
        switch m := msg.(type) {
273✔
1607
        case *lnwire.ChannelUpdate1:
42✔
1608
                scid = m.ShortChannelID.ToUint64()
42✔
1609

1610
        case *lnwire.ChannelAnnouncement1:
217✔
1611
                scid = m.ShortChannelID.ToUint64()
217✔
1612

1613
        default:
14✔
1614
                return false
14✔
1615
        }
1616

1617
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
259✔
1618
        return err != cache.ErrElementNotFound
259✔
1619
}
1620

1621
// retransmitStaleAnns examines all outgoing channels that the source node is
1622
// known to maintain to check to see if any of them are "stale". A channel is
1623
// stale iff, the last timestamp of its rebroadcast is older than the
1624
// RebroadcastInterval. We also check if a refreshed node announcement should
1625
// be resent.
1626
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
28✔
1627
        // Iterate over all of our channels and check if any of them fall
28✔
1628
        // within the prune interval or re-broadcast interval.
28✔
1629
        type updateTuple struct {
28✔
1630
                info *models.ChannelEdgeInfo
28✔
1631
                edge *models.ChannelEdgePolicy
28✔
1632
        }
28✔
1633

28✔
1634
        var (
28✔
1635
                havePublicChannels bool
28✔
1636
                edgesToUpdate      []updateTuple
28✔
1637
        )
28✔
1638
        err := d.cfg.Graph.ForAllOutgoingChannels(func(
28✔
1639
                info *models.ChannelEdgeInfo,
28✔
1640
                edge *models.ChannelEdgePolicy) error {
30✔
1641

2✔
1642
                // If there's no auth proof attached to this edge, it means
2✔
1643
                // that it is a private channel not meant to be announced to
2✔
1644
                // the greater network, so avoid sending channel updates for
2✔
1645
                // this channel to not leak its
2✔
1646
                // existence.
2✔
1647
                if info.AuthProof == nil {
3✔
1648
                        log.Debugf("Skipping retransmission of channel "+
1✔
1649
                                "without AuthProof: %v", info.ChannelID)
1✔
1650
                        return nil
1✔
1651
                }
1✔
1652

1653
                // We make a note that we have at least one public channel. We
1654
                // use this to determine whether we should send a node
1655
                // announcement below.
1656
                havePublicChannels = true
1✔
1657

1✔
1658
                // If this edge has a ChannelUpdate that was created before the
1✔
1659
                // introduction of the MaxHTLC field, then we'll update this
1✔
1660
                // edge to propagate this information in the network.
1✔
1661
                if !edge.MessageFlags.HasMaxHtlc() {
1✔
1662
                        // We'll make sure we support the new max_htlc field if
×
1663
                        // not already present.
×
1664
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1665
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1666

×
1667
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1668
                                info: info,
×
1669
                                edge: edge,
×
1670
                        })
×
1671
                        return nil
×
1672
                }
×
1673

1674
                timeElapsed := now.Sub(edge.LastUpdate)
1✔
1675

1✔
1676
                // If it's been longer than RebroadcastInterval since we've
1✔
1677
                // re-broadcasted the channel, add the channel to the set of
1✔
1678
                // edges we need to update.
1✔
1679
                if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
1680
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
1✔
1681
                                info: info,
1✔
1682
                                edge: edge,
1✔
1683
                        })
1✔
1684
                }
1✔
1685

1686
                return nil
1✔
1687
        })
1688
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
28✔
1689
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
1690
                        err)
×
1691
        }
×
1692

1693
        var signedUpdates []lnwire.Message
28✔
1694
        for _, chanToUpdate := range edgesToUpdate {
29✔
1695
                // Re-sign and update the channel on disk and retrieve our
1✔
1696
                // ChannelUpdate to broadcast.
1✔
1697
                chanAnn, chanUpdate, err := d.updateChannel(
1✔
1698
                        chanToUpdate.info, chanToUpdate.edge,
1✔
1699
                )
1✔
1700
                if err != nil {
1✔
1701
                        return fmt.Errorf("unable to update channel: %w", err)
×
1702
                }
×
1703

1704
                // If we have a valid announcement to transmit, then we'll send
1705
                // that along with the update.
1706
                if chanAnn != nil {
2✔
1707
                        signedUpdates = append(signedUpdates, chanAnn)
1✔
1708
                }
1✔
1709

1710
                signedUpdates = append(signedUpdates, chanUpdate)
1✔
1711
        }
1712

1713
        // If we don't have any public channels, we return as we don't want to
1714
        // broadcast anything that would reveal our existence.
1715
        if !havePublicChannels {
55✔
1716
                return nil
27✔
1717
        }
27✔
1718

1719
        // We'll also check that our NodeAnnouncement is not too old.
1720
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
1✔
1721
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
1✔
1722
        timeElapsed := now.Sub(timestamp)
1✔
1723

1✔
1724
        // If it's been a full day since we've re-broadcasted the
1✔
1725
        // node announcement, refresh it and resend it.
1✔
1726
        nodeAnnStr := ""
1✔
1727
        if timeElapsed >= d.cfg.RebroadcastInterval {
2✔
1728
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
1✔
1729
                if err != nil {
1✔
1730
                        return fmt.Errorf("unable to get refreshed node "+
×
1731
                                "announcement: %v", err)
×
1732
                }
×
1733

1734
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1735
                nodeAnnStr = " and our refreshed node announcement"
1✔
1736

1✔
1737
                // Before broadcasting the refreshed node announcement, add it
1✔
1738
                // to our own graph.
1✔
1739
                if err := d.addNode(&newNodeAnn); err != nil {
2✔
1740
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1741
                                "to graph: %v", err)
1✔
1742
                }
1✔
1743
        }
1744

1745
        // If we don't have any updates to re-broadcast, then we'll exit
1746
        // early.
1747
        if len(signedUpdates) == 0 {
1✔
1748
                return nil
×
1749
        }
×
1750

1751
        log.Infof("Retransmitting %v outgoing channels%v",
1✔
1752
                len(edgesToUpdate), nodeAnnStr)
1✔
1753

1✔
1754
        // With all the wire announcements properly crafted, we'll broadcast
1✔
1755
        // our known outgoing channels to all our immediate peers.
1✔
1756
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
1757
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
1758
        }
×
1759

1760
        return nil
1✔
1761
}
1762

1763
// processChanPolicyUpdate generates a new set of channel updates for the
1764
// provided list of edges and updates the backing ChannelGraphSource.
1765
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
1766
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
1✔
1767

1✔
1768
        var chanUpdates []networkMsg
1✔
1769
        for _, edgeInfo := range edgesToUpdate {
4✔
1770
                // Now that we've collected all the channels we need to update,
3✔
1771
                // we'll re-sign and update the backing ChannelGraphSource, and
3✔
1772
                // retrieve our ChannelUpdate to broadcast.
3✔
1773
                _, chanUpdate, err := d.updateChannel(
3✔
1774
                        edgeInfo.Info, edgeInfo.Edge,
3✔
1775
                )
3✔
1776
                if err != nil {
3✔
1777
                        return nil, err
×
1778
                }
×
1779

1780
                // We'll avoid broadcasting any updates for private channels to
1781
                // avoid directly giving away their existence. Instead, we'll
1782
                // send the update directly to the remote party.
1783
                if edgeInfo.Info.AuthProof == nil {
4✔
1784
                        // If AuthProof is nil and an alias was found for this
1✔
1785
                        // ChannelID (meaning the option-scid-alias feature was
1✔
1786
                        // negotiated), we'll replace the ShortChannelID in the
1✔
1787
                        // update with the peer's alias. We do this after
1✔
1788
                        // updateChannel so that the alias isn't persisted to
1✔
1789
                        // the database.
1✔
1790
                        chanID := lnwire.NewChanIDFromOutPoint(
1✔
1791
                                edgeInfo.Info.ChannelPoint,
1✔
1792
                        )
1✔
1793

1✔
1794
                        var defaultAlias lnwire.ShortChannelID
1✔
1795
                        foundAlias, _ := d.cfg.GetAlias(chanID)
1✔
1796
                        if foundAlias != defaultAlias {
1✔
1797
                                chanUpdate.ShortChannelID = foundAlias
×
1798

×
1799
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
×
1800
                                if err != nil {
×
1801
                                        log.Errorf("Unable to sign alias "+
×
1802
                                                "update: %v", err)
×
1803
                                        continue
×
1804
                                }
1805

1806
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
1807
                                if err != nil {
×
1808
                                        log.Errorf("Unable to create sig: %v",
×
1809
                                                err)
×
1810
                                        continue
×
1811
                                }
1812

1813
                                chanUpdate.Signature = lnSig
×
1814
                        }
1815

1816
                        remotePubKey := remotePubFromChanInfo(
1✔
1817
                                edgeInfo.Info, chanUpdate.ChannelFlags,
1✔
1818
                        )
1✔
1819
                        err := d.reliableSender.sendMessage(
1✔
1820
                                chanUpdate, remotePubKey,
1✔
1821
                        )
1✔
1822
                        if err != nil {
1✔
1823
                                log.Errorf("Unable to reliably send %v for "+
×
1824
                                        "channel=%v to peer=%x: %v",
×
1825
                                        chanUpdate.MsgType(),
×
1826
                                        chanUpdate.ShortChannelID,
×
1827
                                        remotePubKey, err)
×
1828
                        }
×
1829
                        continue
1✔
1830
                }
1831

1832
                // We set ourselves as the source of this message to indicate
1833
                // that we shouldn't skip any peers when sending this message.
1834
                chanUpdates = append(chanUpdates, networkMsg{
2✔
1835
                        source:   d.selfKey,
2✔
1836
                        isRemote: false,
2✔
1837
                        msg:      chanUpdate,
2✔
1838
                })
2✔
1839
        }
1840

1841
        return chanUpdates, nil
1✔
1842
}
1843

1844
// remotePubFromChanInfo returns the public key of the remote peer given a
1845
// ChannelEdgeInfo that describe a channel we have with them.
1846
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1847
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
12✔
1848

12✔
1849
        var remotePubKey [33]byte
12✔
1850
        switch {
12✔
1851
        case chanFlags&lnwire.ChanUpdateDirection == 0:
12✔
1852
                remotePubKey = chanInfo.NodeKey2Bytes
12✔
1853
        case chanFlags&lnwire.ChanUpdateDirection == 1:
×
1854
                remotePubKey = chanInfo.NodeKey1Bytes
×
1855
        }
1856

1857
        return remotePubKey
12✔
1858
}
1859

1860
// processRejectedEdge examines a rejected edge to see if we can extract any
1861
// new announcements from it.  An edge will get rejected if we already added
1862
// the same edge without AuthProof to the graph. If the received announcement
1863
// contains a proof, we can add this proof to our edge.  We can end up in this
1864
// situation in the case where we create a channel, but for some reason fail
1865
// to receive the remote peer's proof, while the remote peer is able to fully
1866
// assemble the proof and craft the ChannelAnnouncement.
1867
func (d *AuthenticatedGossiper) processRejectedEdge(
1868
        chanAnnMsg *lnwire.ChannelAnnouncement1,
1869
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
×
1870

×
1871
        // First, we'll fetch the state of the channel as we know if from the
×
1872
        // database.
×
1873
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
×
1874
                chanAnnMsg.ShortChannelID,
×
1875
        )
×
1876
        if err != nil {
×
1877
                return nil, err
×
1878
        }
×
1879

1880
        // The edge is in the graph, and has a proof attached, then we'll just
1881
        // reject it as normal.
1882
        if chanInfo.AuthProof != nil {
×
1883
                return nil, nil
×
1884
        }
×
1885

1886
        // Otherwise, this means that the edge is within the graph, but it
1887
        // doesn't yet have a proper proof attached. If we did not receive
1888
        // the proof such that we now can add it, there's nothing more we
1889
        // can do.
1890
        if proof == nil {
×
1891
                return nil, nil
×
1892
        }
×
1893

1894
        // We'll then create then validate the new fully assembled
1895
        // announcement.
1896
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1897
                proof, chanInfo, e1, e2,
×
1898
        )
×
1899
        if err != nil {
×
1900
                return nil, err
×
1901
        }
×
1902
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1903
        if err != nil {
×
1904
                err := fmt.Errorf("assembled channel announcement proof "+
×
1905
                        "for shortChanID=%v isn't valid: %v",
×
1906
                        chanAnnMsg.ShortChannelID, err)
×
1907
                log.Error(err)
×
1908
                return nil, err
×
1909
        }
×
1910

1911
        // If everything checks out, then we'll add the fully assembled proof
1912
        // to the database.
1913
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
1914
        if err != nil {
×
1915
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
1916
                        chanAnnMsg.ShortChannelID, err)
×
1917
                log.Error(err)
×
1918
                return nil, err
×
1919
        }
×
1920

1921
        // As we now have a complete channel announcement for this channel,
1922
        // we'll construct the announcement so they can be broadcast out to all
1923
        // our peers.
1924
        announcements := make([]networkMsg, 0, 3)
×
1925
        announcements = append(announcements, networkMsg{
×
1926
                source: d.selfKey,
×
1927
                msg:    chanAnn,
×
1928
        })
×
1929
        if e1Ann != nil {
×
1930
                announcements = append(announcements, networkMsg{
×
1931
                        source: d.selfKey,
×
1932
                        msg:    e1Ann,
×
1933
                })
×
1934
        }
×
1935
        if e2Ann != nil {
×
1936
                announcements = append(announcements, networkMsg{
×
1937
                        source: d.selfKey,
×
1938
                        msg:    e2Ann,
×
1939
                })
×
1940

×
1941
        }
×
1942

1943
        return announcements, nil
×
1944
}
1945

1946
// fetchPKScript fetches the output script for the given SCID.
1947
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
1948
        []byte, error) {
×
1949

×
1950
        return lnwallet.FetchPKScriptWithQuit(d.cfg.ChainIO, chanID, d.quit)
×
1951
}
×
1952

1953
// addNode processes the given node announcement, and adds it to our channel
1954
// graph.
1955
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
1956
        op ...batch.SchedulerOption) error {
17✔
1957

17✔
1958
        if err := graph.ValidateNodeAnn(msg); err != nil {
18✔
1959
                return fmt.Errorf("unable to validate node announcement: %w",
1✔
1960
                        err)
1✔
1961
        }
1✔
1962

1963
        timestamp := time.Unix(int64(msg.Timestamp), 0)
16✔
1964
        features := lnwire.NewFeatureVector(msg.Features, lnwire.Features)
16✔
1965
        node := &models.LightningNode{
16✔
1966
                HaveNodeAnnouncement: true,
16✔
1967
                LastUpdate:           timestamp,
16✔
1968
                Addresses:            msg.Addresses,
16✔
1969
                PubKeyBytes:          msg.NodeID,
16✔
1970
                Alias:                msg.Alias.String(),
16✔
1971
                AuthSigBytes:         msg.Signature.ToSignatureBytes(),
16✔
1972
                Features:             features,
16✔
1973
                Color:                msg.RGBColor,
16✔
1974
                ExtraOpaqueData:      msg.ExtraOpaqueData,
16✔
1975
        }
16✔
1976

16✔
1977
        return d.cfg.Graph.AddNode(node, op...)
16✔
1978
}
1979

1980
// isPremature decides whether a given network message has a block height+delta
1981
// value specified in the future. If so, the message will be added to the
1982
// future message map and be processed when the block height as reached.
1983
//
1984
// NOTE: must be used inside a lock.
1985
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
1986
        delta uint32, msg *networkMsg) bool {
279✔
1987
        // TODO(roasbeef) make height delta 6
279✔
1988
        //  * or configurable
279✔
1989

279✔
1990
        msgHeight := chanID.BlockHeight + delta
279✔
1991

279✔
1992
        // The message height is smaller or equal to our best known height,
279✔
1993
        // thus the message is mature.
279✔
1994
        if msgHeight <= d.bestHeight {
557✔
1995
                return false
278✔
1996
        }
278✔
1997

1998
        // Add the premature message to our future messages which will be
1999
        // resent once the block height has reached.
2000
        //
2001
        // Copy the networkMsgs since the old message's err chan will be
2002
        // consumed.
2003
        copied := &networkMsg{
1✔
2004
                peer:              msg.peer,
1✔
2005
                source:            msg.source,
1✔
2006
                msg:               msg.msg,
1✔
2007
                optionalMsgFields: msg.optionalMsgFields,
1✔
2008
                isRemote:          msg.isRemote,
1✔
2009
                err:               make(chan error, 1),
1✔
2010
        }
1✔
2011

1✔
2012
        // Create the cached message.
1✔
2013
        cachedMsg := &cachedFutureMsg{
1✔
2014
                msg:    copied,
1✔
2015
                height: msgHeight,
1✔
2016
        }
1✔
2017

1✔
2018
        // Increment the msg ID and add it to the cache.
1✔
2019
        nextMsgID := d.futureMsgs.nextMsgID()
1✔
2020
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
1✔
2021
        if err != nil {
1✔
2022
                log.Errorf("Adding future message got error: %v", err)
×
2023
        }
×
2024

2025
        log.Debugf("Network message: %v added to future messages for "+
1✔
2026
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
1✔
2027
                msgHeight, d.bestHeight)
1✔
2028

1✔
2029
        return true
1✔
2030
}
2031

2032
// processNetworkAnnouncement processes a new network relate authenticated
2033
// channel or node announcement or announcements proofs. If the announcement
2034
// didn't affect the internal state due to either being out of date, invalid,
2035
// or redundant, then nil is returned. Otherwise, the set of announcements will
2036
// be returned which should be broadcasted to the rest of the network. The
2037
// boolean returned indicates whether any dependents of the announcement should
2038
// attempt to be processed as well.
2039
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
2040
        nMsg *networkMsg) ([]networkMsg, bool) {
330✔
2041

330✔
2042
        // If this is a remote update, we set the scheduler option to lazily
330✔
2043
        // add it to the graph.
330✔
2044
        var schedulerOp []batch.SchedulerOption
330✔
2045
        if nMsg.isRemote {
613✔
2046
                schedulerOp = append(schedulerOp, batch.LazyAdd())
283✔
2047
        }
283✔
2048

2049
        switch msg := nMsg.msg.(type) {
330✔
2050
        // A new node announcement has arrived which either presents new
2051
        // information about a node in one of the channels we know about, or a
2052
        // updating previously advertised information.
2053
        case *lnwire.NodeAnnouncement:
24✔
2054
                return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
24✔
2055

2056
        // A new channel announcement has arrived, this indicates the
2057
        // *creation* of a new channel within the network. This only advertises
2058
        // the existence of a channel and not yet the routing policies in
2059
        // either direction of the channel.
2060
        case *lnwire.ChannelAnnouncement1:
230✔
2061
                return d.handleChanAnnouncement(nMsg, msg, schedulerOp)
230✔
2062

2063
        // A new authenticated channel edge update has arrived. This indicates
2064
        // that the directional information for an already known channel has
2065
        // been updated.
2066
        case *lnwire.ChannelUpdate1:
55✔
2067
                return d.handleChanUpdate(nMsg, msg, schedulerOp)
55✔
2068

2069
        // A new signature announcement has been received. This indicates
2070
        // willingness of nodes involved in the funding of a channel to
2071
        // announce this new channel to the rest of the world.
2072
        case *lnwire.AnnounceSignatures1:
21✔
2073
                return d.handleAnnSig(nMsg, msg)
21✔
2074

2075
        default:
×
2076
                err := errors.New("wrong type of the announcement")
×
2077
                nMsg.err <- err
×
2078
                return nil, false
×
2079
        }
2080
}
2081

2082
// processZombieUpdate determines whether the provided channel update should
2083
// resurrect a given zombie edge.
2084
//
2085
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2086
// should be inspected.
2087
func (d *AuthenticatedGossiper) processZombieUpdate(
2088
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2089
        msg *lnwire.ChannelUpdate1) error {
3✔
2090

3✔
2091
        // The least-significant bit in the flag on the channel update tells us
3✔
2092
        // which edge is being updated.
3✔
2093
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
2094

3✔
2095
        // Since we've deemed the update as not stale above, before marking it
3✔
2096
        // live, we'll make sure it has been signed by the correct party. If we
3✔
2097
        // have both pubkeys, either party can resurrect the channel. If we've
3✔
2098
        // already marked this with the stricter, single-sided resurrection we
3✔
2099
        // will only have the pubkey of the node with the oldest timestamp.
3✔
2100
        var pubKey *btcec.PublicKey
3✔
2101
        switch {
3✔
2102
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
×
2103
                pubKey, _ = chanInfo.NodeKey1()
×
2104
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
2✔
2105
                pubKey, _ = chanInfo.NodeKey2()
2✔
2106
        }
2107
        if pubKey == nil {
4✔
2108
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
1✔
2109
                        "with chan_id=%v", msg.ShortChannelID)
1✔
2110
        }
1✔
2111

2112
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
2✔
2113
        if err != nil {
3✔
2114
                return fmt.Errorf("unable to verify channel "+
1✔
2115
                        "update signature: %v", err)
1✔
2116
        }
1✔
2117

2118
        // With the signature valid, we'll proceed to mark the
2119
        // edge as live and wait for the channel announcement to
2120
        // come through again.
2121
        err = d.cfg.Graph.MarkEdgeLive(scid)
1✔
2122
        switch {
1✔
2123
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
×
2124
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2125
                        "zombie index: %v", err)
×
2126

×
2127
                return nil
×
2128

2129
        case err != nil:
×
2130
                return fmt.Errorf("unable to remove edge with "+
×
2131
                        "chan_id=%v from zombie index: %v",
×
2132
                        msg.ShortChannelID, err)
×
2133

2134
        default:
1✔
2135
        }
2136

2137
        log.Debugf("Removed edge with chan_id=%v from zombie "+
1✔
2138
                "index", msg.ShortChannelID)
1✔
2139

1✔
2140
        return nil
1✔
2141
}
2142

2143
// fetchNodeAnn fetches the latest signed node announcement from our point of
2144
// view for the node with the given public key.
2145
func (d *AuthenticatedGossiper) fetchNodeAnn(
2146
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
20✔
2147

20✔
2148
        node, err := d.cfg.Graph.FetchLightningNode(pubKey)
20✔
2149
        if err != nil {
26✔
2150
                return nil, err
6✔
2151
        }
6✔
2152

2153
        return node.NodeAnnouncement(true)
14✔
2154
}
2155

2156
// isMsgStale determines whether a message retrieved from the backing
2157
// MessageStore is seen as stale by the current graph.
2158
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
12✔
2159
        switch msg := msg.(type) {
12✔
2160
        case *lnwire.AnnounceSignatures1:
2✔
2161
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
2✔
2162
                        msg.ShortChannelID,
2✔
2163
                )
2✔
2164

2✔
2165
                // If the channel cannot be found, it is most likely a leftover
2✔
2166
                // message for a channel that was closed, so we can consider it
2✔
2167
                // stale.
2✔
2168
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
2✔
2169
                        return true
×
2170
                }
×
2171
                if err != nil {
2✔
2172
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2173
                                "%v", chanInfo.ChannelID, err)
×
2174
                        return false
×
2175
                }
×
2176

2177
                // If the proof exists in the graph, then we have successfully
2178
                // received the remote proof and assembled the full proof, so we
2179
                // can safely delete the local proof from the database.
2180
                return chanInfo.AuthProof != nil
2✔
2181

2182
        case *lnwire.ChannelUpdate1:
10✔
2183
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
10✔
2184

10✔
2185
                // If the channel cannot be found, it is most likely a leftover
10✔
2186
                // message for a channel that was closed, so we can consider it
10✔
2187
                // stale.
10✔
2188
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
10✔
2189
                        return true
×
2190
                }
×
2191
                if err != nil {
10✔
2192
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
2193
                                "%v", msg.ShortChannelID, err)
×
2194
                        return false
×
2195
                }
×
2196

2197
                // Otherwise, we'll retrieve the correct policy that we
2198
                // currently have stored within our graph to check if this
2199
                // message is stale by comparing its timestamp.
2200
                var p *models.ChannelEdgePolicy
10✔
2201
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
20✔
2202
                        p = p1
10✔
2203
                } else {
10✔
2204
                        p = p2
×
2205
                }
×
2206

2207
                // If the policy is still unknown, then we can consider this
2208
                // policy fresh.
2209
                if p == nil {
10✔
2210
                        return false
×
2211
                }
×
2212

2213
                timestamp := time.Unix(int64(msg.Timestamp), 0)
10✔
2214
                return p.LastUpdate.After(timestamp)
10✔
2215

2216
        default:
×
2217
                // We'll make sure to not mark any unsupported messages as stale
×
2218
                // to ensure they are not removed.
×
2219
                return false
×
2220
        }
2221
}
2222

2223
// updateChannel creates a new fully signed update for the channel, and updates
2224
// the underlying graph with the new state.
2225
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
2226
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2227
        *lnwire.ChannelUpdate1, error) {
4✔
2228

4✔
2229
        // Parse the unsigned edge into a channel update.
4✔
2230
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
4✔
2231

4✔
2232
        // We'll generate a new signature over a digest of the channel
4✔
2233
        // announcement itself and update the timestamp to ensure it propagate.
4✔
2234
        err := netann.SignChannelUpdate(
4✔
2235
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
4✔
2236
                netann.ChanUpdSetTimestamp,
4✔
2237
        )
4✔
2238
        if err != nil {
4✔
2239
                return nil, nil, err
×
2240
        }
×
2241

2242
        // Next, we'll set the new signature in place, and update the reference
2243
        // in the backing slice.
2244
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
4✔
2245
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
4✔
2246

4✔
2247
        // To ensure that our signature is valid, we'll verify it ourself
4✔
2248
        // before committing it to the slice returned.
4✔
2249
        err = netann.ValidateChannelUpdateAnn(
4✔
2250
                d.selfKey, info.Capacity, chanUpdate,
4✔
2251
        )
4✔
2252
        if err != nil {
4✔
2253
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2254
                        "update sig: %v", err)
×
2255
        }
×
2256

2257
        // Finally, we'll write the new edge policy to disk.
2258
        if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
4✔
2259
                return nil, nil, err
×
2260
        }
×
2261

2262
        // We'll also create the original channel announcement so the two can
2263
        // be broadcast along side each other (if necessary), but only if we
2264
        // have a full channel announcement for this channel.
2265
        var chanAnn *lnwire.ChannelAnnouncement1
4✔
2266
        if info.AuthProof != nil {
7✔
2267
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
3✔
2268
                chanAnn = &lnwire.ChannelAnnouncement1{
3✔
2269
                        ShortChannelID:  chanID,
3✔
2270
                        NodeID1:         info.NodeKey1Bytes,
3✔
2271
                        NodeID2:         info.NodeKey2Bytes,
3✔
2272
                        ChainHash:       info.ChainHash,
3✔
2273
                        BitcoinKey1:     info.BitcoinKey1Bytes,
3✔
2274
                        Features:        lnwire.NewRawFeatureVector(),
3✔
2275
                        BitcoinKey2:     info.BitcoinKey2Bytes,
3✔
2276
                        ExtraOpaqueData: info.ExtraOpaqueData,
3✔
2277
                }
3✔
2278
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2279
                        info.AuthProof.NodeSig1Bytes,
3✔
2280
                )
3✔
2281
                if err != nil {
3✔
2282
                        return nil, nil, err
×
2283
                }
×
2284
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2285
                        info.AuthProof.NodeSig2Bytes,
3✔
2286
                )
3✔
2287
                if err != nil {
3✔
2288
                        return nil, nil, err
×
2289
                }
×
2290
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
3✔
2291
                        info.AuthProof.BitcoinSig1Bytes,
3✔
2292
                )
3✔
2293
                if err != nil {
3✔
2294
                        return nil, nil, err
×
2295
                }
×
2296
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
3✔
2297
                        info.AuthProof.BitcoinSig2Bytes,
3✔
2298
                )
3✔
2299
                if err != nil {
3✔
2300
                        return nil, nil, err
×
2301
                }
×
2302
        }
2303

2304
        return chanAnn, chanUpdate, err
4✔
2305
}
2306

2307
// SyncManager returns the gossiper's SyncManager instance.
2308
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
×
2309
        return d.syncMgr
×
2310
}
×
2311

2312
// IsKeepAliveUpdate determines whether this channel update is considered a
2313
// keep-alive update based on the previous channel update processed for the same
2314
// direction.
2315
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
2316
        prev *models.ChannelEdgePolicy) bool {
14✔
2317

14✔
2318
        // Both updates should be from the same direction.
14✔
2319
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
14✔
2320
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
14✔
2321

×
2322
                return false
×
2323
        }
×
2324

2325
        // The timestamp should always increase for a keep-alive update.
2326
        timestamp := time.Unix(int64(update.Timestamp), 0)
14✔
2327
        if !timestamp.After(prev.LastUpdate) {
14✔
2328
                return false
×
2329
        }
×
2330

2331
        // None of the remaining fields should change for a keep-alive update.
2332
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
14✔
2333
                return false
×
2334
        }
×
2335
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
26✔
2336
                return false
12✔
2337
        }
12✔
2338
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
2✔
2339
                return false
×
2340
        }
×
2341
        if update.TimeLockDelta != prev.TimeLockDelta {
2✔
2342
                return false
×
2343
        }
×
2344
        if update.HtlcMinimumMsat != prev.MinHTLC {
2✔
2345
                return false
×
2346
        }
×
2347
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
2✔
2348
                return false
×
2349
        }
×
2350
        if update.HtlcMaximumMsat != prev.MaxHTLC {
2✔
2351
                return false
×
2352
        }
×
2353
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
2✔
2354
                return false
×
2355
        }
×
2356
        return true
2✔
2357
}
2358

2359
// latestHeight returns the gossiper's latest height known of the chain.
2360
func (d *AuthenticatedGossiper) latestHeight() uint32 {
×
2361
        d.Lock()
×
2362
        defer d.Unlock()
×
2363
        return d.bestHeight
×
2364
}
×
2365

2366
// handleNodeAnnouncement processes a new node announcement.
2367
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
2368
        nodeAnn *lnwire.NodeAnnouncement,
2369
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
24✔
2370

24✔
2371
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
24✔
2372

24✔
2373
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
24✔
2374
                "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
24✔
2375

24✔
2376
        // We'll quickly ask the router if it already has a newer update for
24✔
2377
        // this node so we can skip validating signatures if not required.
24✔
2378
        if d.cfg.Graph.IsStaleNode(nodeAnn.NodeID, timestamp) {
32✔
2379
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
8✔
2380
                nMsg.err <- nil
8✔
2381
                return nil, true
8✔
2382
        }
8✔
2383

2384
        if err := d.addNode(nodeAnn, ops...); err != nil {
16✔
2385
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
×
2386
                        err)
×
2387

×
2388
                if !graph.IsError(
×
2389
                        err,
×
2390
                        graph.ErrOutdated,
×
2391
                        graph.ErrIgnored,
×
2392
                        graph.ErrVBarrierShuttingDown,
×
2393
                ) {
×
2394

×
2395
                        log.Error(err)
×
2396
                }
×
2397

2398
                nMsg.err <- err
×
2399
                return nil, false
×
2400
        }
2401

2402
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2403
        // quick check to ensure this node intends to publicly advertise itself
2404
        // to the network.
2405
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
16✔
2406
        if err != nil {
16✔
2407
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2408
                        nodeAnn.NodeID, err)
×
2409
                nMsg.err <- err
×
2410
                return nil, false
×
2411
        }
×
2412

2413
        var announcements []networkMsg
16✔
2414

16✔
2415
        // If it does, we'll add their announcement to our batch so that it can
16✔
2416
        // be broadcast to the rest of our peers.
16✔
2417
        if isPublic {
19✔
2418
                announcements = append(announcements, networkMsg{
3✔
2419
                        peer:     nMsg.peer,
3✔
2420
                        isRemote: nMsg.isRemote,
3✔
2421
                        source:   nMsg.source,
3✔
2422
                        msg:      nodeAnn,
3✔
2423
                })
3✔
2424
        } else {
16✔
2425
                log.Tracef("Skipping broadcasting node announcement for %x "+
13✔
2426
                        "due to being unadvertised", nodeAnn.NodeID)
13✔
2427
        }
13✔
2428

2429
        nMsg.err <- nil
16✔
2430
        // TODO(roasbeef): get rid of the above
16✔
2431

16✔
2432
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
16✔
2433
                "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
16✔
2434

16✔
2435
        return announcements, true
16✔
2436
}
2437

2438
// handleChanAnnouncement processes a new channel announcement.
2439
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
2440
        ann *lnwire.ChannelAnnouncement1,
2441
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
230✔
2442

230✔
2443
        scid := ann.ShortChannelID
230✔
2444

230✔
2445
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
230✔
2446
                nMsg.peer, scid.ToUint64())
230✔
2447

230✔
2448
        // We'll ignore any channel announcements that target any chain other
230✔
2449
        // than the set of chains we know of.
230✔
2450
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
230✔
2451
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
×
2452
                        ", gossiper on chain=%v", ann.ChainHash,
×
2453
                        d.cfg.ChainHash)
×
2454
                log.Errorf(err.Error())
×
2455

×
2456
                key := newRejectCacheKey(
×
2457
                        scid.ToUint64(),
×
2458
                        sourceToPub(nMsg.source),
×
2459
                )
×
2460
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2461

×
2462
                nMsg.err <- err
×
2463
                return nil, false
×
2464
        }
×
2465

2466
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2467
        // reject the announcement. Since the router accepts alias SCIDs,
2468
        // not erroring out would be a DoS vector.
2469
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
230✔
2470
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2471
                log.Errorf(err.Error())
×
2472

×
2473
                key := newRejectCacheKey(
×
2474
                        scid.ToUint64(),
×
2475
                        sourceToPub(nMsg.source),
×
2476
                )
×
2477
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2478

×
2479
                nMsg.err <- err
×
2480
                return nil, false
×
2481
        }
×
2482

2483
        // If the advertised inclusionary block is beyond our knowledge of the
2484
        // chain tip, then we'll ignore it for now.
2485
        d.Lock()
230✔
2486
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
231✔
2487
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
1✔
2488
                        "advertises height %v, only height %v is known",
1✔
2489
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
1✔
2490
                d.Unlock()
1✔
2491
                nMsg.err <- nil
1✔
2492
                return nil, false
1✔
2493
        }
1✔
2494
        d.Unlock()
229✔
2495

229✔
2496
        // At this point, we'll now ask the router if this is a zombie/known
229✔
2497
        // edge. If so we can skip all the processing below.
229✔
2498
        if d.cfg.Graph.IsKnownEdge(scid) {
230✔
2499
                nMsg.err <- nil
1✔
2500
                return nil, true
1✔
2501
        }
1✔
2502

2503
        // Check if the channel is already closed in which case we can ignore
2504
        // it.
2505
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
228✔
2506
        if err != nil {
228✔
2507
                log.Errorf("failed to check if scid %v is closed: %v", scid,
×
2508
                        err)
×
2509
                nMsg.err <- err
×
2510

×
2511
                return nil, false
×
2512
        }
×
2513

2514
        if closed {
229✔
2515
                err = fmt.Errorf("ignoring closed channel %v", scid)
1✔
2516
                log.Error(err)
1✔
2517

1✔
2518
                // If this is an announcement from us, we'll just ignore it.
1✔
2519
                if !nMsg.isRemote {
1✔
2520
                        nMsg.err <- err
×
2521
                        return nil, false
×
2522
                }
×
2523

2524
                // Increment the peer's ban score if they are sending closed
2525
                // channel announcements.
2526
                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2527

1✔
2528
                // If the peer is banned and not a channel peer, we'll
1✔
2529
                // disconnect them.
1✔
2530
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2531
                if dcErr != nil {
1✔
2532
                        log.Errorf("failed to check if we should disconnect "+
×
2533
                                "peer: %v", dcErr)
×
2534
                        nMsg.err <- dcErr
×
2535

×
2536
                        return nil, false
×
2537
                }
×
2538

2539
                if shouldDc {
1✔
2540
                        nMsg.peer.Disconnect(ErrPeerBanned)
×
2541
                }
×
2542

2543
                nMsg.err <- err
1✔
2544

1✔
2545
                return nil, false
1✔
2546
        }
2547

2548
        // If this is a remote channel announcement, then we'll validate all
2549
        // the signatures within the proof as it should be well formed.
2550
        var proof *models.ChannelAuthProof
227✔
2551
        if nMsg.isRemote {
440✔
2552
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
213✔
2553
                if err != nil {
213✔
2554
                        err := fmt.Errorf("unable to validate announcement: "+
×
2555
                                "%v", err)
×
2556

×
2557
                        key := newRejectCacheKey(
×
2558
                                scid.ToUint64(),
×
2559
                                sourceToPub(nMsg.source),
×
2560
                        )
×
2561
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2562

×
2563
                        log.Error(err)
×
2564
                        nMsg.err <- err
×
2565
                        return nil, false
×
2566
                }
×
2567

2568
                // If the proof checks out, then we'll save the proof itself to
2569
                // the database so we can fetch it later when gossiping with
2570
                // other nodes.
2571
                proof = &models.ChannelAuthProof{
213✔
2572
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
213✔
2573
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
213✔
2574
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
213✔
2575
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
213✔
2576
                }
213✔
2577
        }
2578

2579
        // With the proof validated (if necessary), we can now store it within
2580
        // the database for our path finding and syncing needs.
2581
        var featureBuf bytes.Buffer
227✔
2582
        if err := ann.Features.Encode(&featureBuf); err != nil {
227✔
2583
                log.Errorf("unable to encode features: %v", err)
×
2584
                nMsg.err <- err
×
2585
                return nil, false
×
2586
        }
×
2587

2588
        edge := &models.ChannelEdgeInfo{
227✔
2589
                ChannelID:        scid.ToUint64(),
227✔
2590
                ChainHash:        ann.ChainHash,
227✔
2591
                NodeKey1Bytes:    ann.NodeID1,
227✔
2592
                NodeKey2Bytes:    ann.NodeID2,
227✔
2593
                BitcoinKey1Bytes: ann.BitcoinKey1,
227✔
2594
                BitcoinKey2Bytes: ann.BitcoinKey2,
227✔
2595
                AuthProof:        proof,
227✔
2596
                Features:         featureBuf.Bytes(),
227✔
2597
                ExtraOpaqueData:  ann.ExtraOpaqueData,
227✔
2598
        }
227✔
2599

227✔
2600
        // If there were any optional message fields provided, we'll include
227✔
2601
        // them in its serialized disk representation now.
227✔
2602
        if nMsg.optionalMsgFields != nil {
241✔
2603
                if nMsg.optionalMsgFields.capacity != nil {
15✔
2604
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
1✔
2605
                }
1✔
2606
                if nMsg.optionalMsgFields.channelPoint != nil {
18✔
2607
                        cp := *nMsg.optionalMsgFields.channelPoint
4✔
2608
                        edge.ChannelPoint = cp
4✔
2609
                }
4✔
2610

2611
                // Optional tapscript root for custom channels.
2612
                edge.TapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
14✔
2613
        }
2614

2615
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
227✔
2616

227✔
2617
        // We will add the edge to the channel router. If the nodes present in
227✔
2618
        // this channel are not present in the database, a partial node will be
227✔
2619
        // added to represent each node while we wait for a node announcement.
227✔
2620
        //
227✔
2621
        // Before we add the edge to the database, we obtain the mutex for this
227✔
2622
        // channel ID. We do this to ensure no other goroutine has read the
227✔
2623
        // database and is now making decisions based on this DB state, before
227✔
2624
        // it writes to the DB.
227✔
2625
        d.channelMtx.Lock(scid.ToUint64())
227✔
2626
        err = d.cfg.Graph.AddEdge(edge, ops...)
227✔
2627
        if err != nil {
429✔
2628
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
202✔
2629
                        scid.ToUint64(), err)
202✔
2630

202✔
2631
                defer d.channelMtx.Unlock(scid.ToUint64())
202✔
2632

202✔
2633
                // If the edge was rejected due to already being known, then it
202✔
2634
                // may be the case that this new message has a fresh channel
202✔
2635
                // proof, so we'll check.
202✔
2636
                switch {
202✔
2637
                case graph.IsError(err, graph.ErrIgnored):
×
2638
                        // Attempt to process the rejected message to see if we
×
2639
                        // get any new announcements.
×
2640
                        anns, rErr := d.processRejectedEdge(ann, proof)
×
2641
                        if rErr != nil {
×
2642
                                key := newRejectCacheKey(
×
2643
                                        scid.ToUint64(),
×
2644
                                        sourceToPub(nMsg.source),
×
2645
                                )
×
2646
                                cr := &cachedReject{}
×
2647
                                _, _ = d.recentRejects.Put(key, cr)
×
2648

×
2649
                                nMsg.err <- rErr
×
2650
                                return nil, false
×
2651
                        }
×
2652

2653
                        log.Debugf("Extracted %v announcements from rejected "+
×
2654
                                "msgs", len(anns))
×
2655

×
2656
                        // If while processing this rejected edge, we realized
×
2657
                        // there's a set of announcements we could extract,
×
2658
                        // then we'll return those directly.
×
2659
                        //
×
2660
                        // NOTE: since this is an ErrIgnored, we can return
×
2661
                        // true here to signal "allow" to its dependants.
×
2662
                        nMsg.err <- nil
×
2663

×
2664
                        return anns, true
×
2665

2666
                case graph.IsError(
2667
                        err, graph.ErrNoFundingTransaction,
2668
                        graph.ErrInvalidFundingOutput,
2669
                ):
200✔
2670
                        key := newRejectCacheKey(
200✔
2671
                                scid.ToUint64(),
200✔
2672
                                sourceToPub(nMsg.source),
200✔
2673
                        )
200✔
2674
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
200✔
2675

200✔
2676
                        // Increment the peer's ban score. We check isRemote
200✔
2677
                        // so we don't actually ban the peer in case of a local
200✔
2678
                        // bug.
200✔
2679
                        if nMsg.isRemote {
400✔
2680
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
200✔
2681
                        }
200✔
2682

2683
                case graph.IsError(err, graph.ErrChannelSpent):
1✔
2684
                        key := newRejectCacheKey(
1✔
2685
                                scid.ToUint64(),
1✔
2686
                                sourceToPub(nMsg.source),
1✔
2687
                        )
1✔
2688
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2689

1✔
2690
                        // Since this channel has already been closed, we'll
1✔
2691
                        // add it to the graph's closed channel index such that
1✔
2692
                        // we won't attempt to do expensive validation checks
1✔
2693
                        // on it again.
1✔
2694
                        // TODO: Populate the ScidCloser by using closed
1✔
2695
                        // channel notifications.
1✔
2696
                        dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
1✔
2697
                        if dbErr != nil {
1✔
2698
                                log.Errorf("failed to mark scid(%v) as "+
×
2699
                                        "closed: %v", scid, dbErr)
×
2700

×
2701
                                nMsg.err <- dbErr
×
2702

×
2703
                                return nil, false
×
2704
                        }
×
2705

2706
                        // Increment the peer's ban score. We check isRemote
2707
                        // so we don't accidentally ban ourselves in case of a
2708
                        // bug.
2709
                        if nMsg.isRemote {
2✔
2710
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2711
                        }
1✔
2712

2713
                default:
1✔
2714
                        // Otherwise, this is just a regular rejected edge.
1✔
2715
                        key := newRejectCacheKey(
1✔
2716
                                scid.ToUint64(),
1✔
2717
                                sourceToPub(nMsg.source),
1✔
2718
                        )
1✔
2719
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2720
                }
2721

2722
                if !nMsg.isRemote {
202✔
2723
                        log.Errorf("failed to add edge for local channel: %v",
×
2724
                                err)
×
2725
                        nMsg.err <- err
×
2726

×
2727
                        return nil, false
×
2728
                }
×
2729

2730
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
202✔
2731
                if dcErr != nil {
202✔
2732
                        log.Errorf("failed to check if we should disconnect "+
×
2733
                                "peer: %v", dcErr)
×
2734
                        nMsg.err <- dcErr
×
2735

×
2736
                        return nil, false
×
2737
                }
×
2738

2739
                if shouldDc {
203✔
2740
                        nMsg.peer.Disconnect(ErrPeerBanned)
1✔
2741
                }
1✔
2742

2743
                nMsg.err <- err
202✔
2744

202✔
2745
                return nil, false
202✔
2746
        }
2747

2748
        // If err is nil, release the lock immediately.
2749
        d.channelMtx.Unlock(scid.ToUint64())
25✔
2750

25✔
2751
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
25✔
2752

25✔
2753
        // If we earlier received any ChannelUpdates for this channel, we can
25✔
2754
        // now process them, as the channel is added to the graph.
25✔
2755
        var channelUpdates []*processedNetworkMsg
25✔
2756

25✔
2757
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
25✔
2758
        if err == nil {
27✔
2759
                // There was actually an entry in the map, so we'll accumulate
2✔
2760
                // it. We don't worry about deletion, since it'll eventually
2✔
2761
                // fall out anyway.
2✔
2762
                chanMsgs := earlyChanUpdates
2✔
2763
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
2✔
2764
        }
2✔
2765

2766
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2767
        // ensure we don't block here, as we can handle only one announcement
2768
        // at a time.
2769
        for _, cu := range channelUpdates {
27✔
2770
                // Skip if already processed.
2✔
2771
                if cu.processed {
2✔
2772
                        continue
×
2773
                }
2774

2775
                // Mark the ChannelUpdate as processed. This ensures that a
2776
                // subsequent announcement in the option-scid-alias case does
2777
                // not re-use an old ChannelUpdate.
2778
                cu.processed = true
2✔
2779

2✔
2780
                d.wg.Add(1)
2✔
2781
                go func(updMsg *networkMsg) {
4✔
2782
                        defer d.wg.Done()
2✔
2783

2✔
2784
                        switch msg := updMsg.msg.(type) {
2✔
2785
                        // Reprocess the message, making sure we return an
2786
                        // error to the original caller in case the gossiper
2787
                        // shuts down.
2788
                        case *lnwire.ChannelUpdate1:
2✔
2789
                                log.Debugf("Reprocessing ChannelUpdate for "+
2✔
2790
                                        "shortChanID=%v", scid.ToUint64())
2✔
2791

2✔
2792
                                select {
2✔
2793
                                case d.networkMsgs <- updMsg:
2✔
2794
                                case <-d.quit:
×
2795
                                        updMsg.err <- ErrGossiperShuttingDown
×
2796
                                }
2797

2798
                        // We don't expect any other message type than
2799
                        // ChannelUpdate to be in this cache.
2800
                        default:
×
2801
                                log.Errorf("Unsupported message type found "+
×
2802
                                        "among ChannelUpdates: %T", msg)
×
2803
                        }
2804
                }(cu.msg)
2805
        }
2806

2807
        // Channel announcement was successfully processed and now it might be
2808
        // broadcast to other connected nodes if it was an announcement with
2809
        // proof (remote).
2810
        var announcements []networkMsg
25✔
2811

25✔
2812
        if proof != nil {
36✔
2813
                announcements = append(announcements, networkMsg{
11✔
2814
                        peer:     nMsg.peer,
11✔
2815
                        isRemote: nMsg.isRemote,
11✔
2816
                        source:   nMsg.source,
11✔
2817
                        msg:      ann,
11✔
2818
                })
11✔
2819
        }
11✔
2820

2821
        nMsg.err <- nil
25✔
2822

25✔
2823
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
25✔
2824
                nMsg.peer, scid.ToUint64())
25✔
2825

25✔
2826
        return announcements, true
25✔
2827
}
2828

2829
// handleChanUpdate processes a new channel update.
2830
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
2831
        upd *lnwire.ChannelUpdate1,
2832
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
55✔
2833

55✔
2834
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
55✔
2835
                nMsg.peer, upd.ShortChannelID.ToUint64())
55✔
2836

55✔
2837
        // We'll ignore any channel updates that target any chain other than
55✔
2838
        // the set of chains we know of.
55✔
2839
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
55✔
2840
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2841
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
×
2842
                log.Errorf(err.Error())
×
2843

×
2844
                key := newRejectCacheKey(
×
2845
                        upd.ShortChannelID.ToUint64(),
×
2846
                        sourceToPub(nMsg.source),
×
2847
                )
×
2848
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
2849

×
2850
                nMsg.err <- err
×
2851
                return nil, false
×
2852
        }
×
2853

2854
        blockHeight := upd.ShortChannelID.BlockHeight
55✔
2855
        shortChanID := upd.ShortChannelID.ToUint64()
55✔
2856

55✔
2857
        // If the advertised inclusionary block is beyond our knowledge of the
55✔
2858
        // chain tip, then we'll put the announcement in limbo to be fully
55✔
2859
        // verified once we advance forward in the chain. If the update has an
55✔
2860
        // alias SCID, we'll skip the isPremature check. This is necessary
55✔
2861
        // since aliases start at block height 16_000_000.
55✔
2862
        d.Lock()
55✔
2863
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
55✔
2864
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
55✔
2865

×
2866
                log.Warnf("Update announcement for short_chan_id(%v), is "+
×
2867
                        "premature: advertises height %v, only height %v is "+
×
2868
                        "known", shortChanID, blockHeight, d.bestHeight)
×
2869
                d.Unlock()
×
2870
                nMsg.err <- nil
×
2871
                return nil, false
×
2872
        }
×
2873
        d.Unlock()
55✔
2874

55✔
2875
        // Before we perform any of the expensive checks below, we'll check
55✔
2876
        // whether this update is stale or is for a zombie channel in order to
55✔
2877
        // quickly reject it.
55✔
2878
        timestamp := time.Unix(int64(upd.Timestamp), 0)
55✔
2879

55✔
2880
        // Fetch the SCID we should be using to lock the channelMtx and make
55✔
2881
        // graph queries with.
55✔
2882
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
55✔
2883
        if err != nil {
110✔
2884
                // Fallback and set the graphScid to the peer-provided SCID.
55✔
2885
                // This will occur for non-option-scid-alias channels and for
55✔
2886
                // public option-scid-alias channels after 6 confirmations.
55✔
2887
                // Once public option-scid-alias channels have 6 confs, we'll
55✔
2888
                // ignore ChannelUpdates with one of their aliases.
55✔
2889
                graphScid = upd.ShortChannelID
55✔
2890
        }
55✔
2891

2892
        if d.cfg.Graph.IsStaleEdgePolicy(
55✔
2893
                graphScid, timestamp, upd.ChannelFlags,
55✔
2894
        ) {
57✔
2895

2✔
2896
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
2✔
2897
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
2✔
2898
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
2✔
2899
                )
2✔
2900

2✔
2901
                nMsg.err <- nil
2✔
2902
                return nil, true
2✔
2903
        }
2✔
2904

2905
        // Check that the ChanUpdate is not too far into the future, this could
2906
        // reveal some faulty implementation therefore we log an error.
2907
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
53✔
2908
                log.Errorf("Skewed timestamp (%v) for edge policy of "+
×
2909
                        "short_chan_id(%v), timestamp too far in the future: "+
×
2910
                        "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
×
2911
                        shortChanID, nMsg.peer, nMsg.msg.MsgType(),
×
2912
                        nMsg.isRemote,
×
2913
                )
×
2914

×
2915
                nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
×
2916
                        "timestamp too far in the future: %v", timestamp.Unix())
×
2917

×
2918
                return nil, false
×
2919
        }
×
2920

2921
        // Get the node pub key as far since we don't have it in the channel
2922
        // update announcement message. We'll need this to properly verify the
2923
        // message's signature.
2924
        //
2925
        // We make sure to obtain the mutex for this channel ID before we
2926
        // access the database. This ensures the state we read from the
2927
        // database has not changed between this point and when we call
2928
        // UpdateEdge() later.
2929
        d.channelMtx.Lock(graphScid.ToUint64())
53✔
2930
        defer d.channelMtx.Unlock(graphScid.ToUint64())
53✔
2931

53✔
2932
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
53✔
2933
        switch {
53✔
2934
        // No error, break.
2935
        case err == nil:
49✔
2936
                break
49✔
2937

2938
        case errors.Is(err, graphdb.ErrZombieEdge):
3✔
2939
                err = d.processZombieUpdate(chanInfo, graphScid, upd)
3✔
2940
                if err != nil {
5✔
2941
                        log.Debug(err)
2✔
2942
                        nMsg.err <- err
2✔
2943
                        return nil, false
2✔
2944
                }
2✔
2945

2946
                // We'll fallthrough to ensure we stash the update until we
2947
                // receive its corresponding ChannelAnnouncement. This is
2948
                // needed to ensure the edge exists in the graph before
2949
                // applying the update.
2950
                fallthrough
1✔
2951
        case errors.Is(err, graphdb.ErrGraphNotFound):
1✔
2952
                fallthrough
1✔
2953
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
1✔
2954
                fallthrough
1✔
2955
        case errors.Is(err, graphdb.ErrEdgeNotFound):
2✔
2956
                // If the edge corresponding to this ChannelUpdate was not
2✔
2957
                // found in the graph, this might be a channel in the process
2✔
2958
                // of being opened, and we haven't processed our own
2✔
2959
                // ChannelAnnouncement yet, hence it is not not found in the
2✔
2960
                // graph. This usually gets resolved after the channel proofs
2✔
2961
                // are exchanged and the channel is broadcasted to the rest of
2✔
2962
                // the network, but in case this is a private channel this
2✔
2963
                // won't ever happen. This can also happen in the case of a
2✔
2964
                // zombie channel with a fresh update for which we don't have a
2✔
2965
                // ChannelAnnouncement for since we reject them. Because of
2✔
2966
                // this, we temporarily add it to a map, and reprocess it after
2✔
2967
                // our own ChannelAnnouncement has been processed.
2✔
2968
                //
2✔
2969
                // The shortChanID may be an alias, but it is fine to use here
2✔
2970
                // since we don't have an edge in the graph and if the peer is
2✔
2971
                // not buggy, we should be able to use it once the gossiper
2✔
2972
                // receives the local announcement.
2✔
2973
                pMsg := &processedNetworkMsg{msg: nMsg}
2✔
2974

2✔
2975
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
2✔
2976
                switch {
2✔
2977
                // Nothing in the cache yet, we can just directly insert this
2978
                // element.
2979
                case err == cache.ErrElementNotFound:
2✔
2980
                        _, _ = d.prematureChannelUpdates.Put(
2✔
2981
                                shortChanID, &cachedNetworkMsg{
2✔
2982
                                        msgs: []*processedNetworkMsg{pMsg},
2✔
2983
                                })
2✔
2984

2985
                // There's already something in the cache, so we'll combine the
2986
                // set of messages into a single value.
2987
                default:
×
2988
                        msgs := earlyMsgs.msgs
×
2989
                        msgs = append(msgs, pMsg)
×
2990
                        _, _ = d.prematureChannelUpdates.Put(
×
2991
                                shortChanID, &cachedNetworkMsg{
×
2992
                                        msgs: msgs,
×
2993
                                })
×
2994
                }
2995

2996
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
2✔
2997
                        "(shortChanID=%v), saving for reprocessing later",
2✔
2998
                        shortChanID)
2✔
2999

2✔
3000
                // NOTE: We don't return anything on the error channel for this
2✔
3001
                // message, as we expect that will be done when this
2✔
3002
                // ChannelUpdate is later reprocessed.
2✔
3003
                return nil, false
2✔
3004

3005
        default:
×
3006
                err := fmt.Errorf("unable to validate channel update "+
×
3007
                        "short_chan_id=%v: %v", shortChanID, err)
×
3008
                log.Error(err)
×
3009
                nMsg.err <- err
×
3010

×
3011
                key := newRejectCacheKey(
×
3012
                        upd.ShortChannelID.ToUint64(),
×
3013
                        sourceToPub(nMsg.source),
×
3014
                )
×
3015
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3016

×
3017
                return nil, false
×
3018
        }
3019

3020
        // The least-significant bit in the flag on the channel update
3021
        // announcement tells us "which" side of the channels directed edge is
3022
        // being updated.
3023
        var (
49✔
3024
                pubKey       *btcec.PublicKey
49✔
3025
                edgeToUpdate *models.ChannelEdgePolicy
49✔
3026
        )
49✔
3027
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
49✔
3028
        switch direction {
49✔
3029
        case 0:
34✔
3030
                pubKey, _ = chanInfo.NodeKey1()
34✔
3031
                edgeToUpdate = e1
34✔
3032
        case 1:
15✔
3033
                pubKey, _ = chanInfo.NodeKey2()
15✔
3034
                edgeToUpdate = e2
15✔
3035
        }
3036

3037
        log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+
49✔
3038
                "edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(),
49✔
3039
                edgeToUpdate != nil)
49✔
3040

49✔
3041
        // Validate the channel announcement with the expected public key and
49✔
3042
        // channel capacity. In the case of an invalid channel update, we'll
49✔
3043
        // return an error to the caller and exit early.
49✔
3044
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
49✔
3045
        if err != nil {
53✔
3046
                rErr := fmt.Errorf("unable to validate channel update "+
4✔
3047
                        "announcement for short_chan_id=%v: %v",
4✔
3048
                        spew.Sdump(upd.ShortChannelID), err)
4✔
3049

4✔
3050
                log.Error(rErr)
4✔
3051
                nMsg.err <- rErr
4✔
3052
                return nil, false
4✔
3053
        }
4✔
3054

3055
        // If we have a previous version of the edge being updated, we'll want
3056
        // to rate limit its updates to prevent spam throughout the network.
3057
        if nMsg.isRemote && edgeToUpdate != nil {
59✔
3058
                // If it's a keep-alive update, we'll only propagate one if
14✔
3059
                // it's been a day since the previous. This follows our own
14✔
3060
                // heuristic of sending keep-alive updates after the same
14✔
3061
                // duration (see retransmitStaleAnns).
14✔
3062
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
14✔
3063
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
16✔
3064
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
3✔
3065
                                log.Debugf("Ignoring keep alive update not "+
1✔
3066
                                        "within %v period for channel %v",
1✔
3067
                                        d.cfg.RebroadcastInterval, shortChanID)
1✔
3068
                                nMsg.err <- nil
1✔
3069
                                return nil, false
1✔
3070
                        }
1✔
3071
                } else {
12✔
3072
                        // If it's not, we'll allow an update per minute with a
12✔
3073
                        // maximum burst of 10. If we haven't seen an update
12✔
3074
                        // for this channel before, we'll need to initialize a
12✔
3075
                        // rate limiter for each direction.
12✔
3076
                        //
12✔
3077
                        // Since the edge exists in the graph, we'll create a
12✔
3078
                        // rate limiter for chanInfo.ChannelID rather then the
12✔
3079
                        // SCID the peer sent. This is because there may be
12✔
3080
                        // multiple aliases for a channel and we may otherwise
12✔
3081
                        // rate-limit only a single alias of the channel,
12✔
3082
                        // instead of the whole channel.
12✔
3083
                        baseScid := chanInfo.ChannelID
12✔
3084
                        d.Lock()
12✔
3085
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
12✔
3086
                        if !ok {
13✔
3087
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
1✔
3088
                                b := d.cfg.MaxChannelUpdateBurst
1✔
3089
                                rls = [2]*rate.Limiter{
1✔
3090
                                        rate.NewLimiter(r, b),
1✔
3091
                                        rate.NewLimiter(r, b),
1✔
3092
                                }
1✔
3093
                                d.chanUpdateRateLimiter[baseScid] = rls
1✔
3094
                        }
1✔
3095
                        d.Unlock()
12✔
3096

12✔
3097
                        if !rls[direction].Allow() {
17✔
3098
                                log.Debugf("Rate limiting update for channel "+
5✔
3099
                                        "%v from direction %x", shortChanID,
5✔
3100
                                        pubKey.SerializeCompressed())
5✔
3101
                                nMsg.err <- nil
5✔
3102
                                return nil, false
5✔
3103
                        }
5✔
3104
                }
3105
        }
3106

3107
        // We'll use chanInfo.ChannelID rather than the peer-supplied
3108
        // ShortChannelID in the ChannelUpdate to avoid the router having to
3109
        // lookup the stored SCID. If we're sending the update, we'll always
3110
        // use the SCID stored in the database rather than a potentially
3111
        // different alias. This might mean that SigBytes is incorrect as it
3112
        // signs a different SCID than the database SCID, but since there will
3113
        // only be a difference if AuthProof == nil, this is fine.
3114
        update := &models.ChannelEdgePolicy{
39✔
3115
                SigBytes:                  upd.Signature.ToSignatureBytes(),
39✔
3116
                ChannelID:                 chanInfo.ChannelID,
39✔
3117
                LastUpdate:                timestamp,
39✔
3118
                MessageFlags:              upd.MessageFlags,
39✔
3119
                ChannelFlags:              upd.ChannelFlags,
39✔
3120
                TimeLockDelta:             upd.TimeLockDelta,
39✔
3121
                MinHTLC:                   upd.HtlcMinimumMsat,
39✔
3122
                MaxHTLC:                   upd.HtlcMaximumMsat,
39✔
3123
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
39✔
3124
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
39✔
3125
                ExtraOpaqueData:           upd.ExtraOpaqueData,
39✔
3126
        }
39✔
3127

39✔
3128
        if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
39✔
3129
                if graph.IsError(
×
3130
                        err, graph.ErrOutdated,
×
3131
                        graph.ErrIgnored,
×
3132
                        graph.ErrVBarrierShuttingDown,
×
3133
                ) {
×
3134

×
3135
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
×
3136
                                shortChanID, err)
×
3137
                } else {
×
3138
                        // Since we know the stored SCID in the graph, we'll
×
3139
                        // cache that SCID.
×
3140
                        key := newRejectCacheKey(
×
3141
                                chanInfo.ChannelID,
×
3142
                                sourceToPub(nMsg.source),
×
3143
                        )
×
3144
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
3145

×
3146
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
3147
                                shortChanID, err)
×
3148
                }
×
3149

3150
                nMsg.err <- err
×
3151
                return nil, false
×
3152
        }
3153

3154
        // If this is a local ChannelUpdate without an AuthProof, it means it
3155
        // is an update to a channel that is not (yet) supposed to be announced
3156
        // to the greater network. However, our channel counter party will need
3157
        // to be given the update, so we'll try sending the update directly to
3158
        // the remote peer.
3159
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
50✔
3160
                if nMsg.optionalMsgFields != nil {
22✔
3161
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
11✔
3162
                        if remoteAlias != nil {
11✔
3163
                                // The remoteAlias field was specified, meaning
×
3164
                                // that we should replace the SCID in the
×
3165
                                // update with the remote's alias. We'll also
×
3166
                                // need to re-sign the channel update. This is
×
3167
                                // required for option-scid-alias feature-bit
×
3168
                                // negotiated channels.
×
3169
                                upd.ShortChannelID = *remoteAlias
×
3170

×
3171
                                sig, err := d.cfg.SignAliasUpdate(upd)
×
3172
                                if err != nil {
×
3173
                                        log.Error(err)
×
3174
                                        nMsg.err <- err
×
3175
                                        return nil, false
×
3176
                                }
×
3177

3178
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
3179
                                if err != nil {
×
3180
                                        log.Error(err)
×
3181
                                        nMsg.err <- err
×
3182
                                        return nil, false
×
3183
                                }
×
3184

3185
                                upd.Signature = lnSig
×
3186
                        }
3187
                }
3188

3189
                // Get our peer's public key.
3190
                remotePubKey := remotePubFromChanInfo(
11✔
3191
                        chanInfo, upd.ChannelFlags,
11✔
3192
                )
11✔
3193

11✔
3194
                log.Debugf("The message %v has no AuthProof, sending the "+
11✔
3195
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
11✔
3196

11✔
3197
                // Now we'll attempt to send the channel update message
11✔
3198
                // reliably to the remote peer in the background, so that we
11✔
3199
                // don't block if the peer happens to be offline at the moment.
11✔
3200
                err := d.reliableSender.sendMessage(upd, remotePubKey)
11✔
3201
                if err != nil {
11✔
3202
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3203
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
3204
                                upd.ShortChannelID, remotePubKey, err)
×
3205
                        nMsg.err <- err
×
3206
                        return nil, false
×
3207
                }
×
3208
        }
3209

3210
        // Channel update announcement was successfully processed and now it
3211
        // can be broadcast to the rest of the network. However, we'll only
3212
        // broadcast the channel update announcement if it has an attached
3213
        // authentication proof. We also won't broadcast the update if it
3214
        // contains an alias because the network would reject this.
3215
        var announcements []networkMsg
39✔
3216
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
58✔
3217
                announcements = append(announcements, networkMsg{
19✔
3218
                        peer:     nMsg.peer,
19✔
3219
                        source:   nMsg.source,
19✔
3220
                        isRemote: nMsg.isRemote,
19✔
3221
                        msg:      upd,
19✔
3222
                })
19✔
3223
        }
19✔
3224

3225
        nMsg.err <- nil
39✔
3226

39✔
3227
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
39✔
3228
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
39✔
3229
                timestamp)
39✔
3230
        return announcements, true
39✔
3231
}
3232

3233
// handleAnnSig processes a new announcement signatures message.
3234
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
3235
        ann *lnwire.AnnounceSignatures1) ([]networkMsg, bool) {
21✔
3236

21✔
3237
        needBlockHeight := ann.ShortChannelID.BlockHeight +
21✔
3238
                d.cfg.ProofMatureDelta
21✔
3239
        shortChanID := ann.ShortChannelID.ToUint64()
21✔
3240

21✔
3241
        prefix := "local"
21✔
3242
        if nMsg.isRemote {
32✔
3243
                prefix = "remote"
11✔
3244
        }
11✔
3245

3246
        log.Infof("Received new %v announcement signature for %v", prefix,
21✔
3247
                ann.ShortChannelID)
21✔
3248

21✔
3249
        // By the specification, channel announcement proofs should be sent
21✔
3250
        // after some number of confirmations after channel was registered in
21✔
3251
        // bitcoin blockchain. Therefore, we check if the proof is mature.
21✔
3252
        d.Lock()
21✔
3253
        premature := d.isPremature(
21✔
3254
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
21✔
3255
        )
21✔
3256
        if premature {
21✔
3257
                log.Warnf("Premature proof announcement, current block height"+
×
3258
                        "lower than needed: %v < %v", d.bestHeight,
×
3259
                        needBlockHeight)
×
3260
                d.Unlock()
×
3261
                nMsg.err <- nil
×
3262
                return nil, false
×
3263
        }
×
3264
        d.Unlock()
21✔
3265

21✔
3266
        // Ensure that we know of a channel with the target channel ID before
21✔
3267
        // proceeding further.
21✔
3268
        //
21✔
3269
        // We must acquire the mutex for this channel ID before getting the
21✔
3270
        // channel from the database, to ensure what we read does not change
21✔
3271
        // before we call AddProof() later.
21✔
3272
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
21✔
3273
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
21✔
3274

21✔
3275
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
21✔
3276
                ann.ShortChannelID,
21✔
3277
        )
21✔
3278
        if err != nil {
22✔
3279
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
1✔
3280
                if err != nil {
1✔
3281
                        err := fmt.Errorf("unable to store the proof for "+
×
3282
                                "short_chan_id=%v: %v", shortChanID, err)
×
3283
                        log.Error(err)
×
3284
                        nMsg.err <- err
×
3285

×
3286
                        return nil, false
×
3287
                }
×
3288

3289
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
1✔
3290
                err := d.cfg.WaitingProofStore.Add(proof)
1✔
3291
                if err != nil {
1✔
3292
                        err := fmt.Errorf("unable to store the proof for "+
×
3293
                                "short_chan_id=%v: %v", shortChanID, err)
×
3294
                        log.Error(err)
×
3295
                        nMsg.err <- err
×
3296
                        return nil, false
×
3297
                }
×
3298

3299
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
1✔
3300
                        ", adding to waiting batch", prefix, shortChanID)
1✔
3301
                nMsg.err <- nil
1✔
3302
                return nil, false
1✔
3303
        }
3304

3305
        nodeID := nMsg.source.SerializeCompressed()
20✔
3306
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
20✔
3307
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
20✔
3308

20✔
3309
        // Ensure that channel that was retrieved belongs to the peer which
20✔
3310
        // sent the proof announcement.
20✔
3311
        if !(isFirstNode || isSecondNode) {
20✔
3312
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3313
                        "to the peer which sent the proof, short_chan_id=%v",
×
3314
                        shortChanID)
×
3315
                log.Error(err)
×
3316
                nMsg.err <- err
×
3317
                return nil, false
×
3318
        }
×
3319

3320
        // If proof was sent by a local sub-system, then we'll send the
3321
        // announcement signature to the remote node so they can also
3322
        // reconstruct the full channel announcement.
3323
        if !nMsg.isRemote {
30✔
3324
                var remotePubKey [33]byte
10✔
3325
                if isFirstNode {
20✔
3326
                        remotePubKey = chanInfo.NodeKey2Bytes
10✔
3327
                } else {
10✔
3328
                        remotePubKey = chanInfo.NodeKey1Bytes
×
3329
                }
×
3330

3331
                // Since the remote peer might not be online we'll call a
3332
                // method that will attempt to deliver the proof when it comes
3333
                // online.
3334
                err := d.reliableSender.sendMessage(ann, remotePubKey)
10✔
3335
                if err != nil {
10✔
3336
                        err := fmt.Errorf("unable to reliably send %v for "+
×
3337
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
3338
                                ann.ShortChannelID, remotePubKey, err)
×
3339
                        nMsg.err <- err
×
3340
                        return nil, false
×
3341
                }
×
3342
        }
3343

3344
        // Check if we already have the full proof for this channel.
3345
        if chanInfo.AuthProof != nil {
21✔
3346
                // If we already have the fully assembled proof, then the peer
1✔
3347
                // sending us their proof has probably not received our local
1✔
3348
                // proof yet. So be kind and send them the full proof.
1✔
3349
                if nMsg.isRemote {
2✔
3350
                        peerID := nMsg.source.SerializeCompressed()
1✔
3351
                        log.Debugf("Got AnnounceSignatures for channel with " +
1✔
3352
                                "full proof.")
1✔
3353

1✔
3354
                        d.wg.Add(1)
1✔
3355
                        go func() {
2✔
3356
                                defer d.wg.Done()
1✔
3357

1✔
3358
                                log.Debugf("Received half proof for channel "+
1✔
3359
                                        "%v with existing full proof. Sending"+
1✔
3360
                                        " full proof to peer=%x",
1✔
3361
                                        ann.ChannelID, peerID)
1✔
3362

1✔
3363
                                ca, _, _, err := netann.CreateChanAnnouncement(
1✔
3364
                                        chanInfo.AuthProof, chanInfo, e1, e2,
1✔
3365
                                )
1✔
3366
                                if err != nil {
1✔
3367
                                        log.Errorf("unable to gen ann: %v",
×
3368
                                                err)
×
3369
                                        return
×
3370
                                }
×
3371

3372
                                err = nMsg.peer.SendMessage(false, ca)
1✔
3373
                                if err != nil {
1✔
3374
                                        log.Errorf("Failed sending full proof"+
×
3375
                                                " to peer=%x: %v", peerID, err)
×
3376
                                        return
×
3377
                                }
×
3378

3379
                                log.Debugf("Full proof sent to peer=%x for "+
1✔
3380
                                        "chanID=%v", peerID, ann.ChannelID)
1✔
3381
                        }()
3382
                }
3383

3384
                log.Debugf("Already have proof for channel with chanID=%v",
1✔
3385
                        ann.ChannelID)
1✔
3386
                nMsg.err <- nil
1✔
3387
                return nil, true
1✔
3388
        }
3389

3390
        // Check that we received the opposite proof. If so, then we're now
3391
        // able to construct the full proof, and create the channel
3392
        // announcement. If we didn't receive the opposite half of the proof
3393
        // then we should store this one, and wait for the opposite to be
3394
        // received.
3395
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
19✔
3396
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
19✔
3397
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
19✔
3398
                err := fmt.Errorf("unable to get the opposite proof for "+
×
3399
                        "short_chan_id=%v: %v", shortChanID, err)
×
3400
                log.Error(err)
×
3401
                nMsg.err <- err
×
3402
                return nil, false
×
3403
        }
×
3404

3405
        if err == channeldb.ErrWaitingProofNotFound {
28✔
3406
                err := d.cfg.WaitingProofStore.Add(proof)
9✔
3407
                if err != nil {
9✔
3408
                        err := fmt.Errorf("unable to store the proof for "+
×
3409
                                "short_chan_id=%v: %v", shortChanID, err)
×
3410
                        log.Error(err)
×
3411
                        nMsg.err <- err
×
3412
                        return nil, false
×
3413
                }
×
3414

3415
                log.Infof("1/2 of channel ann proof received for "+
9✔
3416
                        "short_chan_id=%v, waiting for other half",
9✔
3417
                        shortChanID)
9✔
3418

9✔
3419
                nMsg.err <- nil
9✔
3420
                return nil, false
9✔
3421
        }
3422

3423
        // We now have both halves of the channel announcement proof, then
3424
        // we'll reconstruct the initial announcement so we can validate it
3425
        // shortly below.
3426
        var dbProof models.ChannelAuthProof
10✔
3427
        if isFirstNode {
11✔
3428
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
1✔
3429
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
1✔
3430
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
1✔
3431
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
1✔
3432
        } else {
10✔
3433
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
9✔
3434
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
9✔
3435
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
9✔
3436
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
9✔
3437
        }
9✔
3438

3439
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
10✔
3440
                &dbProof, chanInfo, e1, e2,
10✔
3441
        )
10✔
3442
        if err != nil {
10✔
3443
                log.Error(err)
×
3444
                nMsg.err <- err
×
3445
                return nil, false
×
3446
        }
×
3447

3448
        // With all the necessary components assembled validate the full
3449
        // channel announcement proof.
3450
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
10✔
3451
        if err != nil {
10✔
3452
                err := fmt.Errorf("channel announcement proof for "+
×
3453
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
3454

×
3455
                log.Error(err)
×
3456
                nMsg.err <- err
×
3457
                return nil, false
×
3458
        }
×
3459

3460
        // If the channel was returned by the router it means that existence of
3461
        // funding point and inclusion of nodes bitcoin keys in it already
3462
        // checked by the router. In this stage we should check that node keys
3463
        // attest to the bitcoin keys by validating the signatures of
3464
        // announcement. If proof is valid then we'll populate the channel edge
3465
        // with it, so we can announce it on peer connect.
3466
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
10✔
3467
        if err != nil {
10✔
3468
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3469
                        " %v", ann.ChannelID, err)
×
3470
                log.Error(err)
×
3471
                nMsg.err <- err
×
3472
                return nil, false
×
3473
        }
×
3474

3475
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
10✔
3476
        if err != nil {
10✔
3477
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
3478
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
3479
                log.Error(err)
×
3480
                nMsg.err <- err
×
3481
                return nil, false
×
3482
        }
×
3483

3484
        // Proof was successfully created and now can announce the channel to
3485
        // the remain network.
3486
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
10✔
3487
                ", adding to next ann batch", shortChanID)
10✔
3488

10✔
3489
        // Assemble the necessary announcements to add to the next broadcasting
10✔
3490
        // batch.
10✔
3491
        var announcements []networkMsg
10✔
3492
        announcements = append(announcements, networkMsg{
10✔
3493
                peer:   nMsg.peer,
10✔
3494
                source: nMsg.source,
10✔
3495
                msg:    chanAnn,
10✔
3496
        })
10✔
3497
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
19✔
3498
                announcements = append(announcements, networkMsg{
9✔
3499
                        peer:   nMsg.peer,
9✔
3500
                        source: src,
9✔
3501
                        msg:    e1Ann,
9✔
3502
                })
9✔
3503
        }
9✔
3504
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
18✔
3505
                announcements = append(announcements, networkMsg{
8✔
3506
                        peer:   nMsg.peer,
8✔
3507
                        source: src,
8✔
3508
                        msg:    e2Ann,
8✔
3509
                })
8✔
3510
        }
8✔
3511

3512
        // We'll also send along the node announcements for each channel
3513
        // participant if we know of them. To ensure our node announcement
3514
        // propagates to our channel counterparty, we'll set the source for
3515
        // each announcement to the node it belongs to, otherwise we won't send
3516
        // it since the source gets skipped. This isn't necessary for channel
3517
        // updates and announcement signatures since we send those directly to
3518
        // our channel counterparty through the gossiper's reliable sender.
3519
        node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
10✔
3520
        if err != nil {
12✔
3521
                log.Debugf("Unable to fetch node announcement for %x: %v",
2✔
3522
                        chanInfo.NodeKey1Bytes, err)
2✔
3523
        } else {
10✔
3524
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
16✔
3525
                        announcements = append(announcements, networkMsg{
8✔
3526
                                peer:   nMsg.peer,
8✔
3527
                                source: nodeKey1,
8✔
3528
                                msg:    node1Ann,
8✔
3529
                        })
8✔
3530
                }
8✔
3531
        }
3532

3533
        node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
10✔
3534
        if err != nil {
14✔
3535
                log.Debugf("Unable to fetch node announcement for %x: %v",
4✔
3536
                        chanInfo.NodeKey2Bytes, err)
4✔
3537
        } else {
10✔
3538
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
12✔
3539
                        announcements = append(announcements, networkMsg{
6✔
3540
                                peer:   nMsg.peer,
6✔
3541
                                source: nodeKey2,
6✔
3542
                                msg:    node2Ann,
6✔
3543
                        })
6✔
3544
                }
6✔
3545
        }
3546

3547
        nMsg.err <- nil
10✔
3548
        return announcements, true
10✔
3549
}
3550

3551
// isBanned returns true if the peer identified by pubkey is banned for sending
3552
// invalid channel announcements.
3553
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
205✔
3554
        return d.banman.isBanned(pubkey)
205✔
3555
}
205✔
3556

3557
// ShouldDisconnect returns true if we should disconnect the peer identified by
3558
// pubkey.
3559
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
3560
        bool, error) {
203✔
3561

203✔
3562
        pubkeySer := pubkey.SerializeCompressed()
203✔
3563

203✔
3564
        var pubkeyBytes [33]byte
203✔
3565
        copy(pubkeyBytes[:], pubkeySer)
203✔
3566

203✔
3567
        // If the public key is banned, check whether or not this is a channel
203✔
3568
        // peer.
203✔
3569
        if d.isBanned(pubkeyBytes) {
205✔
3570
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
2✔
3571
                if err != nil {
2✔
3572
                        return false, err
×
3573
                }
×
3574

3575
                // We should only disconnect non-channel peers.
3576
                if !isChanPeer {
3✔
3577
                        return true, nil
1✔
3578
                }
1✔
3579
        }
3580

3581
        return false, nil
202✔
3582
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc