• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12913757673

22 Jan 2025 05:30PM UTC coverage: 58.788% (+0.07%) from 58.72%
12913757673

Pull #8831

github

bhandras
docs: update release notes for 0.19.0
Pull Request #8831: invoices: migrate KV invoices to native SQL for users of KV SQL backends

567 of 732 new or added lines in 12 files covered. (77.46%)

39 existing lines in 15 files now uncovered.

135956 of 231266 relevant lines covered (58.79%)

19252.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.11
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/davecgh/go-spew/spew"
17
        "github.com/lightninglabs/neutrino/cache"
18
        "github.com/lightninglabs/neutrino/cache/lru"
19
        "github.com/lightningnetwork/lnd/batch"
20
        "github.com/lightningnetwork/lnd/chainntnfs"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/fn/v2"
23
        "github.com/lightningnetwork/lnd/graph"
24
        graphdb "github.com/lightningnetwork/lnd/graph/db"
25
        "github.com/lightningnetwork/lnd/graph/db/models"
26
        "github.com/lightningnetwork/lnd/keychain"
27
        "github.com/lightningnetwork/lnd/lnpeer"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwire"
31
        "github.com/lightningnetwork/lnd/multimutex"
32
        "github.com/lightningnetwork/lnd/netann"
33
        "github.com/lightningnetwork/lnd/routing/route"
34
        "github.com/lightningnetwork/lnd/ticker"
35
        "golang.org/x/time/rate"
36
)
37

38
const (
39
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
40
        // for a specific channel and direction that we'll accept over an
41
        // interval.
42
        DefaultMaxChannelUpdateBurst = 10
43

44
        // DefaultChannelUpdateInterval is the default interval we'll use to
45
        // determine how often we should allow a new update for a specific
46
        // channel and direction.
47
        DefaultChannelUpdateInterval = time.Minute
48

49
        // maxPrematureUpdates tracks the max amount of premature channel
50
        // updates that we'll hold onto.
51
        maxPrematureUpdates = 100
52

53
        // maxFutureMessages tracks the max amount of future messages that
54
        // we'll hold onto.
55
        maxFutureMessages = 1000
56

57
        // DefaultSubBatchDelay is the default delay we'll use when
58
        // broadcasting the next announcement batch.
59
        DefaultSubBatchDelay = 5 * time.Second
60

61
        // maxRejectedUpdates tracks the max amount of rejected channel updates
62
        // we'll maintain. This is the global size across all peers. We'll
63
        // allocate ~3 MB max to the cache.
64
        maxRejectedUpdates = 10_000
65
)
66

67
var (
68
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
69
        // is in the process of being shut down.
70
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
71

72
        // ErrGossipSyncerNotFound signals that we were unable to find an active
73
        // gossip syncer corresponding to a gossip query message received from
74
        // the remote peer.
75
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
76

77
        // emptyPubkey is used to compare compressed pubkeys against an empty
78
        // byte array.
79
        emptyPubkey [33]byte
80
)
81

82
// optionalMsgFields is a set of optional message fields that external callers
83
// can provide that serve useful when processing a specific network
84
// announcement.
85
type optionalMsgFields struct {
86
        capacity      *btcutil.Amount
87
        channelPoint  *wire.OutPoint
88
        remoteAlias   *lnwire.ShortChannelID
89
        tapscriptRoot fn.Option[chainhash.Hash]
90
}
91

92
// apply applies the optional fields within the functional options.
93
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
94
        for _, optionalMsgField := range optionalMsgFields {
95
                optionalMsgField(f)
96
        }
97
}
98

50✔
99
// OptionalMsgField is a functional option parameter that can be used to provide
58✔
100
// external information that is not included within a network message but serves
8✔
101
// useful when processing it.
8✔
102
type OptionalMsgField func(*optionalMsgFields)
103

104
// ChannelCapacity is an optional field that lets the gossiper know of the
105
// capacity of a channel.
106
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
107
        return func(f *optionalMsgFields) {
108
                f.capacity = &capacity
109
        }
110
}
111

30✔
112
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
34✔
113
// of a channel.
4✔
114
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
4✔
115
        return func(f *optionalMsgFields) {
116
                f.channelPoint = &op
117
        }
118
}
119

33✔
120
// TapscriptRoot is an optional field that lets the gossiper know of the root of
40✔
121
// the tapscript tree for a custom channel.
7✔
122
func TapscriptRoot(root fn.Option[chainhash.Hash]) OptionalMsgField {
7✔
123
        return func(f *optionalMsgFields) {
124
                f.tapscriptRoot = root
125
        }
126
}
127

29✔
128
// RemoteAlias is an optional field that lets the gossiper know that a locally
32✔
129
// sent channel update is actually an update for the peer that should replace
3✔
130
// the ShortChannelID field with the remote's alias. This is only used for
3✔
131
// channels with peers where the option-scid-alias feature bit was negotiated.
132
// The channel update will be added to the graph under the original SCID, but
133
// will be modified and re-signed with this alias.
134
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
135
        return func(f *optionalMsgFields) {
136
                f.remoteAlias = alias
137
        }
138
}
139

29✔
140
// networkMsg couples a routing related wire message with the peer that
32✔
141
// originally sent it.
3✔
142
type networkMsg struct {
3✔
143
        peer              lnpeer.Peer
144
        source            *btcec.PublicKey
145
        msg               lnwire.Message
146
        optionalMsgFields *optionalMsgFields
147

148
        isRemote bool
149

150
        err chan error
151
}
152

153
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
154
// wishes to update a particular set of channels. New ChannelUpdate messages
155
// will be crafted to be sent out during the next broadcast epoch and the fee
156
// updates committed to the lower layer.
157
type chanPolicyUpdateRequest struct {
158
        edgesToUpdate []EdgeWithInfo
159
        errChan       chan error
160
}
161

162
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
163
// syncer at all times.
164
type PinnedSyncers map[route.Vertex]struct{}
165

166
// Config defines the configuration for the service. ALL elements within the
167
// configuration MUST be non-nil for the service to carry out its duties.
168
type Config struct {
169
        // ChainHash is a hash that indicates which resident chain of the
170
        // AuthenticatedGossiper. Any announcements that don't match this
171
        // chain hash will be ignored.
172
        //
173
        // TODO(roasbeef): eventually make into map so can de-multiplex
174
        // incoming announcements
175
        //   * also need to do same for Notifier
176
        ChainHash chainhash.Hash
177

178
        // Graph is the subsystem which is responsible for managing the
179
        // topology of lightning network. After incoming channel, node, channel
180
        // updates announcements are validated they are sent to the router in
181
        // order to be included in the LN graph.
182
        Graph graph.ChannelGraphSource
183

184
        // ChainIO represents an abstraction over a source that can query the
185
        // blockchain.
186
        ChainIO lnwallet.BlockChainIO
187

188
        // ChanSeries is an interfaces that provides access to a time series
189
        // view of the current known channel graph. Each GossipSyncer enabled
190
        // peer will utilize this in order to create and respond to channel
191
        // graph time series queries.
192
        ChanSeries ChannelGraphTimeSeries
193

194
        // Notifier is used for receiving notifications of incoming blocks.
195
        // With each new incoming block found we process previously premature
196
        // announcements.
197
        //
198
        // TODO(roasbeef): could possibly just replace this with an epoch
199
        // channel.
200
        Notifier chainntnfs.ChainNotifier
201

202
        // Broadcast broadcasts a particular set of announcements to all peers
203
        // that the daemon is connected to. If supplied, the exclude parameter
204
        // indicates that the target peer should be excluded from the
205
        // broadcast.
206
        Broadcast func(skips map[route.Vertex]struct{},
207
                msg ...lnwire.Message) error
208

209
        // NotifyWhenOnline is a function that allows the gossiper to be
210
        // notified when a certain peer comes online, allowing it to
211
        // retry sending a peer message.
212
        //
213
        // NOTE: The peerChan channel must be buffered.
214
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
215

216
        // NotifyWhenOffline is a function that allows the gossiper to be
217
        // notified when a certain peer disconnects, allowing it to request a
218
        // notification for when it reconnects.
219
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
220

221
        // FetchSelfAnnouncement retrieves our current node announcement, for
222
        // use when determining whether we should update our peers about our
223
        // presence in the network.
224
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
225

226
        // UpdateSelfAnnouncement produces a new announcement for our node with
227
        // an updated timestamp which can be broadcast to our peers.
228
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
229

230
        // ProofMatureDelta the number of confirmations which is needed before
231
        // exchange the channel announcement proofs.
232
        ProofMatureDelta uint32
233

234
        // TrickleDelay the period of trickle timer which flushes to the
235
        // network the pending batch of new announcements we've received since
236
        // the last trickle tick.
237
        TrickleDelay time.Duration
238

239
        // RetransmitTicker is a ticker that ticks with a period which
240
        // indicates that we should check if we need re-broadcast any of our
241
        // personal channels.
242
        RetransmitTicker ticker.Ticker
243

244
        // RebroadcastInterval is the maximum time we wait between sending out
245
        // channel updates for our active channels and our own node
246
        // announcement. We do this to ensure our active presence on the
247
        // network is known, and we are not being considered a zombie node or
248
        // having zombie channels.
249
        RebroadcastInterval time.Duration
250

251
        // WaitingProofStore is a persistent storage of partial channel proof
252
        // announcement messages. We use it to buffer half of the material
253
        // needed to reconstruct a full authenticated channel announcement.
254
        // Once we receive the other half the channel proof, we'll be able to
255
        // properly validate it and re-broadcast it out to the network.
256
        //
257
        // TODO(wilmer): make interface to prevent channeldb dependency.
258
        WaitingProofStore *channeldb.WaitingProofStore
259

260
        // MessageStore is a persistent storage of gossip messages which we will
261
        // use to determine which messages need to be resent for a given peer.
262
        MessageStore GossipMessageStore
263

264
        // AnnSigner is an instance of the MessageSigner interface which will
265
        // be used to manually sign any outgoing channel updates. The signer
266
        // implementation should be backed by the public key of the backing
267
        // Lightning node.
268
        //
269
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
270
        // here?
271
        AnnSigner lnwallet.MessageSigner
272

273
        // ScidCloser is an instance of ClosedChannelTracker that helps the
274
        // gossiper cut down on spam channel announcements for already closed
275
        // channels.
276
        ScidCloser ClosedChannelTracker
277

278
        // NumActiveSyncers is the number of peers for which we should have
279
        // active syncers with. After reaching NumActiveSyncers, any future
280
        // gossip syncers will be passive.
281
        NumActiveSyncers int
282

283
        // NoTimestampQueries will prevent the GossipSyncer from querying
284
        // timestamps of announcement messages from the peer and from replying
285
        // to timestamp queries.
286
        NoTimestampQueries bool
287

288
        // RotateTicker is a ticker responsible for notifying the SyncManager
289
        // when it should rotate its active syncers. A single active syncer with
290
        // a chansSynced state will be exchanged for a passive syncer in order
291
        // to ensure we don't keep syncing with the same peers.
292
        RotateTicker ticker.Ticker
293

294
        // HistoricalSyncTicker is a ticker responsible for notifying the
295
        // syncManager when it should attempt a historical sync with a gossip
296
        // sync peer.
297
        HistoricalSyncTicker ticker.Ticker
298

299
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
300
        // syncManager when it should attempt to start the next pending
301
        // activeSyncer due to the current one not completing its state machine
302
        // within the timeout.
303
        ActiveSyncerTimeoutTicker ticker.Ticker
304

305
        // MinimumBatchSize is minimum size of a sub batch of announcement
306
        // messages.
307
        MinimumBatchSize int
308

309
        // SubBatchDelay is the delay between sending sub batches of
310
        // gossip messages.
311
        SubBatchDelay time.Duration
312

313
        // IgnoreHistoricalFilters will prevent syncers from replying with
314
        // historical data when the remote peer sets a gossip_timestamp_range.
315
        // This prevents ranges with old start times from causing us to dump the
316
        // graph on connect.
317
        IgnoreHistoricalFilters bool
318

319
        // PinnedSyncers is a set of peers that will always transition to
320
        // ActiveSync upon connection. These peers will never transition to
321
        // PassiveSync.
322
        PinnedSyncers PinnedSyncers
323

324
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
325
        // specific channel and direction that we'll accept over an interval.
326
        MaxChannelUpdateBurst int
327

328
        // ChannelUpdateInterval specifies the interval we'll use to determine
329
        // how often we should allow a new update for a specific channel and
330
        // direction.
331
        ChannelUpdateInterval time.Duration
332

333
        // IsAlias returns true if a given ShortChannelID is an alias for
334
        // option_scid_alias channels.
335
        IsAlias func(scid lnwire.ShortChannelID) bool
336

337
        // SignAliasUpdate is used to re-sign a channel update using the
338
        // remote's alias if the option-scid-alias feature bit was negotiated.
339
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
340
                error)
341

342
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
343
        // This is used for channels that have negotiated the option-scid-alias
344
        // feature bit.
345
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
346
                lnwire.ShortChannelID, error)
347

348
        // GetAlias allows the gossiper to look up the peer's alias for a given
349
        // ChannelID. This is used to sign updates for them if the channel has
350
        // no AuthProof and the option-scid-alias feature bit was negotiated.
351
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
352

353
        // FindChannel allows the gossiper to find a channel that we're party
354
        // to without iterating over the entire set of open channels.
355
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
356
                *channeldb.OpenChannel, error)
357

358
        // IsStillZombieChannel takes the timestamps of the latest channel
359
        // updates for a channel and returns true if the channel should be
360
        // considered a zombie based on these timestamps.
361
        IsStillZombieChannel func(time.Time, time.Time) bool
362
}
363

364
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
365
// used to let the caller of the lru.Cache know if a message has already been
366
// processed or not.
367
type processedNetworkMsg struct {
368
        processed bool
369
        msg       *networkMsg
370
}
371

372
// cachedNetworkMsg is a wrapper around a network message that can be used with
373
// *lru.Cache.
374
type cachedNetworkMsg struct {
375
        msgs []*processedNetworkMsg
376
}
377

378
// Size returns the "size" of an entry. We return the number of items as we
379
// just want to limit the total amount of entries rather than do accurate size
380
// accounting.
381
func (c *cachedNetworkMsg) Size() (uint64, error) {
382
        return uint64(len(c.msgs)), nil
383
}
384

385
// rejectCacheKey is the cache key that we'll use to track announcements we've
386
// recently rejected.
5✔
387
type rejectCacheKey struct {
5✔
388
        pubkey [33]byte
5✔
389
        chanID uint64
390
}
391

392
// newRejectCacheKey returns a new cache key for the reject cache.
393
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
394
        k := rejectCacheKey{
395
                chanID: cid,
396
                pubkey: pub,
397
        }
398

465✔
399
        return k
465✔
400
}
465✔
401

465✔
402
// sourceToPub returns a serialized-compressed public key for use in the reject
465✔
403
// cache.
465✔
404
func sourceToPub(pk *btcec.PublicKey) [33]byte {
465✔
405
        var pub [33]byte
465✔
406
        copy(pub[:], pk.SerializeCompressed())
407
        return pub
408
}
409

479✔
410
// cachedReject is the empty value used to track the value for rejects.
479✔
411
type cachedReject struct {
479✔
412
}
479✔
413

479✔
414
// Size returns the "size" of an entry. We return 1 as we just want to limit
415
// the total size.
416
func (c *cachedReject) Size() (uint64, error) {
417
        return 1, nil
418
}
419

420
// AuthenticatedGossiper is a subsystem which is responsible for receiving
421
// announcements, validating them and applying the changes to router, syncing
203✔
422
// lightning network with newly connected nodes, broadcasting announcements
203✔
423
// after validation, negotiating the channel announcement proofs exchange and
203✔
424
// handling the premature announcements. All outgoing announcements are
425
// expected to be properly signed as dictated in BOLT#7, additionally, all
426
// incoming message are expected to be well formed and signed. Invalid messages
427
// will be rejected by this struct.
428
type AuthenticatedGossiper struct {
429
        // Parameters which are needed to properly handle the start and stop of
430
        // the service.
431
        started sync.Once
432
        stopped sync.Once
433

434
        // bestHeight is the height of the block at the tip of the main chain
435
        // as we know it. Accesses *MUST* be done with the gossiper's lock
436
        // held.
437
        bestHeight uint32
438

439
        quit chan struct{}
440
        wg   sync.WaitGroup
441

442
        // cfg is a copy of the configuration struct that the gossiper service
443
        // was initialized with.
444
        cfg *Config
445

446
        // blockEpochs encapsulates a stream of block epochs that are sent at
447
        // every new block height.
448
        blockEpochs *chainntnfs.BlockEpochEvent
449

450
        // prematureChannelUpdates is a map of ChannelUpdates we have received
451
        // that wasn't associated with any channel we know about.  We store
452
        // them temporarily, such that we can reprocess them when a
453
        // ChannelAnnouncement for the channel is received.
454
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
455

456
        // banman tracks our peer's ban status.
457
        banman *banman
458

459
        // networkMsgs is a channel that carries new network broadcasted
460
        // message from outside the gossiper service to be processed by the
461
        // networkHandler.
462
        networkMsgs chan *networkMsg
463

464
        // futureMsgs is a list of premature network messages that have a block
465
        // height specified in the future. We will save them and resend it to
466
        // the chan networkMsgs once the block height has reached. The cached
467
        // map format is,
468
        //   {msgID1: msg1, msgID2: msg2, ...}
469
        futureMsgs *futureMsgCache
470

471
        // chanPolicyUpdates is a channel that requests to update the
472
        // forwarding policy of a set of channels is sent over.
473
        chanPolicyUpdates chan *chanPolicyUpdateRequest
474

475
        // selfKey is the identity public key of the backing Lightning node.
476
        selfKey *btcec.PublicKey
477

478
        // selfKeyLoc is the locator for the identity public key of the backing
479
        // Lightning node.
480
        selfKeyLoc keychain.KeyLocator
481

482
        // channelMtx is used to restrict the database access to one
483
        // goroutine per channel ID. This is done to ensure that when
484
        // the gossiper is handling an announcement, the db state stays
485
        // consistent between when the DB is first read until it's written.
486
        channelMtx *multimutex.Mutex[uint64]
487

488
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
489

490
        // syncMgr is a subsystem responsible for managing the gossip syncers
491
        // for peers currently connected. When a new peer is connected, the
492
        // manager will create its accompanying gossip syncer and determine
493
        // whether it should have an activeSync or passiveSync sync type based
494
        // on how many other gossip syncers are currently active. Any activeSync
495
        // gossip syncers are started in a round-robin manner to ensure we're
496
        // not syncing with multiple peers at the same time.
497
        syncMgr *SyncManager
498

499
        // reliableSender is a subsystem responsible for handling reliable
500
        // message send requests to peers. This should only be used for channels
501
        // that are unadvertised at the time of handling the message since if it
502
        // is advertised, then peers should be able to get the message from the
503
        // network.
504
        reliableSender *reliableSender
505

506
        // chanUpdateRateLimiter contains rate limiters for each direction of
507
        // a channel update we've processed. We'll use these to determine
508
        // whether we should accept a new update for a specific channel and
509
        // direction.
510
        //
511
        // NOTE: This map must be synchronized with the main
512
        // AuthenticatedGossiper lock.
513
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
514

515
        sync.Mutex
516
}
517

518
// New creates a new AuthenticatedGossiper instance, initialized with the
519
// passed configuration parameters.
520
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
521
        gossiper := &AuthenticatedGossiper{
522
                selfKey:           selfKeyDesc.PubKey,
523
                selfKeyLoc:        selfKeyDesc.KeyLocator,
524
                cfg:               &cfg,
525
                networkMsgs:       make(chan *networkMsg),
30✔
526
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
30✔
527
                quit:              make(chan struct{}),
30✔
528
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
30✔
529
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
30✔
530
                        maxPrematureUpdates,
30✔
531
                ),
30✔
532
                channelMtx: multimutex.NewMutex[uint64](),
30✔
533
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
30✔
534
                        maxRejectedUpdates,
30✔
535
                ),
30✔
536
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
30✔
537
                banman:                newBanman(),
30✔
538
        }
30✔
539

30✔
540
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
30✔
541
                ChainHash:               cfg.ChainHash,
30✔
542
                ChanSeries:              cfg.ChanSeries,
30✔
543
                RotateTicker:            cfg.RotateTicker,
30✔
544
                HistoricalSyncTicker:    cfg.HistoricalSyncTicker,
30✔
545
                NumActiveSyncers:        cfg.NumActiveSyncers,
30✔
546
                NoTimestampQueries:      cfg.NoTimestampQueries,
30✔
547
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
30✔
548
                BestHeight:              gossiper.latestHeight,
30✔
549
                PinnedSyncers:           cfg.PinnedSyncers,
30✔
550
                IsStillZombieChannel:    cfg.IsStillZombieChannel,
30✔
551
        })
30✔
552

30✔
553
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
30✔
554
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
30✔
555
                NotifyWhenOffline: cfg.NotifyWhenOffline,
30✔
556
                MessageStore:      cfg.MessageStore,
30✔
557
                IsMsgStale:        gossiper.isMsgStale,
30✔
558
        })
30✔
559

30✔
560
        return gossiper
30✔
561
}
30✔
562

30✔
563
// EdgeWithInfo contains the information that is required to update an edge.
30✔
564
type EdgeWithInfo struct {
30✔
565
        // Info describes the channel.
30✔
566
        Info *models.ChannelEdgeInfo
30✔
567

568
        // Edge describes the policy in one direction of the channel.
569
        Edge *models.ChannelEdgePolicy
570
}
571

572
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
573
// specified edge updates. Updates are done in two stages: first, the
574
// AuthenticatedGossiper ensures the update has been committed by dependent
575
// sub-systems, then it signs and broadcasts new updates to the network. A
576
// mapping between outpoints and updated channel policies is returned, which is
577
// used to update the forwarding policies of the underlying links.
578
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
579
        edgesToUpdate []EdgeWithInfo) error {
580

581
        errChan := make(chan error, 1)
582
        policyUpdate := &chanPolicyUpdateRequest{
583
                edgesToUpdate: edgesToUpdate,
584
                errChan:       errChan,
4✔
585
        }
4✔
586

4✔
587
        select {
4✔
588
        case d.chanPolicyUpdates <- policyUpdate:
4✔
589
                err := <-errChan
4✔
590
                return err
4✔
591
        case <-d.quit:
4✔
592
                return fmt.Errorf("AuthenticatedGossiper shutting down")
4✔
593
        }
4✔
594
}
4✔
595

4✔
596
// Start spawns network messages handler goroutine and registers on new block
×
597
// notifications in order to properly handle the premature announcements.
×
598
func (d *AuthenticatedGossiper) Start() error {
599
        var err error
600
        d.started.Do(func() {
601
                log.Info("Authenticated Gossiper starting")
602
                err = d.start()
603
        })
30✔
604
        return err
30✔
605
}
60✔
606

30✔
607
func (d *AuthenticatedGossiper) start() error {
30✔
608
        // First we register for new notifications of newly discovered blocks.
30✔
609
        // We do this immediately so we'll later be able to consume any/all
30✔
610
        // blocks which were discovered.
611
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
612
        if err != nil {
30✔
613
                return err
30✔
614
        }
30✔
615
        d.blockEpochs = blockEpochs
30✔
616

30✔
617
        height, err := d.cfg.Graph.CurrentBlockHeight()
30✔
618
        if err != nil {
×
619
                return err
×
620
        }
30✔
621
        d.bestHeight = height
30✔
622

30✔
623
        // Start the reliable sender. In case we had any pending messages ready
30✔
624
        // to be sent when the gossiper was last shut down, we must continue on
×
625
        // our quest to deliver them to their respective peers.
×
626
        if err := d.reliableSender.Start(); err != nil {
30✔
627
                return err
30✔
628
        }
30✔
629

30✔
630
        d.syncMgr.Start()
30✔
631

30✔
632
        d.banman.start()
×
633

×
634
        // Start receiving blocks in its dedicated goroutine.
635
        d.wg.Add(2)
30✔
636
        go d.syncBlockHeight()
30✔
637
        go d.networkHandler()
30✔
638

30✔
639
        return nil
30✔
640
}
30✔
641

30✔
642
// syncBlockHeight syncs the best block height for the gossiper by reading
30✔
643
// blockEpochs.
30✔
644
//
30✔
645
// NOTE: must be run as a goroutine.
646
func (d *AuthenticatedGossiper) syncBlockHeight() {
647
        defer d.wg.Done()
648

649
        for {
650
                select {
651
                // A new block has arrived, so we can re-process the previously
30✔
652
                // premature announcements.
30✔
653
                case newBlock, ok := <-d.blockEpochs.Epochs:
30✔
654
                        // If the channel has been closed, then this indicates
60✔
655
                        // the daemon is shutting down, so we exit ourselves.
30✔
656
                        if !ok {
657
                                return
658
                        }
3✔
659

3✔
660
                        // Once a new block arrives, we update our running
3✔
661
                        // track of the height of the chain tip.
6✔
662
                        d.Lock()
3✔
663
                        blockHeight := uint32(newBlock.Height)
3✔
664
                        d.bestHeight = blockHeight
665
                        d.Unlock()
666

667
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
3✔
668
                                newBlock.Hash)
3✔
669

3✔
670
                        // Resend future messages, if any.
3✔
671
                        d.resendFutureMessages(blockHeight)
3✔
672

3✔
673
                case <-d.quit:
3✔
674
                        return
3✔
675
                }
3✔
676
        }
3✔
677
}
678

27✔
679
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
27✔
680
// the unique ID when saving the message.
681
type futureMsgCache struct {
682
        *lru.Cache[uint64, *cachedFutureMsg]
683

684
        // msgID is a monotonically increased integer.
685
        msgID atomic.Uint64
686
}
687

688
// nextMsgID returns a unique message ID.
689
func (f *futureMsgCache) nextMsgID() uint64 {
690
        return f.msgID.Add(1)
691
}
692

693
// newFutureMsgCache creates a new future message cache with the underlying lru
694
// cache being initialized with the specified capacity.
6✔
695
func newFutureMsgCache(capacity uint64) *futureMsgCache {
6✔
696
        // Create a new cache.
6✔
697
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
698

699
        return &futureMsgCache{
700
                Cache: cache,
31✔
701
        }
31✔
702
}
31✔
703

31✔
704
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
31✔
705
type cachedFutureMsg struct {
31✔
706
        // msg is the network message.
31✔
707
        msg *networkMsg
31✔
708

709
        // height is the block height.
710
        height uint32
711
}
712

713
// Size returns the size of the message.
714
func (c *cachedFutureMsg) Size() (uint64, error) {
715
        // Return a constant 1.
716
        return 1, nil
717
}
718

719
// resendFutureMessages takes a block height, resends all the future messages
7✔
720
// found below and equal to that height and deletes those messages found in the
7✔
721
// gossiper's futureMsgs.
7✔
722
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
7✔
723
        var (
724
                // msgs are the target messages.
725
                msgs []*networkMsg
726

727
                // keys are the target messages' caching keys.
3✔
728
                keys []uint64
3✔
729
        )
3✔
730

3✔
731
        // filterMsgs is the visitor used when iterating the future cache.
3✔
732
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
3✔
733
                if cmsg.height <= height {
3✔
734
                        msgs = append(msgs, cmsg.msg)
3✔
735
                        keys = append(keys, k)
3✔
736
                }
3✔
737

6✔
738
                return true
6✔
739
        }
3✔
740

3✔
741
        // Filter out the target messages.
3✔
742
        d.futureMsgs.Range(filterMsgs)
743

3✔
744
        // Return early if no messages found.
745
        if len(msgs) == 0 {
746
                return
747
        }
3✔
748

3✔
749
        // Remove the filtered messages.
3✔
750
        for _, key := range keys {
6✔
751
                d.futureMsgs.Delete(key)
3✔
752
        }
3✔
753

754
        log.Debugf("Resending %d network messages at height %d",
755
                len(msgs), height)
6✔
756

3✔
757
        for _, msg := range msgs {
3✔
758
                select {
759
                case d.networkMsgs <- msg:
3✔
760
                case <-d.quit:
3✔
761
                        msg.err <- ErrGossiperShuttingDown
3✔
762
                }
6✔
763
        }
3✔
764
}
3✔
765

×
766
// Stop signals any active goroutines for a graceful closure.
×
767
func (d *AuthenticatedGossiper) Stop() error {
768
        d.stopped.Do(func() {
769
                log.Info("Authenticated gossiper shutting down...")
770
                defer log.Debug("Authenticated gossiper shutdown complete")
771

772
                d.stop()
31✔
773
        })
61✔
774
        return nil
30✔
775
}
30✔
776

30✔
777
func (d *AuthenticatedGossiper) stop() {
30✔
778
        log.Debug("Authenticated Gossiper is stopping")
30✔
779
        defer log.Debug("Authenticated Gossiper stopped")
31✔
780

781
        // `blockEpochs` is only initialized in the start routine so we make
782
        // sure we don't panic here in the case where the `Stop` method is
30✔
783
        // called when the `Start` method does not complete.
30✔
784
        if d.blockEpochs != nil {
30✔
785
                d.blockEpochs.Cancel()
30✔
786
        }
30✔
787

30✔
788
        d.syncMgr.Stop()
30✔
789

60✔
790
        d.banman.stop()
30✔
791

30✔
792
        close(d.quit)
793
        d.wg.Wait()
30✔
794

30✔
795
        // We'll stop our reliable sender after all of the gossiper's goroutines
30✔
796
        // have exited to ensure nothing can cause it to continue executing.
30✔
797
        d.reliableSender.Stop()
30✔
798
}
30✔
799

30✔
800
// TODO(roasbeef): need method to get current gossip timestamp?
30✔
801
//  * using mtx, check time rotate forward is needed?
30✔
802

30✔
803
// ProcessRemoteAnnouncement sends a new remote announcement message along with
804
// the peer that sent the routing message. The announcement will be processed
805
// then added to a queue for batched trickled announcement to all connected
806
// peers.  Remote channel announcements should contain the announcement proof
807
// and be fully validated.
808
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
809
        peer lnpeer.Peer) chan error {
810

811
        errChan := make(chan error, 1)
812

813
        // For messages in the known set of channel series queries, we'll
814
        // dispatch the message directly to the GossipSyncer, and skip the main
287✔
815
        // processing loop.
287✔
816
        switch m := msg.(type) {
287✔
817
        case *lnwire.QueryShortChanIDs,
287✔
818
                *lnwire.QueryChannelRange,
287✔
819
                *lnwire.ReplyChannelRange,
287✔
820
                *lnwire.ReplyShortChanIDsEnd:
287✔
821

287✔
822
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
823
                if !ok {
824
                        log.Warnf("Gossip syncer for peer=%x not found",
825
                                peer.PubKey())
3✔
826

3✔
827
                        errChan <- ErrGossipSyncerNotFound
3✔
828
                        return errChan
3✔
829
                }
×
830

×
831
                // If we've found the message target, then we'll dispatch the
×
832
                // message directly to it.
×
833
                syncer.ProcessQueryMsg(m, peer.QuitSignal())
×
834

×
835
                errChan <- nil
836
                return errChan
837

838
        // If a peer is updating its current update horizon, then we'll dispatch
3✔
839
        // that directly to the proper GossipSyncer.
3✔
840
        case *lnwire.GossipTimestampRange:
3✔
841
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
3✔
842
                if !ok {
843
                        log.Warnf("Gossip syncer for peer=%x not found",
844
                                peer.PubKey())
845

3✔
846
                        errChan <- ErrGossipSyncerNotFound
3✔
847
                        return errChan
3✔
848
                }
×
849

×
850
                // If we've found the message target, then we'll dispatch the
×
851
                // message directly to it.
×
852
                if err := syncer.ApplyGossipFilter(m); err != nil {
×
853
                        log.Warnf("Unable to apply gossip filter for peer=%x: "+
×
854
                                "%v", peer.PubKey(), err)
855

856
                        errChan <- err
857
                        return errChan
3✔
858
                }
×
859

×
860
                errChan <- nil
×
861
                return errChan
×
862

×
863
        // To avoid inserting edges in the graph for our own channels that we
×
864
        // have already closed, we ignore such channel announcements coming
865
        // from the remote.
3✔
866
        case *lnwire.ChannelAnnouncement1:
3✔
867
                ownKey := d.selfKey.SerializeCompressed()
868
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement1 " +
869
                        "for own channel")
870

871
                if bytes.Equal(m.NodeID1[:], ownKey) ||
222✔
872
                        bytes.Equal(m.NodeID2[:], ownKey) {
222✔
873

222✔
874
                        log.Warn(ownErr)
222✔
875
                        errChan <- ownErr
222✔
876
                        return errChan
222✔
877
                }
227✔
878
        }
5✔
879

5✔
880
        nMsg := &networkMsg{
5✔
881
                msg:      msg,
5✔
882
                isRemote: true,
5✔
883
                peer:     peer,
884
                source:   peer.IdentityKey(),
885
                err:      errChan,
285✔
886
        }
285✔
887

285✔
888
        select {
285✔
889
        case d.networkMsgs <- nMsg:
285✔
890

285✔
891
        // If the peer that sent us this error is quitting, then we don't need
285✔
892
        // to send back an error and can return immediately.
285✔
893
        case <-peer.QuitSignal():
285✔
894
                return nil
285✔
895
        case <-d.quit:
896
                nMsg.err <- ErrGossiperShuttingDown
897
        }
898

×
899
        return nMsg.err
×
900
}
×
901

×
902
// ProcessLocalAnnouncement sends a new remote announcement message along with
903
// the peer that sent the routing message. The announcement will be processed
904
// then added to a queue for batched trickled announcement to all connected
285✔
905
// peers.  Local channel announcements don't contain the announcement proof and
906
// will not be fully validated. Once the channel proofs are received, the
907
// entire channel announcement and update messages will be re-constructed and
908
// broadcast to the rest of the network.
909
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
910
        optionalFields ...OptionalMsgField) chan error {
911

912
        optionalMsgFields := &optionalMsgFields{}
913
        optionalMsgFields.apply(optionalFields...)
914

915
        nMsg := &networkMsg{
50✔
916
                msg:               msg,
50✔
917
                optionalMsgFields: optionalMsgFields,
50✔
918
                isRemote:          false,
50✔
919
                source:            d.selfKey,
50✔
920
                err:               make(chan error, 1),
50✔
921
        }
50✔
922

50✔
923
        select {
50✔
924
        case d.networkMsgs <- nMsg:
50✔
925
        case <-d.quit:
50✔
926
                nMsg.err <- ErrGossiperShuttingDown
50✔
927
        }
50✔
928

50✔
929
        return nMsg.err
50✔
930
}
×
931

×
932
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
933
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
934
// tuple.
50✔
935
type channelUpdateID struct {
936
        // channelID represents the set of data which is needed to
937
        // retrieve all necessary data to validate the channel existence.
938
        channelID lnwire.ShortChannelID
939

940
        // Flags least-significant bit must be set to 0 if the creating node
941
        // corresponds to the first node in the previously sent channel
942
        // announcement and 1 otherwise.
943
        flags lnwire.ChanUpdateChanFlags
944
}
945

946
// msgWithSenders is a wrapper struct around a message, and the set of peers
947
// that originally sent us this message. Using this struct, we can ensure that
948
// we don't re-send a message to the peer that sent it to us in the first
949
// place.
950
type msgWithSenders struct {
951
        // msg is the wire message itself.
952
        msg lnwire.Message
953

954
        // isLocal is true if this was a message that originated locally. We'll
955
        // use this to bypass our normal checks to ensure we prioritize sending
956
        // out our own updates.
957
        isLocal bool
958

959
        // sender is the set of peers that sent us this message.
960
        senders map[route.Vertex]struct{}
961
}
962

963
// mergeSyncerMap is used to merge the set of senders of a particular message
964
// with peers that we have an active GossipSyncer with. We do this to ensure
965
// that we don't broadcast messages to any peers that we have active gossip
966
// syncers for.
967
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
968
        for peerPub := range syncers {
969
                m.senders[peerPub] = struct{}{}
970
        }
971
}
972

27✔
973
// deDupedAnnouncements de-duplicates announcements that have been added to the
30✔
974
// batch. Internally, announcements are stored in three maps
3✔
975
// (one each for channel announcements, channel updates, and node
3✔
976
// announcements). These maps keep track of unique announcements and ensure no
977
// announcements are duplicated. We keep the three message types separate, such
978
// that we can send channel announcements first, then channel updates, and
979
// finally node announcements when it's time to broadcast them.
980
type deDupedAnnouncements struct {
981
        // channelAnnouncements are identified by the short channel id field.
982
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
983

984
        // channelUpdates are identified by the channel update id field.
985
        channelUpdates map[channelUpdateID]msgWithSenders
986

987
        // nodeAnnouncements are identified by the Vertex field.
988
        nodeAnnouncements map[route.Vertex]msgWithSenders
989

990
        sync.Mutex
991
}
992

993
// Reset operates on deDupedAnnouncements to reset the storage of
994
// announcements.
995
func (d *deDupedAnnouncements) Reset() {
996
        d.Lock()
997
        defer d.Unlock()
998

999
        d.reset()
1000
}
32✔
1001

32✔
1002
// reset is the private version of the Reset method. We have this so we can
32✔
1003
// call this method within method that are already holding the lock.
32✔
1004
func (d *deDupedAnnouncements) reset() {
32✔
1005
        // Storage of each type of announcement (channel announcements, channel
32✔
1006
        // updates, node announcements) is set to an empty map where the
1007
        // appropriate key points to the corresponding lnwire.Message.
1008
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
1009
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
319✔
1010
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
319✔
1011
}
319✔
1012

319✔
1013
// addMsg adds a new message to the current batch. If the message is already
319✔
1014
// present in the current batch, then this new instance replaces the latter,
319✔
1015
// and the set of senders is updated to reflect which node sent us this
319✔
1016
// message.
319✔
1017
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
1018
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
1019

1020
        // Depending on the message type (channel announcement, channel update,
1021
        // or node announcement), the message is added to the corresponding map
1022
        // in deDupedAnnouncements. Because each identifying key can have at
89✔
1023
        // most one value, the announcements are de-duplicated, with newer ones
89✔
1024
        // replacing older ones.
89✔
1025
        switch msg := message.msg.(type) {
89✔
1026

89✔
1027
        // Channel announcements are identified by the short channel id field.
89✔
1028
        case *lnwire.ChannelAnnouncement1:
89✔
1029
                deDupKey := msg.ShortChannelID
89✔
1030
                sender := route.NewVertex(message.source)
89✔
1031

1032
                mws, ok := d.channelAnnouncements[deDupKey]
1033
                if !ok {
25✔
1034
                        mws = msgWithSenders{
25✔
1035
                                msg:     msg,
25✔
1036
                                isLocal: !message.isRemote,
25✔
1037
                                senders: make(map[route.Vertex]struct{}),
25✔
1038
                        }
49✔
1039
                        mws.senders[sender] = struct{}{}
24✔
1040

24✔
1041
                        d.channelAnnouncements[deDupKey] = mws
24✔
1042

24✔
1043
                        return
24✔
1044
                }
24✔
1045

24✔
1046
                mws.msg = msg
24✔
1047
                mws.senders[sender] = struct{}{}
24✔
1048
                d.channelAnnouncements[deDupKey] = mws
24✔
1049

24✔
1050
        // Channel updates are identified by the (short channel id,
1051
        // channelflags) tuple.
1✔
1052
        case *lnwire.ChannelUpdate1:
1✔
1053
                sender := route.NewVertex(message.source)
1✔
1054
                deDupKey := channelUpdateID{
1055
                        msg.ShortChannelID,
1056
                        msg.ChannelFlags,
1057
                }
45✔
1058

45✔
1059
                oldTimestamp := uint32(0)
45✔
1060
                mws, ok := d.channelUpdates[deDupKey]
45✔
1061
                if ok {
45✔
1062
                        // If we already have seen this message, record its
45✔
1063
                        // timestamp.
45✔
1064
                        update, ok := mws.msg.(*lnwire.ChannelUpdate1)
45✔
1065
                        if !ok {
45✔
1066
                                log.Errorf("Expected *lnwire.ChannelUpdate1, "+
48✔
1067
                                        "got: %T", mws.msg)
3✔
1068

3✔
1069
                                return
3✔
1070
                        }
3✔
1071

×
1072
                        oldTimestamp = update.Timestamp
×
1073
                }
×
1074

×
1075
                // If we already had this message with a strictly newer
×
1076
                // timestamp, then we'll just discard the message we got.
1077
                if oldTimestamp > msg.Timestamp {
3✔
1078
                        log.Debugf("Ignored outdated network message: "+
1079
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1080
                        return
1081
                }
1082

46✔
1083
                // If the message we just got is newer than what we previously
1✔
1084
                // have seen, or this is the first time we see it, then we'll
1✔
1085
                // add it to our map of announcements.
1✔
1086
                if oldTimestamp < msg.Timestamp {
1✔
1087
                        mws = msgWithSenders{
1088
                                msg:     msg,
1089
                                isLocal: !message.isRemote,
1090
                                senders: make(map[route.Vertex]struct{}),
1091
                        }
87✔
1092

43✔
1093
                        // We'll mark the sender of the message in the
43✔
1094
                        // senders map.
43✔
1095
                        mws.senders[sender] = struct{}{}
43✔
1096

43✔
1097
                        d.channelUpdates[deDupKey] = mws
43✔
1098

43✔
1099
                        return
43✔
1100
                }
43✔
1101

43✔
1102
                // Lastly, if we had seen this exact message from before, with
43✔
1103
                // the same timestamp, we'll add the sender to the map of
43✔
1104
                // senders, such that we can skip sending this message back in
43✔
1105
                // the next batch.
43✔
1106
                mws.msg = msg
1107
                mws.senders[sender] = struct{}{}
1108
                d.channelUpdates[deDupKey] = mws
1109

1110
        // Node announcements are identified by the Vertex field.  Use the
1111
        // NodeID to create the corresponding Vertex.
1✔
1112
        case *lnwire.NodeAnnouncement:
1✔
1113
                sender := route.NewVertex(message.source)
1✔
1114
                deDupKey := route.Vertex(msg.NodeID)
1115

1116
                // We do the same for node announcements as we did for channel
1117
                // updates, as they also carry a timestamp.
25✔
1118
                oldTimestamp := uint32(0)
25✔
1119
                mws, ok := d.nodeAnnouncements[deDupKey]
25✔
1120
                if ok {
25✔
1121
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
25✔
1122
                }
25✔
1123

25✔
1124
                // Discard the message if it's old.
25✔
1125
                if oldTimestamp > msg.Timestamp {
33✔
1126
                        return
8✔
1127
                }
8✔
1128

1129
                // Replace if it's newer.
1130
                if oldTimestamp < msg.Timestamp {
28✔
1131
                        mws = msgWithSenders{
3✔
1132
                                msg:     msg,
3✔
1133
                                isLocal: !message.isRemote,
1134
                                senders: make(map[route.Vertex]struct{}),
1135
                        }
46✔
1136

21✔
1137
                        mws.senders[sender] = struct{}{}
21✔
1138

21✔
1139
                        d.nodeAnnouncements[deDupKey] = mws
21✔
1140

21✔
1141
                        return
21✔
1142
                }
21✔
1143

21✔
1144
                // Add to senders map if it's the same as we had.
21✔
1145
                mws.msg = msg
21✔
1146
                mws.senders[sender] = struct{}{}
21✔
1147
                d.nodeAnnouncements[deDupKey] = mws
21✔
1148
        }
1149
}
1150

7✔
1151
// AddMsgs is a helper method to add multiple messages to the announcement
7✔
1152
// batch.
7✔
1153
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
1154
        d.Lock()
1155
        defer d.Unlock()
1156

1157
        for _, msg := range msgs {
1158
                d.addMsg(msg)
57✔
1159
        }
57✔
1160
}
57✔
1161

57✔
1162
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
146✔
1163
// to broadcast next into messages that are locally sourced and those that are
89✔
1164
// sourced remotely.
89✔
1165
type msgsToBroadcast struct {
1166
        // localMsgs is the set of messages we created locally.
1167
        localMsgs []msgWithSenders
1168

1169
        // remoteMsgs is the set of messages that we received from a remote
1170
        // party.
1171
        remoteMsgs []msgWithSenders
1172
}
1173

1174
// addMsg adds a new message to the appropriate sub-slice.
1175
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
1176
        if msg.isLocal {
1177
                m.localMsgs = append(m.localMsgs, msg)
1178
        } else {
1179
                m.remoteMsgs = append(m.remoteMsgs, msg)
1180
        }
74✔
1181
}
124✔
1182

50✔
1183
// isEmpty returns true if the batch is empty.
77✔
1184
func (m *msgsToBroadcast) isEmpty() bool {
27✔
1185
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
27✔
1186
}
1187

1188
// length returns the length of the combined message set.
1189
func (m *msgsToBroadcast) length() int {
289✔
1190
        return len(m.localMsgs) + len(m.remoteMsgs)
289✔
1191
}
289✔
1192

1193
// Emit returns the set of de-duplicated announcements to be sent out during
1194
// the next announcement epoch, in the order of channel announcements, channel
1✔
1195
// updates, and node announcements. Each message emitted, contains the set of
1✔
1196
// peers that sent us the message. This way, we can ensure that we don't waste
1✔
1197
// bandwidth by re-sending a message to the peer that sent it to us in the
1198
// first place. Additionally, the set of stored messages are reset.
1199
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
1200
        d.Lock()
1201
        defer d.Unlock()
1202

1203
        // Get the total number of announcements.
1204
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
290✔
1205
                len(d.nodeAnnouncements)
290✔
1206

290✔
1207
        // Create an empty array of lnwire.Messages with a length equal to
290✔
1208
        // the total number of announcements.
290✔
1209
        msgs := msgsToBroadcast{
290✔
1210
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
290✔
1211
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
290✔
1212
        }
290✔
1213

290✔
1214
        // Add the channel announcements to the array first.
290✔
1215
        for _, message := range d.channelAnnouncements {
290✔
1216
                msgs.addMsg(message)
290✔
1217
        }
290✔
1218

290✔
1219
        // Then add the channel updates.
290✔
1220
        for _, message := range d.channelUpdates {
311✔
1221
                msgs.addMsg(message)
21✔
1222
        }
21✔
1223

1224
        // Finally add the node announcements.
1225
        for _, message := range d.nodeAnnouncements {
329✔
1226
                msgs.addMsg(message)
39✔
1227
        }
39✔
1228

1229
        d.reset()
1230

310✔
1231
        // Return the array of lnwire.messages.
20✔
1232
        return msgs
20✔
1233
}
1234

290✔
1235
// calculateSubBatchSize is a helper function that calculates the size to break
290✔
1236
// down the batchSize into.
290✔
1237
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
290✔
1238
        minimumBatchSize, batchSize int) int {
1239
        if subBatchDelay > totalDelay {
1240
                return batchSize
1241
        }
1242

1243
        subBatchSize := (batchSize*int(subBatchDelay) +
16✔
1244
                int(totalDelay) - 1) / int(totalDelay)
18✔
1245

2✔
1246
        if subBatchSize < minimumBatchSize {
2✔
1247
                return minimumBatchSize
1248
        }
14✔
1249

14✔
1250
        return subBatchSize
14✔
1251
}
18✔
1252

4✔
1253
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
4✔
1254
// this variable so the function can be mocked in our test.
1255
var batchSizeCalculator = calculateSubBatchSize
10✔
1256

1257
// splitAnnouncementBatches takes an exiting list of announcements and
1258
// decomposes it into sub batches controlled by the `subBatchSize`.
1259
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1260
        announcementBatch []msgWithSenders) [][]msgWithSenders {
1261

1262
        subBatchSize := batchSizeCalculator(
1263
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
1264
                d.cfg.MinimumBatchSize, len(announcementBatch),
1265
        )
72✔
1266

72✔
1267
        var splitAnnouncementBatch [][]msgWithSenders
72✔
1268

72✔
1269
        for subBatchSize < len(announcementBatch) {
72✔
1270
                // For slicing with minimal allocation
72✔
1271
                // https://github.com/golang/go/wiki/SliceTricks
72✔
1272
                announcementBatch, splitAnnouncementBatch =
72✔
1273
                        announcementBatch[subBatchSize:],
72✔
1274
                        append(splitAnnouncementBatch,
196✔
1275
                                announcementBatch[0:subBatchSize:subBatchSize])
124✔
1276
        }
124✔
1277
        splitAnnouncementBatch = append(
124✔
1278
                splitAnnouncementBatch, announcementBatch,
124✔
1279
        )
124✔
1280

124✔
1281
        return splitAnnouncementBatch
124✔
1282
}
72✔
1283

72✔
1284
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
72✔
1285
// split size, and then sends out all items to the set of target peers. Locally
72✔
1286
// generated announcements are always sent before remotely generated
72✔
1287
// announcements.
1288
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
1289
        annBatch msgsToBroadcast) {
1290

1291
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
1292
        // duration to delay the sending of next announcement batch.
1293
        delayNextBatch := func() {
1294
                select {
34✔
1295
                case <-time.After(d.cfg.SubBatchDelay):
34✔
1296
                case <-d.quit:
34✔
1297
                        return
34✔
1298
                }
99✔
1299
        }
65✔
1300

48✔
1301
        // Fetch the local and remote announcements.
17✔
1302
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
17✔
1303
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
1304

1305
        d.wg.Add(1)
1306
        go func() {
1307
                defer d.wg.Done()
34✔
1308

34✔
1309
                log.Debugf("Broadcasting %v new local announcements in %d "+
34✔
1310
                        "sub batches", len(annBatch.localMsgs),
34✔
1311
                        len(localBatches))
68✔
1312

34✔
1313
                // Send out the local announcements first.
34✔
1314
                for _, annBatch := range localBatches {
34✔
1315
                        d.sendLocalBatch(annBatch)
34✔
1316
                        delayNextBatch()
34✔
1317
                }
34✔
1318

34✔
1319
                log.Debugf("Broadcasting %v new remote announcements in %d "+
68✔
1320
                        "sub batches", len(annBatch.remoteMsgs),
34✔
1321
                        len(remoteBatches))
34✔
1322

34✔
1323
                // Now send the remote announcements.
1324
                for _, annBatch := range remoteBatches {
34✔
1325
                        d.sendRemoteBatch(annBatch)
34✔
1326
                        delayNextBatch()
34✔
1327
                }
34✔
1328
        }()
34✔
1329
}
68✔
1330

34✔
1331
// sendLocalBatch broadcasts a list of locally generated announcements to our
34✔
1332
// peers. For local announcements, we skip the filter and dedup logic and just
34✔
1333
// send the announcements out to all our coonnected peers.
1334
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
1335
        msgsToSend := lnutils.Map(
1336
                annBatch, func(m msgWithSenders) lnwire.Message {
1337
                        return m.msg
1338
                },
1339
        )
34✔
1340

34✔
1341
        err := d.cfg.Broadcast(nil, msgsToSend...)
80✔
1342
        if err != nil {
46✔
1343
                log.Errorf("Unable to send local batch announcements: %v", err)
46✔
1344
        }
1345
}
1346

34✔
1347
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
34✔
1348
// peers.
×
1349
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
×
1350
        syncerPeers := d.syncMgr.GossipSyncers()
1351

1352
        // We'll first attempt to filter out this new message for all peers
1353
        // that have active gossip syncers active.
1354
        for pub, syncer := range syncerPeers {
34✔
1355
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
34✔
1356
                syncer.FilterGossipMsgs(annBatch...)
34✔
1357
        }
34✔
1358

34✔
1359
        for _, msgChunk := range annBatch {
37✔
1360
                msgChunk := msgChunk
3✔
1361

3✔
1362
                // With the syncers taken care of, we'll merge the sender map
3✔
1363
                // with the set of syncers, so we don't send out duplicate
1364
                // messages.
61✔
1365
                msgChunk.mergeSyncerMap(syncerPeers)
27✔
1366

27✔
1367
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
27✔
1368
                if err != nil {
27✔
1369
                        log.Errorf("Unable to send batch "+
27✔
1370
                                "announcements: %v", err)
27✔
1371
                        continue
27✔
1372
                }
27✔
1373
        }
27✔
1374
}
×
1375

×
1376
// networkHandler is the primary goroutine that drives this service. The roles
×
1377
// of this goroutine includes answering queries related to the state of the
1378
// network, syncing up newly connected peers, and also periodically
1379
// broadcasting our latest topology state to all connected peers.
1380
//
1381
// NOTE: This MUST be run as a goroutine.
1382
func (d *AuthenticatedGossiper) networkHandler() {
1383
        defer d.wg.Done()
1384

1385
        // Initialize empty deDupedAnnouncements to store announcement batch.
1386
        announcements := deDupedAnnouncements{}
1387
        announcements.Reset()
30✔
1388

30✔
1389
        d.cfg.RetransmitTicker.Resume()
30✔
1390
        defer d.cfg.RetransmitTicker.Stop()
30✔
1391

30✔
1392
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
30✔
1393
        defer trickleTimer.Stop()
30✔
1394

30✔
1395
        // To start, we'll first check to see if there are any stale channel or
30✔
1396
        // node announcements that we need to re-transmit.
30✔
1397
        if err := d.retransmitStaleAnns(time.Now()); err != nil {
30✔
1398
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
30✔
1399
        }
30✔
1400

30✔
1401
        // We'll use this validation to ensure that we process jobs in their
30✔
1402
        // dependency order during parallel validation.
30✔
1403
        validationBarrier := graph.NewValidationBarrier(1000, d.quit)
×
1404

×
1405
        for {
1406
                select {
1407
                // A new policy update has arrived. We'll commit it to the
1408
                // sub-systems below us, then craft, sign, and broadcast a new
30✔
1409
                // ChannelUpdate for the set of affected clients.
30✔
1410
                case policyUpdate := <-d.chanPolicyUpdates:
679✔
1411
                        log.Tracef("Received channel %d policy update requests",
649✔
1412
                                len(policyUpdate.edgesToUpdate))
1413

1414
                        // First, we'll now create new fully signed updates for
1415
                        // the affected channels and also update the underlying
4✔
1416
                        // graph with the new state.
4✔
1417
                        newChanUpdates, err := d.processChanPolicyUpdate(
4✔
1418
                                policyUpdate.edgesToUpdate,
4✔
1419
                        )
4✔
1420
                        policyUpdate.errChan <- err
4✔
1421
                        if err != nil {
4✔
1422
                                log.Errorf("Unable to craft policy updates: %v",
4✔
1423
                                        err)
4✔
1424
                                continue
4✔
1425
                        }
4✔
1426

4✔
1427
                        // Finally, with the updates committed, we'll now add
×
1428
                        // them to the announcement batch to be flushed at the
×
1429
                        // start of the next epoch.
×
1430
                        announcements.AddMsgs(newChanUpdates...)
1431

1432
                case announcement := <-d.networkMsgs:
1433
                        log.Tracef("Received network message: "+
1434
                                "peer=%v, msg=%s, is_remote=%v",
1435
                                announcement.peer, announcement.msg.MsgType(),
4✔
1436
                                announcement.isRemote)
1437

334✔
1438
                        switch announcement.msg.(type) {
334✔
1439
                        // Channel announcement signatures are amongst the only
334✔
1440
                        // messages that we'll process serially.
334✔
1441
                        case *lnwire.AnnounceSignatures1:
334✔
1442
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
334✔
1443
                                        announcement,
334✔
1444
                                )
1445
                                log.Debugf("Processed network message %s, "+
1446
                                        "returned len(announcements)=%v",
24✔
1447
                                        announcement.msg.MsgType(),
24✔
1448
                                        len(emittedAnnouncements))
24✔
1449

24✔
1450
                                if emittedAnnouncements != nil {
24✔
1451
                                        announcements.AddMsgs(
24✔
1452
                                                emittedAnnouncements...,
24✔
1453
                                        )
24✔
1454
                                }
24✔
1455
                                continue
37✔
1456
                        }
13✔
1457

13✔
1458
                        // If this message was recently rejected, then we won't
13✔
1459
                        // attempt to re-process it.
13✔
1460
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
24✔
1461
                                announcement.msg,
1462
                                sourceToPub(announcement.source),
1463
                        ) {
1464

1465
                                announcement.err <- fmt.Errorf("recently " +
313✔
1466
                                        "rejected")
313✔
1467
                                continue
313✔
1468
                        }
314✔
1469

1✔
1470
                        // We'll set up any dependent, and wait until a free
1✔
1471
                        // slot for this job opens up, this allow us to not
1✔
1472
                        // have thousands of goroutines active.
1✔
1473
                        validationBarrier.InitJobDependencies(announcement.msg)
1474

1475
                        d.wg.Add(1)
1476
                        go d.handleNetworkMessages(
1477
                                announcement, &announcements, validationBarrier,
1478
                        )
312✔
1479

312✔
1480
                // The trickle timer has ticked, which indicates we should
312✔
1481
                // flush to the network the pending batch of new announcements
312✔
1482
                // we've received since the last trickle tick.
312✔
1483
                case <-trickleTimer.C:
312✔
1484
                        // Emit the current batch of announcements from
1485
                        // deDupedAnnouncements.
1486
                        announcementBatch := announcements.Emit()
1487

1488
                        // If the current announcements batch is nil, then we
289✔
1489
                        // have no further work here.
289✔
1490
                        if announcementBatch.isEmpty() {
289✔
1491
                                continue
289✔
1492
                        }
289✔
1493

289✔
1494
                        // At this point, we have the set of local and remote
289✔
1495
                        // announcements we want to send out. We'll do the
547✔
1496
                        // batching as normal for both, but for local
258✔
1497
                        // announcements, we'll blast them out w/o regard for
1498
                        // our peer's policies so we ensure they propagate
1499
                        // properly.
1500
                        d.splitAndSendAnnBatch(announcementBatch)
1501

1502
                // The retransmission timer has ticked which indicates that we
1503
                // should check if we need to prune or re-broadcast any of our
1504
                // personal channels or node announcement. This addresses the
1505
                // case of "zombie" channels and channel advertisements that
34✔
1506
                // have been dropped, or not properly propagated through the
1507
                // network.
1508
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1509
                        if err := d.retransmitStaleAnns(tick); err != nil {
1510
                                log.Errorf("unable to rebroadcast stale "+
1511
                                        "announcements: %v", err)
1512
                        }
1513

1✔
1514
                // The gossiper has been signalled to exit, to we exit our
1✔
1515
                // main loop so the wait group can be decremented.
×
1516
                case <-d.quit:
×
1517
                        return
×
1518
                }
1519
        }
1520
}
1521

30✔
1522
// handleNetworkMessages is responsible for waiting for dependencies for a
30✔
1523
// given network message and processing the message. Once processed, it will
1524
// signal its dependants and add the new announcements to the announce batch.
1525
//
1526
// NOTE: must be run as a goroutine.
1527
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1528
        deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {
1529

1530
        defer d.wg.Done()
1531
        defer vb.CompleteJob()
1532

1533
        // We should only broadcast this message forward if it originated from
312✔
1534
        // us or it wasn't received as part of our initial historical sync.
312✔
1535
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
312✔
1536

312✔
1537
        // If this message has an existing dependency, then we'll wait until
312✔
1538
        // that has been fully validated before we proceed.
312✔
1539
        err := vb.WaitForDependants(nMsg.msg)
312✔
1540
        if err != nil {
312✔
1541
                log.Debugf("Validating network message %s got err: %v",
312✔
1542
                        nMsg.msg.MsgType(), err)
312✔
1543

312✔
1544
                if !graph.IsError(
312✔
1545
                        err,
312✔
1546
                        graph.ErrVBarrierShuttingDown,
×
1547
                        graph.ErrParentValidationFailed,
×
1548
                ) {
×
1549

×
1550
                        log.Warnf("unexpected error during validation "+
×
1551
                                "barrier shutdown: %v", err)
×
1552
                }
×
1553
                nMsg.err <- err
×
1554

×
1555
                return
×
1556
        }
×
1557

×
1558
        // Process the network announcement to determine if this is either a
×
1559
        // new announcement from our PoV or an edges to a prior vertex/edge we
×
1560
        // previously proceeded.
×
1561
        newAnns, allow := d.processNetworkAnnouncement(nMsg)
1562

1563
        log.Tracef("Processed network message %s, returned "+
1564
                "len(announcements)=%v, allowDependents=%v",
1565
                nMsg.msg.MsgType(), len(newAnns), allow)
1566

312✔
1567
        // If this message had any dependencies, then we can now signal them to
312✔
1568
        // continue.
312✔
1569
        vb.SignalDependants(nMsg.msg, allow)
312✔
1570

312✔
1571
        // If the announcement was accepted, then add the emitted announcements
312✔
1572
        // to our announce batch to be broadcast once the trickle timer ticks
312✔
1573
        // gain.
312✔
1574
        if newAnns != nil && shouldBroadcast {
312✔
1575
                // TODO(roasbeef): exclude peer that sent.
312✔
1576
                deDuped.AddMsgs(newAnns...)
312✔
1577
        } else if newAnns != nil {
312✔
1578
                log.Trace("Skipping broadcast of announcements received " +
312✔
1579
                        "during initial graph sync")
347✔
1580
        }
35✔
1581
}
35✔
1582

319✔
1583
// TODO(roasbeef): d/c peers that send updates not on our chain
4✔
1584

4✔
1585
// InitSyncState is called by outside sub-systems when a connection is
4✔
1586
// established to a new peer that understands how to perform channel range
1587
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1588
// needed to handle new queries.
1589
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
1590
        d.syncMgr.InitSyncState(syncPeer)
1591
}
1592

1593
// PruneSyncState is called by outside sub-systems once a peer that we were
1594
// previously connected to has been disconnected. In this case we can stop the
3✔
1595
// existing GossipSyncer assigned to the peer and free up resources.
3✔
1596
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
3✔
1597
        d.syncMgr.PruneSyncState(peer)
1598
}
1599

1600
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1601
// false otherwise, This avoids expensive reprocessing of the message.
3✔
1602
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
3✔
1603
        peerPub [33]byte) bool {
3✔
1604

1605
        var scid uint64
1606
        switch m := msg.(type) {
1607
        case *lnwire.ChannelUpdate1:
1608
                scid = m.ShortChannelID.ToUint64()
276✔
1609

276✔
1610
        case *lnwire.ChannelAnnouncement1:
276✔
1611
                scid = m.ShortChannelID.ToUint64()
276✔
1612

45✔
1613
        default:
45✔
1614
                return false
1615
        }
220✔
1616

220✔
1617
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
1618
        return err != cache.ErrElementNotFound
17✔
1619
}
17✔
1620

1621
// retransmitStaleAnns examines all outgoing channels that the source node is
1622
// known to maintain to check to see if any of them are "stale". A channel is
262✔
1623
// stale iff, the last timestamp of its rebroadcast is older than the
262✔
1624
// RebroadcastInterval. We also check if a refreshed node announcement should
1625
// be resent.
1626
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
1627
        // Iterate over all of our channels and check if any of them fall
1628
        // within the prune interval or re-broadcast interval.
1629
        type updateTuple struct {
1630
                info *models.ChannelEdgeInfo
1631
                edge *models.ChannelEdgePolicy
31✔
1632
        }
31✔
1633

31✔
1634
        var (
31✔
1635
                havePublicChannels bool
31✔
1636
                edgesToUpdate      []updateTuple
31✔
1637
        )
31✔
1638
        err := d.cfg.Graph.ForAllOutgoingChannels(func(
31✔
1639
                info *models.ChannelEdgeInfo,
31✔
1640
                edge *models.ChannelEdgePolicy) error {
31✔
1641

31✔
1642
                // If there's no auth proof attached to this edge, it means
31✔
1643
                // that it is a private channel not meant to be announced to
31✔
1644
                // the greater network, so avoid sending channel updates for
31✔
1645
                // this channel to not leak its
36✔
1646
                // existence.
5✔
1647
                if info.AuthProof == nil {
5✔
1648
                        log.Debugf("Skipping retransmission of channel "+
5✔
1649
                                "without AuthProof: %v", info.ChannelID)
5✔
1650
                        return nil
5✔
1651
                }
5✔
1652

9✔
1653
                // We make a note that we have at least one public channel. We
4✔
1654
                // use this to determine whether we should send a node
4✔
1655
                // announcement below.
4✔
1656
                havePublicChannels = true
4✔
1657

1658
                // If this edge has a ChannelUpdate that was created before the
1659
                // introduction of the MaxHTLC field, then we'll update this
1660
                // edge to propagate this information in the network.
1661
                if !edge.MessageFlags.HasMaxHtlc() {
4✔
1662
                        // We'll make sure we support the new max_htlc field if
4✔
1663
                        // not already present.
4✔
1664
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
4✔
1665
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
4✔
1666

4✔
1667
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
1668
                                info: info,
×
1669
                                edge: edge,
×
1670
                        })
×
1671
                        return nil
×
1672
                }
×
1673

×
1674
                timeElapsed := now.Sub(edge.LastUpdate)
×
1675

×
1676
                // If it's been longer than RebroadcastInterval since we've
×
1677
                // re-broadcasted the channel, add the channel to the set of
×
1678
                // edges we need to update.
1679
                if timeElapsed >= d.cfg.RebroadcastInterval {
4✔
1680
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
4✔
1681
                                info: info,
4✔
1682
                                edge: edge,
4✔
1683
                        })
4✔
1684
                }
5✔
1685

1✔
1686
                return nil
1✔
1687
        })
1✔
1688
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
1✔
1689
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
1✔
1690
                        err)
1691
        }
4✔
1692

1693
        var signedUpdates []lnwire.Message
31✔
1694
        for _, chanToUpdate := range edgesToUpdate {
×
1695
                // Re-sign and update the channel on disk and retrieve our
×
1696
                // ChannelUpdate to broadcast.
×
1697
                chanAnn, chanUpdate, err := d.updateChannel(
1698
                        chanToUpdate.info, chanToUpdate.edge,
31✔
1699
                )
32✔
1700
                if err != nil {
1✔
1701
                        return fmt.Errorf("unable to update channel: %w", err)
1✔
1702
                }
1✔
1703

1✔
1704
                // If we have a valid announcement to transmit, then we'll send
1✔
1705
                // that along with the update.
1✔
1706
                if chanAnn != nil {
×
1707
                        signedUpdates = append(signedUpdates, chanAnn)
×
1708
                }
1709

1710
                signedUpdates = append(signedUpdates, chanUpdate)
1711
        }
2✔
1712

1✔
1713
        // If we don't have any public channels, we return as we don't want to
1✔
1714
        // broadcast anything that would reveal our existence.
1715
        if !havePublicChannels {
1✔
1716
                return nil
1717
        }
1718

1719
        // We'll also check that our NodeAnnouncement is not too old.
1720
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
61✔
1721
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
30✔
1722
        timeElapsed := now.Sub(timestamp)
30✔
1723

1724
        // If it's been a full day since we've re-broadcasted the
1725
        // node announcement, refresh it and resend it.
4✔
1726
        nodeAnnStr := ""
4✔
1727
        if timeElapsed >= d.cfg.RebroadcastInterval {
4✔
1728
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
4✔
1729
                if err != nil {
4✔
1730
                        return fmt.Errorf("unable to get refreshed node "+
4✔
1731
                                "announcement: %v", err)
4✔
1732
                }
5✔
1733

1✔
1734
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1735
                nodeAnnStr = " and our refreshed node announcement"
×
1736

×
1737
                // Before broadcasting the refreshed node announcement, add it
×
1738
                // to our own graph.
1739
                if err := d.addNode(&newNodeAnn); err != nil {
1✔
1740
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1741
                                "to graph: %v", err)
1✔
1742
                }
1✔
1743
        }
1✔
1744

2✔
1745
        // If we don't have any updates to re-broadcast, then we'll exit
1✔
1746
        // early.
1✔
1747
        if len(signedUpdates) == 0 {
1✔
1748
                return nil
1749
        }
1750

1751
        log.Infof("Retransmitting %v outgoing channels%v",
1752
                len(edgesToUpdate), nodeAnnStr)
7✔
1753

3✔
1754
        // With all the wire announcements properly crafted, we'll broadcast
3✔
1755
        // our known outgoing channels to all our immediate peers.
1756
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
1757
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
1✔
1758
        }
1✔
1759

1✔
1760
        return nil
1✔
1761
}
1✔
1762

×
1763
// processChanPolicyUpdate generates a new set of channel updates for the
×
1764
// provided list of edges and updates the backing ChannelGraphSource.
1765
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
1✔
1766
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
1767

1768
        var chanUpdates []networkMsg
1769
        for _, edgeInfo := range edgesToUpdate {
1770
                // Now that we've collected all the channels we need to update,
1771
                // we'll re-sign and update the backing ChannelGraphSource, and
4✔
1772
                // retrieve our ChannelUpdate to broadcast.
4✔
1773
                _, chanUpdate, err := d.updateChannel(
4✔
1774
                        edgeInfo.Info, edgeInfo.Edge,
10✔
1775
                )
6✔
1776
                if err != nil {
6✔
1777
                        return nil, err
6✔
1778
                }
6✔
1779

6✔
1780
                // We'll avoid broadcasting any updates for private channels to
6✔
1781
                // avoid directly giving away their existence. Instead, we'll
6✔
1782
                // send the update directly to the remote party.
×
1783
                if edgeInfo.Info.AuthProof == nil {
×
1784
                        // If AuthProof is nil and an alias was found for this
1785
                        // ChannelID (meaning the option-scid-alias feature was
1786
                        // negotiated), we'll replace the ShortChannelID in the
1787
                        // update with the peer's alias. We do this after
1788
                        // updateChannel so that the alias isn't persisted to
10✔
1789
                        // the database.
4✔
1790
                        chanID := lnwire.NewChanIDFromOutPoint(
4✔
1791
                                edgeInfo.Info.ChannelPoint,
4✔
1792
                        )
4✔
1793

4✔
1794
                        var defaultAlias lnwire.ShortChannelID
4✔
1795
                        foundAlias, _ := d.cfg.GetAlias(chanID)
4✔
1796
                        if foundAlias != defaultAlias {
4✔
1797
                                chanUpdate.ShortChannelID = foundAlias
4✔
1798

4✔
1799
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
4✔
1800
                                if err != nil {
4✔
1801
                                        log.Errorf("Unable to sign alias "+
7✔
1802
                                                "update: %v", err)
3✔
1803
                                        continue
3✔
1804
                                }
3✔
1805

3✔
1806
                                lnSig, err := lnwire.NewSigFromSignature(sig)
×
1807
                                if err != nil {
×
1808
                                        log.Errorf("Unable to create sig: %v",
×
1809
                                                err)
1810
                                        continue
1811
                                }
3✔
1812

3✔
1813
                                chanUpdate.Signature = lnSig
×
1814
                        }
×
1815

×
1816
                        remotePubKey := remotePubFromChanInfo(
1817
                                edgeInfo.Info, chanUpdate.ChannelFlags,
1818
                        )
3✔
1819
                        err := d.reliableSender.sendMessage(
1820
                                chanUpdate, remotePubKey,
1821
                        )
4✔
1822
                        if err != nil {
4✔
1823
                                log.Errorf("Unable to reliably send %v for "+
4✔
1824
                                        "channel=%v to peer=%x: %v",
4✔
1825
                                        chanUpdate.MsgType(),
4✔
1826
                                        chanUpdate.ShortChannelID,
4✔
1827
                                        remotePubKey, err)
4✔
1828
                        }
×
1829
                        continue
×
1830
                }
×
1831

×
1832
                // We set ourselves as the source of this message to indicate
×
1833
                // that we shouldn't skip any peers when sending this message.
×
1834
                chanUpdates = append(chanUpdates, networkMsg{
4✔
1835
                        source:   d.selfKey,
1836
                        isRemote: false,
1837
                        msg:      chanUpdate,
1838
                })
1839
        }
5✔
1840

5✔
1841
        return chanUpdates, nil
5✔
1842
}
5✔
1843

5✔
1844
// remotePubFromChanInfo returns the public key of the remote peer given a
1845
// ChannelEdgeInfo that describe a channel we have with them.
1846
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
4✔
1847
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
1848

1849
        var remotePubKey [33]byte
1850
        switch {
1851
        case chanFlags&lnwire.ChanUpdateDirection == 0:
1852
                remotePubKey = chanInfo.NodeKey2Bytes
15✔
1853
        case chanFlags&lnwire.ChanUpdateDirection == 1:
15✔
1854
                remotePubKey = chanInfo.NodeKey1Bytes
15✔
1855
        }
15✔
1856

15✔
1857
        return remotePubKey
15✔
1858
}
3✔
1859

3✔
1860
// processRejectedEdge examines a rejected edge to see if we can extract any
1861
// new announcements from it.  An edge will get rejected if we already added
1862
// the same edge without AuthProof to the graph. If the received announcement
15✔
1863
// contains a proof, we can add this proof to our edge.  We can end up in this
1864
// situation in the case where we create a channel, but for some reason fail
1865
// to receive the remote peer's proof, while the remote peer is able to fully
1866
// assemble the proof and craft the ChannelAnnouncement.
1867
func (d *AuthenticatedGossiper) processRejectedEdge(
1868
        chanAnnMsg *lnwire.ChannelAnnouncement1,
1869
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
1870

1871
        // First, we'll fetch the state of the channel as we know if from the
1872
        // database.
1873
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
1874
                chanAnnMsg.ShortChannelID,
3✔
1875
        )
3✔
1876
        if err != nil {
3✔
1877
                return nil, err
3✔
1878
        }
3✔
1879

3✔
1880
        // The edge is in the graph, and has a proof attached, then we'll just
3✔
1881
        // reject it as normal.
3✔
1882
        if chanInfo.AuthProof != nil {
×
1883
                return nil, nil
×
1884
        }
1885

1886
        // Otherwise, this means that the edge is within the graph, but it
1887
        // doesn't yet have a proper proof attached. If we did not receive
6✔
1888
        // the proof such that we now can add it, there's nothing more we
3✔
1889
        // can do.
3✔
1890
        if proof == nil {
1891
                return nil, nil
1892
        }
1893

1894
        // We'll then create then validate the new fully assembled
1895
        // announcement.
×
1896
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1897
                proof, chanInfo, e1, e2,
×
1898
        )
1899
        if err != nil {
1900
                return nil, err
1901
        }
×
1902
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
×
1903
        if err != nil {
×
1904
                err := fmt.Errorf("assembled channel announcement proof "+
×
1905
                        "for shortChanID=%v isn't valid: %v",
×
1906
                        chanAnnMsg.ShortChannelID, err)
×
1907
                log.Error(err)
×
1908
                return nil, err
×
1909
        }
×
1910

×
1911
        // If everything checks out, then we'll add the fully assembled proof
×
1912
        // to the database.
×
1913
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
1914
        if err != nil {
×
1915
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
1916
                        chanAnnMsg.ShortChannelID, err)
1917
                log.Error(err)
1918
                return nil, err
×
1919
        }
×
1920

×
1921
        // As we now have a complete channel announcement for this channel,
×
1922
        // we'll construct the announcement so they can be broadcast out to all
×
1923
        // our peers.
×
1924
        announcements := make([]networkMsg, 0, 3)
×
1925
        announcements = append(announcements, networkMsg{
1926
                source: d.selfKey,
1927
                msg:    chanAnn,
1928
        })
1929
        if e1Ann != nil {
×
1930
                announcements = append(announcements, networkMsg{
×
1931
                        source: d.selfKey,
×
1932
                        msg:    e1Ann,
×
1933
                })
×
1934
        }
×
1935
        if e2Ann != nil {
×
1936
                announcements = append(announcements, networkMsg{
×
1937
                        source: d.selfKey,
×
1938
                        msg:    e2Ann,
×
1939
                })
×
1940

×
1941
        }
×
1942

×
1943
        return announcements, nil
×
1944
}
×
1945

×
1946
// fetchPKScript fetches the output script for the given SCID.
×
1947
func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
1948
        []byte, error) {
×
1949

1950
        return lnwallet.FetchPKScriptWithQuit(d.cfg.ChainIO, chanID, d.quit)
1951
}
1952

1953
// addNode processes the given node announcement, and adds it to our channel
×
1954
// graph.
×
1955
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
×
1956
        op ...batch.SchedulerOption) error {
×
1957

1958
        if err := graph.ValidateNodeAnn(msg); err != nil {
1959
                return fmt.Errorf("unable to validate node announcement: %w",
1960
                        err)
1961
        }
20✔
1962

20✔
1963
        timestamp := time.Unix(int64(msg.Timestamp), 0)
21✔
1964
        features := lnwire.NewFeatureVector(msg.Features, lnwire.Features)
1✔
1965
        node := &models.LightningNode{
1✔
1966
                HaveNodeAnnouncement: true,
1✔
1967
                LastUpdate:           timestamp,
1968
                Addresses:            msg.Addresses,
19✔
1969
                PubKeyBytes:          msg.NodeID,
19✔
1970
                Alias:                msg.Alias.String(),
19✔
1971
                AuthSigBytes:         msg.Signature.ToSignatureBytes(),
19✔
1972
                Features:             features,
19✔
1973
                Color:                msg.RGBColor,
19✔
1974
                ExtraOpaqueData:      msg.ExtraOpaqueData,
19✔
1975
        }
19✔
1976

19✔
1977
        return d.cfg.Graph.AddNode(node, op...)
19✔
1978
}
19✔
1979

19✔
1980
// isPremature decides whether a given network message has a block height+delta
19✔
1981
// value specified in the future. If so, the message will be added to the
19✔
1982
// future message map and be processed when the block height as reached.
19✔
1983
//
1984
// NOTE: must be used inside a lock.
1985
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
1986
        delta uint32, msg *networkMsg) bool {
1987
        // TODO(roasbeef) make height delta 6
1988
        //  * or configurable
1989

1990
        msgHeight := chanID.BlockHeight + delta
1991

282✔
1992
        // The message height is smaller or equal to our best known height,
282✔
1993
        // thus the message is mature.
282✔
1994
        if msgHeight <= d.bestHeight {
282✔
1995
                return false
282✔
1996
        }
282✔
1997

285✔
1998
        // Add the premature message to our future messages which will be
3✔
1999
        // resent once the block height has reached.
3✔
2000
        //
2001
        // Copy the networkMsgs since the old message's err chan will be
282✔
2002
        // consumed.
282✔
2003
        copied := &networkMsg{
282✔
2004
                peer:              msg.peer,
282✔
2005
                source:            msg.source,
563✔
2006
                msg:               msg.msg,
281✔
2007
                optionalMsgFields: msg.optionalMsgFields,
281✔
2008
                isRemote:          msg.isRemote,
2009
                err:               make(chan error, 1),
2010
        }
2011

2012
        // Create the cached message.
2013
        cachedMsg := &cachedFutureMsg{
2014
                msg:    copied,
4✔
2015
                height: msgHeight,
4✔
2016
        }
4✔
2017

4✔
2018
        // Increment the msg ID and add it to the cache.
4✔
2019
        nextMsgID := d.futureMsgs.nextMsgID()
4✔
2020
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
4✔
2021
        if err != nil {
4✔
2022
                log.Errorf("Adding future message got error: %v", err)
4✔
2023
        }
4✔
2024

4✔
2025
        log.Debugf("Network message: %v added to future messages for "+
4✔
2026
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
4✔
2027
                msgHeight, d.bestHeight)
4✔
2028

4✔
2029
        return true
4✔
2030
}
4✔
2031

4✔
2032
// processNetworkAnnouncement processes a new network relate authenticated
4✔
2033
// channel or node announcement or announcements proofs. If the announcement
×
2034
// didn't affect the internal state due to either being out of date, invalid,
×
2035
// or redundant, then nil is returned. Otherwise, the set of announcements will
2036
// be returned which should be broadcasted to the rest of the network. The
4✔
2037
// boolean returned indicates whether any dependents of the announcement should
4✔
2038
// attempt to be processed as well.
4✔
2039
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
4✔
2040
        nMsg *networkMsg) ([]networkMsg, bool) {
4✔
2041

2042
        // If this is a remote update, we set the scheduler option to lazily
2043
        // add it to the graph.
2044
        var schedulerOp []batch.SchedulerOption
2045
        if nMsg.isRemote {
2046
                schedulerOp = append(schedulerOp, batch.LazyAdd())
2047
        }
2048

2049
        switch msg := nMsg.msg.(type) {
2050
        // A new node announcement has arrived which either presents new
2051
        // information about a node in one of the channels we know about, or a
333✔
2052
        // updating previously advertised information.
333✔
2053
        case *lnwire.NodeAnnouncement:
333✔
2054
                return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
333✔
2055

333✔
2056
        // A new channel announcement has arrived, this indicates the
619✔
2057
        // *creation* of a new channel within the network. This only advertises
286✔
2058
        // the existence of a channel and not yet the routing policies in
286✔
2059
        // either direction of the channel.
2060
        case *lnwire.ChannelAnnouncement1:
333✔
2061
                return d.handleChanAnnouncement(nMsg, msg, schedulerOp)
2062

2063
        // A new authenticated channel edge update has arrived. This indicates
2064
        // that the directional information for an already known channel has
27✔
2065
        // been updated.
27✔
2066
        case *lnwire.ChannelUpdate1:
2067
                return d.handleChanUpdate(nMsg, msg, schedulerOp)
2068

2069
        // A new signature announcement has been received. This indicates
2070
        // willingness of nodes involved in the funding of a channel to
2071
        // announce this new channel to the rest of the world.
233✔
2072
        case *lnwire.AnnounceSignatures1:
233✔
2073
                return d.handleAnnSig(nMsg, msg)
2074

2075
        default:
2076
                err := errors.New("wrong type of the announcement")
2077
                nMsg.err <- err
58✔
2078
                return nil, false
58✔
2079
        }
2080
}
2081

2082
// processZombieUpdate determines whether the provided channel update should
2083
// resurrect a given zombie edge.
24✔
2084
//
24✔
2085
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2086
// should be inspected.
×
2087
func (d *AuthenticatedGossiper) processZombieUpdate(
×
2088
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
×
2089
        msg *lnwire.ChannelUpdate1) error {
×
2090

2091
        // The least-significant bit in the flag on the channel update tells us
2092
        // which edge is being updated.
2093
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
2094

2095
        // Since we've deemed the update as not stale above, before marking it
2096
        // live, we'll make sure it has been signed by the correct party. If we
2097
        // have both pubkeys, either party can resurrect the channel. If we've
2098
        // already marked this with the stricter, single-sided resurrection we
2099
        // will only have the pubkey of the node with the oldest timestamp.
2100
        var pubKey *btcec.PublicKey
3✔
2101
        switch {
3✔
2102
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
3✔
2103
                pubKey, _ = chanInfo.NodeKey1()
3✔
2104
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
3✔
2105
                pubKey, _ = chanInfo.NodeKey2()
3✔
2106
        }
3✔
2107
        if pubKey == nil {
3✔
2108
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
3✔
2109
                        "with chan_id=%v", msg.ShortChannelID)
3✔
2110
        }
3✔
2111

3✔
2112
        err := netann.VerifyChannelUpdateSignature(msg, pubKey)
3✔
2113
        if err != nil {
×
2114
                return fmt.Errorf("unable to verify channel "+
×
2115
                        "update signature: %v", err)
2✔
2116
        }
2✔
2117

2118
        // With the signature valid, we'll proceed to mark the
4✔
2119
        // edge as live and wait for the channel announcement to
1✔
2120
        // come through again.
1✔
2121
        err = d.cfg.Graph.MarkEdgeLive(scid)
1✔
2122
        switch {
2123
        case errors.Is(err, graphdb.ErrZombieEdgeNotFound):
2✔
2124
                log.Errorf("edge with chan_id=%v was not found in the "+
3✔
2125
                        "zombie index: %v", err)
1✔
2126

1✔
2127
                return nil
1✔
2128

2129
        case err != nil:
2130
                return fmt.Errorf("unable to remove edge with "+
2131
                        "chan_id=%v from zombie index: %v",
2132
                        msg.ShortChannelID, err)
1✔
2133

1✔
2134
        default:
×
2135
        }
×
2136

×
2137
        log.Debugf("Removed edge with chan_id=%v from zombie "+
×
2138
                "index", msg.ShortChannelID)
×
2139

2140
        return nil
×
2141
}
×
2142

×
2143
// fetchNodeAnn fetches the latest signed node announcement from our point of
×
2144
// view for the node with the given public key.
2145
func (d *AuthenticatedGossiper) fetchNodeAnn(
1✔
2146
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
2147

2148
        node, err := d.cfg.Graph.FetchLightningNode(pubKey)
1✔
2149
        if err != nil {
1✔
2150
                return nil, err
1✔
2151
        }
1✔
2152

2153
        return node.NodeAnnouncement(true)
2154
}
2155

2156
// isMsgStale determines whether a message retrieved from the backing
2157
// MessageStore is seen as stale by the current graph.
23✔
2158
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
23✔
2159
        switch msg := msg.(type) {
23✔
2160
        case *lnwire.AnnounceSignatures1:
29✔
2161
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
6✔
2162
                        msg.ShortChannelID,
6✔
2163
                )
2164

17✔
2165
                // If the channel cannot be found, it is most likely a leftover
2166
                // message for a channel that was closed, so we can consider it
2167
                // stale.
2168
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
2169
                        return true
15✔
2170
                }
15✔
2171
                if err != nil {
5✔
2172
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
5✔
2173
                                "%v", chanInfo.ChannelID, err)
5✔
2174
                        return false
5✔
2175
                }
5✔
2176

5✔
2177
                // If the proof exists in the graph, then we have successfully
5✔
2178
                // received the remote proof and assembled the full proof, so we
5✔
2179
                // can safely delete the local proof from the database.
8✔
2180
                return chanInfo.AuthProof != nil
3✔
2181

3✔
2182
        case *lnwire.ChannelUpdate1:
5✔
2183
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
×
2184

×
2185
                // If the channel cannot be found, it is most likely a leftover
×
2186
                // message for a channel that was closed, so we can consider it
×
2187
                // stale.
2188
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
2189
                        return true
2190
                }
2191
                if err != nil {
5✔
2192
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
2193
                                "%v", msg.ShortChannelID, err)
13✔
2194
                        return false
13✔
2195
                }
13✔
2196

13✔
2197
                // Otherwise, we'll retrieve the correct policy that we
13✔
2198
                // currently have stored within our graph to check if this
13✔
2199
                // message is stale by comparing its timestamp.
16✔
2200
                var p *models.ChannelEdgePolicy
3✔
2201
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
3✔
2202
                        p = p1
13✔
2203
                } else {
×
2204
                        p = p2
×
2205
                }
×
2206

×
2207
                // If the policy is still unknown, then we can consider this
2208
                // policy fresh.
2209
                if p == nil {
2210
                        return false
2211
                }
13✔
2212

26✔
2213
                timestamp := time.Unix(int64(msg.Timestamp), 0)
13✔
2214
                return p.LastUpdate.After(timestamp)
16✔
2215

3✔
2216
        default:
3✔
2217
                // We'll make sure to not mark any unsupported messages as stale
2218
                // to ensure they are not removed.
2219
                return false
2220
        }
13✔
2221
}
×
2222

×
2223
// updateChannel creates a new fully signed update for the channel, and updates
2224
// the underlying graph with the new state.
13✔
2225
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
13✔
2226
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
2227
        *lnwire.ChannelUpdate1, error) {
×
2228

×
2229
        // Parse the unsigned edge into a channel update.
×
2230
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
×
2231

2232
        // We'll generate a new signature over a digest of the channel
2233
        // announcement itself and update the timestamp to ensure it propagate.
2234
        err := netann.SignChannelUpdate(
2235
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
2236
                netann.ChanUpdSetTimestamp,
2237
        )
2238
        if err != nil {
7✔
2239
                return nil, nil, err
7✔
2240
        }
7✔
2241

7✔
2242
        // Next, we'll set the new signature in place, and update the reference
7✔
2243
        // in the backing slice.
7✔
2244
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
7✔
2245
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
7✔
2246

7✔
2247
        // To ensure that our signature is valid, we'll verify it ourself
7✔
2248
        // before committing it to the slice returned.
7✔
2249
        err = netann.ValidateChannelUpdateAnn(
7✔
2250
                d.selfKey, info.Capacity, chanUpdate,
×
2251
        )
×
2252
        if err != nil {
2253
                return nil, nil, fmt.Errorf("generated invalid channel "+
2254
                        "update sig: %v", err)
2255
        }
7✔
2256

7✔
2257
        // Finally, we'll write the new edge policy to disk.
7✔
2258
        if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
7✔
2259
                return nil, nil, err
7✔
2260
        }
7✔
2261

7✔
2262
        // We'll also create the original channel announcement so the two can
7✔
2263
        // be broadcast along side each other (if necessary), but only if we
7✔
2264
        // have a full channel announcement for this channel.
×
2265
        var chanAnn *lnwire.ChannelAnnouncement1
×
2266
        if info.AuthProof != nil {
×
2267
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
2268
                chanAnn = &lnwire.ChannelAnnouncement1{
2269
                        ShortChannelID:  chanID,
7✔
2270
                        NodeID1:         info.NodeKey1Bytes,
×
2271
                        NodeID2:         info.NodeKey2Bytes,
×
2272
                        ChainHash:       info.ChainHash,
2273
                        BitcoinKey1:     info.BitcoinKey1Bytes,
2274
                        Features:        lnwire.NewRawFeatureVector(),
2275
                        BitcoinKey2:     info.BitcoinKey2Bytes,
2276
                        ExtraOpaqueData: info.ExtraOpaqueData,
7✔
2277
                }
13✔
2278
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
6✔
2279
                        info.AuthProof.NodeSig1Bytes,
6✔
2280
                )
6✔
2281
                if err != nil {
6✔
2282
                        return nil, nil, err
6✔
2283
                }
6✔
2284
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
6✔
2285
                        info.AuthProof.NodeSig2Bytes,
6✔
2286
                )
6✔
2287
                if err != nil {
6✔
2288
                        return nil, nil, err
6✔
2289
                }
6✔
2290
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
6✔
2291
                        info.AuthProof.BitcoinSig1Bytes,
6✔
2292
                )
6✔
2293
                if err != nil {
×
2294
                        return nil, nil, err
×
2295
                }
6✔
2296
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
6✔
2297
                        info.AuthProof.BitcoinSig2Bytes,
6✔
2298
                )
6✔
2299
                if err != nil {
×
2300
                        return nil, nil, err
×
2301
                }
6✔
2302
        }
6✔
2303

6✔
2304
        return chanAnn, chanUpdate, err
6✔
2305
}
×
2306

×
2307
// SyncManager returns the gossiper's SyncManager instance.
6✔
2308
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
6✔
2309
        return d.syncMgr
6✔
2310
}
6✔
2311

×
2312
// IsKeepAliveUpdate determines whether this channel update is considered a
×
2313
// keep-alive update based on the previous channel update processed for the same
2314
// direction.
2315
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
7✔
2316
        prev *models.ChannelEdgePolicy) bool {
2317

2318
        // Both updates should be from the same direction.
2319
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
3✔
2320
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
3✔
2321

3✔
2322
                return false
2323
        }
2324

2325
        // The timestamp should always increase for a keep-alive update.
2326
        timestamp := time.Unix(int64(update.Timestamp), 0)
2327
        if !timestamp.After(prev.LastUpdate) {
17✔
2328
                return false
17✔
2329
        }
17✔
2330

17✔
2331
        // None of the remaining fields should change for a keep-alive update.
17✔
2332
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
×
2333
                return false
×
2334
        }
×
2335
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
2336
                return false
2337
        }
17✔
2338
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
20✔
2339
                return false
3✔
2340
        }
3✔
2341
        if update.TimeLockDelta != prev.TimeLockDelta {
2342
                return false
2343
        }
20✔
2344
        if update.HtlcMinimumMsat != prev.MinHTLC {
3✔
2345
                return false
3✔
2346
        }
32✔
2347
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
15✔
2348
                return false
15✔
2349
        }
8✔
2350
        if update.HtlcMaximumMsat != prev.MaxHTLC {
3✔
2351
                return false
3✔
2352
        }
5✔
2353
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
×
2354
                return false
×
2355
        }
5✔
2356
        return true
×
2357
}
×
2358

5✔
2359
// latestHeight returns the gossiper's latest height known of the chain.
×
2360
func (d *AuthenticatedGossiper) latestHeight() uint32 {
×
2361
        d.Lock()
5✔
2362
        defer d.Unlock()
×
2363
        return d.bestHeight
×
2364
}
8✔
2365

3✔
2366
// handleNodeAnnouncement processes a new node announcement.
3✔
2367
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
5✔
2368
        nodeAnn *lnwire.NodeAnnouncement,
2369
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
2370

2371
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
3✔
2372

3✔
2373
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
3✔
2374
                "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
3✔
2375

3✔
2376
        // We'll quickly ask the router if it already has a newer update for
2377
        // this node so we can skip validating signatures if not required.
2378
        if d.cfg.Graph.IsStaleNode(nodeAnn.NodeID, timestamp) {
2379
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
2380
                nMsg.err <- nil
27✔
2381
                return nil, true
27✔
2382
        }
27✔
2383

27✔
2384
        if err := d.addNode(nodeAnn, ops...); err != nil {
27✔
2385
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
27✔
2386
                        err)
27✔
2387

27✔
2388
                if !graph.IsError(
27✔
2389
                        err,
38✔
2390
                        graph.ErrOutdated,
11✔
2391
                        graph.ErrIgnored,
11✔
2392
                        graph.ErrVBarrierShuttingDown,
11✔
2393
                ) {
11✔
2394

2395
                        log.Error(err)
22✔
2396
                }
3✔
2397

3✔
2398
                nMsg.err <- err
3✔
2399
                return nil, false
3✔
2400
        }
3✔
2401

3✔
2402
        // In order to ensure we don't leak unadvertised nodes, we'll make a
3✔
2403
        // quick check to ensure this node intends to publicly advertise itself
3✔
2404
        // to the network.
3✔
2405
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
×
2406
        if err != nil {
×
2407
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
2408
                        nodeAnn.NodeID, err)
2409
                nMsg.err <- err
3✔
2410
                return nil, false
3✔
2411
        }
2412

2413
        var announcements []networkMsg
2414

2415
        // If it does, we'll add their announcement to our batch so that it can
2416
        // be broadcast to the rest of our peers.
19✔
2417
        if isPublic {
19✔
2418
                announcements = append(announcements, networkMsg{
×
2419
                        peer:     nMsg.peer,
×
2420
                        isRemote: nMsg.isRemote,
×
2421
                        source:   nMsg.source,
×
2422
                        msg:      nodeAnn,
×
2423
                })
2424
        } else {
19✔
2425
                log.Tracef("Skipping broadcasting node announcement for %x "+
19✔
2426
                        "due to being unadvertised", nodeAnn.NodeID)
19✔
2427
        }
19✔
2428

25✔
2429
        nMsg.err <- nil
6✔
2430
        // TODO(roasbeef): get rid of the above
6✔
2431

6✔
2432
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
6✔
2433
                "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
6✔
2434

6✔
2435
        return announcements, true
22✔
2436
}
16✔
2437

16✔
2438
// handleChanAnnouncement processes a new channel announcement.
16✔
2439
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
2440
        ann *lnwire.ChannelAnnouncement1,
19✔
2441
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
19✔
2442

19✔
2443
        scid := ann.ShortChannelID
19✔
2444

19✔
2445
        log.Debugf("Processing ChannelAnnouncement1: peer=%v, short_chan_id=%v",
19✔
2446
                nMsg.peer, scid.ToUint64())
19✔
2447

2448
        // We'll ignore any channel announcements that target any chain other
2449
        // than the set of chains we know of.
2450
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
2451
                err := fmt.Errorf("ignoring ChannelAnnouncement1 from chain=%v"+
2452
                        ", gossiper on chain=%v", ann.ChainHash,
233✔
2453
                        d.cfg.ChainHash)
233✔
2454
                log.Errorf(err.Error())
233✔
2455

233✔
2456
                key := newRejectCacheKey(
233✔
2457
                        scid.ToUint64(),
233✔
2458
                        sourceToPub(nMsg.source),
233✔
2459
                )
233✔
2460
                _, _ = d.recentRejects.Put(key, &cachedReject{})
233✔
2461

233✔
2462
                nMsg.err <- err
×
2463
                return nil, false
×
2464
        }
×
2465

×
2466
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
×
2467
        // reject the announcement. Since the router accepts alias SCIDs,
×
2468
        // not erroring out would be a DoS vector.
×
2469
        if nMsg.isRemote && d.cfg.IsAlias(scid) {
×
2470
                err := fmt.Errorf("ignoring remote alias channel=%v", scid)
×
2471
                log.Errorf(err.Error())
×
2472

×
2473
                key := newRejectCacheKey(
×
2474
                        scid.ToUint64(),
×
2475
                        sourceToPub(nMsg.source),
×
2476
                )
2477
                _, _ = d.recentRejects.Put(key, &cachedReject{})
2478

2479
                nMsg.err <- err
2480
                return nil, false
233✔
2481
        }
×
2482

×
2483
        // If the advertised inclusionary block is beyond our knowledge of the
×
2484
        // chain tip, then we'll ignore it for now.
×
2485
        d.Lock()
×
2486
        if nMsg.isRemote && d.isPremature(scid, 0, nMsg) {
×
2487
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
×
2488
                        "advertises height %v, only height %v is known",
×
2489
                        scid.ToUint64(), scid.BlockHeight, d.bestHeight)
×
2490
                d.Unlock()
×
2491
                nMsg.err <- nil
×
2492
                return nil, false
×
2493
        }
2494
        d.Unlock()
2495

2496
        // At this point, we'll now ask the router if this is a zombie/known
233✔
2497
        // edge. If so we can skip all the processing below.
234✔
2498
        if d.cfg.Graph.IsKnownEdge(scid) {
1✔
2499
                nMsg.err <- nil
1✔
2500
                return nil, true
1✔
2501
        }
1✔
2502

1✔
2503
        // Check if the channel is already closed in which case we can ignore
1✔
2504
        // it.
1✔
2505
        closed, err := d.cfg.ScidCloser.IsClosedScid(scid)
232✔
2506
        if err != nil {
232✔
2507
                log.Errorf("failed to check if scid %v is closed: %v", scid,
232✔
2508
                        err)
232✔
2509
                nMsg.err <- err
236✔
2510

4✔
2511
                return nil, false
4✔
2512
        }
4✔
2513

2514
        if closed {
2515
                err = fmt.Errorf("ignoring closed channel %v", scid)
2516
                log.Error(err)
231✔
2517

231✔
2518
                // If this is an announcement from us, we'll just ignore it.
×
2519
                if !nMsg.isRemote {
×
2520
                        nMsg.err <- err
×
2521
                        return nil, false
×
2522
                }
×
2523

×
2524
                // Increment the peer's ban score if they are sending closed
2525
                // channel announcements.
232✔
2526
                d.banman.incrementBanScore(nMsg.peer.PubKey())
1✔
2527

1✔
2528
                // If the peer is banned and not a channel peer, we'll
1✔
2529
                // disconnect them.
1✔
2530
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2531
                if dcErr != nil {
×
2532
                        log.Errorf("failed to check if we should disconnect "+
×
2533
                                "peer: %v", dcErr)
×
2534
                        nMsg.err <- dcErr
2535

2536
                        return nil, false
2537
                }
1✔
2538

1✔
2539
                if shouldDc {
1✔
2540
                        nMsg.peer.Disconnect(ErrPeerBanned)
1✔
2541
                }
1✔
2542

1✔
2543
                nMsg.err <- err
×
2544

×
2545
                return nil, false
×
2546
        }
×
2547

×
2548
        // If this is a remote channel announcement, then we'll validate all
×
2549
        // the signatures within the proof as it should be well formed.
2550
        var proof *models.ChannelAuthProof
1✔
2551
        if nMsg.isRemote {
×
2552
                err := netann.ValidateChannelAnn(ann, d.fetchPKScript)
×
2553
                if err != nil {
2554
                        err := fmt.Errorf("unable to validate announcement: "+
1✔
2555
                                "%v", err)
1✔
2556

1✔
2557
                        key := newRejectCacheKey(
2558
                                scid.ToUint64(),
2559
                                sourceToPub(nMsg.source),
2560
                        )
2561
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
230✔
2562

446✔
2563
                        log.Error(err)
216✔
2564
                        nMsg.err <- err
216✔
2565
                        return nil, false
×
2566
                }
×
2567

×
2568
                // If the proof checks out, then we'll save the proof itself to
×
2569
                // the database so we can fetch it later when gossiping with
×
2570
                // other nodes.
×
2571
                proof = &models.ChannelAuthProof{
×
2572
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
×
2573
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
×
2574
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
×
2575
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
×
2576
                }
×
2577
        }
×
2578

2579
        // With the proof validated (if necessary), we can now store it within
2580
        // the database for our path finding and syncing needs.
2581
        var featureBuf bytes.Buffer
2582
        if err := ann.Features.Encode(&featureBuf); err != nil {
216✔
2583
                log.Errorf("unable to encode features: %v", err)
216✔
2584
                nMsg.err <- err
216✔
2585
                return nil, false
216✔
2586
        }
216✔
2587

216✔
2588
        edge := &models.ChannelEdgeInfo{
2589
                ChannelID:        scid.ToUint64(),
2590
                ChainHash:        ann.ChainHash,
2591
                NodeKey1Bytes:    ann.NodeID1,
2592
                NodeKey2Bytes:    ann.NodeID2,
230✔
2593
                BitcoinKey1Bytes: ann.BitcoinKey1,
230✔
2594
                BitcoinKey2Bytes: ann.BitcoinKey2,
×
2595
                AuthProof:        proof,
×
2596
                Features:         featureBuf.Bytes(),
×
2597
                ExtraOpaqueData:  ann.ExtraOpaqueData,
×
2598
        }
2599

230✔
2600
        // If there were any optional message fields provided, we'll include
230✔
2601
        // them in its serialized disk representation now.
230✔
2602
        if nMsg.optionalMsgFields != nil {
230✔
2603
                if nMsg.optionalMsgFields.capacity != nil {
230✔
2604
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
230✔
2605
                }
230✔
2606
                if nMsg.optionalMsgFields.channelPoint != nil {
230✔
2607
                        cp := *nMsg.optionalMsgFields.channelPoint
230✔
2608
                        edge.ChannelPoint = cp
230✔
2609
                }
230✔
2610

230✔
2611
                // Optional tapscript root for custom channels.
230✔
2612
                edge.TapscriptRoot = nMsg.optionalMsgFields.tapscriptRoot
230✔
2613
        }
247✔
2614

21✔
2615
        log.Debugf("Adding edge for short_chan_id: %v", scid.ToUint64())
4✔
2616

4✔
2617
        // We will add the edge to the channel router. If the nodes present in
24✔
2618
        // this channel are not present in the database, a partial node will be
7✔
2619
        // added to represent each node while we wait for a node announcement.
7✔
2620
        //
7✔
2621
        // Before we add the edge to the database, we obtain the mutex for this
2622
        // channel ID. We do this to ensure no other goroutine has read the
2623
        // database and is now making decisions based on this DB state, before
17✔
2624
        // it writes to the DB.
2625
        d.channelMtx.Lock(scid.ToUint64())
2626
        err = d.cfg.Graph.AddEdge(edge, ops...)
230✔
2627
        if err != nil {
230✔
2628
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
230✔
2629
                        scid.ToUint64(), err)
230✔
2630

230✔
2631
                defer d.channelMtx.Unlock(scid.ToUint64())
230✔
2632

230✔
2633
                // If the edge was rejected due to already being known, then it
230✔
2634
                // may be the case that this new message has a fresh channel
230✔
2635
                // proof, so we'll check.
230✔
2636
                switch {
230✔
2637
                case graph.IsError(err, graph.ErrIgnored):
230✔
2638
                        // Attempt to process the rejected message to see if we
435✔
2639
                        // get any new announcements.
205✔
2640
                        anns, rErr := d.processRejectedEdge(ann, proof)
205✔
2641
                        if rErr != nil {
205✔
2642
                                key := newRejectCacheKey(
205✔
2643
                                        scid.ToUint64(),
205✔
2644
                                        sourceToPub(nMsg.source),
205✔
2645
                                )
205✔
2646
                                cr := &cachedReject{}
205✔
2647
                                _, _ = d.recentRejects.Put(key, cr)
205✔
2648

3✔
2649
                                nMsg.err <- rErr
3✔
2650
                                return nil, false
3✔
2651
                        }
3✔
2652

3✔
2653
                        log.Debugf("Extracted %v announcements from rejected "+
×
2654
                                "msgs", len(anns))
×
2655

×
2656
                        // If while processing this rejected edge, we realized
×
2657
                        // there's a set of announcements we could extract,
×
2658
                        // then we'll return those directly.
×
2659
                        //
×
2660
                        // NOTE: since this is an ErrIgnored, we can return
×
2661
                        // true here to signal "allow" to its dependants.
×
2662
                        nMsg.err <- nil
×
2663

2664
                        return anns, true
3✔
2665

3✔
2666
                case graph.IsError(
3✔
2667
                        err, graph.ErrNoFundingTransaction,
3✔
2668
                        graph.ErrInvalidFundingOutput,
3✔
2669
                ):
3✔
2670
                        key := newRejectCacheKey(
3✔
2671
                                scid.ToUint64(),
3✔
2672
                                sourceToPub(nMsg.source),
3✔
2673
                        )
3✔
2674
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
3✔
2675

3✔
2676
                        // Increment the peer's ban score. We check isRemote
2677
                        // so we don't actually ban the peer in case of a local
2678
                        // bug.
2679
                        if nMsg.isRemote {
2680
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
200✔
2681
                        }
200✔
2682

200✔
2683
                case graph.IsError(err, graph.ErrChannelSpent):
200✔
2684
                        key := newRejectCacheKey(
200✔
2685
                                scid.ToUint64(),
200✔
2686
                                sourceToPub(nMsg.source),
200✔
2687
                        )
200✔
2688
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
200✔
2689

200✔
2690
                        // Since this channel has already been closed, we'll
400✔
2691
                        // add it to the graph's closed channel index such that
200✔
2692
                        // we won't attempt to do expensive validation checks
200✔
2693
                        // on it again.
2694
                        // TODO: Populate the ScidCloser by using closed
1✔
2695
                        // channel notifications.
1✔
2696
                        dbErr := d.cfg.ScidCloser.PutClosedScid(scid)
1✔
2697
                        if dbErr != nil {
1✔
2698
                                log.Errorf("failed to mark scid(%v) as "+
1✔
2699
                                        "closed: %v", scid, dbErr)
1✔
2700

1✔
2701
                                nMsg.err <- dbErr
1✔
2702

1✔
2703
                                return nil, false
1✔
2704
                        }
1✔
2705

1✔
2706
                        // Increment the peer's ban score. We check isRemote
1✔
2707
                        // so we don't accidentally ban ourselves in case of a
1✔
2708
                        // bug.
1✔
2709
                        if nMsg.isRemote {
×
2710
                                d.banman.incrementBanScore(nMsg.peer.PubKey())
×
2711
                        }
×
2712

×
2713
                default:
×
2714
                        // Otherwise, this is just a regular rejected edge.
×
2715
                        key := newRejectCacheKey(
×
2716
                                scid.ToUint64(),
2717
                                sourceToPub(nMsg.source),
2718
                        )
2719
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
2720
                }
2✔
2721

1✔
2722
                if !nMsg.isRemote {
1✔
2723
                        log.Errorf("failed to add edge for local channel: %v",
2724
                                err)
1✔
2725
                        nMsg.err <- err
1✔
2726

1✔
2727
                        return nil, false
1✔
2728
                }
1✔
2729

1✔
2730
                shouldDc, dcErr := d.ShouldDisconnect(nMsg.peer.IdentityKey())
1✔
2731
                if dcErr != nil {
2732
                        log.Errorf("failed to check if we should disconnect "+
2733
                                "peer: %v", dcErr)
202✔
2734
                        nMsg.err <- dcErr
×
2735

×
2736
                        return nil, false
×
2737
                }
×
2738

×
2739
                if shouldDc {
×
2740
                        nMsg.peer.Disconnect(ErrPeerBanned)
2741
                }
202✔
2742

202✔
2743
                nMsg.err <- err
×
2744

×
2745
                return nil, false
×
2746
        }
×
2747

×
2748
        // If err is nil, release the lock immediately.
×
2749
        d.channelMtx.Unlock(scid.ToUint64())
2750

203✔
2751
        log.Debugf("Finish adding edge for short_chan_id: %v", scid.ToUint64())
1✔
2752

1✔
2753
        // If we earlier received any ChannelUpdates for this channel, we can
2754
        // now process them, as the channel is added to the graph.
202✔
2755
        var channelUpdates []*processedNetworkMsg
202✔
2756

202✔
2757
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(scid.ToUint64())
2758
        if err == nil {
2759
                // There was actually an entry in the map, so we'll accumulate
2760
                // it. We don't worry about deletion, since it'll eventually
28✔
2761
                // fall out anyway.
28✔
2762
                chanMsgs := earlyChanUpdates
28✔
2763
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
28✔
2764
        }
28✔
2765

28✔
2766
        // Launch a new goroutine to handle each ChannelUpdate, this is to
28✔
2767
        // ensure we don't block here, as we can handle only one announcement
28✔
2768
        // at a time.
28✔
2769
        for _, cu := range channelUpdates {
33✔
2770
                // Skip if already processed.
5✔
2771
                if cu.processed {
5✔
2772
                        continue
5✔
2773
                }
5✔
2774

5✔
2775
                // Mark the ChannelUpdate as processed. This ensures that a
5✔
2776
                // subsequent announcement in the option-scid-alias case does
2777
                // not re-use an old ChannelUpdate.
2778
                cu.processed = true
2779

2780
                d.wg.Add(1)
33✔
2781
                go func(updMsg *networkMsg) {
5✔
2782
                        defer d.wg.Done()
5✔
UNCOV
2783

×
2784
                        switch msg := updMsg.msg.(type) {
2785
                        // Reprocess the message, making sure we return an
2786
                        // error to the original caller in case the gossiper
2787
                        // shuts down.
2788
                        case *lnwire.ChannelUpdate1:
2789
                                log.Debugf("Reprocessing ChannelUpdate for "+
5✔
2790
                                        "shortChanID=%v", scid.ToUint64())
5✔
2791

5✔
2792
                                select {
10✔
2793
                                case d.networkMsgs <- updMsg:
5✔
2794
                                case <-d.quit:
5✔
2795
                                        updMsg.err <- ErrGossiperShuttingDown
5✔
2796
                                }
2797

2798
                        // We don't expect any other message type than
2799
                        // ChannelUpdate to be in this cache.
5✔
2800
                        default:
5✔
2801
                                log.Errorf("Unsupported message type found "+
5✔
2802
                                        "among ChannelUpdates: %T", msg)
5✔
2803
                        }
5✔
2804
                }(cu.msg)
5✔
2805
        }
×
2806

×
2807
        // Channel announcement was successfully processed and now it might be
2808
        // broadcast to other connected nodes if it was an announcement with
2809
        // proof (remote).
2810
        var announcements []networkMsg
2811

×
2812
        if proof != nil {
×
2813
                announcements = append(announcements, networkMsg{
×
2814
                        peer:     nMsg.peer,
2815
                        isRemote: nMsg.isRemote,
2816
                        source:   nMsg.source,
2817
                        msg:      ann,
2818
                })
2819
        }
2820

2821
        nMsg.err <- nil
28✔
2822

28✔
2823
        log.Debugf("Processed ChannelAnnouncement1: peer=%v, short_chan_id=%v",
42✔
2824
                nMsg.peer, scid.ToUint64())
14✔
2825

14✔
2826
        return announcements, true
14✔
2827
}
14✔
2828

14✔
2829
// handleChanUpdate processes a new channel update.
14✔
2830
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
14✔
2831
        upd *lnwire.ChannelUpdate1,
2832
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
28✔
2833

28✔
2834
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
28✔
2835
                nMsg.peer, upd.ShortChannelID.ToUint64())
28✔
2836

28✔
2837
        // We'll ignore any channel updates that target any chain other than
28✔
2838
        // the set of chains we know of.
2839
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
2840
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
2841
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
2842
                log.Errorf(err.Error())
2843

58✔
2844
                key := newRejectCacheKey(
58✔
2845
                        upd.ShortChannelID.ToUint64(),
58✔
2846
                        sourceToPub(nMsg.source),
58✔
2847
                )
58✔
2848
                _, _ = d.recentRejects.Put(key, &cachedReject{})
58✔
2849

58✔
2850
                nMsg.err <- err
58✔
2851
                return nil, false
×
2852
        }
×
2853

×
2854
        blockHeight := upd.ShortChannelID.BlockHeight
×
2855
        shortChanID := upd.ShortChannelID.ToUint64()
×
2856

×
2857
        // If the advertised inclusionary block is beyond our knowledge of the
×
2858
        // chain tip, then we'll put the announcement in limbo to be fully
×
2859
        // verified once we advance forward in the chain. If the update has an
×
2860
        // alias SCID, we'll skip the isPremature check. This is necessary
×
2861
        // since aliases start at block height 16_000_000.
×
2862
        d.Lock()
×
2863
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
×
2864
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
2865

58✔
2866
                log.Warnf("Update announcement for short_chan_id(%v), is "+
58✔
2867
                        "premature: advertises height %v, only height %v is "+
58✔
2868
                        "known", shortChanID, blockHeight, d.bestHeight)
58✔
2869
                d.Unlock()
58✔
2870
                nMsg.err <- nil
58✔
2871
                return nil, false
58✔
2872
        }
58✔
2873
        d.Unlock()
58✔
2874

58✔
2875
        // Before we perform any of the expensive checks below, we'll check
58✔
UNCOV
2876
        // whether this update is stale or is for a zombie channel in order to
×
UNCOV
2877
        // quickly reject it.
×
UNCOV
2878
        timestamp := time.Unix(int64(upd.Timestamp), 0)
×
UNCOV
2879

×
UNCOV
2880
        // Fetch the SCID we should be using to lock the channelMtx and make
×
UNCOV
2881
        // graph queries with.
×
UNCOV
2882
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
×
UNCOV
2883
        if err != nil {
×
2884
                // Fallback and set the graphScid to the peer-provided SCID.
58✔
2885
                // This will occur for non-option-scid-alias channels and for
58✔
2886
                // public option-scid-alias channels after 6 confirmations.
58✔
2887
                // Once public option-scid-alias channels have 6 confs, we'll
58✔
2888
                // ignore ChannelUpdates with one of their aliases.
58✔
2889
                graphScid = upd.ShortChannelID
58✔
2890
        }
58✔
2891

58✔
2892
        if d.cfg.Graph.IsStaleEdgePolicy(
58✔
2893
                graphScid, timestamp, upd.ChannelFlags,
58✔
2894
        ) {
116✔
2895

58✔
2896
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
58✔
2897
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
58✔
2898
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
58✔
2899
                )
58✔
2900

58✔
2901
                nMsg.err <- nil
58✔
2902
                return nil, true
2903
        }
58✔
2904

58✔
2905
        // Check that the ChanUpdate is not too far into the future, this could
63✔
2906
        // reveal some faulty implementation therefore we log an error.
5✔
2907
        if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
5✔
2908
                log.Errorf("Skewed timestamp (%v) for edge policy of "+
5✔
2909
                        "short_chan_id(%v), timestamp too far in the future: "+
5✔
2910
                        "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
5✔
2911
                        shortChanID, nMsg.peer, nMsg.msg.MsgType(),
5✔
2912
                        nMsg.isRemote,
5✔
2913
                )
5✔
2914

5✔
2915
                nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
2916
                        "timestamp too far in the future: %v", timestamp.Unix())
2917

2918
                return nil, false
56✔
2919
        }
×
2920

×
2921
        // Get the node pub key as far since we don't have it in the channel
×
2922
        // update announcement message. We'll need this to properly verify the
×
2923
        // message's signature.
×
2924
        //
×
2925
        // We make sure to obtain the mutex for this channel ID before we
×
2926
        // access the database. This ensures the state we read from the
×
2927
        // database has not changed between this point and when we call
×
2928
        // UpdateEdge() later.
×
2929
        d.channelMtx.Lock(graphScid.ToUint64())
×
2930
        defer d.channelMtx.Unlock(graphScid.ToUint64())
×
2931

2932
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
2933
        switch {
2934
        // No error, break.
2935
        case err == nil:
2936
                break
2937

2938
        case errors.Is(err, graphdb.ErrZombieEdge):
2939
                err = d.processZombieUpdate(chanInfo, graphScid, upd)
2940
                if err != nil {
56✔
2941
                        log.Debug(err)
56✔
2942
                        nMsg.err <- err
56✔
2943
                        return nil, false
56✔
2944
                }
56✔
2945

2946
                // We'll fallthrough to ensure we stash the update until we
52✔
2947
                // receive its corresponding ChannelAnnouncement. This is
52✔
2948
                // needed to ensure the edge exists in the graph before
2949
                // applying the update.
3✔
2950
                fallthrough
3✔
2951
        case errors.Is(err, graphdb.ErrGraphNotFound):
5✔
2952
                fallthrough
2✔
2953
        case errors.Is(err, graphdb.ErrGraphNoEdgesFound):
2✔
2954
                fallthrough
2✔
2955
        case errors.Is(err, graphdb.ErrEdgeNotFound):
2✔
2956
                // If the edge corresponding to this ChannelUpdate was not
2957
                // found in the graph, this might be a channel in the process
2958
                // of being opened, and we haven't processed our own
2959
                // ChannelAnnouncement yet, hence it is not not found in the
2960
                // graph. This usually gets resolved after the channel proofs
2961
                // are exchanged and the channel is broadcasted to the rest of
1✔
2962
                // the network, but in case this is a private channel this
1✔
2963
                // won't ever happen. This can also happen in the case of a
1✔
2964
                // zombie channel with a fresh update for which we don't have a
1✔
2965
                // ChannelAnnouncement for since we reject them. Because of
1✔
2966
                // this, we temporarily add it to a map, and reprocess it after
5✔
2967
                // our own ChannelAnnouncement has been processed.
5✔
2968
                //
5✔
2969
                // The shortChanID may be an alias, but it is fine to use here
5✔
2970
                // since we don't have an edge in the graph and if the peer is
5✔
2971
                // not buggy, we should be able to use it once the gossiper
5✔
2972
                // receives the local announcement.
5✔
2973
                pMsg := &processedNetworkMsg{msg: nMsg}
5✔
2974

5✔
2975
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
5✔
2976
                switch {
5✔
2977
                // Nothing in the cache yet, we can just directly insert this
5✔
2978
                // element.
5✔
2979
                case err == cache.ErrElementNotFound:
5✔
2980
                        _, _ = d.prematureChannelUpdates.Put(
5✔
2981
                                shortChanID, &cachedNetworkMsg{
5✔
2982
                                        msgs: []*processedNetworkMsg{pMsg},
5✔
2983
                                })
5✔
2984

5✔
2985
                // There's already something in the cache, so we'll combine the
5✔
2986
                // set of messages into a single value.
5✔
2987
                default:
5✔
2988
                        msgs := earlyMsgs.msgs
2989
                        msgs = append(msgs, pMsg)
2990
                        _, _ = d.prematureChannelUpdates.Put(
5✔
2991
                                shortChanID, &cachedNetworkMsg{
5✔
2992
                                        msgs: msgs,
5✔
2993
                                })
5✔
2994
                }
5✔
2995

2996
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
2997
                        "(shortChanID=%v), saving for reprocessing later",
2998
                        shortChanID)
3✔
2999

3✔
3000
                // NOTE: We don't return anything on the error channel for this
3✔
3001
                // message, as we expect that will be done when this
3✔
3002
                // ChannelUpdate is later reprocessed.
3✔
3003
                return nil, false
3✔
3004

3✔
3005
        default:
3006
                err := fmt.Errorf("unable to validate channel update "+
3007
                        "short_chan_id=%v: %v", shortChanID, err)
5✔
3008
                log.Error(err)
5✔
3009
                nMsg.err <- err
5✔
3010

5✔
3011
                key := newRejectCacheKey(
5✔
3012
                        upd.ShortChannelID.ToUint64(),
5✔
3013
                        sourceToPub(nMsg.source),
5✔
3014
                )
5✔
3015
                _, _ = d.recentRejects.Put(key, &cachedReject{})
3016

×
3017
                return nil, false
×
3018
        }
×
3019

×
3020
        // The least-significant bit in the flag on the channel update
×
3021
        // announcement tells us "which" side of the channels directed edge is
×
3022
        // being updated.
×
3023
        var (
×
3024
                pubKey       *btcec.PublicKey
×
3025
                edgeToUpdate *models.ChannelEdgePolicy
×
3026
        )
×
3027
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
×
3028
        switch direction {
×
3029
        case 0:
3030
                pubKey, _ = chanInfo.NodeKey1()
3031
                edgeToUpdate = e1
3032
        case 1:
3033
                pubKey, _ = chanInfo.NodeKey2()
3034
                edgeToUpdate = e2
52✔
3035
        }
52✔
3036

52✔
3037
        log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+
52✔
3038
                "edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(),
52✔
3039
                edgeToUpdate != nil)
52✔
3040

37✔
3041
        // Validate the channel announcement with the expected public key and
37✔
3042
        // channel capacity. In the case of an invalid channel update, we'll
37✔
3043
        // return an error to the caller and exit early.
18✔
3044
        err = netann.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
18✔
3045
        if err != nil {
18✔
3046
                rErr := fmt.Errorf("unable to validate channel update "+
3047
                        "announcement for short_chan_id=%v: %v",
3048
                        spew.Sdump(upd.ShortChannelID), err)
52✔
3049

52✔
3050
                log.Error(rErr)
52✔
3051
                nMsg.err <- rErr
52✔
3052
                return nil, false
52✔
3053
        }
52✔
3054

52✔
3055
        // If we have a previous version of the edge being updated, we'll want
52✔
3056
        // to rate limit its updates to prevent spam throughout the network.
56✔
3057
        if nMsg.isRemote && edgeToUpdate != nil {
4✔
3058
                // If it's a keep-alive update, we'll only propagate one if
4✔
3059
                // it's been a day since the previous. This follows our own
4✔
3060
                // heuristic of sending keep-alive updates after the same
4✔
3061
                // duration (see retransmitStaleAnns).
4✔
3062
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
4✔
3063
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
4✔
3064
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
4✔
3065
                                log.Debugf("Ignoring keep alive update not "+
3066
                                        "within %v period for channel %v",
3067
                                        d.cfg.RebroadcastInterval, shortChanID)
3068
                                nMsg.err <- nil
65✔
3069
                                return nil, false
17✔
3070
                        }
17✔
3071
                } else {
17✔
3072
                        // If it's not, we'll allow an update per minute with a
17✔
3073
                        // maximum burst of 10. If we haven't seen an update
17✔
3074
                        // for this channel before, we'll need to initialize a
22✔
3075
                        // rate limiter for each direction.
9✔
3076
                        //
4✔
3077
                        // Since the edge exists in the graph, we'll create a
4✔
3078
                        // rate limiter for chanInfo.ChannelID rather then the
4✔
3079
                        // SCID the peer sent. This is because there may be
4✔
3080
                        // multiple aliases for a channel and we may otherwise
4✔
3081
                        // rate-limit only a single alias of the channel,
4✔
3082
                        // instead of the whole channel.
15✔
3083
                        baseScid := chanInfo.ChannelID
15✔
3084
                        d.Lock()
15✔
3085
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
15✔
3086
                        if !ok {
15✔
3087
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
15✔
3088
                                b := d.cfg.MaxChannelUpdateBurst
15✔
3089
                                rls = [2]*rate.Limiter{
15✔
3090
                                        rate.NewLimiter(r, b),
15✔
3091
                                        rate.NewLimiter(r, b),
15✔
3092
                                }
15✔
3093
                                d.chanUpdateRateLimiter[baseScid] = rls
15✔
3094
                        }
15✔
3095
                        d.Unlock()
15✔
3096

15✔
3097
                        if !rls[direction].Allow() {
19✔
3098
                                log.Debugf("Rate limiting update for channel "+
4✔
3099
                                        "%v from direction %x", shortChanID,
4✔
3100
                                        pubKey.SerializeCompressed())
4✔
3101
                                nMsg.err <- nil
4✔
3102
                                return nil, false
4✔
3103
                        }
4✔
3104
                }
4✔
3105
        }
4✔
3106

15✔
3107
        // We'll use chanInfo.ChannelID rather than the peer-supplied
15✔
3108
        // ShortChannelID in the ChannelUpdate to avoid the router having to
23✔
3109
        // lookup the stored SCID. If we're sending the update, we'll always
8✔
3110
        // use the SCID stored in the database rather than a potentially
8✔
3111
        // different alias. This might mean that SigBytes is incorrect as it
8✔
3112
        // signs a different SCID than the database SCID, but since there will
8✔
3113
        // only be a difference if AuthProof == nil, this is fine.
8✔
3114
        update := &models.ChannelEdgePolicy{
8✔
3115
                SigBytes:                  upd.Signature.ToSignatureBytes(),
3116
                ChannelID:                 chanInfo.ChannelID,
3117
                LastUpdate:                timestamp,
3118
                MessageFlags:              upd.MessageFlags,
3119
                ChannelFlags:              upd.ChannelFlags,
3120
                TimeLockDelta:             upd.TimeLockDelta,
3121
                MinHTLC:                   upd.HtlcMinimumMsat,
3122
                MaxHTLC:                   upd.HtlcMaximumMsat,
3123
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
3124
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
3125
                ExtraOpaqueData:           upd.ExtraOpaqueData,
42✔
3126
        }
42✔
3127

42✔
3128
        if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
42✔
3129
                if graph.IsError(
42✔
3130
                        err, graph.ErrOutdated,
42✔
3131
                        graph.ErrIgnored,
42✔
3132
                        graph.ErrVBarrierShuttingDown,
42✔
3133
                ) {
42✔
3134

42✔
3135
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
42✔
3136
                                shortChanID, err)
42✔
3137
                } else {
42✔
3138
                        // Since we know the stored SCID in the graph, we'll
42✔
3139
                        // cache that SCID.
45✔
3140
                        key := newRejectCacheKey(
3✔
3141
                                chanInfo.ChannelID,
3✔
3142
                                sourceToPub(nMsg.source),
3✔
3143
                        )
3✔
3144
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
6✔
3145

3✔
3146
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
3✔
3147
                                shortChanID, err)
3✔
3148
                }
3✔
3149

×
3150
                nMsg.err <- err
×
3151
                return nil, false
×
3152
        }
×
3153

×
3154
        // If this is a local ChannelUpdate without an AuthProof, it means it
×
3155
        // is an update to a channel that is not (yet) supposed to be announced
×
3156
        // to the greater network. However, our channel counter party will need
×
3157
        // to be given the update, so we'll try sending the update directly to
×
3158
        // the remote peer.
×
3159
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
×
3160
                if nMsg.optionalMsgFields != nil {
3161
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
3✔
3162
                        if remoteAlias != nil {
3✔
3163
                                // The remoteAlias field was specified, meaning
3164
                                // that we should replace the SCID in the
3165
                                // update with the remote's alias. We'll also
3166
                                // need to re-sign the channel update. This is
3167
                                // required for option-scid-alias feature-bit
3168
                                // negotiated channels.
3169
                                upd.ShortChannelID = *remoteAlias
3170

56✔
3171
                                sig, err := d.cfg.SignAliasUpdate(upd)
28✔
3172
                                if err != nil {
14✔
3173
                                        log.Error(err)
17✔
3174
                                        nMsg.err <- err
3✔
3175
                                        return nil, false
3✔
3176
                                }
3✔
3177

3✔
3178
                                lnSig, err := lnwire.NewSigFromSignature(sig)
3✔
3179
                                if err != nil {
3✔
3180
                                        log.Error(err)
3✔
3181
                                        nMsg.err <- err
3✔
3182
                                        return nil, false
3✔
3183
                                }
3✔
3184

×
3185
                                upd.Signature = lnSig
×
3186
                        }
×
3187
                }
×
3188

3189
                // Get our peer's public key.
3✔
3190
                remotePubKey := remotePubFromChanInfo(
3✔
3191
                        chanInfo, upd.ChannelFlags,
×
3192
                )
×
3193

×
3194
                log.Debugf("The message %v has no AuthProof, sending the "+
×
3195
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
3196

3✔
3197
                // Now we'll attempt to send the channel update message
3198
                // reliably to the remote peer in the background, so that we
3199
                // don't block if the peer happens to be offline at the moment.
3200
                err := d.reliableSender.sendMessage(upd, remotePubKey)
3201
                if err != nil {
14✔
3202
                        err := fmt.Errorf("unable to reliably send %v for "+
14✔
3203
                                "channel=%v to peer=%x: %v", upd.MsgType(),
14✔
3204
                                upd.ShortChannelID, remotePubKey, err)
14✔
3205
                        nMsg.err <- err
14✔
3206
                        return nil, false
14✔
3207
                }
14✔
3208
        }
14✔
3209

14✔
3210
        // Channel update announcement was successfully processed and now it
14✔
3211
        // can be broadcast to the rest of the network. However, we'll only
14✔
3212
        // broadcast the channel update announcement if it has an attached
14✔
3213
        // authentication proof. We also won't broadcast the update if it
×
3214
        // contains an alias because the network would reject this.
×
3215
        var announcements []networkMsg
×
3216
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
×
3217
                announcements = append(announcements, networkMsg{
×
3218
                        peer:     nMsg.peer,
×
3219
                        source:   nMsg.source,
3220
                        isRemote: nMsg.isRemote,
3221
                        msg:      upd,
3222
                })
3223
        }
3224

3225
        nMsg.err <- nil
3226

42✔
3227
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
64✔
3228
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
22✔
3229
                timestamp)
22✔
3230
        return announcements, true
22✔
3231
}
22✔
3232

22✔
3233
// handleAnnSig processes a new announcement signatures message.
22✔
3234
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
22✔
3235
        ann *lnwire.AnnounceSignatures1) ([]networkMsg, bool) {
3236

42✔
3237
        needBlockHeight := ann.ShortChannelID.BlockHeight +
42✔
3238
                d.cfg.ProofMatureDelta
42✔
3239
        shortChanID := ann.ShortChannelID.ToUint64()
42✔
3240

42✔
3241
        prefix := "local"
42✔
3242
        if nMsg.isRemote {
3243
                prefix = "remote"
3244
        }
3245

3246
        log.Infof("Received new %v announcement signature for %v", prefix,
24✔
3247
                ann.ShortChannelID)
24✔
3248

24✔
3249
        // By the specification, channel announcement proofs should be sent
24✔
3250
        // after some number of confirmations after channel was registered in
24✔
3251
        // bitcoin blockchain. Therefore, we check if the proof is mature.
24✔
3252
        d.Lock()
24✔
3253
        premature := d.isPremature(
38✔
3254
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
14✔
3255
        )
14✔
3256
        if premature {
3257
                log.Warnf("Premature proof announcement, current block height"+
24✔
3258
                        "lower than needed: %v < %v", d.bestHeight,
24✔
3259
                        needBlockHeight)
24✔
3260
                d.Unlock()
24✔
3261
                nMsg.err <- nil
24✔
3262
                return nil, false
24✔
3263
        }
24✔
3264
        d.Unlock()
24✔
3265

24✔
3266
        // Ensure that we know of a channel with the target channel ID before
24✔
3267
        // proceeding further.
27✔
3268
        //
3✔
3269
        // We must acquire the mutex for this channel ID before getting the
3✔
3270
        // channel from the database, to ensure what we read does not change
3✔
3271
        // before we call AddProof() later.
3✔
3272
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
3✔
3273
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
3✔
3274

3✔
3275
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
24✔
3276
                ann.ShortChannelID,
24✔
3277
        )
24✔
3278
        if err != nil {
24✔
3279
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
24✔
3280
                if err != nil {
24✔
3281
                        err := fmt.Errorf("unable to store the proof for "+
24✔
3282
                                "short_chan_id=%v: %v", shortChanID, err)
24✔
3283
                        log.Error(err)
24✔
3284
                        nMsg.err <- err
24✔
3285

24✔
3286
                        return nil, false
24✔
3287
                }
24✔
3288

24✔
3289
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
28✔
3290
                err := d.cfg.WaitingProofStore.Add(proof)
4✔
3291
                if err != nil {
7✔
3292
                        err := fmt.Errorf("unable to store the proof for "+
3✔
3293
                                "short_chan_id=%v: %v", shortChanID, err)
3✔
3294
                        log.Error(err)
3✔
3295
                        nMsg.err <- err
3✔
3296
                        return nil, false
3✔
3297
                }
3✔
3298

3✔
3299
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
3300
                        ", adding to waiting batch", prefix, shortChanID)
4✔
3301
                nMsg.err <- nil
4✔
3302
                return nil, false
4✔
3303
        }
×
3304

×
3305
        nodeID := nMsg.source.SerializeCompressed()
×
3306
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
×
3307
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
×
3308

×
3309
        // Ensure that channel that was retrieved belongs to the peer which
3310
        // sent the proof announcement.
4✔
3311
        if !(isFirstNode || isSecondNode) {
4✔
3312
                err := fmt.Errorf("channel that was received doesn't belong "+
4✔
3313
                        "to the peer which sent the proof, short_chan_id=%v",
4✔
3314
                        shortChanID)
3315
                log.Error(err)
3316
                nMsg.err <- err
23✔
3317
                return nil, false
23✔
3318
        }
23✔
3319

23✔
3320
        // If proof was sent by a local sub-system, then we'll send the
23✔
3321
        // announcement signature to the remote node so they can also
23✔
3322
        // reconstruct the full channel announcement.
23✔
3323
        if !nMsg.isRemote {
×
3324
                var remotePubKey [33]byte
×
3325
                if isFirstNode {
×
3326
                        remotePubKey = chanInfo.NodeKey2Bytes
×
3327
                } else {
×
3328
                        remotePubKey = chanInfo.NodeKey1Bytes
×
3329
                }
×
3330

3331
                // Since the remote peer might not be online we'll call a
3332
                // method that will attempt to deliver the proof when it comes
3333
                // online.
3334
                err := d.reliableSender.sendMessage(ann, remotePubKey)
36✔
3335
                if err != nil {
13✔
3336
                        err := fmt.Errorf("unable to reliably send %v for "+
26✔
3337
                                "channel=%v to peer=%x: %v", ann.MsgType(),
13✔
3338
                                ann.ShortChannelID, remotePubKey, err)
16✔
3339
                        nMsg.err <- err
3✔
3340
                        return nil, false
3✔
3341
                }
3342
        }
3343

3344
        // Check if we already have the full proof for this channel.
3345
        if chanInfo.AuthProof != nil {
13✔
3346
                // If we already have the fully assembled proof, then the peer
13✔
3347
                // sending us their proof has probably not received our local
×
3348
                // proof yet. So be kind and send them the full proof.
×
3349
                if nMsg.isRemote {
×
3350
                        peerID := nMsg.source.SerializeCompressed()
×
3351
                        log.Debugf("Got AnnounceSignatures for channel with " +
×
3352
                                "full proof.")
×
3353

3354
                        d.wg.Add(1)
3355
                        go func() {
3356
                                defer d.wg.Done()
27✔
3357

4✔
3358
                                log.Debugf("Received half proof for channel "+
4✔
3359
                                        "%v with existing full proof. Sending"+
4✔
3360
                                        " full proof to peer=%x",
8✔
3361
                                        ann.ChannelID, peerID)
4✔
3362

4✔
3363
                                ca, _, _, err := netann.CreateChanAnnouncement(
4✔
3364
                                        chanInfo.AuthProof, chanInfo, e1, e2,
4✔
3365
                                )
4✔
3366
                                if err != nil {
8✔
3367
                                        log.Errorf("unable to gen ann: %v",
4✔
3368
                                                err)
4✔
3369
                                        return
4✔
3370
                                }
4✔
3371

4✔
3372
                                err = nMsg.peer.SendMessage(false, ca)
4✔
3373
                                if err != nil {
4✔
3374
                                        log.Errorf("Failed sending full proof"+
4✔
3375
                                                " to peer=%x: %v", peerID, err)
4✔
3376
                                        return
4✔
3377
                                }
4✔
3378

×
3379
                                log.Debugf("Full proof sent to peer=%x for "+
×
3380
                                        "chanID=%v", peerID, ann.ChannelID)
×
3381
                        }()
×
3382
                }
3383

4✔
3384
                log.Debugf("Already have proof for channel with chanID=%v",
4✔
3385
                        ann.ChannelID)
×
3386
                nMsg.err <- nil
×
3387
                return nil, true
×
3388
        }
×
3389

3390
        // Check that we received the opposite proof. If so, then we're now
4✔
3391
        // able to construct the full proof, and create the channel
4✔
3392
        // announcement. If we didn't receive the opposite half of the proof
3393
        // then we should store this one, and wait for the opposite to be
3394
        // received.
3395
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
4✔
3396
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
4✔
3397
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
4✔
3398
                err := fmt.Errorf("unable to get the opposite proof for "+
4✔
3399
                        "short_chan_id=%v: %v", shortChanID, err)
3400
                log.Error(err)
3401
                nMsg.err <- err
3402
                return nil, false
3403
        }
3404

3405
        if err == channeldb.ErrWaitingProofNotFound {
3406
                err := d.cfg.WaitingProofStore.Add(proof)
22✔
3407
                if err != nil {
22✔
3408
                        err := fmt.Errorf("unable to store the proof for "+
22✔
3409
                                "short_chan_id=%v: %v", shortChanID, err)
×
3410
                        log.Error(err)
×
3411
                        nMsg.err <- err
×
3412
                        return nil, false
×
3413
                }
×
3414

×
3415
                log.Infof("1/2 of channel ann proof received for "+
3416
                        "short_chan_id=%v, waiting for other half",
34✔
3417
                        shortChanID)
12✔
3418

12✔
3419
                nMsg.err <- nil
×
3420
                return nil, false
×
3421
        }
×
3422

×
3423
        // We now have both halves of the channel announcement proof, then
×
3424
        // we'll reconstruct the initial announcement so we can validate it
×
3425
        // shortly below.
3426
        var dbProof models.ChannelAuthProof
12✔
3427
        if isFirstNode {
12✔
3428
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
12✔
3429
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
12✔
3430
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
12✔
3431
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
12✔
3432
        } else {
3433
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
3434
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
3435
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
3436
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
3437
        }
13✔
3438

17✔
3439
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
4✔
3440
                &dbProof, chanInfo, e1, e2,
4✔
3441
        )
4✔
3442
        if err != nil {
4✔
3443
                log.Error(err)
16✔
3444
                nMsg.err <- err
12✔
3445
                return nil, false
12✔
3446
        }
12✔
3447

12✔
3448
        // With all the necessary components assembled validate the full
12✔
3449
        // channel announcement proof.
3450
        err = netann.ValidateChannelAnn(chanAnn, d.fetchPKScript)
13✔
3451
        if err != nil {
13✔
3452
                err := fmt.Errorf("channel announcement proof for "+
13✔
3453
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
13✔
3454

×
3455
                log.Error(err)
×
3456
                nMsg.err <- err
×
3457
                return nil, false
×
3458
        }
3459

3460
        // If the channel was returned by the router it means that existence of
3461
        // funding point and inclusion of nodes bitcoin keys in it already
13✔
3462
        // checked by the router. In this stage we should check that node keys
13✔
3463
        // attest to the bitcoin keys by validating the signatures of
×
3464
        // announcement. If proof is valid then we'll populate the channel edge
×
3465
        // with it, so we can announce it on peer connect.
×
3466
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
×
3467
        if err != nil {
×
3468
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
3469
                        " %v", ann.ChannelID, err)
×
3470
                log.Error(err)
3471
                nMsg.err <- err
3472
                return nil, false
3473
        }
3474

3475
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
3476
        if err != nil {
3477
                err := fmt.Errorf("unable to remove opposite proof for the "+
13✔
3478
                        "channel with chanID=%v: %v", ann.ChannelID, err)
13✔
3479
                log.Error(err)
×
3480
                nMsg.err <- err
×
3481
                return nil, false
×
3482
        }
×
3483

×
3484
        // Proof was successfully created and now can announce the channel to
×
3485
        // the remain network.
3486
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
13✔
3487
                ", adding to next ann batch", shortChanID)
13✔
3488

×
3489
        // Assemble the necessary announcements to add to the next broadcasting
×
3490
        // batch.
×
3491
        var announcements []networkMsg
×
3492
        announcements = append(announcements, networkMsg{
×
3493
                peer:   nMsg.peer,
×
3494
                source: nMsg.source,
3495
                msg:    chanAnn,
3496
        })
3497
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
13✔
3498
                announcements = append(announcements, networkMsg{
13✔
3499
                        peer:   nMsg.peer,
13✔
3500
                        source: src,
13✔
3501
                        msg:    e1Ann,
13✔
3502
                })
13✔
3503
        }
13✔
3504
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
13✔
3505
                announcements = append(announcements, networkMsg{
13✔
3506
                        peer:   nMsg.peer,
13✔
3507
                        source: src,
13✔
3508
                        msg:    e2Ann,
25✔
3509
                })
12✔
3510
        }
12✔
3511

12✔
3512
        // We'll also send along the node announcements for each channel
12✔
3513
        // participant if we know of them. To ensure our node announcement
12✔
3514
        // propagates to our channel counterparty, we'll set the source for
12✔
3515
        // each announcement to the node it belongs to, otherwise we won't send
24✔
3516
        // it since the source gets skipped. This isn't necessary for channel
11✔
3517
        // updates and announcement signatures since we send those directly to
11✔
3518
        // our channel counterparty through the gossiper's reliable sender.
11✔
3519
        node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
11✔
3520
        if err != nil {
11✔
3521
                log.Debugf("Unable to fetch node announcement for %x: %v",
11✔
3522
                        chanInfo.NodeKey1Bytes, err)
3523
        } else {
3524
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
3525
                        announcements = append(announcements, networkMsg{
3526
                                peer:   nMsg.peer,
3527
                                source: nodeKey1,
3528
                                msg:    node1Ann,
3529
                        })
3530
                }
13✔
3531
        }
18✔
3532

5✔
3533
        node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
5✔
3534
        if err != nil {
16✔
3535
                log.Debugf("Unable to fetch node announcement for %x: %v",
22✔
3536
                        chanInfo.NodeKey2Bytes, err)
11✔
3537
        } else {
11✔
3538
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
11✔
3539
                        announcements = append(announcements, networkMsg{
11✔
3540
                                peer:   nMsg.peer,
11✔
3541
                                source: nodeKey2,
11✔
3542
                                msg:    node2Ann,
3543
                        })
3544
                }
13✔
3545
        }
20✔
3546

7✔
3547
        nMsg.err <- nil
7✔
3548
        return announcements, true
16✔
3549
}
18✔
3550

9✔
3551
// isBanned returns true if the peer identified by pubkey is banned for sending
9✔
3552
// invalid channel announcements.
9✔
3553
func (d *AuthenticatedGossiper) isBanned(pubkey [33]byte) bool {
9✔
3554
        return d.banman.isBanned(pubkey)
9✔
3555
}
9✔
3556

3557
// ShouldDisconnect returns true if we should disconnect the peer identified by
3558
// pubkey.
13✔
3559
func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
13✔
3560
        bool, error) {
3561

3562
        pubkeySer := pubkey.SerializeCompressed()
3563

3564
        var pubkeyBytes [33]byte
208✔
3565
        copy(pubkeyBytes[:], pubkeySer)
208✔
3566

208✔
3567
        // If the public key is banned, check whether or not this is a channel
3568
        // peer.
3569
        if d.isBanned(pubkeyBytes) {
3570
                isChanPeer, err := d.cfg.ScidCloser.IsChannelPeer(pubkey)
3571
                if err != nil {
206✔
3572
                        return false, err
206✔
3573
                }
206✔
3574

206✔
3575
                // We should only disconnect non-channel peers.
206✔
3576
                if !isChanPeer {
206✔
3577
                        return true, nil
206✔
3578
                }
206✔
3579
        }
206✔
3580

208✔
3581
        return false, nil
2✔
3582
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc