• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 10207481183

01 Aug 2024 11:52PM UTC coverage: 58.679% (+0.09%) from 58.591%
10207481183

push

github

web-flow
Merge pull request #8836 from hieblmi/payment-failure-reason-cancel

routing: add payment failure reason `FailureReasonCancel`

7 of 30 new or added lines in 5 files covered. (23.33%)

1662 existing lines in 21 files now uncovered.

125454 of 213798 relevant lines covered (58.68%)

28679.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.0
/discovery/gossiper.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/davecgh/go-spew/spew"
17
        "github.com/lightninglabs/neutrino/cache"
18
        "github.com/lightninglabs/neutrino/cache/lru"
19
        "github.com/lightningnetwork/lnd/batch"
20
        "github.com/lightningnetwork/lnd/chainntnfs"
21
        "github.com/lightningnetwork/lnd/channeldb"
22
        "github.com/lightningnetwork/lnd/channeldb/models"
23
        "github.com/lightningnetwork/lnd/graph"
24
        "github.com/lightningnetwork/lnd/keychain"
25
        "github.com/lightningnetwork/lnd/kvdb"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lnutils"
28
        "github.com/lightningnetwork/lnd/lnwallet"
29
        "github.com/lightningnetwork/lnd/lnwire"
30
        "github.com/lightningnetwork/lnd/multimutex"
31
        "github.com/lightningnetwork/lnd/netann"
32
        "github.com/lightningnetwork/lnd/routing/route"
33
        "github.com/lightningnetwork/lnd/ticker"
34
        "golang.org/x/time/rate"
35
)
36

37
const (
38
        // DefaultMaxChannelUpdateBurst is the default maximum number of updates
39
        // for a specific channel and direction that we'll accept over an
40
        // interval.
41
        DefaultMaxChannelUpdateBurst = 10
42

43
        // DefaultChannelUpdateInterval is the default interval we'll use to
44
        // determine how often we should allow a new update for a specific
45
        // channel and direction.
46
        DefaultChannelUpdateInterval = time.Minute
47

48
        // maxPrematureUpdates tracks the max amount of premature channel
49
        // updates that we'll hold onto.
50
        maxPrematureUpdates = 100
51

52
        // maxFutureMessages tracks the max amount of future messages that
53
        // we'll hold onto.
54
        maxFutureMessages = 1000
55

56
        // DefaultSubBatchDelay is the default delay we'll use when
57
        // broadcasting the next announcement batch.
58
        DefaultSubBatchDelay = 5 * time.Second
59

60
        // maxRejectedUpdates tracks the max amount of rejected channel updates
61
        // we'll maintain. This is the global size across all peers. We'll
62
        // allocate ~3 MB max to the cache.
63
        maxRejectedUpdates = 10_000
64
)
65

66
var (
67
        // ErrGossiperShuttingDown is an error that is returned if the gossiper
68
        // is in the process of being shut down.
69
        ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
70

71
        // ErrGossipSyncerNotFound signals that we were unable to find an active
72
        // gossip syncer corresponding to a gossip query message received from
73
        // the remote peer.
74
        ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
75

76
        // emptyPubkey is used to compare compressed pubkeys against an empty
77
        // byte array.
78
        emptyPubkey [33]byte
79
)
80

81
// optionalMsgFields is a set of optional message fields that external callers
82
// can provide that serve useful when processing a specific network
83
// announcement.
84
type optionalMsgFields struct {
85
        capacity     *btcutil.Amount
86
        channelPoint *wire.OutPoint
87
        remoteAlias  *lnwire.ShortChannelID
88
}
89

90
// apply applies the optional fields within the functional options.
91
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
51✔
92
        for _, optionalMsgField := range optionalMsgFields {
60✔
93
                optionalMsgField(f)
9✔
94
        }
9✔
95
}
96

97
// OptionalMsgField is a functional option parameter that can be used to provide
98
// external information that is not included within a network message but serves
99
// useful when processing it.
100
type OptionalMsgField func(*optionalMsgFields)
101

102
// ChannelCapacity is an optional field that lets the gossiper know of the
103
// capacity of a channel.
104
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
31✔
105
        return func(f *optionalMsgFields) {
36✔
106
                f.capacity = &capacity
5✔
107
        }
5✔
108
}
109

110
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
111
// of a channel.
112
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
34✔
113
        return func(f *optionalMsgFields) {
42✔
114
                f.channelPoint = &op
8✔
115
        }
8✔
116
}
117

118
// RemoteAlias is an optional field that lets the gossiper know that a locally
119
// sent channel update is actually an update for the peer that should replace
120
// the ShortChannelID field with the remote's alias. This is only used for
121
// channels with peers where the option-scid-alias feature bit was negotiated.
122
// The channel update will be added to the graph under the original SCID, but
123
// will be modified and re-signed with this alias.
124
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
30✔
125
        return func(f *optionalMsgFields) {
34✔
126
                f.remoteAlias = alias
4✔
127
        }
4✔
128
}
129

130
// networkMsg couples a routing related wire message with the peer that
131
// originally sent it.
132
type networkMsg struct {
133
        peer              lnpeer.Peer
134
        source            *btcec.PublicKey
135
        msg               lnwire.Message
136
        optionalMsgFields *optionalMsgFields
137

138
        isRemote bool
139

140
        err chan error
141
}
142

143
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
144
// wishes to update a particular set of channels. New ChannelUpdate messages
145
// will be crafted to be sent out during the next broadcast epoch and the fee
146
// updates committed to the lower layer.
147
type chanPolicyUpdateRequest struct {
148
        edgesToUpdate []EdgeWithInfo
149
        errChan       chan error
150
}
151

152
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
153
// syncer at all times.
154
type PinnedSyncers map[route.Vertex]struct{}
155

156
// Config defines the configuration for the service. ALL elements within the
157
// configuration MUST be non-nil for the service to carry out its duties.
158
type Config struct {
159
        // ChainHash is a hash that indicates which resident chain of the
160
        // AuthenticatedGossiper. Any announcements that don't match this
161
        // chain hash will be ignored.
162
        //
163
        // TODO(roasbeef): eventually make into map so can de-multiplex
164
        // incoming announcements
165
        //   * also need to do same for Notifier
166
        ChainHash chainhash.Hash
167

168
        // Graph is the subsystem which is responsible for managing the
169
        // topology of lightning network. After incoming channel, node, channel
170
        // updates announcements are validated they are sent to the router in
171
        // order to be included in the LN graph.
172
        Graph graph.ChannelGraphSource
173

174
        // ChanSeries is an interfaces that provides access to a time series
175
        // view of the current known channel graph. Each GossipSyncer enabled
176
        // peer will utilize this in order to create and respond to channel
177
        // graph time series queries.
178
        ChanSeries ChannelGraphTimeSeries
179

180
        // Notifier is used for receiving notifications of incoming blocks.
181
        // With each new incoming block found we process previously premature
182
        // announcements.
183
        //
184
        // TODO(roasbeef): could possibly just replace this with an epoch
185
        // channel.
186
        Notifier chainntnfs.ChainNotifier
187

188
        // Broadcast broadcasts a particular set of announcements to all peers
189
        // that the daemon is connected to. If supplied, the exclude parameter
190
        // indicates that the target peer should be excluded from the
191
        // broadcast.
192
        Broadcast func(skips map[route.Vertex]struct{},
193
                msg ...lnwire.Message) error
194

195
        // NotifyWhenOnline is a function that allows the gossiper to be
196
        // notified when a certain peer comes online, allowing it to
197
        // retry sending a peer message.
198
        //
199
        // NOTE: The peerChan channel must be buffered.
200
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
201

202
        // NotifyWhenOffline is a function that allows the gossiper to be
203
        // notified when a certain peer disconnects, allowing it to request a
204
        // notification for when it reconnects.
205
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
206

207
        // FetchSelfAnnouncement retrieves our current node announcement, for
208
        // use when determining whether we should update our peers about our
209
        // presence in the network.
210
        FetchSelfAnnouncement func() lnwire.NodeAnnouncement
211

212
        // UpdateSelfAnnouncement produces a new announcement for our node with
213
        // an updated timestamp which can be broadcast to our peers.
214
        UpdateSelfAnnouncement func() (lnwire.NodeAnnouncement, error)
215

216
        // ProofMatureDelta the number of confirmations which is needed before
217
        // exchange the channel announcement proofs.
218
        ProofMatureDelta uint32
219

220
        // TrickleDelay the period of trickle timer which flushes to the
221
        // network the pending batch of new announcements we've received since
222
        // the last trickle tick.
223
        TrickleDelay time.Duration
224

225
        // RetransmitTicker is a ticker that ticks with a period which
226
        // indicates that we should check if we need re-broadcast any of our
227
        // personal channels.
228
        RetransmitTicker ticker.Ticker
229

230
        // RebroadcastInterval is the maximum time we wait between sending out
231
        // channel updates for our active channels and our own node
232
        // announcement. We do this to ensure our active presence on the
233
        // network is known, and we are not being considered a zombie node or
234
        // having zombie channels.
235
        RebroadcastInterval time.Duration
236

237
        // WaitingProofStore is a persistent storage of partial channel proof
238
        // announcement messages. We use it to buffer half of the material
239
        // needed to reconstruct a full authenticated channel announcement.
240
        // Once we receive the other half the channel proof, we'll be able to
241
        // properly validate it and re-broadcast it out to the network.
242
        //
243
        // TODO(wilmer): make interface to prevent channeldb dependency.
244
        WaitingProofStore *channeldb.WaitingProofStore
245

246
        // MessageStore is a persistent storage of gossip messages which we will
247
        // use to determine which messages need to be resent for a given peer.
248
        MessageStore GossipMessageStore
249

250
        // AnnSigner is an instance of the MessageSigner interface which will
251
        // be used to manually sign any outgoing channel updates. The signer
252
        // implementation should be backed by the public key of the backing
253
        // Lightning node.
254
        //
255
        // TODO(roasbeef): extract ann crafting + sign from fundingMgr into
256
        // here?
257
        AnnSigner lnwallet.MessageSigner
258

259
        // NumActiveSyncers is the number of peers for which we should have
260
        // active syncers with. After reaching NumActiveSyncers, any future
261
        // gossip syncers will be passive.
262
        NumActiveSyncers int
263

264
        // NoTimestampQueries will prevent the GossipSyncer from querying
265
        // timestamps of announcement messages from the peer and from replying
266
        // to timestamp queries.
267
        NoTimestampQueries bool
268

269
        // RotateTicker is a ticker responsible for notifying the SyncManager
270
        // when it should rotate its active syncers. A single active syncer with
271
        // a chansSynced state will be exchanged for a passive syncer in order
272
        // to ensure we don't keep syncing with the same peers.
273
        RotateTicker ticker.Ticker
274

275
        // HistoricalSyncTicker is a ticker responsible for notifying the
276
        // syncManager when it should attempt a historical sync with a gossip
277
        // sync peer.
278
        HistoricalSyncTicker ticker.Ticker
279

280
        // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
281
        // syncManager when it should attempt to start the next pending
282
        // activeSyncer due to the current one not completing its state machine
283
        // within the timeout.
284
        ActiveSyncerTimeoutTicker ticker.Ticker
285

286
        // MinimumBatchSize is minimum size of a sub batch of announcement
287
        // messages.
288
        MinimumBatchSize int
289

290
        // SubBatchDelay is the delay between sending sub batches of
291
        // gossip messages.
292
        SubBatchDelay time.Duration
293

294
        // IgnoreHistoricalFilters will prevent syncers from replying with
295
        // historical data when the remote peer sets a gossip_timestamp_range.
296
        // This prevents ranges with old start times from causing us to dump the
297
        // graph on connect.
298
        IgnoreHistoricalFilters bool
299

300
        // PinnedSyncers is a set of peers that will always transition to
301
        // ActiveSync upon connection. These peers will never transition to
302
        // PassiveSync.
303
        PinnedSyncers PinnedSyncers
304

305
        // MaxChannelUpdateBurst specifies the maximum number of updates for a
306
        // specific channel and direction that we'll accept over an interval.
307
        MaxChannelUpdateBurst int
308

309
        // ChannelUpdateInterval specifies the interval we'll use to determine
310
        // how often we should allow a new update for a specific channel and
311
        // direction.
312
        ChannelUpdateInterval time.Duration
313

314
        // IsAlias returns true if a given ShortChannelID is an alias for
315
        // option_scid_alias channels.
316
        IsAlias func(scid lnwire.ShortChannelID) bool
317

318
        // SignAliasUpdate is used to re-sign a channel update using the
319
        // remote's alias if the option-scid-alias feature bit was negotiated.
320
        SignAliasUpdate func(u *lnwire.ChannelUpdate) (*ecdsa.Signature,
321
                error)
322

323
        // FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
324
        // This is used for channels that have negotiated the option-scid-alias
325
        // feature bit.
326
        FindBaseByAlias func(alias lnwire.ShortChannelID) (
327
                lnwire.ShortChannelID, error)
328

329
        // GetAlias allows the gossiper to look up the peer's alias for a given
330
        // ChannelID. This is used to sign updates for them if the channel has
331
        // no AuthProof and the option-scid-alias feature bit was negotiated.
332
        GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
333

334
        // FindChannel allows the gossiper to find a channel that we're party
335
        // to without iterating over the entire set of open channels.
336
        FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
337
                *channeldb.OpenChannel, error)
338

339
        // IsStillZombieChannel takes the timestamps of the latest channel
340
        // updates for a channel and returns true if the channel should be
341
        // considered a zombie based on these timestamps.
342
        IsStillZombieChannel func(time.Time, time.Time) bool
343
}
344

345
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
346
// used to let the caller of the lru.Cache know if a message has already been
347
// processed or not.
348
type processedNetworkMsg struct {
349
        processed bool
350
        msg       *networkMsg
351
}
352

353
// cachedNetworkMsg is a wrapper around a network message that can be used with
354
// *lru.Cache.
355
type cachedNetworkMsg struct {
356
        msgs []*processedNetworkMsg
357
}
358

359
// Size returns the "size" of an entry. We return the number of items as we
360
// just want to limit the total amount of entries rather than do accurate size
361
// accounting.
362
func (c *cachedNetworkMsg) Size() (uint64, error) {
6✔
363
        return uint64(len(c.msgs)), nil
6✔
364
}
6✔
365

366
// rejectCacheKey is the cache key that we'll use to track announcements we've
367
// recently rejected.
368
type rejectCacheKey struct {
369
        pubkey [33]byte
370
        chanID uint64
371
}
372

373
// newRejectCacheKey returns a new cache key for the reject cache.
374
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
62✔
375
        k := rejectCacheKey{
62✔
376
                chanID: cid,
62✔
377
                pubkey: pub,
62✔
378
        }
62✔
379

62✔
380
        return k
62✔
381
}
62✔
382

383
// sourceToPub returns a serialized-compressed public key for use in the reject
384
// cache.
385
func sourceToPub(pk *btcec.PublicKey) [33]byte {
76✔
386
        var pub [33]byte
76✔
387
        copy(pub[:], pk.SerializeCompressed())
76✔
388
        return pub
76✔
389
}
76✔
390

391
// cachedReject is the empty value used to track the value for rejects.
392
type cachedReject struct {
393
}
394

395
// Size returns the "size" of an entry. We return 1 as we just want to limit
396
// the total size.
397
func (c *cachedReject) Size() (uint64, error) {
1✔
398
        return 1, nil
1✔
399
}
1✔
400

401
// AuthenticatedGossiper is a subsystem which is responsible for receiving
402
// announcements, validating them and applying the changes to router, syncing
403
// lightning network with newly connected nodes, broadcasting announcements
404
// after validation, negotiating the channel announcement proofs exchange and
405
// handling the premature announcements. All outgoing announcements are
406
// expected to be properly signed as dictated in BOLT#7, additionally, all
407
// incoming message are expected to be well formed and signed. Invalid messages
408
// will be rejected by this struct.
409
type AuthenticatedGossiper struct {
410
        // Parameters which are needed to properly handle the start and stop of
411
        // the service.
412
        started sync.Once
413
        stopped sync.Once
414

415
        // bestHeight is the height of the block at the tip of the main chain
416
        // as we know it. Accesses *MUST* be done with the gossiper's lock
417
        // held.
418
        bestHeight uint32
419

420
        quit chan struct{}
421
        wg   sync.WaitGroup
422

423
        // cfg is a copy of the configuration struct that the gossiper service
424
        // was initialized with.
425
        cfg *Config
426

427
        // blockEpochs encapsulates a stream of block epochs that are sent at
428
        // every new block height.
429
        blockEpochs *chainntnfs.BlockEpochEvent
430

431
        // prematureChannelUpdates is a map of ChannelUpdates we have received
432
        // that wasn't associated with any channel we know about.  We store
433
        // them temporarily, such that we can reprocess them when a
434
        // ChannelAnnouncement for the channel is received.
435
        prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
436

437
        // networkMsgs is a channel that carries new network broadcasted
438
        // message from outside the gossiper service to be processed by the
439
        // networkHandler.
440
        networkMsgs chan *networkMsg
441

442
        // futureMsgs is a list of premature network messages that have a block
443
        // height specified in the future. We will save them and resend it to
444
        // the chan networkMsgs once the block height has reached. The cached
445
        // map format is,
446
        //   {msgID1: msg1, msgID2: msg2, ...}
447
        futureMsgs *futureMsgCache
448

449
        // chanPolicyUpdates is a channel that requests to update the
450
        // forwarding policy of a set of channels is sent over.
451
        chanPolicyUpdates chan *chanPolicyUpdateRequest
452

453
        // selfKey is the identity public key of the backing Lightning node.
454
        selfKey *btcec.PublicKey
455

456
        // selfKeyLoc is the locator for the identity public key of the backing
457
        // Lightning node.
458
        selfKeyLoc keychain.KeyLocator
459

460
        // channelMtx is used to restrict the database access to one
461
        // goroutine per channel ID. This is done to ensure that when
462
        // the gossiper is handling an announcement, the db state stays
463
        // consistent between when the DB is first read until it's written.
464
        channelMtx *multimutex.Mutex[uint64]
465

466
        recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
467

468
        // syncMgr is a subsystem responsible for managing the gossip syncers
469
        // for peers currently connected. When a new peer is connected, the
470
        // manager will create its accompanying gossip syncer and determine
471
        // whether it should have an activeSync or passiveSync sync type based
472
        // on how many other gossip syncers are currently active. Any activeSync
473
        // gossip syncers are started in a round-robin manner to ensure we're
474
        // not syncing with multiple peers at the same time.
475
        syncMgr *SyncManager
476

477
        // reliableSender is a subsystem responsible for handling reliable
478
        // message send requests to peers. This should only be used for channels
479
        // that are unadvertised at the time of handling the message since if it
480
        // is advertised, then peers should be able to get the message from the
481
        // network.
482
        reliableSender *reliableSender
483

484
        // chanUpdateRateLimiter contains rate limiters for each direction of
485
        // a channel update we've processed. We'll use these to determine
486
        // whether we should accept a new update for a specific channel and
487
        // direction.
488
        //
489
        // NOTE: This map must be synchronized with the main
490
        // AuthenticatedGossiper lock.
491
        chanUpdateRateLimiter map[uint64][2]*rate.Limiter
492

493
        sync.Mutex
494
}
495

496
// New creates a new AuthenticatedGossiper instance, initialized with the
497
// passed configuration parameters.
498
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
29✔
499
        gossiper := &AuthenticatedGossiper{
29✔
500
                selfKey:           selfKeyDesc.PubKey,
29✔
501
                selfKeyLoc:        selfKeyDesc.KeyLocator,
29✔
502
                cfg:               &cfg,
29✔
503
                networkMsgs:       make(chan *networkMsg),
29✔
504
                futureMsgs:        newFutureMsgCache(maxFutureMessages),
29✔
505
                quit:              make(chan struct{}),
29✔
506
                chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
29✔
507
                prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: lll
29✔
508
                        maxPrematureUpdates,
29✔
509
                ),
29✔
510
                channelMtx: multimutex.NewMutex[uint64](),
29✔
511
                recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
29✔
512
                        maxRejectedUpdates,
29✔
513
                ),
29✔
514
                chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
29✔
515
        }
29✔
516

29✔
517
        gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
29✔
518
                ChainHash:               cfg.ChainHash,
29✔
519
                ChanSeries:              cfg.ChanSeries,
29✔
520
                RotateTicker:            cfg.RotateTicker,
29✔
521
                HistoricalSyncTicker:    cfg.HistoricalSyncTicker,
29✔
522
                NumActiveSyncers:        cfg.NumActiveSyncers,
29✔
523
                NoTimestampQueries:      cfg.NoTimestampQueries,
29✔
524
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
29✔
525
                BestHeight:              gossiper.latestHeight,
29✔
526
                PinnedSyncers:           cfg.PinnedSyncers,
29✔
527
                IsStillZombieChannel:    cfg.IsStillZombieChannel,
29✔
528
        })
29✔
529

29✔
530
        gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
29✔
531
                NotifyWhenOnline:  cfg.NotifyWhenOnline,
29✔
532
                NotifyWhenOffline: cfg.NotifyWhenOffline,
29✔
533
                MessageStore:      cfg.MessageStore,
29✔
534
                IsMsgStale:        gossiper.isMsgStale,
29✔
535
        })
29✔
536

29✔
537
        return gossiper
29✔
538
}
29✔
539

540
// EdgeWithInfo contains the information that is required to update an edge.
541
type EdgeWithInfo struct {
542
        // Info describes the channel.
543
        Info *models.ChannelEdgeInfo
544

545
        // Edge describes the policy in one direction of the channel.
546
        Edge *models.ChannelEdgePolicy
547
}
548

549
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
550
// specified edge updates. Updates are done in two stages: first, the
551
// AuthenticatedGossiper ensures the update has been committed by dependent
552
// sub-systems, then it signs and broadcasts new updates to the network. A
553
// mapping between outpoints and updated channel policies is returned, which is
554
// used to update the forwarding policies of the underlying links.
555
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
556
        edgesToUpdate []EdgeWithInfo) error {
5✔
557

5✔
558
        errChan := make(chan error, 1)
5✔
559
        policyUpdate := &chanPolicyUpdateRequest{
5✔
560
                edgesToUpdate: edgesToUpdate,
5✔
561
                errChan:       errChan,
5✔
562
        }
5✔
563

5✔
564
        select {
5✔
565
        case d.chanPolicyUpdates <- policyUpdate:
5✔
566
                err := <-errChan
5✔
567
                return err
5✔
568
        case <-d.quit:
×
569
                return fmt.Errorf("AuthenticatedGossiper shutting down")
×
570
        }
571
}
572

573
// Start spawns network messages handler goroutine and registers on new block
574
// notifications in order to properly handle the premature announcements.
575
func (d *AuthenticatedGossiper) Start() error {
29✔
576
        var err error
29✔
577
        d.started.Do(func() {
58✔
578
                log.Info("Authenticated Gossiper starting")
29✔
579
                err = d.start()
29✔
580
        })
29✔
581
        return err
29✔
582
}
583

584
func (d *AuthenticatedGossiper) start() error {
29✔
585
        // First we register for new notifications of newly discovered blocks.
29✔
586
        // We do this immediately so we'll later be able to consume any/all
29✔
587
        // blocks which were discovered.
29✔
588
        blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
29✔
589
        if err != nil {
29✔
590
                return err
×
591
        }
×
592
        d.blockEpochs = blockEpochs
29✔
593

29✔
594
        height, err := d.cfg.Graph.CurrentBlockHeight()
29✔
595
        if err != nil {
29✔
596
                return err
×
597
        }
×
598
        d.bestHeight = height
29✔
599

29✔
600
        // Start the reliable sender. In case we had any pending messages ready
29✔
601
        // to be sent when the gossiper was last shut down, we must continue on
29✔
602
        // our quest to deliver them to their respective peers.
29✔
603
        if err := d.reliableSender.Start(); err != nil {
29✔
604
                return err
×
605
        }
×
606

607
        d.syncMgr.Start()
29✔
608

29✔
609
        // Start receiving blocks in its dedicated goroutine.
29✔
610
        d.wg.Add(2)
29✔
611
        go d.syncBlockHeight()
29✔
612
        go d.networkHandler()
29✔
613

29✔
614
        return nil
29✔
615
}
616

617
// syncBlockHeight syncs the best block height for the gossiper by reading
618
// blockEpochs.
619
//
620
// NOTE: must be run as a goroutine.
621
func (d *AuthenticatedGossiper) syncBlockHeight() {
29✔
622
        defer d.wg.Done()
29✔
623

29✔
624
        for {
58✔
625
                select {
29✔
626
                // A new block has arrived, so we can re-process the previously
627
                // premature announcements.
628
                case newBlock, ok := <-d.blockEpochs.Epochs:
4✔
629
                        // If the channel has been closed, then this indicates
4✔
630
                        // the daemon is shutting down, so we exit ourselves.
4✔
631
                        if !ok {
8✔
632
                                return
4✔
633
                        }
4✔
634

635
                        // Once a new block arrives, we update our running
636
                        // track of the height of the chain tip.
637
                        d.Lock()
4✔
638
                        blockHeight := uint32(newBlock.Height)
4✔
639
                        d.bestHeight = blockHeight
4✔
640
                        d.Unlock()
4✔
641

4✔
642
                        log.Debugf("New block: height=%d, hash=%s", blockHeight,
4✔
643
                                newBlock.Hash)
4✔
644

4✔
645
                        // Resend future messages, if any.
4✔
646
                        d.resendFutureMessages(blockHeight)
4✔
647

648
                case <-d.quit:
25✔
649
                        return
25✔
650
                }
651
        }
652
}
653

654
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
655
// the unique ID when saving the message.
656
type futureMsgCache struct {
657
        *lru.Cache[uint64, *cachedFutureMsg]
658

659
        // msgID is a monotonically increased integer.
660
        msgID atomic.Uint64
661
}
662

663
// nextMsgID returns a unique message ID.
664
func (f *futureMsgCache) nextMsgID() uint64 {
4✔
665
        return f.msgID.Add(1)
4✔
666
}
4✔
667

668
// newFutureMsgCache creates a new future message cache with the underlying lru
669
// cache being initialized with the specified capacity.
670
func newFutureMsgCache(capacity uint64) *futureMsgCache {
30✔
671
        // Create a new cache.
30✔
672
        cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
30✔
673

30✔
674
        return &futureMsgCache{
30✔
675
                Cache: cache,
30✔
676
        }
30✔
677
}
30✔
678

679
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
680
type cachedFutureMsg struct {
681
        // msg is the network message.
682
        msg *networkMsg
683

684
        // height is the block height.
685
        height uint32
686
}
687

688
// Size returns the size of the message.
689
func (c *cachedFutureMsg) Size() (uint64, error) {
5✔
690
        // Return a constant 1.
5✔
691
        return 1, nil
5✔
692
}
5✔
693

694
// resendFutureMessages takes a block height, resends all the future messages
695
// found below and equal to that height and deletes those messages found in the
696
// gossiper's futureMsgs.
697
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
4✔
698
        var (
4✔
699
                // msgs are the target messages.
4✔
700
                msgs []*networkMsg
4✔
701

4✔
702
                // keys are the target messages' caching keys.
4✔
703
                keys []uint64
4✔
704
        )
4✔
705

4✔
706
        // filterMsgs is the visitor used when iterating the future cache.
4✔
707
        filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
5✔
708
                if cmsg.height <= height {
2✔
709
                        msgs = append(msgs, cmsg.msg)
1✔
710
                        keys = append(keys, k)
1✔
711
                }
1✔
712

713
                return true
1✔
714
        }
715

716
        // Filter out the target messages.
717
        d.futureMsgs.Range(filterMsgs)
4✔
718

4✔
719
        // Return early if no messages found.
4✔
720
        if len(msgs) == 0 {
8✔
721
                return
4✔
722
        }
4✔
723

724
        // Remove the filtered messages.
725
        for _, key := range keys {
2✔
726
                d.futureMsgs.Delete(key)
1✔
727
        }
1✔
728

729
        log.Debugf("Resending %d network messages at height %d",
1✔
730
                len(msgs), height)
1✔
731

1✔
732
        for _, msg := range msgs {
2✔
733
                select {
1✔
734
                case d.networkMsgs <- msg:
1✔
735
                case <-d.quit:
×
736
                        msg.err <- ErrGossiperShuttingDown
×
737
                }
738
        }
739
}
740

741
// Stop signals any active goroutines for a graceful closure.
742
func (d *AuthenticatedGossiper) Stop() error {
30✔
743
        d.stopped.Do(func() {
59✔
744
                log.Info("Authenticated gossiper shutting down...")
29✔
745
                defer log.Debug("Authenticated gossiper shutdown complete")
29✔
746

29✔
747
                d.stop()
29✔
748
        })
29✔
749
        return nil
30✔
750
}
751

752
func (d *AuthenticatedGossiper) stop() {
29✔
753
        log.Debug("Authenticated Gossiper is stopping")
29✔
754
        defer log.Debug("Authenticated Gossiper stopped")
29✔
755

29✔
756
        // `blockEpochs` is only initialized in the start routine so we make
29✔
757
        // sure we don't panic here in the case where the `Stop` method is
29✔
758
        // called when the `Start` method does not complete.
29✔
759
        if d.blockEpochs != nil {
58✔
760
                d.blockEpochs.Cancel()
29✔
761
        }
29✔
762

763
        d.syncMgr.Stop()
29✔
764

29✔
765
        close(d.quit)
29✔
766
        d.wg.Wait()
29✔
767

29✔
768
        // We'll stop our reliable sender after all of the gossiper's goroutines
29✔
769
        // have exited to ensure nothing can cause it to continue executing.
29✔
770
        d.reliableSender.Stop()
29✔
771
}
772

773
// TODO(roasbeef): need method to get current gossip timestamp?
774
//  * using mtx, check time rotate forward is needed?
775

776
// ProcessRemoteAnnouncement sends a new remote announcement message along with
777
// the peer that sent the routing message. The announcement will be processed
778
// then added to a queue for batched trickled announcement to all connected
779
// peers.  Remote channel announcements should contain the announcement proof
780
// and be fully validated.
781
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
782
        peer lnpeer.Peer) chan error {
86✔
783

86✔
784
        errChan := make(chan error, 1)
86✔
785

86✔
786
        // For messages in the known set of channel series queries, we'll
86✔
787
        // dispatch the message directly to the GossipSyncer, and skip the main
86✔
788
        // processing loop.
86✔
789
        switch m := msg.(type) {
86✔
790
        case *lnwire.QueryShortChanIDs,
791
                *lnwire.QueryChannelRange,
792
                *lnwire.ReplyChannelRange,
793
                *lnwire.ReplyShortChanIDsEnd:
4✔
794

4✔
795
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
4✔
796
                if !ok {
4✔
797
                        log.Warnf("Gossip syncer for peer=%x not found",
×
UNCOV
798
                                peer.PubKey())
×
UNCOV
799

×
UNCOV
800
                        errChan <- ErrGossipSyncerNotFound
×
UNCOV
801
                        return errChan
×
UNCOV
802
                }
×
803

804
                // If we've found the message target, then we'll dispatch the
805
                // message directly to it.
806
                syncer.ProcessQueryMsg(m, peer.QuitSignal())
4✔
807

4✔
808
                errChan <- nil
4✔
809
                return errChan
4✔
810

811
        // If a peer is updating its current update horizon, then we'll dispatch
812
        // that directly to the proper GossipSyncer.
813
        case *lnwire.GossipTimestampRange:
4✔
814
                syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
4✔
815
                if !ok {
4✔
816
                        log.Warnf("Gossip syncer for peer=%x not found",
×
UNCOV
817
                                peer.PubKey())
×
UNCOV
818

×
UNCOV
819
                        errChan <- ErrGossipSyncerNotFound
×
UNCOV
820
                        return errChan
×
821
                }
×
822

823
                // If we've found the message target, then we'll dispatch the
824
                // message directly to it.
825
                if err := syncer.ApplyGossipFilter(m); err != nil {
4✔
826
                        log.Warnf("Unable to apply gossip filter for peer=%x: "+
×
UNCOV
827
                                "%v", peer.PubKey(), err)
×
UNCOV
828

×
UNCOV
829
                        errChan <- err
×
UNCOV
830
                        return errChan
×
UNCOV
831
                }
×
832

833
                errChan <- nil
4✔
834
                return errChan
4✔
835

836
        // To avoid inserting edges in the graph for our own channels that we
837
        // have already closed, we ignore such channel announcements coming
838
        // from the remote.
839
        case *lnwire.ChannelAnnouncement:
21✔
840
                ownKey := d.selfKey.SerializeCompressed()
21✔
841
                ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement " +
21✔
842
                        "for own channel")
21✔
843

21✔
844
                if bytes.Equal(m.NodeID1[:], ownKey) ||
21✔
845
                        bytes.Equal(m.NodeID2[:], ownKey) {
27✔
846

6✔
847
                        log.Warn(ownErr)
6✔
848
                        errChan <- ownErr
6✔
849
                        return errChan
6✔
850
                }
6✔
851
        }
852

853
        nMsg := &networkMsg{
84✔
854
                msg:      msg,
84✔
855
                isRemote: true,
84✔
856
                peer:     peer,
84✔
857
                source:   peer.IdentityKey(),
84✔
858
                err:      errChan,
84✔
859
        }
84✔
860

84✔
861
        select {
84✔
862
        case d.networkMsgs <- nMsg:
84✔
863

864
        // If the peer that sent us this error is quitting, then we don't need
865
        // to send back an error and can return immediately.
UNCOV
866
        case <-peer.QuitSignal():
×
UNCOV
867
                return nil
×
UNCOV
868
        case <-d.quit:
×
UNCOV
869
                nMsg.err <- ErrGossiperShuttingDown
×
870
        }
871

872
        return nMsg.err
84✔
873
}
874

875
// ProcessLocalAnnouncement sends a new remote announcement message along with
876
// the peer that sent the routing message. The announcement will be processed
877
// then added to a queue for batched trickled announcement to all connected
878
// peers.  Local channel announcements don't contain the announcement proof and
879
// will not be fully validated. Once the channel proofs are received, the
880
// entire channel announcement and update messages will be re-constructed and
881
// broadcast to the rest of the network.
882
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
883
        optionalFields ...OptionalMsgField) chan error {
51✔
884

51✔
885
        optionalMsgFields := &optionalMsgFields{}
51✔
886
        optionalMsgFields.apply(optionalFields...)
51✔
887

51✔
888
        nMsg := &networkMsg{
51✔
889
                msg:               msg,
51✔
890
                optionalMsgFields: optionalMsgFields,
51✔
891
                isRemote:          false,
51✔
892
                source:            d.selfKey,
51✔
893
                err:               make(chan error, 1),
51✔
894
        }
51✔
895

51✔
896
        select {
51✔
897
        case d.networkMsgs <- nMsg:
51✔
UNCOV
898
        case <-d.quit:
×
UNCOV
899
                nMsg.err <- ErrGossiperShuttingDown
×
900
        }
901

902
        return nMsg.err
51✔
903
}
904

905
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
906
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
907
// tuple.
908
type channelUpdateID struct {
909
        // channelID represents the set of data which is needed to
910
        // retrieve all necessary data to validate the channel existence.
911
        channelID lnwire.ShortChannelID
912

913
        // Flags least-significant bit must be set to 0 if the creating node
914
        // corresponds to the first node in the previously sent channel
915
        // announcement and 1 otherwise.
916
        flags lnwire.ChanUpdateChanFlags
917
}
918

919
// msgWithSenders is a wrapper struct around a message, and the set of peers
920
// that originally sent us this message. Using this struct, we can ensure that
921
// we don't re-send a message to the peer that sent it to us in the first
922
// place.
923
type msgWithSenders struct {
924
        // msg is the wire message itself.
925
        msg lnwire.Message
926

927
        // isLocal is true if this was a message that originated locally. We'll
928
        // use this to bypass our normal checks to ensure we prioritize sending
929
        // out our own updates.
930
        isLocal bool
931

932
        // sender is the set of peers that sent us this message.
933
        senders map[route.Vertex]struct{}
934
}
935

936
// mergeSyncerMap is used to merge the set of senders of a particular message
937
// with peers that we have an active GossipSyncer with. We do this to ensure
938
// that we don't broadcast messages to any peers that we have active gossip
939
// syncers for.
940
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
28✔
941
        for peerPub := range syncers {
32✔
942
                m.senders[peerPub] = struct{}{}
4✔
943
        }
4✔
944
}
945

946
// deDupedAnnouncements de-duplicates announcements that have been added to the
947
// batch. Internally, announcements are stored in three maps
948
// (one each for channel announcements, channel updates, and node
949
// announcements). These maps keep track of unique announcements and ensure no
950
// announcements are duplicated. We keep the three message types separate, such
951
// that we can send channel announcements first, then channel updates, and
952
// finally node announcements when it's time to broadcast them.
953
type deDupedAnnouncements struct {
954
        // channelAnnouncements are identified by the short channel id field.
955
        channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
956

957
        // channelUpdates are identified by the channel update id field.
958
        channelUpdates map[channelUpdateID]msgWithSenders
959

960
        // nodeAnnouncements are identified by the Vertex field.
961
        nodeAnnouncements map[route.Vertex]msgWithSenders
962

963
        sync.Mutex
964
}
965

966
// Reset operates on deDupedAnnouncements to reset the storage of
967
// announcements.
968
func (d *deDupedAnnouncements) Reset() {
31✔
969
        d.Lock()
31✔
970
        defer d.Unlock()
31✔
971

31✔
972
        d.reset()
31✔
973
}
31✔
974

975
// reset is the private version of the Reset method. We have this so we can
976
// call this method within method that are already holding the lock.
977
func (d *deDupedAnnouncements) reset() {
316✔
978
        // Storage of each type of announcement (channel announcements, channel
316✔
979
        // updates, node announcements) is set to an empty map where the
316✔
980
        // appropriate key points to the corresponding lnwire.Message.
316✔
981
        d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
316✔
982
        d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
316✔
983
        d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
316✔
984
}
316✔
985

986
// addMsg adds a new message to the current batch. If the message is already
987
// present in the current batch, then this new instance replaces the latter,
988
// and the set of senders is updated to reflect which node sent us this
989
// message.
990
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
90✔
991
        log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
90✔
992

90✔
993
        // Depending on the message type (channel announcement, channel update,
90✔
994
        // or node announcement), the message is added to the corresponding map
90✔
995
        // in deDupedAnnouncements. Because each identifying key can have at
90✔
996
        // most one value, the announcements are de-duplicated, with newer ones
90✔
997
        // replacing older ones.
90✔
998
        switch msg := message.msg.(type) {
90✔
999

1000
        // Channel announcements are identified by the short channel id field.
1001
        case *lnwire.ChannelAnnouncement:
26✔
1002
                deDupKey := msg.ShortChannelID
26✔
1003
                sender := route.NewVertex(message.source)
26✔
1004

26✔
1005
                mws, ok := d.channelAnnouncements[deDupKey]
26✔
1006
                if !ok {
51✔
1007
                        mws = msgWithSenders{
25✔
1008
                                msg:     msg,
25✔
1009
                                isLocal: !message.isRemote,
25✔
1010
                                senders: make(map[route.Vertex]struct{}),
25✔
1011
                        }
25✔
1012
                        mws.senders[sender] = struct{}{}
25✔
1013

25✔
1014
                        d.channelAnnouncements[deDupKey] = mws
25✔
1015

25✔
1016
                        return
25✔
1017
                }
25✔
1018

1019
                mws.msg = msg
1✔
1020
                mws.senders[sender] = struct{}{}
1✔
1021
                d.channelAnnouncements[deDupKey] = mws
1✔
1022

1023
        // Channel updates are identified by the (short channel id,
1024
        // channelflags) tuple.
1025
        case *lnwire.ChannelUpdate:
46✔
1026
                sender := route.NewVertex(message.source)
46✔
1027
                deDupKey := channelUpdateID{
46✔
1028
                        msg.ShortChannelID,
46✔
1029
                        msg.ChannelFlags,
46✔
1030
                }
46✔
1031

46✔
1032
                oldTimestamp := uint32(0)
46✔
1033
                mws, ok := d.channelUpdates[deDupKey]
46✔
1034
                if ok {
49✔
1035
                        // If we already have seen this message, record its
3✔
1036
                        // timestamp.
3✔
1037
                        oldTimestamp = mws.msg.(*lnwire.ChannelUpdate).Timestamp
3✔
1038
                }
3✔
1039

1040
                // If we already had this message with a strictly newer
1041
                // timestamp, then we'll just discard the message we got.
1042
                if oldTimestamp > msg.Timestamp {
47✔
1043
                        log.Debugf("Ignored outdated network message: "+
1✔
1044
                                "peer=%v, msg=%s", message.peer, msg.MsgType())
1✔
1045
                        return
1✔
1046
                }
1✔
1047

1048
                // If the message we just got is newer than what we previously
1049
                // have seen, or this is the first time we see it, then we'll
1050
                // add it to our map of announcements.
1051
                if oldTimestamp < msg.Timestamp {
89✔
1052
                        mws = msgWithSenders{
44✔
1053
                                msg:     msg,
44✔
1054
                                isLocal: !message.isRemote,
44✔
1055
                                senders: make(map[route.Vertex]struct{}),
44✔
1056
                        }
44✔
1057

44✔
1058
                        // We'll mark the sender of the message in the
44✔
1059
                        // senders map.
44✔
1060
                        mws.senders[sender] = struct{}{}
44✔
1061

44✔
1062
                        d.channelUpdates[deDupKey] = mws
44✔
1063

44✔
1064
                        return
44✔
1065
                }
44✔
1066

1067
                // Lastly, if we had seen this exact message from before, with
1068
                // the same timestamp, we'll add the sender to the map of
1069
                // senders, such that we can skip sending this message back in
1070
                // the next batch.
1071
                mws.msg = msg
1✔
1072
                mws.senders[sender] = struct{}{}
1✔
1073
                d.channelUpdates[deDupKey] = mws
1✔
1074

1075
        // Node announcements are identified by the Vertex field.  Use the
1076
        // NodeID to create the corresponding Vertex.
1077
        case *lnwire.NodeAnnouncement:
26✔
1078
                sender := route.NewVertex(message.source)
26✔
1079
                deDupKey := route.Vertex(msg.NodeID)
26✔
1080

26✔
1081
                // We do the same for node announcements as we did for channel
26✔
1082
                // updates, as they also carry a timestamp.
26✔
1083
                oldTimestamp := uint32(0)
26✔
1084
                mws, ok := d.nodeAnnouncements[deDupKey]
26✔
1085
                if ok {
35✔
1086
                        oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
9✔
1087
                }
9✔
1088

1089
                // Discard the message if it's old.
1090
                if oldTimestamp > msg.Timestamp {
30✔
1091
                        return
4✔
1092
                }
4✔
1093

1094
                // Replace if it's newer.
1095
                if oldTimestamp < msg.Timestamp {
48✔
1096
                        mws = msgWithSenders{
22✔
1097
                                msg:     msg,
22✔
1098
                                isLocal: !message.isRemote,
22✔
1099
                                senders: make(map[route.Vertex]struct{}),
22✔
1100
                        }
22✔
1101

22✔
1102
                        mws.senders[sender] = struct{}{}
22✔
1103

22✔
1104
                        d.nodeAnnouncements[deDupKey] = mws
22✔
1105

22✔
1106
                        return
22✔
1107
                }
22✔
1108

1109
                // Add to senders map if it's the same as we had.
1110
                mws.msg = msg
8✔
1111
                mws.senders[sender] = struct{}{}
8✔
1112
                d.nodeAnnouncements[deDupKey] = mws
8✔
1113
        }
1114
}
1115

1116
// AddMsgs is a helper method to add multiple messages to the announcement
1117
// batch.
1118
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
58✔
1119
        d.Lock()
58✔
1120
        defer d.Unlock()
58✔
1121

58✔
1122
        for _, msg := range msgs {
148✔
1123
                d.addMsg(msg)
90✔
1124
        }
90✔
1125
}
1126

1127
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
1128
// to broadcast next into messages that are locally sourced and those that are
1129
// sourced remotely.
1130
type msgsToBroadcast struct {
1131
        // localMsgs is the set of messages we created locally.
1132
        localMsgs []msgWithSenders
1133

1134
        // remoteMsgs is the set of messages that we received from a remote
1135
        // party.
1136
        remoteMsgs []msgWithSenders
1137
}
1138

1139
// addMsg adds a new message to the appropriate sub-slice.
1140
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
75✔
1141
        if msg.isLocal {
126✔
1142
                m.localMsgs = append(m.localMsgs, msg)
51✔
1143
        } else {
79✔
1144
                m.remoteMsgs = append(m.remoteMsgs, msg)
28✔
1145
        }
28✔
1146
}
1147

1148
// isEmpty returns true if the batch is empty.
1149
func (m *msgsToBroadcast) isEmpty() bool {
288✔
1150
        return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
288✔
1151
}
288✔
1152

1153
// length returns the length of the combined message set.
1154
func (m *msgsToBroadcast) length() int {
1✔
1155
        return len(m.localMsgs) + len(m.remoteMsgs)
1✔
1156
}
1✔
1157

1158
// Emit returns the set of de-duplicated announcements to be sent out during
1159
// the next announcement epoch, in the order of channel announcements, channel
1160
// updates, and node announcements. Each message emitted, contains the set of
1161
// peers that sent us the message. This way, we can ensure that we don't waste
1162
// bandwidth by re-sending a message to the peer that sent it to us in the
1163
// first place. Additionally, the set of stored messages are reset.
1164
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
289✔
1165
        d.Lock()
289✔
1166
        defer d.Unlock()
289✔
1167

289✔
1168
        // Get the total number of announcements.
289✔
1169
        numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
289✔
1170
                len(d.nodeAnnouncements)
289✔
1171

289✔
1172
        // Create an empty array of lnwire.Messages with a length equal to
289✔
1173
        // the total number of announcements.
289✔
1174
        msgs := msgsToBroadcast{
289✔
1175
                localMsgs:  make([]msgWithSenders, 0, numAnnouncements),
289✔
1176
                remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
289✔
1177
        }
289✔
1178

289✔
1179
        // Add the channel announcements to the array first.
289✔
1180
        for _, message := range d.channelAnnouncements {
311✔
1181
                msgs.addMsg(message)
22✔
1182
        }
22✔
1183

1184
        // Then add the channel updates.
1185
        for _, message := range d.channelUpdates {
329✔
1186
                msgs.addMsg(message)
40✔
1187
        }
40✔
1188

1189
        // Finally add the node announcements.
1190
        for _, message := range d.nodeAnnouncements {
310✔
1191
                msgs.addMsg(message)
21✔
1192
        }
21✔
1193

1194
        d.reset()
289✔
1195

289✔
1196
        // Return the array of lnwire.messages.
289✔
1197
        return msgs
289✔
1198
}
1199

1200
// calculateSubBatchSize is a helper function that calculates the size to break
1201
// down the batchSize into.
1202
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
1203
        minimumBatchSize, batchSize int) int {
17✔
1204
        if subBatchDelay > totalDelay {
19✔
1205
                return batchSize
2✔
1206
        }
2✔
1207

1208
        subBatchSize := (batchSize*int(subBatchDelay) +
15✔
1209
                int(totalDelay) - 1) / int(totalDelay)
15✔
1210

15✔
1211
        if subBatchSize < minimumBatchSize {
20✔
1212
                return minimumBatchSize
5✔
1213
        }
5✔
1214

1215
        return subBatchSize
10✔
1216
}
1217

1218
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
1219
// this variable so the function can be mocked in our test.
1220
var batchSizeCalculator = calculateSubBatchSize
1221

1222
// splitAnnouncementBatches takes an exiting list of announcements and
1223
// decomposes it into sub batches controlled by the `subBatchSize`.
1224
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
1225
        announcementBatch []msgWithSenders) [][]msgWithSenders {
73✔
1226

73✔
1227
        subBatchSize := batchSizeCalculator(
73✔
1228
                d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
73✔
1229
                d.cfg.MinimumBatchSize, len(announcementBatch),
73✔
1230
        )
73✔
1231

73✔
1232
        var splitAnnouncementBatch [][]msgWithSenders
73✔
1233

73✔
1234
        for subBatchSize < len(announcementBatch) {
198✔
1235
                // For slicing with minimal allocation
125✔
1236
                // https://github.com/golang/go/wiki/SliceTricks
125✔
1237
                announcementBatch, splitAnnouncementBatch =
125✔
1238
                        announcementBatch[subBatchSize:],
125✔
1239
                        append(splitAnnouncementBatch,
125✔
1240
                                announcementBatch[0:subBatchSize:subBatchSize])
125✔
1241
        }
125✔
1242
        splitAnnouncementBatch = append(
73✔
1243
                splitAnnouncementBatch, announcementBatch,
73✔
1244
        )
73✔
1245

73✔
1246
        return splitAnnouncementBatch
73✔
1247
}
1248

1249
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
1250
// split size, and then sends out all items to the set of target peers. Locally
1251
// generated announcements are always sent before remotely generated
1252
// announcements.
1253
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
1254
        annBatch msgsToBroadcast) {
35✔
1255

35✔
1256
        // delayNextBatch is a helper closure that blocks for `SubBatchDelay`
35✔
1257
        // duration to delay the sending of next announcement batch.
35✔
1258
        delayNextBatch := func() {
101✔
1259
                select {
66✔
1260
                case <-time.After(d.cfg.SubBatchDelay):
49✔
1261
                case <-d.quit:
17✔
1262
                        return
17✔
1263
                }
1264
        }
1265

1266
        // Fetch the local and remote announcements.
1267
        localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
35✔
1268
        remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
35✔
1269

35✔
1270
        d.wg.Add(1)
35✔
1271
        go func() {
70✔
1272
                defer d.wg.Done()
35✔
1273

35✔
1274
                log.Debugf("Broadcasting %v new local announcements in %d "+
35✔
1275
                        "sub batches", len(annBatch.localMsgs),
35✔
1276
                        len(localBatches))
35✔
1277

35✔
1278
                // Send out the local announcements first.
35✔
1279
                for _, annBatch := range localBatches {
70✔
1280
                        d.sendLocalBatch(annBatch)
35✔
1281
                        delayNextBatch()
35✔
1282
                }
35✔
1283

1284
                log.Debugf("Broadcasting %v new remote announcements in %d "+
35✔
1285
                        "sub batches", len(annBatch.remoteMsgs),
35✔
1286
                        len(remoteBatches))
35✔
1287

35✔
1288
                // Now send the remote announcements.
35✔
1289
                for _, annBatch := range remoteBatches {
70✔
1290
                        d.sendRemoteBatch(annBatch)
35✔
1291
                        delayNextBatch()
35✔
1292
                }
35✔
1293
        }()
1294
}
1295

1296
// sendLocalBatch broadcasts a list of locally generated announcements to our
1297
// peers. For local announcements, we skip the filter and dedup logic and just
1298
// send the announcements out to all our coonnected peers.
1299
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
35✔
1300
        msgsToSend := lnutils.Map(
35✔
1301
                annBatch, func(m msgWithSenders) lnwire.Message {
82✔
1302
                        return m.msg
47✔
1303
                },
47✔
1304
        )
1305

1306
        err := d.cfg.Broadcast(nil, msgsToSend...)
35✔
1307
        if err != nil {
35✔
UNCOV
1308
                log.Errorf("Unable to send local batch announcements: %v", err)
×
UNCOV
1309
        }
×
1310
}
1311

1312
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
1313
// peers.
1314
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
35✔
1315
        syncerPeers := d.syncMgr.GossipSyncers()
35✔
1316

35✔
1317
        // We'll first attempt to filter out this new message for all peers
35✔
1318
        // that have active gossip syncers active.
35✔
1319
        for pub, syncer := range syncerPeers {
39✔
1320
                log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
4✔
1321
                syncer.FilterGossipMsgs(annBatch...)
4✔
1322
        }
4✔
1323

1324
        for _, msgChunk := range annBatch {
63✔
1325
                msgChunk := msgChunk
28✔
1326

28✔
1327
                // With the syncers taken care of, we'll merge the sender map
28✔
1328
                // with the set of syncers, so we don't send out duplicate
28✔
1329
                // messages.
28✔
1330
                msgChunk.mergeSyncerMap(syncerPeers)
28✔
1331

28✔
1332
                err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
28✔
1333
                if err != nil {
28✔
UNCOV
1334
                        log.Errorf("Unable to send batch "+
×
UNCOV
1335
                                "announcements: %v", err)
×
UNCOV
1336
                        continue
×
1337
                }
1338
        }
1339
}
1340

1341
// networkHandler is the primary goroutine that drives this service. The roles
1342
// of this goroutine includes answering queries related to the state of the
1343
// network, syncing up newly connected peers, and also periodically
1344
// broadcasting our latest topology state to all connected peers.
1345
//
1346
// NOTE: This MUST be run as a goroutine.
1347
func (d *AuthenticatedGossiper) networkHandler() {
29✔
1348
        defer d.wg.Done()
29✔
1349

29✔
1350
        // Initialize empty deDupedAnnouncements to store announcement batch.
29✔
1351
        announcements := deDupedAnnouncements{}
29✔
1352
        announcements.Reset()
29✔
1353

29✔
1354
        d.cfg.RetransmitTicker.Resume()
29✔
1355
        defer d.cfg.RetransmitTicker.Stop()
29✔
1356

29✔
1357
        trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
29✔
1358
        defer trickleTimer.Stop()
29✔
1359

29✔
1360
        // To start, we'll first check to see if there are any stale channel or
29✔
1361
        // node announcements that we need to re-transmit.
29✔
1362
        if err := d.retransmitStaleAnns(time.Now()); err != nil {
29✔
UNCOV
1363
                log.Errorf("Unable to rebroadcast stale announcements: %v", err)
×
UNCOV
1364
        }
×
1365

1366
        // We'll use this validation to ensure that we process jobs in their
1367
        // dependency order during parallel validation.
1368
        validationBarrier := graph.NewValidationBarrier(1000, d.quit)
29✔
1369

29✔
1370
        for {
473✔
1371
                select {
444✔
1372
                // A new policy update has arrived. We'll commit it to the
1373
                // sub-systems below us, then craft, sign, and broadcast a new
1374
                // ChannelUpdate for the set of affected clients.
1375
                case policyUpdate := <-d.chanPolicyUpdates:
5✔
1376
                        log.Tracef("Received channel %d policy update requests",
5✔
1377
                                len(policyUpdate.edgesToUpdate))
5✔
1378

5✔
1379
                        // First, we'll now create new fully signed updates for
5✔
1380
                        // the affected channels and also update the underlying
5✔
1381
                        // graph with the new state.
5✔
1382
                        newChanUpdates, err := d.processChanPolicyUpdate(
5✔
1383
                                policyUpdate.edgesToUpdate,
5✔
1384
                        )
5✔
1385
                        policyUpdate.errChan <- err
5✔
1386
                        if err != nil {
5✔
UNCOV
1387
                                log.Errorf("Unable to craft policy updates: %v",
×
UNCOV
1388
                                        err)
×
UNCOV
1389
                                continue
×
1390
                        }
1391

1392
                        // Finally, with the updates committed, we'll now add
1393
                        // them to the announcement batch to be flushed at the
1394
                        // start of the next epoch.
1395
                        announcements.AddMsgs(newChanUpdates...)
5✔
1396

1397
                case announcement := <-d.networkMsgs:
133✔
1398
                        log.Tracef("Received network message: "+
133✔
1399
                                "peer=%v, msg=%s, is_remote=%v",
133✔
1400
                                announcement.peer, announcement.msg.MsgType(),
133✔
1401
                                announcement.isRemote)
133✔
1402

133✔
1403
                        switch announcement.msg.(type) {
133✔
1404
                        // Channel announcement signatures are amongst the only
1405
                        // messages that we'll process serially.
1406
                        case *lnwire.AnnounceSignatures:
25✔
1407
                                emittedAnnouncements, _ := d.processNetworkAnnouncement(
25✔
1408
                                        announcement,
25✔
1409
                                )
25✔
1410
                                log.Debugf("Processed network message %s, "+
25✔
1411
                                        "returned len(announcements)=%v",
25✔
1412
                                        announcement.msg.MsgType(),
25✔
1413
                                        len(emittedAnnouncements))
25✔
1414

25✔
1415
                                if emittedAnnouncements != nil {
39✔
1416
                                        announcements.AddMsgs(
14✔
1417
                                                emittedAnnouncements...,
14✔
1418
                                        )
14✔
1419
                                }
14✔
1420
                                continue
25✔
1421
                        }
1422

1423
                        // If this message was recently rejected, then we won't
1424
                        // attempt to re-process it.
1425
                        if announcement.isRemote && d.isRecentlyRejectedMsg(
112✔
1426
                                announcement.msg,
112✔
1427
                                sourceToPub(announcement.source),
112✔
1428
                        ) {
113✔
1429

1✔
1430
                                announcement.err <- fmt.Errorf("recently " +
1✔
1431
                                        "rejected")
1✔
1432
                                continue
1✔
1433
                        }
1434

1435
                        // We'll set up any dependent, and wait until a free
1436
                        // slot for this job opens up, this allow us to not
1437
                        // have thousands of goroutines active.
1438
                        validationBarrier.InitJobDependencies(announcement.msg)
111✔
1439

111✔
1440
                        d.wg.Add(1)
111✔
1441
                        go d.handleNetworkMessages(
111✔
1442
                                announcement, &announcements, validationBarrier,
111✔
1443
                        )
111✔
1444

1445
                // The trickle timer has ticked, which indicates we should
1446
                // flush to the network the pending batch of new announcements
1447
                // we've received since the last trickle tick.
1448
                case <-trickleTimer.C:
288✔
1449
                        // Emit the current batch of announcements from
288✔
1450
                        // deDupedAnnouncements.
288✔
1451
                        announcementBatch := announcements.Emit()
288✔
1452

288✔
1453
                        // If the current announcements batch is nil, then we
288✔
1454
                        // have no further work here.
288✔
1455
                        if announcementBatch.isEmpty() {
545✔
1456
                                continue
257✔
1457
                        }
1458

1459
                        // At this point, we have the set of local and remote
1460
                        // announcements we want to send out. We'll do the
1461
                        // batching as normal for both, but for local
1462
                        // announcements, we'll blast them out w/o regard for
1463
                        // our peer's policies so we ensure they propagate
1464
                        // properly.
1465
                        d.splitAndSendAnnBatch(announcementBatch)
35✔
1466

1467
                // The retransmission timer has ticked which indicates that we
1468
                // should check if we need to prune or re-broadcast any of our
1469
                // personal channels or node announcement. This addresses the
1470
                // case of "zombie" channels and channel advertisements that
1471
                // have been dropped, or not properly propagated through the
1472
                // network.
1473
                case tick := <-d.cfg.RetransmitTicker.Ticks():
1✔
1474
                        if err := d.retransmitStaleAnns(tick); err != nil {
1✔
UNCOV
1475
                                log.Errorf("unable to rebroadcast stale "+
×
UNCOV
1476
                                        "announcements: %v", err)
×
UNCOV
1477
                        }
×
1478

1479
                // The gossiper has been signalled to exit, to we exit our
1480
                // main loop so the wait group can be decremented.
1481
                case <-d.quit:
29✔
1482
                        return
29✔
1483
                }
1484
        }
1485
}
1486

1487
// handleNetworkMessages is responsible for waiting for dependencies for a
1488
// given network message and processing the message. Once processed, it will
1489
// signal its dependants and add the new announcements to the announce batch.
1490
//
1491
// NOTE: must be run as a goroutine.
1492
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1493
        deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {
111✔
1494

111✔
1495
        defer d.wg.Done()
111✔
1496
        defer vb.CompleteJob()
111✔
1497

111✔
1498
        // We should only broadcast this message forward if it originated from
111✔
1499
        // us or it wasn't received as part of our initial historical sync.
111✔
1500
        shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
111✔
1501

111✔
1502
        // If this message has an existing dependency, then we'll wait until
111✔
1503
        // that has been fully validated before we proceed.
111✔
1504
        err := vb.WaitForDependants(nMsg.msg)
111✔
1505
        if err != nil {
111✔
1506
                log.Debugf("Validating network message %s got err: %v",
×
1507
                        nMsg.msg.MsgType(), err)
×
1508

×
1509
                if !graph.IsError(
×
1510
                        err,
×
1511
                        graph.ErrVBarrierShuttingDown,
×
1512
                        graph.ErrParentValidationFailed,
×
1513
                ) {
×
1514

×
1515
                        log.Warnf("unexpected error during validation "+
×
UNCOV
1516
                                "barrier shutdown: %v", err)
×
UNCOV
1517
                }
×
UNCOV
1518
                nMsg.err <- err
×
UNCOV
1519

×
UNCOV
1520
                return
×
1521
        }
1522

1523
        // Process the network announcement to determine if this is either a
1524
        // new announcement from our PoV or an edges to a prior vertex/edge we
1525
        // previously proceeded.
1526
        newAnns, allow := d.processNetworkAnnouncement(nMsg)
111✔
1527

111✔
1528
        log.Tracef("Processed network message %s, returned "+
111✔
1529
                "len(announcements)=%v, allowDependents=%v",
111✔
1530
                nMsg.msg.MsgType(), len(newAnns), allow)
111✔
1531

111✔
1532
        // If this message had any dependencies, then we can now signal them to
111✔
1533
        // continue.
111✔
1534
        vb.SignalDependants(nMsg.msg, allow)
111✔
1535

111✔
1536
        // If the announcement was accepted, then add the emitted announcements
111✔
1537
        // to our announce batch to be broadcast once the trickle timer ticks
111✔
1538
        // gain.
111✔
1539
        if newAnns != nil && shouldBroadcast {
147✔
1540
                // TODO(roasbeef): exclude peer that sent.
36✔
1541
                deDuped.AddMsgs(newAnns...)
36✔
1542
        } else if newAnns != nil {
120✔
1543
                log.Trace("Skipping broadcast of announcements received " +
5✔
1544
                        "during initial graph sync")
5✔
1545
        }
5✔
1546
}
1547

1548
// TODO(roasbeef): d/c peers that send updates not on our chain
1549

1550
// InitSyncState is called by outside sub-systems when a connection is
1551
// established to a new peer that understands how to perform channel range
1552
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
1553
// needed to handle new queries.
1554
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
4✔
1555
        d.syncMgr.InitSyncState(syncPeer)
4✔
1556
}
4✔
1557

1558
// PruneSyncState is called by outside sub-systems once a peer that we were
1559
// previously connected to has been disconnected. In this case we can stop the
1560
// existing GossipSyncer assigned to the peer and free up resources.
1561
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
4✔
1562
        d.syncMgr.PruneSyncState(peer)
4✔
1563
}
4✔
1564

1565
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
1566
// false otherwise, This avoids expensive reprocessing of the message.
1567
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
1568
        peerPub [33]byte) bool {
75✔
1569

75✔
1570
        var scid uint64
75✔
1571
        switch m := msg.(type) {
75✔
1572
        case *lnwire.ChannelUpdate:
46✔
1573
                scid = m.ShortChannelID.ToUint64()
46✔
1574

1575
        case *lnwire.ChannelAnnouncement:
19✔
1576
                scid = m.ShortChannelID.ToUint64()
19✔
1577

1578
        default:
18✔
1579
                return false
18✔
1580
        }
1581

1582
        _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
61✔
1583
        return err != cache.ErrElementNotFound
61✔
1584
}
1585

1586
// retransmitStaleAnns examines all outgoing channels that the source node is
1587
// known to maintain to check to see if any of them are "stale". A channel is
1588
// stale iff, the last timestamp of its rebroadcast is older than the
1589
// RebroadcastInterval. We also check if a refreshed node announcement should
1590
// be resent.
1591
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
30✔
1592
        // Iterate over all of our channels and check if any of them fall
30✔
1593
        // within the prune interval or re-broadcast interval.
30✔
1594
        type updateTuple struct {
30✔
1595
                info *models.ChannelEdgeInfo
30✔
1596
                edge *models.ChannelEdgePolicy
30✔
1597
        }
30✔
1598

30✔
1599
        var (
30✔
1600
                havePublicChannels bool
30✔
1601
                edgesToUpdate      []updateTuple
30✔
1602
        )
30✔
1603
        err := d.cfg.Graph.ForAllOutgoingChannels(func(
30✔
1604
                _ kvdb.RTx,
30✔
1605
                info *models.ChannelEdgeInfo,
30✔
1606
                edge *models.ChannelEdgePolicy) error {
36✔
1607

6✔
1608
                // If there's no auth proof attached to this edge, it means
6✔
1609
                // that it is a private channel not meant to be announced to
6✔
1610
                // the greater network, so avoid sending channel updates for
6✔
1611
                // this channel to not leak its
6✔
1612
                // existence.
6✔
1613
                if info.AuthProof == nil {
11✔
1614
                        log.Debugf("Skipping retransmission of channel "+
5✔
1615
                                "without AuthProof: %v", info.ChannelID)
5✔
1616
                        return nil
5✔
1617
                }
5✔
1618

1619
                // We make a note that we have at least one public channel. We
1620
                // use this to determine whether we should send a node
1621
                // announcement below.
1622
                havePublicChannels = true
5✔
1623

5✔
1624
                // If this edge has a ChannelUpdate that was created before the
5✔
1625
                // introduction of the MaxHTLC field, then we'll update this
5✔
1626
                // edge to propagate this information in the network.
5✔
1627
                if !edge.MessageFlags.HasMaxHtlc() {
5✔
1628
                        // We'll make sure we support the new max_htlc field if
×
1629
                        // not already present.
×
1630
                        edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
×
1631
                        edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
×
1632

×
1633
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
×
UNCOV
1634
                                info: info,
×
UNCOV
1635
                                edge: edge,
×
UNCOV
1636
                        })
×
UNCOV
1637
                        return nil
×
UNCOV
1638
                }
×
1639

1640
                timeElapsed := now.Sub(edge.LastUpdate)
5✔
1641

5✔
1642
                // If it's been longer than RebroadcastInterval since we've
5✔
1643
                // re-broadcasted the channel, add the channel to the set of
5✔
1644
                // edges we need to update.
5✔
1645
                if timeElapsed >= d.cfg.RebroadcastInterval {
6✔
1646
                        edgesToUpdate = append(edgesToUpdate, updateTuple{
1✔
1647
                                info: info,
1✔
1648
                                edge: edge,
1✔
1649
                        })
1✔
1650
                }
1✔
1651

1652
                return nil
5✔
1653
        })
1654
        if err != nil && err != channeldb.ErrGraphNoEdgesFound {
30✔
UNCOV
1655
                return fmt.Errorf("unable to retrieve outgoing channels: %w",
×
UNCOV
1656
                        err)
×
UNCOV
1657
        }
×
1658

1659
        var signedUpdates []lnwire.Message
30✔
1660
        for _, chanToUpdate := range edgesToUpdate {
31✔
1661
                // Re-sign and update the channel on disk and retrieve our
1✔
1662
                // ChannelUpdate to broadcast.
1✔
1663
                chanAnn, chanUpdate, err := d.updateChannel(
1✔
1664
                        chanToUpdate.info, chanToUpdate.edge,
1✔
1665
                )
1✔
1666
                if err != nil {
1✔
UNCOV
1667
                        return fmt.Errorf("unable to update channel: %w", err)
×
UNCOV
1668
                }
×
1669

1670
                // If we have a valid announcement to transmit, then we'll send
1671
                // that along with the update.
1672
                if chanAnn != nil {
2✔
1673
                        signedUpdates = append(signedUpdates, chanAnn)
1✔
1674
                }
1✔
1675

1676
                signedUpdates = append(signedUpdates, chanUpdate)
1✔
1677
        }
1678

1679
        // If we don't have any public channels, we return as we don't want to
1680
        // broadcast anything that would reveal our existence.
1681
        if !havePublicChannels {
59✔
1682
                return nil
29✔
1683
        }
29✔
1684

1685
        // We'll also check that our NodeAnnouncement is not too old.
1686
        currentNodeAnn := d.cfg.FetchSelfAnnouncement()
5✔
1687
        timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
5✔
1688
        timeElapsed := now.Sub(timestamp)
5✔
1689

5✔
1690
        // If it's been a full day since we've re-broadcasted the
5✔
1691
        // node announcement, refresh it and resend it.
5✔
1692
        nodeAnnStr := ""
5✔
1693
        if timeElapsed >= d.cfg.RebroadcastInterval {
6✔
1694
                newNodeAnn, err := d.cfg.UpdateSelfAnnouncement()
1✔
1695
                if err != nil {
1✔
UNCOV
1696
                        return fmt.Errorf("unable to get refreshed node "+
×
UNCOV
1697
                                "announcement: %v", err)
×
UNCOV
1698
                }
×
1699

1700
                signedUpdates = append(signedUpdates, &newNodeAnn)
1✔
1701
                nodeAnnStr = " and our refreshed node announcement"
1✔
1702

1✔
1703
                // Before broadcasting the refreshed node announcement, add it
1✔
1704
                // to our own graph.
1✔
1705
                if err := d.addNode(&newNodeAnn); err != nil {
2✔
1706
                        log.Errorf("Unable to add refreshed node announcement "+
1✔
1707
                                "to graph: %v", err)
1✔
1708
                }
1✔
1709
        }
1710

1711
        // If we don't have any updates to re-broadcast, then we'll exit
1712
        // early.
1713
        if len(signedUpdates) == 0 {
9✔
1714
                return nil
4✔
1715
        }
4✔
1716

1717
        log.Infof("Retransmitting %v outgoing channels%v",
1✔
1718
                len(edgesToUpdate), nodeAnnStr)
1✔
1719

1✔
1720
        // With all the wire announcements properly crafted, we'll broadcast
1✔
1721
        // our known outgoing channels to all our immediate peers.
1✔
1722
        if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
1✔
UNCOV
1723
                return fmt.Errorf("unable to re-broadcast channels: %w", err)
×
UNCOV
1724
        }
×
1725

1726
        return nil
1✔
1727
}
1728

1729
// processChanPolicyUpdate generates a new set of channel updates for the
1730
// provided list of edges and updates the backing ChannelGraphSource.
1731
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
1732
        edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
5✔
1733

5✔
1734
        var chanUpdates []networkMsg
5✔
1735
        for _, edgeInfo := range edgesToUpdate {
12✔
1736
                // Now that we've collected all the channels we need to update,
7✔
1737
                // we'll re-sign and update the backing ChannelGraphSource, and
7✔
1738
                // retrieve our ChannelUpdate to broadcast.
7✔
1739
                _, chanUpdate, err := d.updateChannel(
7✔
1740
                        edgeInfo.Info, edgeInfo.Edge,
7✔
1741
                )
7✔
1742
                if err != nil {
7✔
UNCOV
1743
                        return nil, err
×
UNCOV
1744
                }
×
1745

1746
                // We'll avoid broadcasting any updates for private channels to
1747
                // avoid directly giving away their existence. Instead, we'll
1748
                // send the update directly to the remote party.
1749
                if edgeInfo.Info.AuthProof == nil {
12✔
1750
                        // If AuthProof is nil and an alias was found for this
5✔
1751
                        // ChannelID (meaning the option-scid-alias feature was
5✔
1752
                        // negotiated), we'll replace the ShortChannelID in the
5✔
1753
                        // update with the peer's alias. We do this after
5✔
1754
                        // updateChannel so that the alias isn't persisted to
5✔
1755
                        // the database.
5✔
1756
                        chanID := lnwire.NewChanIDFromOutPoint(
5✔
1757
                                edgeInfo.Info.ChannelPoint,
5✔
1758
                        )
5✔
1759

5✔
1760
                        var defaultAlias lnwire.ShortChannelID
5✔
1761
                        foundAlias, _ := d.cfg.GetAlias(chanID)
5✔
1762
                        if foundAlias != defaultAlias {
9✔
1763
                                chanUpdate.ShortChannelID = foundAlias
4✔
1764

4✔
1765
                                sig, err := d.cfg.SignAliasUpdate(chanUpdate)
4✔
1766
                                if err != nil {
4✔
UNCOV
1767
                                        log.Errorf("Unable to sign alias "+
×
UNCOV
1768
                                                "update: %v", err)
×
1769
                                        continue
×
1770
                                }
1771

1772
                                lnSig, err := lnwire.NewSigFromSignature(sig)
4✔
1773
                                if err != nil {
4✔
UNCOV
1774
                                        log.Errorf("Unable to create sig: %v",
×
UNCOV
1775
                                                err)
×
UNCOV
1776
                                        continue
×
1777
                                }
1778

1779
                                chanUpdate.Signature = lnSig
4✔
1780
                        }
1781

1782
                        remotePubKey := remotePubFromChanInfo(
5✔
1783
                                edgeInfo.Info, chanUpdate.ChannelFlags,
5✔
1784
                        )
5✔
1785
                        err := d.reliableSender.sendMessage(
5✔
1786
                                chanUpdate, remotePubKey,
5✔
1787
                        )
5✔
1788
                        if err != nil {
5✔
1789
                                log.Errorf("Unable to reliably send %v for "+
×
UNCOV
1790
                                        "channel=%v to peer=%x: %v",
×
UNCOV
1791
                                        chanUpdate.MsgType(),
×
UNCOV
1792
                                        chanUpdate.ShortChannelID,
×
UNCOV
1793
                                        remotePubKey, err)
×
UNCOV
1794
                        }
×
1795
                        continue
5✔
1796
                }
1797

1798
                // We set ourselves as the source of this message to indicate
1799
                // that we shouldn't skip any peers when sending this message.
1800
                chanUpdates = append(chanUpdates, networkMsg{
6✔
1801
                        source:   d.selfKey,
6✔
1802
                        isRemote: false,
6✔
1803
                        msg:      chanUpdate,
6✔
1804
                })
6✔
1805
        }
1806

1807
        return chanUpdates, nil
5✔
1808
}
1809

1810
// remotePubFromChanInfo returns the public key of the remote peer given a
1811
// ChannelEdgeInfo that describe a channel we have with them.
1812
func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
1813
        chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
16✔
1814

16✔
1815
        var remotePubKey [33]byte
16✔
1816
        switch {
16✔
1817
        case chanFlags&lnwire.ChanUpdateDirection == 0:
16✔
1818
                remotePubKey = chanInfo.NodeKey2Bytes
16✔
1819
        case chanFlags&lnwire.ChanUpdateDirection == 1:
4✔
1820
                remotePubKey = chanInfo.NodeKey1Bytes
4✔
1821
        }
1822

1823
        return remotePubKey
16✔
1824
}
1825

1826
// processRejectedEdge examines a rejected edge to see if we can extract any
1827
// new announcements from it.  An edge will get rejected if we already added
1828
// the same edge without AuthProof to the graph. If the received announcement
1829
// contains a proof, we can add this proof to our edge.  We can end up in this
1830
// situation in the case where we create a channel, but for some reason fail
1831
// to receive the remote peer's proof, while the remote peer is able to fully
1832
// assemble the proof and craft the ChannelAnnouncement.
1833
func (d *AuthenticatedGossiper) processRejectedEdge(
1834
        chanAnnMsg *lnwire.ChannelAnnouncement,
1835
        proof *models.ChannelAuthProof) ([]networkMsg, error) {
4✔
1836

4✔
1837
        // First, we'll fetch the state of the channel as we know if from the
4✔
1838
        // database.
4✔
1839
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
4✔
1840
                chanAnnMsg.ShortChannelID,
4✔
1841
        )
4✔
1842
        if err != nil {
4✔
UNCOV
1843
                return nil, err
×
UNCOV
1844
        }
×
1845

1846
        // The edge is in the graph, and has a proof attached, then we'll just
1847
        // reject it as normal.
1848
        if chanInfo.AuthProof != nil {
8✔
1849
                return nil, nil
4✔
1850
        }
4✔
1851

1852
        // Otherwise, this means that the edge is within the graph, but it
1853
        // doesn't yet have a proper proof attached. If we did not receive
1854
        // the proof such that we now can add it, there's nothing more we
1855
        // can do.
UNCOV
1856
        if proof == nil {
×
1857
                return nil, nil
×
1858
        }
×
1859

1860
        // We'll then create then validate the new fully assembled
1861
        // announcement.
1862
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
×
1863
                proof, chanInfo, e1, e2,
×
1864
        )
×
1865
        if err != nil {
×
1866
                return nil, err
×
1867
        }
×
1868
        err = graph.ValidateChannelAnn(chanAnn)
×
1869
        if err != nil {
×
1870
                err := fmt.Errorf("assembled channel announcement proof "+
×
UNCOV
1871
                        "for shortChanID=%v isn't valid: %v",
×
UNCOV
1872
                        chanAnnMsg.ShortChannelID, err)
×
UNCOV
1873
                log.Error(err)
×
1874
                return nil, err
×
1875
        }
×
1876

1877
        // If everything checks out, then we'll add the fully assembled proof
1878
        // to the database.
1879
        err = d.cfg.Graph.AddProof(chanAnnMsg.ShortChannelID, proof)
×
1880
        if err != nil {
×
UNCOV
1881
                err := fmt.Errorf("unable add proof to shortChanID=%v: %w",
×
UNCOV
1882
                        chanAnnMsg.ShortChannelID, err)
×
UNCOV
1883
                log.Error(err)
×
UNCOV
1884
                return nil, err
×
1885
        }
×
1886

1887
        // As we now have a complete channel announcement for this channel,
1888
        // we'll construct the announcement so they can be broadcast out to all
1889
        // our peers.
1890
        announcements := make([]networkMsg, 0, 3)
×
1891
        announcements = append(announcements, networkMsg{
×
1892
                source: d.selfKey,
×
1893
                msg:    chanAnn,
×
1894
        })
×
1895
        if e1Ann != nil {
×
1896
                announcements = append(announcements, networkMsg{
×
1897
                        source: d.selfKey,
×
1898
                        msg:    e1Ann,
×
1899
                })
×
1900
        }
×
1901
        if e2Ann != nil {
×
1902
                announcements = append(announcements, networkMsg{
×
UNCOV
1903
                        source: d.selfKey,
×
1904
                        msg:    e2Ann,
×
UNCOV
1905
                })
×
UNCOV
1906

×
UNCOV
1907
        }
×
1908

UNCOV
1909
        return announcements, nil
×
1910
}
1911

1912
// addNode processes the given node announcement, and adds it to our channel
1913
// graph.
1914
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
1915
        op ...batch.SchedulerOption) error {
21✔
1916

21✔
1917
        if err := graph.ValidateNodeAnn(msg); err != nil {
22✔
1918
                return fmt.Errorf("unable to validate node announcement: %w",
1✔
1919
                        err)
1✔
1920
        }
1✔
1921

1922
        timestamp := time.Unix(int64(msg.Timestamp), 0)
20✔
1923
        features := lnwire.NewFeatureVector(msg.Features, lnwire.Features)
20✔
1924
        node := &channeldb.LightningNode{
20✔
1925
                HaveNodeAnnouncement: true,
20✔
1926
                LastUpdate:           timestamp,
20✔
1927
                Addresses:            msg.Addresses,
20✔
1928
                PubKeyBytes:          msg.NodeID,
20✔
1929
                Alias:                msg.Alias.String(),
20✔
1930
                AuthSigBytes:         msg.Signature.ToSignatureBytes(),
20✔
1931
                Features:             features,
20✔
1932
                Color:                msg.RGBColor,
20✔
1933
                ExtraOpaqueData:      msg.ExtraOpaqueData,
20✔
1934
        }
20✔
1935

20✔
1936
        return d.cfg.Graph.AddNode(node, op...)
20✔
1937
}
1938

1939
// isPremature decides whether a given network message has a block height+delta
1940
// value specified in the future. If so, the message will be added to the
1941
// future message map and be processed when the block height as reached.
1942
//
1943
// NOTE: must be used inside a lock.
1944
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
1945
        delta uint32, msg *networkMsg) bool {
81✔
1946
        // TODO(roasbeef) make height delta 6
81✔
1947
        //  * or configurable
81✔
1948

81✔
1949
        msgHeight := chanID.BlockHeight + delta
81✔
1950

81✔
1951
        // The message height is smaller or equal to our best known height,
81✔
1952
        // thus the message is mature.
81✔
1953
        if msgHeight <= d.bestHeight {
161✔
1954
                return false
80✔
1955
        }
80✔
1956

1957
        // Add the premature message to our future messages which will be
1958
        // resent once the block height has reached.
1959
        //
1960
        // Copy the networkMsgs since the old message's err chan will be
1961
        // consumed.
1962
        copied := &networkMsg{
2✔
1963
                peer:              msg.peer,
2✔
1964
                source:            msg.source,
2✔
1965
                msg:               msg.msg,
2✔
1966
                optionalMsgFields: msg.optionalMsgFields,
2✔
1967
                isRemote:          msg.isRemote,
2✔
1968
                err:               make(chan error, 1),
2✔
1969
        }
2✔
1970

2✔
1971
        // Create the cached message.
2✔
1972
        cachedMsg := &cachedFutureMsg{
2✔
1973
                msg:    copied,
2✔
1974
                height: msgHeight,
2✔
1975
        }
2✔
1976

2✔
1977
        // Increment the msg ID and add it to the cache.
2✔
1978
        nextMsgID := d.futureMsgs.nextMsgID()
2✔
1979
        _, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
2✔
1980
        if err != nil {
2✔
UNCOV
1981
                log.Errorf("Adding future message got error: %v", err)
×
UNCOV
1982
        }
×
1983

1984
        log.Debugf("Network message: %v added to future messages for "+
2✔
1985
                "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
2✔
1986
                msgHeight, d.bestHeight)
2✔
1987

2✔
1988
        return true
2✔
1989
}
1990

1991
// processNetworkAnnouncement processes a new network relate authenticated
1992
// channel or node announcement or announcements proofs. If the announcement
1993
// didn't affect the internal state due to either being out of date, invalid,
1994
// or redundant, then nil is returned. Otherwise, the set of announcements will
1995
// be returned which should be broadcasted to the rest of the network. The
1996
// boolean returned indicates whether any dependents of the announcement should
1997
// attempt to be processed as well.
1998
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
1999
        nMsg *networkMsg) ([]networkMsg, bool) {
132✔
2000

132✔
2001
        // If this is a remote update, we set the scheduler option to lazily
132✔
2002
        // add it to the graph.
132✔
2003
        var schedulerOp []batch.SchedulerOption
132✔
2004
        if nMsg.isRemote {
217✔
2005
                schedulerOp = append(schedulerOp, batch.LazyAdd())
85✔
2006
        }
85✔
2007

2008
        switch msg := nMsg.msg.(type) {
132✔
2009
        // A new node announcement has arrived which either presents new
2010
        // information about a node in one of the channels we know about, or a
2011
        // updating previously advertised information.
2012
        case *lnwire.NodeAnnouncement:
28✔
2013
                return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
28✔
2014

2015
        // A new channel announcement has arrived, this indicates the
2016
        // *creation* of a new channel within the network. This only advertises
2017
        // the existence of a channel and not yet the routing policies in
2018
        // either direction of the channel.
2019
        case *lnwire.ChannelAnnouncement:
32✔
2020
                return d.handleChanAnnouncement(nMsg, msg, schedulerOp)
32✔
2021

2022
        // A new authenticated channel edge update has arrived. This indicates
2023
        // that the directional information for an already known channel has
2024
        // been updated.
2025
        case *lnwire.ChannelUpdate:
59✔
2026
                return d.handleChanUpdate(nMsg, msg, schedulerOp)
59✔
2027

2028
        // A new signature announcement has been received. This indicates
2029
        // willingness of nodes involved in the funding of a channel to
2030
        // announce this new channel to the rest of the world.
2031
        case *lnwire.AnnounceSignatures:
25✔
2032
                return d.handleAnnSig(nMsg, msg)
25✔
2033

UNCOV
2034
        default:
×
UNCOV
2035
                err := errors.New("wrong type of the announcement")
×
UNCOV
2036
                nMsg.err <- err
×
UNCOV
2037
                return nil, false
×
2038
        }
2039
}
2040

2041
// processZombieUpdate determines whether the provided channel update should
2042
// resurrect a given zombie edge.
2043
//
2044
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
2045
// should be inspected.
2046
func (d *AuthenticatedGossiper) processZombieUpdate(
2047
        chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
2048
        msg *lnwire.ChannelUpdate) error {
7✔
2049

7✔
2050
        // The least-significant bit in the flag on the channel update tells us
7✔
2051
        // which edge is being updated.
7✔
2052
        isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
7✔
2053

7✔
2054
        // Since we've deemed the update as not stale above, before marking it
7✔
2055
        // live, we'll make sure it has been signed by the correct party. If we
7✔
2056
        // have both pubkeys, either party can resurrect the channel. If we've
7✔
2057
        // already marked this with the stricter, single-sided resurrection we
7✔
2058
        // will only have the pubkey of the node with the oldest timestamp.
7✔
2059
        var pubKey *btcec.PublicKey
7✔
2060
        switch {
7✔
2061
        case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
2✔
2062
                pubKey, _ = chanInfo.NodeKey1()
2✔
2063
        case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
4✔
2064
                pubKey, _ = chanInfo.NodeKey2()
4✔
2065
        }
2066
        if pubKey == nil {
8✔
2067
                return fmt.Errorf("incorrect pubkey to resurrect zombie "+
1✔
2068
                        "with chan_id=%v", msg.ShortChannelID)
1✔
2069
        }
1✔
2070

2071
        err := graph.VerifyChannelUpdateSignature(msg, pubKey)
6✔
2072
        if err != nil {
7✔
2073
                return fmt.Errorf("unable to verify channel "+
1✔
2074
                        "update signature: %v", err)
1✔
2075
        }
1✔
2076

2077
        // With the signature valid, we'll proceed to mark the
2078
        // edge as live and wait for the channel announcement to
2079
        // come through again.
2080
        err = d.cfg.Graph.MarkEdgeLive(scid)
5✔
2081
        switch {
5✔
UNCOV
2082
        case errors.Is(err, channeldb.ErrZombieEdgeNotFound):
×
2083
                log.Errorf("edge with chan_id=%v was not found in the "+
×
2084
                        "zombie index: %v", err)
×
2085

×
2086
                return nil
×
2087

UNCOV
2088
        case err != nil:
×
UNCOV
2089
                return fmt.Errorf("unable to remove edge with "+
×
UNCOV
2090
                        "chan_id=%v from zombie index: %v",
×
UNCOV
2091
                        msg.ShortChannelID, err)
×
2092

2093
        default:
5✔
2094
        }
2095

2096
        log.Debugf("Removed edge with chan_id=%v from zombie "+
5✔
2097
                "index", msg.ShortChannelID)
5✔
2098

5✔
2099
        return nil
5✔
2100
}
2101

2102
// fetchNodeAnn fetches the latest signed node announcement from our point of
2103
// view for the node with the given public key.
2104
func (d *AuthenticatedGossiper) fetchNodeAnn(
2105
        pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
24✔
2106

24✔
2107
        node, err := d.cfg.Graph.FetchLightningNode(pubKey)
24✔
2108
        if err != nil {
30✔
2109
                return nil, err
6✔
2110
        }
6✔
2111

2112
        return node.NodeAnnouncement(true)
18✔
2113
}
2114

2115
// isMsgStale determines whether a message retrieved from the backing
2116
// MessageStore is seen as stale by the current graph.
2117
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
17✔
2118
        switch msg := msg.(type) {
17✔
2119
        case *lnwire.AnnounceSignatures:
6✔
2120
                chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
6✔
2121
                        msg.ShortChannelID,
6✔
2122
                )
6✔
2123

6✔
2124
                // If the channel cannot be found, it is most likely a leftover
6✔
2125
                // message for a channel that was closed, so we can consider it
6✔
2126
                // stale.
6✔
2127
                if errors.Is(err, channeldb.ErrEdgeNotFound) {
10✔
2128
                        return true
4✔
2129
                }
4✔
2130
                if err != nil {
6✔
UNCOV
2131
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
UNCOV
2132
                                "%v", chanInfo.ChannelID, err)
×
UNCOV
2133
                        return false
×
UNCOV
2134
                }
×
2135

2136
                // If the proof exists in the graph, then we have successfully
2137
                // received the remote proof and assembled the full proof, so we
2138
                // can safely delete the local proof from the database.
2139
                return chanInfo.AuthProof != nil
6✔
2140

2141
        case *lnwire.ChannelUpdate:
15✔
2142
                _, p1, p2, err := d.cfg.Graph.GetChannelByID(msg.ShortChannelID)
15✔
2143

15✔
2144
                // If the channel cannot be found, it is most likely a leftover
15✔
2145
                // message for a channel that was closed, so we can consider it
15✔
2146
                // stale.
15✔
2147
                if errors.Is(err, channeldb.ErrEdgeNotFound) {
19✔
2148
                        return true
4✔
2149
                }
4✔
2150
                if err != nil {
15✔
UNCOV
2151
                        log.Debugf("Unable to retrieve channel=%v from graph: "+
×
UNCOV
2152
                                "%v", msg.ShortChannelID, err)
×
UNCOV
2153
                        return false
×
UNCOV
2154
                }
×
2155

2156
                // Otherwise, we'll retrieve the correct policy that we
2157
                // currently have stored within our graph to check if this
2158
                // message is stale by comparing its timestamp.
2159
                var p *models.ChannelEdgePolicy
15✔
2160
                if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
30✔
2161
                        p = p1
15✔
2162
                } else {
19✔
2163
                        p = p2
4✔
2164
                }
4✔
2165

2166
                // If the policy is still unknown, then we can consider this
2167
                // policy fresh.
2168
                if p == nil {
15✔
UNCOV
2169
                        return false
×
2170
                }
×
2171

2172
                timestamp := time.Unix(int64(msg.Timestamp), 0)
15✔
2173
                return p.LastUpdate.After(timestamp)
15✔
2174

UNCOV
2175
        default:
×
UNCOV
2176
                // We'll make sure to not mark any unsupported messages as stale
×
UNCOV
2177
                // to ensure they are not removed.
×
UNCOV
2178
                return false
×
2179
        }
2180
}
2181

2182
// updateChannel creates a new fully signed update for the channel, and updates
2183
// the underlying graph with the new state.
2184
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
2185
        edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement,
2186
        *lnwire.ChannelUpdate, error) {
8✔
2187

8✔
2188
        // Parse the unsigned edge into a channel update.
8✔
2189
        chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
8✔
2190

8✔
2191
        // We'll generate a new signature over a digest of the channel
8✔
2192
        // announcement itself and update the timestamp to ensure it propagate.
8✔
2193
        err := netann.SignChannelUpdate(
8✔
2194
                d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
8✔
2195
                netann.ChanUpdSetTimestamp,
8✔
2196
        )
8✔
2197
        if err != nil {
8✔
UNCOV
2198
                return nil, nil, err
×
UNCOV
2199
        }
×
2200

2201
        // Next, we'll set the new signature in place, and update the reference
2202
        // in the backing slice.
2203
        edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
8✔
2204
        edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
8✔
2205

8✔
2206
        // To ensure that our signature is valid, we'll verify it ourself
8✔
2207
        // before committing it to the slice returned.
8✔
2208
        err = graph.ValidateChannelUpdateAnn(
8✔
2209
                d.selfKey, info.Capacity, chanUpdate,
8✔
2210
        )
8✔
2211
        if err != nil {
8✔
UNCOV
2212
                return nil, nil, fmt.Errorf("generated invalid channel "+
×
2213
                        "update sig: %v", err)
×
2214
        }
×
2215

2216
        // Finally, we'll write the new edge policy to disk.
2217
        if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
8✔
UNCOV
2218
                return nil, nil, err
×
UNCOV
2219
        }
×
2220

2221
        // We'll also create the original channel announcement so the two can
2222
        // be broadcast along side each other (if necessary), but only if we
2223
        // have a full channel announcement for this channel.
2224
        var chanAnn *lnwire.ChannelAnnouncement
8✔
2225
        if info.AuthProof != nil {
15✔
2226
                chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
7✔
2227
                chanAnn = &lnwire.ChannelAnnouncement{
7✔
2228
                        ShortChannelID:  chanID,
7✔
2229
                        NodeID1:         info.NodeKey1Bytes,
7✔
2230
                        NodeID2:         info.NodeKey2Bytes,
7✔
2231
                        ChainHash:       info.ChainHash,
7✔
2232
                        BitcoinKey1:     info.BitcoinKey1Bytes,
7✔
2233
                        Features:        lnwire.NewRawFeatureVector(),
7✔
2234
                        BitcoinKey2:     info.BitcoinKey2Bytes,
7✔
2235
                        ExtraOpaqueData: edge.ExtraOpaqueData,
7✔
2236
                }
7✔
2237
                chanAnn.NodeSig1, err = lnwire.NewSigFromECDSARawSignature(
7✔
2238
                        info.AuthProof.NodeSig1Bytes,
7✔
2239
                )
7✔
2240
                if err != nil {
7✔
UNCOV
2241
                        return nil, nil, err
×
2242
                }
×
2243
                chanAnn.NodeSig2, err = lnwire.NewSigFromECDSARawSignature(
7✔
2244
                        info.AuthProof.NodeSig2Bytes,
7✔
2245
                )
7✔
2246
                if err != nil {
7✔
UNCOV
2247
                        return nil, nil, err
×
2248
                }
×
2249
                chanAnn.BitcoinSig1, err = lnwire.NewSigFromECDSARawSignature(
7✔
2250
                        info.AuthProof.BitcoinSig1Bytes,
7✔
2251
                )
7✔
2252
                if err != nil {
7✔
UNCOV
2253
                        return nil, nil, err
×
2254
                }
×
2255
                chanAnn.BitcoinSig2, err = lnwire.NewSigFromECDSARawSignature(
7✔
2256
                        info.AuthProof.BitcoinSig2Bytes,
7✔
2257
                )
7✔
2258
                if err != nil {
7✔
UNCOV
2259
                        return nil, nil, err
×
UNCOV
2260
                }
×
2261
        }
2262

2263
        return chanAnn, chanUpdate, err
8✔
2264
}
2265

2266
// SyncManager returns the gossiper's SyncManager instance.
2267
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
4✔
2268
        return d.syncMgr
4✔
2269
}
4✔
2270

2271
// IsKeepAliveUpdate determines whether this channel update is considered a
2272
// keep-alive update based on the previous channel update processed for the same
2273
// direction.
2274
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate,
2275
        prev *models.ChannelEdgePolicy) bool {
18✔
2276

18✔
2277
        // Both updates should be from the same direction.
18✔
2278
        if update.ChannelFlags&lnwire.ChanUpdateDirection !=
18✔
2279
                prev.ChannelFlags&lnwire.ChanUpdateDirection {
18✔
UNCOV
2280

×
UNCOV
2281
                return false
×
UNCOV
2282
        }
×
2283

2284
        // The timestamp should always increase for a keep-alive update.
2285
        timestamp := time.Unix(int64(update.Timestamp), 0)
18✔
2286
        if !timestamp.After(prev.LastUpdate) {
22✔
2287
                return false
4✔
2288
        }
4✔
2289

2290
        // None of the remaining fields should change for a keep-alive update.
2291
        if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
22✔
2292
                return false
4✔
2293
        }
4✔
2294
        if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
34✔
2295
                return false
16✔
2296
        }
16✔
2297
        if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
10✔
2298
                return false
4✔
2299
        }
4✔
2300
        if update.TimeLockDelta != prev.TimeLockDelta {
6✔
UNCOV
2301
                return false
×
2302
        }
×
2303
        if update.HtlcMinimumMsat != prev.MinHTLC {
6✔
UNCOV
2304
                return false
×
2305
        }
×
2306
        if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
6✔
UNCOV
2307
                return false
×
UNCOV
2308
        }
×
2309
        if update.HtlcMaximumMsat != prev.MaxHTLC {
6✔
UNCOV
2310
                return false
×
UNCOV
2311
        }
×
2312
        if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
10✔
2313
                return false
4✔
2314
        }
4✔
2315
        return true
6✔
2316
}
2317

2318
// latestHeight returns the gossiper's latest height known of the chain.
2319
func (d *AuthenticatedGossiper) latestHeight() uint32 {
4✔
2320
        d.Lock()
4✔
2321
        defer d.Unlock()
4✔
2322
        return d.bestHeight
4✔
2323
}
4✔
2324

2325
// handleNodeAnnouncement processes a new node announcement.
2326
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
2327
        nodeAnn *lnwire.NodeAnnouncement,
2328
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
28✔
2329

28✔
2330
        timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
28✔
2331

28✔
2332
        log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
28✔
2333
                "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
28✔
2334

28✔
2335
        // We'll quickly ask the router if it already has a newer update for
28✔
2336
        // this node so we can skip validating signatures if not required.
28✔
2337
        if d.cfg.Graph.IsStaleNode(nodeAnn.NodeID, timestamp) {
40✔
2338
                log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
12✔
2339
                nMsg.err <- nil
12✔
2340
                return nil, true
12✔
2341
        }
12✔
2342

2343
        if err := d.addNode(nodeAnn, ops...); err != nil {
24✔
2344
                log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
4✔
2345
                        err)
4✔
2346

4✔
2347
                if !graph.IsError(
4✔
2348
                        err,
4✔
2349
                        graph.ErrOutdated,
4✔
2350
                        graph.ErrIgnored,
4✔
2351
                        graph.ErrVBarrierShuttingDown,
4✔
2352
                ) {
4✔
UNCOV
2353

×
UNCOV
2354
                        log.Error(err)
×
UNCOV
2355
                }
×
2356

2357
                nMsg.err <- err
4✔
2358
                return nil, false
4✔
2359
        }
2360

2361
        // In order to ensure we don't leak unadvertised nodes, we'll make a
2362
        // quick check to ensure this node intends to publicly advertise itself
2363
        // to the network.
2364
        isPublic, err := d.cfg.Graph.IsPublicNode(nodeAnn.NodeID)
20✔
2365
        if err != nil {
20✔
UNCOV
2366
                log.Errorf("Unable to determine if node %x is advertised: %v",
×
UNCOV
2367
                        nodeAnn.NodeID, err)
×
UNCOV
2368
                nMsg.err <- err
×
UNCOV
2369
                return nil, false
×
UNCOV
2370
        }
×
2371

2372
        var announcements []networkMsg
20✔
2373

20✔
2374
        // If it does, we'll add their announcement to our batch so that it can
20✔
2375
        // be broadcast to the rest of our peers.
20✔
2376
        if isPublic {
27✔
2377
                announcements = append(announcements, networkMsg{
7✔
2378
                        peer:     nMsg.peer,
7✔
2379
                        isRemote: nMsg.isRemote,
7✔
2380
                        source:   nMsg.source,
7✔
2381
                        msg:      nodeAnn,
7✔
2382
                })
7✔
2383
        } else {
24✔
2384
                log.Tracef("Skipping broadcasting node announcement for %x "+
17✔
2385
                        "due to being unadvertised", nodeAnn.NodeID)
17✔
2386
        }
17✔
2387

2388
        nMsg.err <- nil
20✔
2389
        // TODO(roasbeef): get rid of the above
20✔
2390

20✔
2391
        log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
20✔
2392
                "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
20✔
2393

20✔
2394
        return announcements, true
20✔
2395
}
2396

2397
// handleChanAnnouncement processes a new channel announcement.
2398
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
2399
        ann *lnwire.ChannelAnnouncement,
2400
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
32✔
2401

32✔
2402
        log.Debugf("Processing ChannelAnnouncement: peer=%v, short_chan_id=%v",
32✔
2403
                nMsg.peer, ann.ShortChannelID.ToUint64())
32✔
2404

32✔
2405
        // We'll ignore any channel announcements that target any chain other
32✔
2406
        // than the set of chains we know of.
32✔
2407
        if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
32✔
2408
                err := fmt.Errorf("ignoring ChannelAnnouncement from chain=%v"+
×
2409
                        ", gossiper on chain=%v", ann.ChainHash,
×
2410
                        d.cfg.ChainHash)
×
2411
                log.Errorf(err.Error())
×
2412

×
2413
                key := newRejectCacheKey(
×
2414
                        ann.ShortChannelID.ToUint64(),
×
2415
                        sourceToPub(nMsg.source),
×
2416
                )
×
UNCOV
2417
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2418

×
UNCOV
2419
                nMsg.err <- err
×
UNCOV
2420
                return nil, false
×
UNCOV
2421
        }
×
2422

2423
        // If this is a remote ChannelAnnouncement with an alias SCID, we'll
2424
        // reject the announcement. Since the router accepts alias SCIDs,
2425
        // not erroring out would be a DoS vector.
2426
        if nMsg.isRemote && d.cfg.IsAlias(ann.ShortChannelID) {
32✔
2427
                err := fmt.Errorf("ignoring remote alias channel=%v",
×
2428
                        ann.ShortChannelID)
×
2429
                log.Errorf(err.Error())
×
2430

×
2431
                key := newRejectCacheKey(
×
2432
                        ann.ShortChannelID.ToUint64(),
×
2433
                        sourceToPub(nMsg.source),
×
2434
                )
×
UNCOV
2435
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2436

×
UNCOV
2437
                nMsg.err <- err
×
UNCOV
2438
                return nil, false
×
UNCOV
2439
        }
×
2440

2441
        // If the advertised inclusionary block is beyond our knowledge of the
2442
        // chain tip, then we'll ignore it for now.
2443
        d.Lock()
32✔
2444
        if nMsg.isRemote && d.isPremature(ann.ShortChannelID, 0, nMsg) {
33✔
2445
                log.Warnf("Announcement for chan_id=(%v), is premature: "+
1✔
2446
                        "advertises height %v, only height %v is known",
1✔
2447
                        ann.ShortChannelID.ToUint64(),
1✔
2448
                        ann.ShortChannelID.BlockHeight, d.bestHeight)
1✔
2449
                d.Unlock()
1✔
2450
                nMsg.err <- nil
1✔
2451
                return nil, false
1✔
2452
        }
1✔
2453
        d.Unlock()
31✔
2454

31✔
2455
        // At this point, we'll now ask the router if this is a zombie/known
31✔
2456
        // edge. If so we can skip all the processing below.
31✔
2457
        if d.cfg.Graph.IsKnownEdge(ann.ShortChannelID) {
36✔
2458
                nMsg.err <- nil
5✔
2459
                return nil, true
5✔
2460
        }
5✔
2461

2462
        // If this is a remote channel announcement, then we'll validate all
2463
        // the signatures within the proof as it should be well formed.
2464
        var proof *models.ChannelAuthProof
30✔
2465
        if nMsg.isRemote {
46✔
2466
                if err := graph.ValidateChannelAnn(ann); err != nil {
16✔
2467
                        err := fmt.Errorf("unable to validate announcement: "+
×
2468
                                "%v", err)
×
2469

×
2470
                        key := newRejectCacheKey(
×
2471
                                ann.ShortChannelID.ToUint64(),
×
2472
                                sourceToPub(nMsg.source),
×
2473
                        )
×
2474
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2475

×
UNCOV
2476
                        log.Error(err)
×
UNCOV
2477
                        nMsg.err <- err
×
UNCOV
2478
                        return nil, false
×
UNCOV
2479
                }
×
2480

2481
                // If the proof checks out, then we'll save the proof itself to
2482
                // the database so we can fetch it later when gossiping with
2483
                // other nodes.
2484
                proof = &models.ChannelAuthProof{
16✔
2485
                        NodeSig1Bytes:    ann.NodeSig1.ToSignatureBytes(),
16✔
2486
                        NodeSig2Bytes:    ann.NodeSig2.ToSignatureBytes(),
16✔
2487
                        BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
16✔
2488
                        BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
16✔
2489
                }
16✔
2490
        }
2491

2492
        // With the proof validated (if necessary), we can now store it within
2493
        // the database for our path finding and syncing needs.
2494
        var featureBuf bytes.Buffer
30✔
2495
        if err := ann.Features.Encode(&featureBuf); err != nil {
30✔
UNCOV
2496
                log.Errorf("unable to encode features: %v", err)
×
UNCOV
2497
                nMsg.err <- err
×
UNCOV
2498
                return nil, false
×
UNCOV
2499
        }
×
2500

2501
        edge := &models.ChannelEdgeInfo{
30✔
2502
                ChannelID:        ann.ShortChannelID.ToUint64(),
30✔
2503
                ChainHash:        ann.ChainHash,
30✔
2504
                NodeKey1Bytes:    ann.NodeID1,
30✔
2505
                NodeKey2Bytes:    ann.NodeID2,
30✔
2506
                BitcoinKey1Bytes: ann.BitcoinKey1,
30✔
2507
                BitcoinKey2Bytes: ann.BitcoinKey2,
30✔
2508
                AuthProof:        proof,
30✔
2509
                Features:         featureBuf.Bytes(),
30✔
2510
                ExtraOpaqueData:  ann.ExtraOpaqueData,
30✔
2511
        }
30✔
2512

30✔
2513
        // If there were any optional message fields provided, we'll include
30✔
2514
        // them in its serialized disk representation now.
30✔
2515
        if nMsg.optionalMsgFields != nil {
48✔
2516
                if nMsg.optionalMsgFields.capacity != nil {
23✔
2517
                        edge.Capacity = *nMsg.optionalMsgFields.capacity
5✔
2518
                }
5✔
2519
                if nMsg.optionalMsgFields.channelPoint != nil {
26✔
2520
                        cp := *nMsg.optionalMsgFields.channelPoint
8✔
2521
                        edge.ChannelPoint = cp
8✔
2522
                }
8✔
2523
        }
2524

2525
        log.Debugf("Adding edge for short_chan_id: %v",
30✔
2526
                ann.ShortChannelID.ToUint64())
30✔
2527

30✔
2528
        // We will add the edge to the channel router. If the nodes present in
30✔
2529
        // this channel are not present in the database, a partial node will be
30✔
2530
        // added to represent each node while we wait for a node announcement.
30✔
2531
        //
30✔
2532
        // Before we add the edge to the database, we obtain the mutex for this
30✔
2533
        // channel ID. We do this to ensure no other goroutine has read the
30✔
2534
        // database and is now making decisions based on this DB state, before
30✔
2535
        // it writes to the DB.
30✔
2536
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
30✔
2537
        err := d.cfg.Graph.AddEdge(edge, ops...)
30✔
2538
        if err != nil {
35✔
2539
                log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
5✔
2540
                        ann.ShortChannelID.ToUint64(), err)
5✔
2541

5✔
2542
                defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
5✔
2543

5✔
2544
                // If the edge was rejected due to already being known, then it
5✔
2545
                // may be the case that this new message has a fresh channel
5✔
2546
                // proof, so we'll check.
5✔
2547
                if graph.IsError(err, graph.ErrIgnored) {
9✔
2548
                        // Attempt to process the rejected message to see if we
4✔
2549
                        // get any new announcements.
4✔
2550
                        anns, rErr := d.processRejectedEdge(ann, proof)
4✔
2551
                        if rErr != nil {
4✔
2552
                                key := newRejectCacheKey(
×
2553
                                        ann.ShortChannelID.ToUint64(),
×
2554
                                        sourceToPub(nMsg.source),
×
2555
                                )
×
2556
                                cr := &cachedReject{}
×
UNCOV
2557
                                _, _ = d.recentRejects.Put(key, cr)
×
UNCOV
2558

×
UNCOV
2559
                                nMsg.err <- rErr
×
UNCOV
2560
                                return nil, false
×
UNCOV
2561
                        }
×
2562

2563
                        log.Debugf("Extracted %v announcements from rejected "+
4✔
2564
                                "msgs", len(anns))
4✔
2565

4✔
2566
                        // If while processing this rejected edge, we realized
4✔
2567
                        // there's a set of announcements we could extract,
4✔
2568
                        // then we'll return those directly.
4✔
2569
                        //
4✔
2570
                        // NOTE: since this is an ErrIgnored, we can return
4✔
2571
                        // true here to signal "allow" to its dependants.
4✔
2572
                        nMsg.err <- nil
4✔
2573

4✔
2574
                        return anns, true
4✔
2575
                } else {
1✔
2576
                        // Otherwise, this is just a regular rejected edge.
1✔
2577
                        key := newRejectCacheKey(
1✔
2578
                                ann.ShortChannelID.ToUint64(),
1✔
2579
                                sourceToPub(nMsg.source),
1✔
2580
                        )
1✔
2581
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
1✔
2582
                }
1✔
2583

2584
                nMsg.err <- err
1✔
2585
                return nil, false
1✔
2586
        }
2587

2588
        // If err is nil, release the lock immediately.
2589
        d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
29✔
2590

29✔
2591
        log.Debugf("Finish adding edge for short_chan_id: %v",
29✔
2592
                ann.ShortChannelID.ToUint64())
29✔
2593

29✔
2594
        // If we earlier received any ChannelUpdates for this channel, we can
29✔
2595
        // now process them, as the channel is added to the graph.
29✔
2596
        shortChanID := ann.ShortChannelID.ToUint64()
29✔
2597
        var channelUpdates []*processedNetworkMsg
29✔
2598

29✔
2599
        earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID)
29✔
2600
        if err == nil {
35✔
2601
                // There was actually an entry in the map, so we'll accumulate
6✔
2602
                // it. We don't worry about deletion, since it'll eventually
6✔
2603
                // fall out anyway.
6✔
2604
                chanMsgs := earlyChanUpdates
6✔
2605
                channelUpdates = append(channelUpdates, chanMsgs.msgs...)
6✔
2606
        }
6✔
2607

2608
        // Launch a new goroutine to handle each ChannelUpdate, this is to
2609
        // ensure we don't block here, as we can handle only one announcement
2610
        // at a time.
2611
        for _, cu := range channelUpdates {
35✔
2612
                // Skip if already processed.
6✔
2613
                if cu.processed {
8✔
2614
                        continue
2✔
2615
                }
2616

2617
                // Mark the ChannelUpdate as processed. This ensures that a
2618
                // subsequent announcement in the option-scid-alias case does
2619
                // not re-use an old ChannelUpdate.
2620
                cu.processed = true
6✔
2621

6✔
2622
                d.wg.Add(1)
6✔
2623
                go func(updMsg *networkMsg) {
12✔
2624
                        defer d.wg.Done()
6✔
2625

6✔
2626
                        switch msg := updMsg.msg.(type) {
6✔
2627
                        // Reprocess the message, making sure we return an
2628
                        // error to the original caller in case the gossiper
2629
                        // shuts down.
2630
                        case *lnwire.ChannelUpdate:
6✔
2631
                                log.Debugf("Reprocessing ChannelUpdate for "+
6✔
2632
                                        "shortChanID=%v",
6✔
2633
                                        msg.ShortChannelID.ToUint64())
6✔
2634

6✔
2635
                                select {
6✔
2636
                                case d.networkMsgs <- updMsg:
6✔
UNCOV
2637
                                case <-d.quit:
×
2638
                                        updMsg.err <- ErrGossiperShuttingDown
×
2639
                                }
2640

2641
                        // We don't expect any other message type than
2642
                        // ChannelUpdate to be in this cache.
UNCOV
2643
                        default:
×
UNCOV
2644
                                log.Errorf("Unsupported message type found "+
×
UNCOV
2645
                                        "among ChannelUpdates: %T", msg)
×
2646
                        }
2647
                }(cu.msg)
2648
        }
2649

2650
        // Channel announcement was successfully processed and now it might be
2651
        // broadcast to other connected nodes if it was an announcement with
2652
        // proof (remote).
2653
        var announcements []networkMsg
29✔
2654

29✔
2655
        if proof != nil {
44✔
2656
                announcements = append(announcements, networkMsg{
15✔
2657
                        peer:     nMsg.peer,
15✔
2658
                        isRemote: nMsg.isRemote,
15✔
2659
                        source:   nMsg.source,
15✔
2660
                        msg:      ann,
15✔
2661
                })
15✔
2662
        }
15✔
2663

2664
        nMsg.err <- nil
29✔
2665

29✔
2666
        log.Debugf("Processed ChannelAnnouncement: peer=%v, short_chan_id=%v",
29✔
2667
                nMsg.peer, ann.ShortChannelID.ToUint64())
29✔
2668

29✔
2669
        return announcements, true
29✔
2670
}
2671

2672
// handleChanUpdate processes a new channel update.
2673
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
2674
        upd *lnwire.ChannelUpdate,
2675
        ops []batch.SchedulerOption) ([]networkMsg, bool) {
59✔
2676

59✔
2677
        log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
59✔
2678
                nMsg.peer, upd.ShortChannelID.ToUint64())
59✔
2679

59✔
2680
        // We'll ignore any channel updates that target any chain other than
59✔
2681
        // the set of chains we know of.
59✔
2682
        if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
59✔
2683
                err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
×
2684
                        "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
×
2685
                log.Errorf(err.Error())
×
2686

×
2687
                key := newRejectCacheKey(
×
2688
                        upd.ShortChannelID.ToUint64(),
×
2689
                        sourceToPub(nMsg.source),
×
2690
                )
×
UNCOV
2691
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2692

×
UNCOV
2693
                nMsg.err <- err
×
UNCOV
2694
                return nil, false
×
UNCOV
2695
        }
×
2696

2697
        blockHeight := upd.ShortChannelID.BlockHeight
59✔
2698
        shortChanID := upd.ShortChannelID.ToUint64()
59✔
2699

59✔
2700
        // If the advertised inclusionary block is beyond our knowledge of the
59✔
2701
        // chain tip, then we'll put the announcement in limbo to be fully
59✔
2702
        // verified once we advance forward in the chain. If the update has an
59✔
2703
        // alias SCID, we'll skip the isPremature check. This is necessary
59✔
2704
        // since aliases start at block height 16_000_000.
59✔
2705
        d.Lock()
59✔
2706
        if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
59✔
2707
                d.isPremature(upd.ShortChannelID, 0, nMsg) {
60✔
2708

1✔
2709
                log.Warnf("Update announcement for short_chan_id(%v), is "+
1✔
2710
                        "premature: advertises height %v, only height %v is "+
1✔
2711
                        "known", shortChanID, blockHeight, d.bestHeight)
1✔
2712
                d.Unlock()
1✔
2713
                nMsg.err <- nil
1✔
2714
                return nil, false
1✔
2715
        }
1✔
2716
        d.Unlock()
59✔
2717

59✔
2718
        // Before we perform any of the expensive checks below, we'll check
59✔
2719
        // whether this update is stale or is for a zombie channel in order to
59✔
2720
        // quickly reject it.
59✔
2721
        timestamp := time.Unix(int64(upd.Timestamp), 0)
59✔
2722

59✔
2723
        // Fetch the SCID we should be using to lock the channelMtx and make
59✔
2724
        // graph queries with.
59✔
2725
        graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
59✔
2726
        if err != nil {
118✔
2727
                // Fallback and set the graphScid to the peer-provided SCID.
59✔
2728
                // This will occur for non-option-scid-alias channels and for
59✔
2729
                // public option-scid-alias channels after 6 confirmations.
59✔
2730
                // Once public option-scid-alias channels have 6 confs, we'll
59✔
2731
                // ignore ChannelUpdates with one of their aliases.
59✔
2732
                graphScid = upd.ShortChannelID
59✔
2733
        }
59✔
2734

2735
        if d.cfg.Graph.IsStaleEdgePolicy(
59✔
2736
                graphScid, timestamp, upd.ChannelFlags,
59✔
2737
        ) {
65✔
2738

6✔
2739
                log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
6✔
2740
                        "peer=%v, msg=%s, is_remote=%v", shortChanID,
6✔
2741
                        nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
6✔
2742
                )
6✔
2743

6✔
2744
                nMsg.err <- nil
6✔
2745
                return nil, true
6✔
2746
        }
6✔
2747

2748
        // Get the node pub key as far since we don't have it in the channel
2749
        // update announcement message. We'll need this to properly verify the
2750
        // message's signature.
2751
        //
2752
        // We make sure to obtain the mutex for this channel ID before we
2753
        // access the database. This ensures the state we read from the
2754
        // database has not changed between this point and when we call
2755
        // UpdateEdge() later.
2756
        d.channelMtx.Lock(graphScid.ToUint64())
57✔
2757
        defer d.channelMtx.Unlock(graphScid.ToUint64())
57✔
2758

57✔
2759
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(graphScid)
57✔
2760
        switch {
57✔
2761
        // No error, break.
2762
        case err == nil:
53✔
2763
                break
53✔
2764

2765
        case errors.Is(err, channeldb.ErrZombieEdge):
7✔
2766
                err = d.processZombieUpdate(chanInfo, graphScid, upd)
7✔
2767
                if err != nil {
9✔
2768
                        log.Debug(err)
2✔
2769
                        nMsg.err <- err
2✔
2770
                        return nil, false
2✔
2771
                }
2✔
2772

2773
                // We'll fallthrough to ensure we stash the update until we
2774
                // receive its corresponding ChannelAnnouncement. This is
2775
                // needed to ensure the edge exists in the graph before
2776
                // applying the update.
2777
                fallthrough
5✔
2778
        case errors.Is(err, channeldb.ErrGraphNotFound):
5✔
2779
                fallthrough
5✔
2780
        case errors.Is(err, channeldb.ErrGraphNoEdgesFound):
5✔
2781
                fallthrough
5✔
2782
        case errors.Is(err, channeldb.ErrEdgeNotFound):
6✔
2783
                // If the edge corresponding to this ChannelUpdate was not
6✔
2784
                // found in the graph, this might be a channel in the process
6✔
2785
                // of being opened, and we haven't processed our own
6✔
2786
                // ChannelAnnouncement yet, hence it is not not found in the
6✔
2787
                // graph. This usually gets resolved after the channel proofs
6✔
2788
                // are exchanged and the channel is broadcasted to the rest of
6✔
2789
                // the network, but in case this is a private channel this
6✔
2790
                // won't ever happen. This can also happen in the case of a
6✔
2791
                // zombie channel with a fresh update for which we don't have a
6✔
2792
                // ChannelAnnouncement for since we reject them. Because of
6✔
2793
                // this, we temporarily add it to a map, and reprocess it after
6✔
2794
                // our own ChannelAnnouncement has been processed.
6✔
2795
                //
6✔
2796
                // The shortChanID may be an alias, but it is fine to use here
6✔
2797
                // since we don't have an edge in the graph and if the peer is
6✔
2798
                // not buggy, we should be able to use it once the gossiper
6✔
2799
                // receives the local announcement.
6✔
2800
                pMsg := &processedNetworkMsg{msg: nMsg}
6✔
2801

6✔
2802
                earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
6✔
2803
                switch {
6✔
2804
                // Nothing in the cache yet, we can just directly insert this
2805
                // element.
2806
                case err == cache.ErrElementNotFound:
6✔
2807
                        _, _ = d.prematureChannelUpdates.Put(
6✔
2808
                                shortChanID, &cachedNetworkMsg{
6✔
2809
                                        msgs: []*processedNetworkMsg{pMsg},
6✔
2810
                                })
6✔
2811

2812
                // There's already something in the cache, so we'll combine the
2813
                // set of messages into a single value.
2814
                default:
4✔
2815
                        msgs := earlyMsgs.msgs
4✔
2816
                        msgs = append(msgs, pMsg)
4✔
2817
                        _, _ = d.prematureChannelUpdates.Put(
4✔
2818
                                shortChanID, &cachedNetworkMsg{
4✔
2819
                                        msgs: msgs,
4✔
2820
                                })
4✔
2821
                }
2822

2823
                log.Debugf("Got ChannelUpdate for edge not found in graph"+
6✔
2824
                        "(shortChanID=%v), saving for reprocessing later",
6✔
2825
                        shortChanID)
6✔
2826

6✔
2827
                // NOTE: We don't return anything on the error channel for this
6✔
2828
                // message, as we expect that will be done when this
6✔
2829
                // ChannelUpdate is later reprocessed.
6✔
2830
                return nil, false
6✔
2831

2832
        default:
×
2833
                err := fmt.Errorf("unable to validate channel update "+
×
2834
                        "short_chan_id=%v: %v", shortChanID, err)
×
2835
                log.Error(err)
×
2836
                nMsg.err <- err
×
2837

×
2838
                key := newRejectCacheKey(
×
2839
                        upd.ShortChannelID.ToUint64(),
×
UNCOV
2840
                        sourceToPub(nMsg.source),
×
UNCOV
2841
                )
×
UNCOV
2842
                _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2843

×
UNCOV
2844
                return nil, false
×
2845
        }
2846

2847
        // The least-significant bit in the flag on the channel update
2848
        // announcement tells us "which" side of the channels directed edge is
2849
        // being updated.
2850
        var (
53✔
2851
                pubKey       *btcec.PublicKey
53✔
2852
                edgeToUpdate *models.ChannelEdgePolicy
53✔
2853
        )
53✔
2854
        direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
53✔
2855
        switch direction {
53✔
2856
        case 0:
38✔
2857
                pubKey, _ = chanInfo.NodeKey1()
38✔
2858
                edgeToUpdate = e1
38✔
2859
        case 1:
19✔
2860
                pubKey, _ = chanInfo.NodeKey2()
19✔
2861
                edgeToUpdate = e2
19✔
2862
        }
2863

2864
        log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+
53✔
2865
                "edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(),
53✔
2866
                edgeToUpdate != nil)
53✔
2867

53✔
2868
        // Validate the channel announcement with the expected public key and
53✔
2869
        // channel capacity. In the case of an invalid channel update, we'll
53✔
2870
        // return an error to the caller and exit early.
53✔
2871
        err = graph.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
53✔
2872
        if err != nil {
57✔
2873
                rErr := fmt.Errorf("unable to validate channel update "+
4✔
2874
                        "announcement for short_chan_id=%v: %v",
4✔
2875
                        spew.Sdump(upd.ShortChannelID), err)
4✔
2876

4✔
2877
                log.Error(rErr)
4✔
2878
                nMsg.err <- rErr
4✔
2879
                return nil, false
4✔
2880
        }
4✔
2881

2882
        // If we have a previous version of the edge being updated, we'll want
2883
        // to rate limit its updates to prevent spam throughout the network.
2884
        if nMsg.isRemote && edgeToUpdate != nil {
67✔
2885
                // If it's a keep-alive update, we'll only propagate one if
18✔
2886
                // it's been a day since the previous. This follows our own
18✔
2887
                // heuristic of sending keep-alive updates after the same
18✔
2888
                // duration (see retransmitStaleAnns).
18✔
2889
                timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
18✔
2890
                if IsKeepAliveUpdate(upd, edgeToUpdate) {
24✔
2891
                        if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
11✔
2892
                                log.Debugf("Ignoring keep alive update not "+
5✔
2893
                                        "within %v period for channel %v",
5✔
2894
                                        d.cfg.RebroadcastInterval, shortChanID)
5✔
2895
                                nMsg.err <- nil
5✔
2896
                                return nil, false
5✔
2897
                        }
5✔
2898
                } else {
16✔
2899
                        // If it's not, we'll allow an update per minute with a
16✔
2900
                        // maximum burst of 10. If we haven't seen an update
16✔
2901
                        // for this channel before, we'll need to initialize a
16✔
2902
                        // rate limiter for each direction.
16✔
2903
                        //
16✔
2904
                        // Since the edge exists in the graph, we'll create a
16✔
2905
                        // rate limiter for chanInfo.ChannelID rather then the
16✔
2906
                        // SCID the peer sent. This is because there may be
16✔
2907
                        // multiple aliases for a channel and we may otherwise
16✔
2908
                        // rate-limit only a single alias of the channel,
16✔
2909
                        // instead of the whole channel.
16✔
2910
                        baseScid := chanInfo.ChannelID
16✔
2911
                        d.Lock()
16✔
2912
                        rls, ok := d.chanUpdateRateLimiter[baseScid]
16✔
2913
                        if !ok {
21✔
2914
                                r := rate.Every(d.cfg.ChannelUpdateInterval)
5✔
2915
                                b := d.cfg.MaxChannelUpdateBurst
5✔
2916
                                rls = [2]*rate.Limiter{
5✔
2917
                                        rate.NewLimiter(r, b),
5✔
2918
                                        rate.NewLimiter(r, b),
5✔
2919
                                }
5✔
2920
                                d.chanUpdateRateLimiter[baseScid] = rls
5✔
2921
                        }
5✔
2922
                        d.Unlock()
16✔
2923

16✔
2924
                        if !rls[direction].Allow() {
25✔
2925
                                log.Debugf("Rate limiting update for channel "+
9✔
2926
                                        "%v from direction %x", shortChanID,
9✔
2927
                                        pubKey.SerializeCompressed())
9✔
2928
                                nMsg.err <- nil
9✔
2929
                                return nil, false
9✔
2930
                        }
9✔
2931
                }
2932
        }
2933

2934
        // We'll use chanInfo.ChannelID rather than the peer-supplied
2935
        // ShortChannelID in the ChannelUpdate to avoid the router having to
2936
        // lookup the stored SCID. If we're sending the update, we'll always
2937
        // use the SCID stored in the database rather than a potentially
2938
        // different alias. This might mean that SigBytes is incorrect as it
2939
        // signs a different SCID than the database SCID, but since there will
2940
        // only be a difference if AuthProof == nil, this is fine.
2941
        update := &models.ChannelEdgePolicy{
43✔
2942
                SigBytes:                  upd.Signature.ToSignatureBytes(),
43✔
2943
                ChannelID:                 chanInfo.ChannelID,
43✔
2944
                LastUpdate:                timestamp,
43✔
2945
                MessageFlags:              upd.MessageFlags,
43✔
2946
                ChannelFlags:              upd.ChannelFlags,
43✔
2947
                TimeLockDelta:             upd.TimeLockDelta,
43✔
2948
                MinHTLC:                   upd.HtlcMinimumMsat,
43✔
2949
                MaxHTLC:                   upd.HtlcMaximumMsat,
43✔
2950
                FeeBaseMSat:               lnwire.MilliSatoshi(upd.BaseFee),
43✔
2951
                FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
43✔
2952
                ExtraOpaqueData:           upd.ExtraOpaqueData,
43✔
2953
        }
43✔
2954

43✔
2955
        if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
47✔
2956
                if graph.IsError(
4✔
2957
                        err, graph.ErrOutdated,
4✔
2958
                        graph.ErrIgnored,
4✔
2959
                        graph.ErrVBarrierShuttingDown,
4✔
2960
                ) {
8✔
2961

4✔
2962
                        log.Debugf("Update edge for short_chan_id(%v) got: %v",
4✔
2963
                                shortChanID, err)
4✔
2964
                } else {
4✔
2965
                        // Since we know the stored SCID in the graph, we'll
×
2966
                        // cache that SCID.
×
2967
                        key := newRejectCacheKey(
×
2968
                                chanInfo.ChannelID,
×
2969
                                sourceToPub(nMsg.source),
×
2970
                        )
×
UNCOV
2971
                        _, _ = d.recentRejects.Put(key, &cachedReject{})
×
UNCOV
2972

×
UNCOV
2973
                        log.Errorf("Update edge for short_chan_id(%v) got: %v",
×
UNCOV
2974
                                shortChanID, err)
×
UNCOV
2975
                }
×
2976

2977
                nMsg.err <- err
4✔
2978
                return nil, false
4✔
2979
        }
2980

2981
        // If this is a local ChannelUpdate without an AuthProof, it means it
2982
        // is an update to a channel that is not (yet) supposed to be announced
2983
        // to the greater network. However, our channel counter party will need
2984
        // to be given the update, so we'll try sending the update directly to
2985
        // the remote peer.
2986
        if !nMsg.isRemote && chanInfo.AuthProof == nil {
58✔
2987
                if nMsg.optionalMsgFields != nil {
30✔
2988
                        remoteAlias := nMsg.optionalMsgFields.remoteAlias
15✔
2989
                        if remoteAlias != nil {
19✔
2990
                                // The remoteAlias field was specified, meaning
4✔
2991
                                // that we should replace the SCID in the
4✔
2992
                                // update with the remote's alias. We'll also
4✔
2993
                                // need to re-sign the channel update. This is
4✔
2994
                                // required for option-scid-alias feature-bit
4✔
2995
                                // negotiated channels.
4✔
2996
                                upd.ShortChannelID = *remoteAlias
4✔
2997

4✔
2998
                                sig, err := d.cfg.SignAliasUpdate(upd)
4✔
2999
                                if err != nil {
4✔
UNCOV
3000
                                        log.Error(err)
×
UNCOV
3001
                                        nMsg.err <- err
×
3002
                                        return nil, false
×
3003
                                }
×
3004

3005
                                lnSig, err := lnwire.NewSigFromSignature(sig)
4✔
3006
                                if err != nil {
4✔
UNCOV
3007
                                        log.Error(err)
×
UNCOV
3008
                                        nMsg.err <- err
×
UNCOV
3009
                                        return nil, false
×
UNCOV
3010
                                }
×
3011

3012
                                upd.Signature = lnSig
4✔
3013
                        }
3014
                }
3015

3016
                // Get our peer's public key.
3017
                remotePubKey := remotePubFromChanInfo(
15✔
3018
                        chanInfo, upd.ChannelFlags,
15✔
3019
                )
15✔
3020

15✔
3021
                log.Debugf("The message %v has no AuthProof, sending the "+
15✔
3022
                        "update to remote peer %x", upd.MsgType(), remotePubKey)
15✔
3023

15✔
3024
                // Now we'll attempt to send the channel update message
15✔
3025
                // reliably to the remote peer in the background, so that we
15✔
3026
                // don't block if the peer happens to be offline at the moment.
15✔
3027
                err := d.reliableSender.sendMessage(upd, remotePubKey)
15✔
3028
                if err != nil {
15✔
3029
                        err := fmt.Errorf("unable to reliably send %v for "+
×
UNCOV
3030
                                "channel=%v to peer=%x: %v", upd.MsgType(),
×
UNCOV
3031
                                upd.ShortChannelID, remotePubKey, err)
×
UNCOV
3032
                        nMsg.err <- err
×
UNCOV
3033
                        return nil, false
×
UNCOV
3034
                }
×
3035
        }
3036

3037
        // Channel update announcement was successfully processed and now it
3038
        // can be broadcast to the rest of the network. However, we'll only
3039
        // broadcast the channel update announcement if it has an attached
3040
        // authentication proof. We also won't broadcast the update if it
3041
        // contains an alias because the network would reject this.
3042
        var announcements []networkMsg
43✔
3043
        if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
66✔
3044
                announcements = append(announcements, networkMsg{
23✔
3045
                        peer:     nMsg.peer,
23✔
3046
                        source:   nMsg.source,
23✔
3047
                        isRemote: nMsg.isRemote,
23✔
3048
                        msg:      upd,
23✔
3049
                })
23✔
3050
        }
23✔
3051

3052
        nMsg.err <- nil
43✔
3053

43✔
3054
        log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
43✔
3055
                "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
43✔
3056
                timestamp)
43✔
3057
        return announcements, true
43✔
3058
}
3059

3060
// handleAnnSig processes a new announcement signatures message.
3061
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
3062
        ann *lnwire.AnnounceSignatures) ([]networkMsg, bool) {
25✔
3063

25✔
3064
        needBlockHeight := ann.ShortChannelID.BlockHeight +
25✔
3065
                d.cfg.ProofMatureDelta
25✔
3066
        shortChanID := ann.ShortChannelID.ToUint64()
25✔
3067

25✔
3068
        prefix := "local"
25✔
3069
        if nMsg.isRemote {
40✔
3070
                prefix = "remote"
15✔
3071
        }
15✔
3072

3073
        log.Infof("Received new %v announcement signature for %v", prefix,
25✔
3074
                ann.ShortChannelID)
25✔
3075

25✔
3076
        // By the specification, channel announcement proofs should be sent
25✔
3077
        // after some number of confirmations after channel was registered in
25✔
3078
        // bitcoin blockchain. Therefore, we check if the proof is mature.
25✔
3079
        d.Lock()
25✔
3080
        premature := d.isPremature(
25✔
3081
                ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
25✔
3082
        )
25✔
3083
        if premature {
26✔
3084
                log.Warnf("Premature proof announcement, current block height"+
1✔
3085
                        "lower than needed: %v < %v", d.bestHeight,
1✔
3086
                        needBlockHeight)
1✔
3087
                d.Unlock()
1✔
3088
                nMsg.err <- nil
1✔
3089
                return nil, false
1✔
3090
        }
1✔
3091
        d.Unlock()
25✔
3092

25✔
3093
        // Ensure that we know of a channel with the target channel ID before
25✔
3094
        // proceeding further.
25✔
3095
        //
25✔
3096
        // We must acquire the mutex for this channel ID before getting the
25✔
3097
        // channel from the database, to ensure what we read does not change
25✔
3098
        // before we call AddProof() later.
25✔
3099
        d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
25✔
3100
        defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
25✔
3101

25✔
3102
        chanInfo, e1, e2, err := d.cfg.Graph.GetChannelByID(
25✔
3103
                ann.ShortChannelID,
25✔
3104
        )
25✔
3105
        if err != nil {
30✔
3106
                _, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
5✔
3107
                if err != nil {
9✔
3108
                        err := fmt.Errorf("unable to store the proof for "+
4✔
3109
                                "short_chan_id=%v: %v", shortChanID, err)
4✔
3110
                        log.Error(err)
4✔
3111
                        nMsg.err <- err
4✔
3112

4✔
3113
                        return nil, false
4✔
3114
                }
4✔
3115

3116
                proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
5✔
3117
                err := d.cfg.WaitingProofStore.Add(proof)
5✔
3118
                if err != nil {
5✔
3119
                        err := fmt.Errorf("unable to store the proof for "+
×
UNCOV
3120
                                "short_chan_id=%v: %v", shortChanID, err)
×
UNCOV
3121
                        log.Error(err)
×
UNCOV
3122
                        nMsg.err <- err
×
UNCOV
3123
                        return nil, false
×
UNCOV
3124
                }
×
3125

3126
                log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
5✔
3127
                        ", adding to waiting batch", prefix, shortChanID)
5✔
3128
                nMsg.err <- nil
5✔
3129
                return nil, false
5✔
3130
        }
3131

3132
        nodeID := nMsg.source.SerializeCompressed()
24✔
3133
        isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
24✔
3134
        isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
24✔
3135

24✔
3136
        // Ensure that channel that was retrieved belongs to the peer which
24✔
3137
        // sent the proof announcement.
24✔
3138
        if !(isFirstNode || isSecondNode) {
24✔
3139
                err := fmt.Errorf("channel that was received doesn't belong "+
×
3140
                        "to the peer which sent the proof, short_chan_id=%v",
×
UNCOV
3141
                        shortChanID)
×
UNCOV
3142
                log.Error(err)
×
UNCOV
3143
                nMsg.err <- err
×
UNCOV
3144
                return nil, false
×
UNCOV
3145
        }
×
3146

3147
        // If proof was sent by a local sub-system, then we'll send the
3148
        // announcement signature to the remote node so they can also
3149
        // reconstruct the full channel announcement.
3150
        if !nMsg.isRemote {
38✔
3151
                var remotePubKey [33]byte
14✔
3152
                if isFirstNode {
28✔
3153
                        remotePubKey = chanInfo.NodeKey2Bytes
14✔
3154
                } else {
18✔
3155
                        remotePubKey = chanInfo.NodeKey1Bytes
4✔
3156
                }
4✔
3157

3158
                // Since the remote peer might not be online we'll call a
3159
                // method that will attempt to deliver the proof when it comes
3160
                // online.
3161
                err := d.reliableSender.sendMessage(ann, remotePubKey)
14✔
3162
                if err != nil {
14✔
3163
                        err := fmt.Errorf("unable to reliably send %v for "+
×
UNCOV
3164
                                "channel=%v to peer=%x: %v", ann.MsgType(),
×
UNCOV
3165
                                ann.ShortChannelID, remotePubKey, err)
×
UNCOV
3166
                        nMsg.err <- err
×
UNCOV
3167
                        return nil, false
×
UNCOV
3168
                }
×
3169
        }
3170

3171
        // Check if we already have the full proof for this channel.
3172
        if chanInfo.AuthProof != nil {
29✔
3173
                // If we already have the fully assembled proof, then the peer
5✔
3174
                // sending us their proof has probably not received our local
5✔
3175
                // proof yet. So be kind and send them the full proof.
5✔
3176
                if nMsg.isRemote {
10✔
3177
                        peerID := nMsg.source.SerializeCompressed()
5✔
3178
                        log.Debugf("Got AnnounceSignatures for channel with " +
5✔
3179
                                "full proof.")
5✔
3180

5✔
3181
                        d.wg.Add(1)
5✔
3182
                        go func() {
10✔
3183
                                defer d.wg.Done()
5✔
3184

5✔
3185
                                log.Debugf("Received half proof for channel "+
5✔
3186
                                        "%v with existing full proof. Sending"+
5✔
3187
                                        " full proof to peer=%x",
5✔
3188
                                        ann.ChannelID, peerID)
5✔
3189

5✔
3190
                                ca, _, _, err := netann.CreateChanAnnouncement(
5✔
3191
                                        chanInfo.AuthProof, chanInfo, e1, e2,
5✔
3192
                                )
5✔
3193
                                if err != nil {
5✔
UNCOV
3194
                                        log.Errorf("unable to gen ann: %v",
×
UNCOV
3195
                                                err)
×
3196
                                        return
×
3197
                                }
×
3198

3199
                                err = nMsg.peer.SendMessage(false, ca)
5✔
3200
                                if err != nil {
5✔
UNCOV
3201
                                        log.Errorf("Failed sending full proof"+
×
UNCOV
3202
                                                " to peer=%x: %v", peerID, err)
×
UNCOV
3203
                                        return
×
UNCOV
3204
                                }
×
3205

3206
                                log.Debugf("Full proof sent to peer=%x for "+
5✔
3207
                                        "chanID=%v", peerID, ann.ChannelID)
5✔
3208
                        }()
3209
                }
3210

3211
                log.Debugf("Already have proof for channel with chanID=%v",
5✔
3212
                        ann.ChannelID)
5✔
3213
                nMsg.err <- nil
5✔
3214
                return nil, true
5✔
3215
        }
3216

3217
        // Check that we received the opposite proof. If so, then we're now
3218
        // able to construct the full proof, and create the channel
3219
        // announcement. If we didn't receive the opposite half of the proof
3220
        // then we should store this one, and wait for the opposite to be
3221
        // received.
3222
        proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
23✔
3223
        oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
23✔
3224
        if err != nil && err != channeldb.ErrWaitingProofNotFound {
23✔
3225
                err := fmt.Errorf("unable to get the opposite proof for "+
×
UNCOV
3226
                        "short_chan_id=%v: %v", shortChanID, err)
×
UNCOV
3227
                log.Error(err)
×
UNCOV
3228
                nMsg.err <- err
×
UNCOV
3229
                return nil, false
×
3230
        }
×
3231

3232
        if err == channeldb.ErrWaitingProofNotFound {
36✔
3233
                err := d.cfg.WaitingProofStore.Add(proof)
13✔
3234
                if err != nil {
13✔
3235
                        err := fmt.Errorf("unable to store the proof for "+
×
UNCOV
3236
                                "short_chan_id=%v: %v", shortChanID, err)
×
UNCOV
3237
                        log.Error(err)
×
UNCOV
3238
                        nMsg.err <- err
×
UNCOV
3239
                        return nil, false
×
UNCOV
3240
                }
×
3241

3242
                log.Infof("1/2 of channel ann proof received for "+
13✔
3243
                        "short_chan_id=%v, waiting for other half",
13✔
3244
                        shortChanID)
13✔
3245

13✔
3246
                nMsg.err <- nil
13✔
3247
                return nil, false
13✔
3248
        }
3249

3250
        // We now have both halves of the channel announcement proof, then
3251
        // we'll reconstruct the initial announcement so we can validate it
3252
        // shortly below.
3253
        var dbProof models.ChannelAuthProof
14✔
3254
        if isFirstNode {
19✔
3255
                dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
5✔
3256
                dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
5✔
3257
                dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
5✔
3258
                dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
5✔
3259
        } else {
18✔
3260
                dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
13✔
3261
                dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
13✔
3262
                dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
13✔
3263
                dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
13✔
3264
        }
13✔
3265

3266
        chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
14✔
3267
                &dbProof, chanInfo, e1, e2,
14✔
3268
        )
14✔
3269
        if err != nil {
14✔
UNCOV
3270
                log.Error(err)
×
UNCOV
3271
                nMsg.err <- err
×
UNCOV
3272
                return nil, false
×
3273
        }
×
3274

3275
        // With all the necessary components assembled validate the full
3276
        // channel announcement proof.
3277
        if err := graph.ValidateChannelAnn(chanAnn); err != nil {
14✔
3278
                err := fmt.Errorf("channel announcement proof for "+
×
3279
                        "short_chan_id=%v isn't valid: %v", shortChanID, err)
×
UNCOV
3280

×
UNCOV
3281
                log.Error(err)
×
UNCOV
3282
                nMsg.err <- err
×
UNCOV
3283
                return nil, false
×
UNCOV
3284
        }
×
3285

3286
        // If the channel was returned by the router it means that existence of
3287
        // funding point and inclusion of nodes bitcoin keys in it already
3288
        // checked by the router. In this stage we should check that node keys
3289
        // attest to the bitcoin keys by validating the signatures of
3290
        // announcement. If proof is valid then we'll populate the channel edge
3291
        // with it, so we can announce it on peer connect.
3292
        err = d.cfg.Graph.AddProof(ann.ShortChannelID, &dbProof)
14✔
3293
        if err != nil {
14✔
3294
                err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
×
UNCOV
3295
                        " %v", ann.ChannelID, err)
×
UNCOV
3296
                log.Error(err)
×
UNCOV
3297
                nMsg.err <- err
×
3298
                return nil, false
×
3299
        }
×
3300

3301
        err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
14✔
3302
        if err != nil {
14✔
3303
                err := fmt.Errorf("unable to remove opposite proof for the "+
×
UNCOV
3304
                        "channel with chanID=%v: %v", ann.ChannelID, err)
×
UNCOV
3305
                log.Error(err)
×
UNCOV
3306
                nMsg.err <- err
×
UNCOV
3307
                return nil, false
×
UNCOV
3308
        }
×
3309

3310
        // Proof was successfully created and now can announce the channel to
3311
        // the remain network.
3312
        log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
14✔
3313
                ", adding to next ann batch", shortChanID)
14✔
3314

14✔
3315
        // Assemble the necessary announcements to add to the next broadcasting
14✔
3316
        // batch.
14✔
3317
        var announcements []networkMsg
14✔
3318
        announcements = append(announcements, networkMsg{
14✔
3319
                peer:   nMsg.peer,
14✔
3320
                source: nMsg.source,
14✔
3321
                msg:    chanAnn,
14✔
3322
        })
14✔
3323
        if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
27✔
3324
                announcements = append(announcements, networkMsg{
13✔
3325
                        peer:   nMsg.peer,
13✔
3326
                        source: src,
13✔
3327
                        msg:    e1Ann,
13✔
3328
                })
13✔
3329
        }
13✔
3330
        if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
26✔
3331
                announcements = append(announcements, networkMsg{
12✔
3332
                        peer:   nMsg.peer,
12✔
3333
                        source: src,
12✔
3334
                        msg:    e2Ann,
12✔
3335
                })
12✔
3336
        }
12✔
3337

3338
        // We'll also send along the node announcements for each channel
3339
        // participant if we know of them. To ensure our node announcement
3340
        // propagates to our channel counterparty, we'll set the source for
3341
        // each announcement to the node it belongs to, otherwise we won't send
3342
        // it since the source gets skipped. This isn't necessary for channel
3343
        // updates and announcement signatures since we send those directly to
3344
        // our channel counterparty through the gossiper's reliable sender.
3345
        node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
14✔
3346
        if err != nil {
20✔
3347
                log.Debugf("Unable to fetch node announcement for %x: %v",
6✔
3348
                        chanInfo.NodeKey1Bytes, err)
6✔
3349
        } else {
18✔
3350
                if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
24✔
3351
                        announcements = append(announcements, networkMsg{
12✔
3352
                                peer:   nMsg.peer,
12✔
3353
                                source: nodeKey1,
12✔
3354
                                msg:    node1Ann,
12✔
3355
                        })
12✔
3356
                }
12✔
3357
        }
3358

3359
        node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
14✔
3360
        if err != nil {
22✔
3361
                log.Debugf("Unable to fetch node announcement for %x: %v",
8✔
3362
                        chanInfo.NodeKey2Bytes, err)
8✔
3363
        } else {
18✔
3364
                if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
20✔
3365
                        announcements = append(announcements, networkMsg{
10✔
3366
                                peer:   nMsg.peer,
10✔
3367
                                source: nodeKey2,
10✔
3368
                                msg:    node2Ann,
10✔
3369
                        })
10✔
3370
                }
10✔
3371
        }
3372

3373
        nMsg.err <- nil
14✔
3374
        return announcements, true
14✔
3375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc