• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16911773184

12 Aug 2025 02:21PM UTC coverage: 57.471% (-9.4%) from 66.9%
16911773184

Pull #10103

github

web-flow
Merge d64a1234d into f3e1f2f35
Pull Request #10103: Rate limit outgoing gossip bandwidth by peer

57 of 77 new or added lines in 5 files covered. (74.03%)

28294 existing lines in 457 files now uncovered.

99110 of 172451 relevant lines covered (57.47%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.14
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "math"
8
        "math/rand"
9
        "sort"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/graph"
17
        graphdb "github.com/lightningnetwork/lnd/graph/db"
18
        "github.com/lightningnetwork/lnd/lnpeer"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "golang.org/x/time/rate"
21
)
22

23
// SyncerType encapsulates the different types of syncing mechanisms for a
24
// gossip syncer.
25
type SyncerType uint8
26

27
const (
28
        // ActiveSync denotes that a gossip syncer:
29
        //
30
        // 1. Should not attempt to synchronize with the remote peer for
31
        //    missing channels.
32
        // 2. Should respond to queries from the remote peer.
33
        // 3. Should receive new updates from the remote peer.
34
        //
35
        // They are started in a chansSynced state in order to accomplish their
36
        // responsibilities above.
37
        ActiveSync SyncerType = iota
38

39
        // PassiveSync denotes that a gossip syncer:
40
        //
41
        // 1. Should not attempt to synchronize with the remote peer for
42
        //    missing channels.
43
        // 2. Should respond to queries from the remote peer.
44
        // 3. Should not receive new updates from the remote peer.
45
        //
46
        // They are started in a chansSynced state in order to accomplish their
47
        // responsibilities above.
48
        PassiveSync
49

50
        // PinnedSync denotes an ActiveSync that doesn't count towards the
51
        // default active syncer limits and is always active throughout the
52
        // duration of the peer's connection. Each pinned syncer will begin by
53
        // performing a historical sync to ensure we are well synchronized with
54
        // their routing table.
55
        PinnedSync
56
)
57

58
const (
59
        // defaultTimestampQueueSize is the size of the timestamp range queue
60
        // used.
61
        defaultTimestampQueueSize = 1
62
)
63

64
// String returns a human readable string describing the target SyncerType.
65
func (t SyncerType) String() string {
3✔
66
        switch t {
3✔
67
        case ActiveSync:
3✔
68
                return "ActiveSync"
3✔
69
        case PassiveSync:
3✔
70
                return "PassiveSync"
3✔
71
        case PinnedSync:
3✔
72
                return "PinnedSync"
3✔
73
        default:
×
74
                return fmt.Sprintf("unknown sync type %d", t)
×
75
        }
76
}
77

78
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
79
// allowing new gossip messages to be received from the peer.
80
func (t SyncerType) IsActiveSync() bool {
3✔
81
        switch t {
3✔
82
        case ActiveSync, PinnedSync:
3✔
83
                return true
3✔
84
        default:
3✔
85
                return false
3✔
86
        }
87
}
88

89
// syncerState is an enum that represents the current state of the GossipSyncer.
90
// As the syncer is a state machine, we'll gate our actions based off of the
91
// current state and the next incoming message.
92
type syncerState uint32
93

94
const (
95
        // syncingChans is the default state of the GossipSyncer. We start in
96
        // this state when a new peer first connects and we don't yet know if
97
        // we're fully synchronized.
98
        syncingChans syncerState = iota
99

100
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
101
        // We enter this state after we send out our first QueryChannelRange
102
        // reply. We'll stay in this state until the remote party sends us a
103
        // ReplyShortChanIDsEnd message that indicates they've responded to our
104
        // query entirely. After this state, we'll transition to
105
        // waitingQueryChanReply after we send out requests for all the new
106
        // chan ID's to us.
107
        waitingQueryRangeReply
108

109
        // queryNewChannels is the third main phase of the GossipSyncer.  In
110
        // this phase we'll send out all of our QueryShortChanIDs messages in
111
        // response to the new channels that we don't yet know about.
112
        queryNewChannels
113

114
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
115
        // We enter this phase once we've sent off a query chink to the remote
116
        // peer.  We'll stay in this phase until we receive a
117
        // ReplyShortChanIDsEnd message which indicates that the remote party
118
        // has responded to all of our requests.
119
        waitingQueryChanReply
120

121
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
122
        // this phase, we'll send out our update horizon, which filters out the
123
        // set of channel updates that we're interested in. In this state,
124
        // we'll be able to accept any outgoing messages from the
125
        // AuthenticatedGossiper, and decide if we should forward them to our
126
        // target peer based on its update horizon.
127
        chansSynced
128

129
        // syncerIdle is a state in which the gossip syncer can handle external
130
        // requests to transition or perform historical syncs. It is used as the
131
        // initial state for pinned syncers, as well as a fallthrough case for
132
        // chansSynced allowing fully synced peers to facilitate requests.
133
        syncerIdle
134
)
135

136
// String returns a human readable string describing the target syncerState.
137
func (s syncerState) String() string {
3✔
138
        switch s {
3✔
139
        case syncingChans:
3✔
140
                return "syncingChans"
3✔
141

142
        case waitingQueryRangeReply:
3✔
143
                return "waitingQueryRangeReply"
3✔
144

145
        case queryNewChannels:
3✔
146
                return "queryNewChannels"
3✔
147

148
        case waitingQueryChanReply:
3✔
149
                return "waitingQueryChanReply"
3✔
150

151
        case chansSynced:
3✔
152
                return "chansSynced"
3✔
153

154
        case syncerIdle:
3✔
155
                return "syncerIdle"
3✔
156

157
        default:
×
158
                return "UNKNOWN STATE"
×
159
        }
160
}
161

162
const (
163
        // maxQueryChanRangeReplies specifies the default limit of replies to
164
        // process for a single QueryChannelRange request.
165
        maxQueryChanRangeReplies = 500
166

167
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
168
        // the maximum number of replies allowed for zlib encoded replies.
169
        maxQueryChanRangeRepliesZlibFactor = 4
170

171
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
172
        // asking the remote peer for their any channels they know of beyond
173
        // our highest known channel ID.
174
        chanRangeQueryBuffer = 144
175

176
        // syncTransitionTimeout is the default timeout in which we'll wait up
177
        // to when attempting to perform a sync transition.
178
        syncTransitionTimeout = 5 * time.Second
179

180
        // requestBatchSize is the maximum number of channels we will query the
181
        // remote peer for in a QueryShortChanIDs message.
182
        requestBatchSize = 500
183

184
        // syncerBufferSize is the size of the syncer's buffers.
185
        syncerBufferSize = 50
186
)
187

188
var (
189
        // encodingTypeToChunkSize maps an encoding type, to the max number of
190
        // short chan ID's using the encoding type that we can fit into a
191
        // single message safely.
192
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
193
                lnwire.EncodingSortedPlain: 8000,
194
        }
195

196
        // ErrGossipSyncerExiting signals that the syncer has been killed.
197
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
198

199
        // ErrSyncTransitionTimeout is an error returned when we've timed out
200
        // attempting to perform a sync transition.
201
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
202
                "transition sync type")
203

204
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
205
        // peers that we do not want to receive any new graph updates.
206
        zeroTimestamp time.Time
207
)
208

209
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
210
type syncTransitionReq struct {
211
        newSyncType SyncerType
212
        errChan     chan error
213
}
214

215
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
216
// historical sync.
217
type historicalSyncReq struct {
218
        // doneChan is a channel that serves as a signal and is closed to ensure
219
        // the historical sync is attempted by the time we return to the caller.
220
        doneChan chan struct{}
221
}
222

223
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
224
// needs to carry out its duties.
225
type gossipSyncerCfg struct {
226
        // chainHash is the chain that this syncer is responsible for.
227
        chainHash chainhash.Hash
228

229
        // peerPub is the public key of the peer we're syncing with, serialized
230
        // in compressed format.
231
        peerPub [33]byte
232

233
        // channelSeries is the primary interface that we'll use to generate
234
        // our queries and respond to the queries of the remote peer.
235
        channelSeries ChannelGraphTimeSeries
236

237
        // encodingType is the current encoding type we're aware of. Requests
238
        // with different encoding types will be rejected.
239
        encodingType lnwire.QueryEncoding
240

241
        // chunkSize is the max number of short chan IDs using the syncer's
242
        // encoding type that we can fit into a single message safely.
243
        chunkSize int32
244

245
        // batchSize is the max number of channels the syncer will query from
246
        // the remote node in a single QueryShortChanIDs request.
247
        batchSize int32
248

249
        // sendMsg sends a variadic number of messages to the remote peer.
250
        // The boolean indicates whether this method should be blocked or not
251
        // while waiting for sends to be written to the wire.
252
        sendMsg func(context.Context, bool, ...lnwire.Message) error
253

254
        // noSyncChannels will prevent the GossipSyncer from spawning a
255
        // channelGraphSyncer, meaning we will not try to reconcile unknown
256
        // channels with the remote peer.
257
        noSyncChannels bool
258

259
        // noReplyQueries will prevent the GossipSyncer from spawning a
260
        // replyHandler, meaning we will not reply to queries from our remote
261
        // peer.
262
        noReplyQueries bool
263

264
        // noTimestampQueryOption will prevent the GossipSyncer from querying
265
        // timestamps of announcement messages from the peer, and it will
266
        // prevent it from responding to timestamp queries.
267
        noTimestampQueryOption bool
268

269
        // ignoreHistoricalFilters will prevent syncers from replying with
270
        // historical data when the remote peer sets a gossip_timestamp_range.
271
        // This prevents ranges with old start times from causing us to dump the
272
        // graph on connect.
273
        ignoreHistoricalFilters bool
274

275
        // bestHeight returns the latest height known of the chain.
276
        bestHeight func() uint32
277

278
        // markGraphSynced updates the SyncManager's perception of whether we
279
        // have completed at least one historical sync.
280
        markGraphSynced func()
281

282
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
283
        // for a single QueryChannelRange request.
284
        maxQueryChanRangeReplies uint32
285

286
        // isStillZombieChannel takes the timestamps of the latest channel
287
        // updates for a channel and returns true if the channel should be
288
        // considered a zombie based on these timestamps.
289
        isStillZombieChannel func(time.Time, time.Time) bool
290

291
        // timestampQueueSize is the size of the timestamp range queue. If not
292
        // set, defaults to the global timestampQueueSize constant.
293
        timestampQueueSize int
294

295
        // msgBytesPerSecond is the allotted bandwidth rate, expressed in
296
        // bytes/second that this gossip syncer can consume. Once we exceed this
297
        // rate, message sending will block until we're below the rate.
298
        msgBytesPerSecond uint64
299
}
300

301
// GossipSyncer is a struct that handles synchronizing the channel graph state
302
// with a remote peer. The GossipSyncer implements a state machine that will
303
// progressively ensure we're synchronized with the channel state of the remote
304
// node. Once both nodes have been synchronized, we'll use an update filter to
305
// filter out which messages should be sent to a remote peer based on their
306
// update horizon. If the update horizon isn't specified, then we won't send
307
// them any channel updates at all.
308
type GossipSyncer struct {
309
        started sync.Once
310
        stopped sync.Once
311

312
        // state is the current state of the GossipSyncer.
313
        //
314
        // NOTE: This variable MUST be used atomically.
315
        state uint32
316

317
        // syncType denotes the SyncerType the gossip syncer is currently
318
        // exercising.
319
        //
320
        // NOTE: This variable MUST be used atomically.
321
        syncType uint32
322

323
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
324
        // use this to properly filter out any messages.
325
        remoteUpdateHorizon *lnwire.GossipTimestampRange
326

327
        // localUpdateHorizon is our local update horizon, we'll use this to
328
        // determine if we've already sent out our update.
329
        localUpdateHorizon *lnwire.GossipTimestampRange
330

331
        // syncTransitions is a channel through which new sync type transition
332
        // requests will be sent through. These requests should only be handled
333
        // when the gossip syncer is in a chansSynced state to ensure its state
334
        // machine behaves as expected.
335
        syncTransitionReqs chan *syncTransitionReq
336

337
        // historicalSyncReqs is a channel that serves as a signal for the
338
        // gossip syncer to perform a historical sync. These can only be done
339
        // once the gossip syncer is in a chansSynced state to ensure its state
340
        // machine behaves as expected.
341
        historicalSyncReqs chan *historicalSyncReq
342

343
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
344
        // that it should request the remote peer for all of its known channel
345
        // IDs starting from the genesis block of the chain. This can only
346
        // happen if the gossip syncer receives a request to attempt a
347
        // historical sync. It can be unset if the syncer ever transitions from
348
        // PassiveSync to ActiveSync.
349
        genHistoricalChanRangeQuery bool
350

351
        // gossipMsgs is a channel that all responses to our queries from the
352
        // target peer will be sent over, these will be read by the
353
        // channelGraphSyncer.
354
        gossipMsgs chan lnwire.Message
355

356
        // queryMsgs is a channel that all queries from the remote peer will be
357
        // received over, these will be read by the replyHandler.
358
        queryMsgs chan lnwire.Message
359

360
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
361
        // we've sent to a peer to ensure we've consumed all expected replies.
362
        // This field is primarily used within the waitingQueryChanReply state.
363
        curQueryRangeMsg *lnwire.QueryChannelRange
364

365
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
366
        // message we've received from a peer to ensure they've fully replied to
367
        // our query by ensuring they covered our requested block range. This
368
        // field is primarily used within the waitingQueryChanReply state.
369
        prevReplyChannelRange *lnwire.ReplyChannelRange
370

371
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
372
        // buffer all the chunked response to our query.
373
        bufferedChanRangeReplies []graphdb.ChannelUpdateInfo
374

375
        // numChanRangeRepliesRcvd is used to track the number of replies
376
        // received as part of a QueryChannelRange. This field is primarily used
377
        // within the waitingQueryChanReply state.
378
        numChanRangeRepliesRcvd uint32
379

380
        // newChansToQuery is used to pass the set of channels we should query
381
        // for from the waitingQueryChanReply state to the queryNewChannels
382
        // state.
383
        newChansToQuery []lnwire.ShortChannelID
384

385
        cfg gossipSyncerCfg
386

387
        // syncedSignal is a channel that, if set, will be closed when the
388
        // GossipSyncer reaches its terminal chansSynced state.
389
        syncedSignal chan struct{}
390

391
        // syncerSema is used to more finely control the syncer's ability to
392
        // respond to gossip timestamp range messages.
393
        syncerSema chan struct{}
394

395
        // timestampRangeQueue is a buffered channel for queuing timestamp range
396
        // messages that need to be processed asynchronously. This prevents the
397
        // gossiper from blocking when ApplyGossipFilter is called.
398
        timestampRangeQueue chan *lnwire.GossipTimestampRange
399

400
        // isSendingBacklog is an atomic flag that indicates whether a goroutine
401
        // is currently sending the backlog of messages. This ensures only one
402
        // goroutine is active at a time.
403
        isSendingBacklog atomic.Bool
404

405
        sync.Mutex
406

407
        // cg is a helper that encapsulates a wait group and quit channel and
408
        // allows contexts that either block or cancel on those depending on
409
        // the use case.
410
        cg *fn.ContextGuard
411

412
        // rateLimiter dictates the frequency with which we will reply to gossip
413
        // queries to this peer.
414
        rateLimiter *rate.Limiter
415
}
416

417
// newGossipSyncer returns a new instance of the GossipSyncer populated using
418
// the passed config.
419
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
3✔
420
        // Use the configured queue size if set, otherwise use the default.
3✔
421
        queueSize := cfg.timestampQueueSize
3✔
422
        if queueSize == 0 {
6✔
423
                queueSize = defaultTimestampQueueSize
3✔
424
        }
3✔
425

426
        bytesPerSecond := cfg.msgBytesPerSecond
3✔
427
        if bytesPerSecond == 0 {
3✔
NEW
428
                bytesPerSecond = DefaultPeerMsgBytesPerSecond
×
NEW
429
        }
×
430
        bytesBurst := 2 * bytesPerSecond
3✔
431

3✔
432
        // We'll use this rate limiter to limit this single peer.
3✔
433
        rateLimiter := rate.NewLimiter(
3✔
434
                rate.Limit(bytesPerSecond), int(bytesBurst),
3✔
435
        )
3✔
436

3✔
437
        return &GossipSyncer{
3✔
438
                cfg:                cfg,
3✔
439
                syncTransitionReqs: make(chan *syncTransitionReq),
3✔
440
                historicalSyncReqs: make(chan *historicalSyncReq),
3✔
441
                gossipMsgs:         make(chan lnwire.Message, syncerBufferSize),
3✔
442
                queryMsgs:          make(chan lnwire.Message, syncerBufferSize),
3✔
443
                timestampRangeQueue: make(
3✔
444
                        chan *lnwire.GossipTimestampRange, queueSize,
3✔
445
                ),
3✔
446
                syncerSema:  sema,
3✔
447
                cg:          fn.NewContextGuard(),
3✔
448
                rateLimiter: rateLimiter,
3✔
449
        }
3✔
450
}
451

452
// Start starts the GossipSyncer and any goroutines that it needs to carry out
453
// its duties.
454
func (g *GossipSyncer) Start() {
3✔
455
        g.started.Do(func() {
6✔
456
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
3✔
457

3✔
458
                ctx, _ := g.cg.Create(context.Background())
3✔
459

3✔
460
                // TODO(conner): only spawn channelGraphSyncer if remote
3✔
461
                // supports gossip queries, and only spawn replyHandler if we
3✔
462
                // advertise support
3✔
463
                if !g.cfg.noSyncChannels {
6✔
464
                        g.cg.WgAdd(1)
3✔
465
                        go g.channelGraphSyncer(ctx)
3✔
466
                }
3✔
467
                if !g.cfg.noReplyQueries {
6✔
468
                        g.cg.WgAdd(1)
3✔
469
                        go g.replyHandler(ctx)
3✔
470
                }
3✔
471

472
                // Start the timestamp range queue processor to handle gossip
473
                // filter applications asynchronously.
474
                if !g.cfg.noTimestampQueryOption {
6✔
475
                        g.cg.WgAdd(1)
3✔
476
                        go g.processTimestampRangeQueue(ctx)
3✔
477
                }
3✔
478
        })
479
}
480

481
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
482
// exited.
483
func (g *GossipSyncer) Stop() {
3✔
484
        g.stopped.Do(func() {
6✔
485
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
3✔
486
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
3✔
487

3✔
488
                g.cg.Quit()
3✔
489
        })
3✔
490
}
491

492
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
493
// in this state, we will send a QueryChannelRange msg to our peer and advance
494
// the syncer's state to waitingQueryRangeReply.
495
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
3✔
496
        // Prepare the query msg.
3✔
497
        queryRangeMsg, err := g.genChanRangeQuery(
3✔
498
                ctx, g.genHistoricalChanRangeQuery,
3✔
499
        )
3✔
500
        if err != nil {
3✔
501
                log.Errorf("Unable to gen chan range query: %v", err)
×
502
                return
×
503
        }
×
504

505
        // Acquire a lock so the following state transition is atomic.
506
        //
507
        // NOTE: We must lock the following steps as it's possible we get an
508
        // immediate response (ReplyChannelRange) after sending the query msg.
509
        // The response is handled in ProcessQueryMsg, which requires the
510
        // current state to be waitingQueryRangeReply.
511
        g.Lock()
3✔
512
        defer g.Unlock()
3✔
513

3✔
514
        // Send the msg to the remote peer, which is non-blocking as
3✔
515
        // `sendToPeer` only queues the msg in Brontide.
3✔
516
        err = g.sendToPeer(ctx, queryRangeMsg)
3✔
517
        if err != nil {
3✔
518
                log.Errorf("Unable to send chan range query: %v", err)
×
519
                return
×
520
        }
×
521

522
        // With the message sent successfully, we'll transition into the next
523
        // state where we wait for their reply.
524
        g.setSyncState(waitingQueryRangeReply)
3✔
525
}
526

527
// channelGraphSyncer is the main goroutine responsible for ensuring that we
528
// properly channel graph state with the remote peer, and also that we only
529
// send them messages which actually pass their defined update horizon.
530
func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
3✔
531
        defer g.cg.WgDone()
3✔
532

3✔
533
        for {
6✔
534
                state := g.syncState()
3✔
535
                syncType := g.SyncType()
3✔
536

3✔
537
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
3✔
538
                        g.cfg.peerPub[:], state, syncType)
3✔
539

3✔
540
                switch state {
3✔
541
                // When we're in this state, we're trying to synchronize our
542
                // view of the network with the remote peer. We'll kick off
543
                // this sync by asking them for the set of channels they
544
                // understand, as we'll as responding to any other queries by
545
                // them.
546
                case syncingChans:
3✔
547
                        g.handleSyncingChans(ctx)
3✔
548

549
                // In this state, we've sent out our initial channel range
550
                // query and are waiting for the final response from the remote
551
                // peer before we perform a diff to see with channels they know
552
                // of that we don't.
553
                case waitingQueryRangeReply:
3✔
554
                        // We'll wait to either process a new message from the
3✔
555
                        // remote party, or exit due to the gossiper exiting,
3✔
556
                        // or us being signalled to do so.
3✔
557
                        select {
3✔
558
                        case msg := <-g.gossipMsgs:
3✔
559
                                // The remote peer is sending a response to our
3✔
560
                                // initial query, we'll collate this response,
3✔
561
                                // and see if it's the final one in the series.
3✔
562
                                // If so, we can then transition to querying
3✔
563
                                // for the new channels.
3✔
564
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
3✔
565
                                if ok {
6✔
566
                                        err := g.processChanRangeReply(
3✔
567
                                                ctx, queryReply,
3✔
568
                                        )
3✔
569
                                        if err != nil {
3✔
570
                                                log.Errorf("Unable to "+
×
571
                                                        "process chan range "+
×
572
                                                        "query: %v", err)
×
573
                                                return
×
574
                                        }
×
575
                                        continue
3✔
576
                                }
577

578
                                log.Warnf("Unexpected message: %T in state=%v",
×
579
                                        msg, state)
×
580

UNCOV
581
                        case <-g.cg.Done():
×
UNCOV
582
                                return
×
583

UNCOV
584
                        case <-ctx.Done():
×
UNCOV
585
                                return
×
586
                        }
587

588
                // We'll enter this state once we've discovered which channels
589
                // the remote party knows of that we don't yet know of
590
                // ourselves.
591
                case queryNewChannels:
3✔
592
                        // First, we'll attempt to continue our channel
3✔
593
                        // synchronization by continuing to send off another
3✔
594
                        // query chunk.
3✔
595
                        done := g.synchronizeChanIDs(ctx)
3✔
596

3✔
597
                        // If this wasn't our last query, then we'll need to
3✔
598
                        // transition to our waiting state.
3✔
599
                        if !done {
6✔
600
                                continue
3✔
601
                        }
602

603
                        // If we're fully synchronized, then we can transition
604
                        // to our terminal state.
605
                        g.setSyncState(chansSynced)
3✔
606

3✔
607
                        // Ensure that the sync manager becomes aware that the
3✔
608
                        // historical sync completed so synced_to_graph is
3✔
609
                        // updated over rpc.
3✔
610
                        g.cfg.markGraphSynced()
3✔
611

612
                // In this state, we've just sent off a new query for channels
613
                // that we don't yet know of. We'll remain in this state until
614
                // the remote party signals they've responded to our query in
615
                // totality.
616
                case waitingQueryChanReply:
3✔
617
                        // Once we've sent off our query, we'll wait for either
3✔
618
                        // an ending reply, or just another query from the
3✔
619
                        // remote peer.
3✔
620
                        select {
3✔
621
                        case msg := <-g.gossipMsgs:
3✔
622
                                // If this is the final reply to one of our
3✔
623
                                // queries, then we'll loop back into our query
3✔
624
                                // state to send of the remaining query chunks.
3✔
625
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
3✔
626
                                if ok {
6✔
627
                                        g.setSyncState(queryNewChannels)
3✔
628
                                        continue
3✔
629
                                }
630

631
                                log.Warnf("Unexpected message: %T in state=%v",
×
632
                                        msg, state)
×
633

634
                        case <-g.cg.Done():
×
635
                                return
×
636

637
                        case <-ctx.Done():
×
638
                                return
×
639
                        }
640

641
                // This is our final terminal state where we'll only reply to
642
                // any further queries by the remote peer.
643
                case chansSynced:
3✔
644
                        g.Lock()
3✔
645
                        if g.syncedSignal != nil {
6✔
646
                                close(g.syncedSignal)
3✔
647
                                g.syncedSignal = nil
3✔
648
                        }
3✔
649
                        g.Unlock()
3✔
650

3✔
651
                        // If we haven't yet sent out our update horizon, and
3✔
652
                        // we want to receive real-time channel updates, we'll
3✔
653
                        // do so now.
3✔
654
                        if g.localUpdateHorizon == nil &&
3✔
655
                                syncType.IsActiveSync() {
6✔
656

3✔
657
                                err := g.sendGossipTimestampRange(
3✔
658
                                        ctx, time.Now(), math.MaxUint32,
3✔
659
                                )
3✔
660
                                if err != nil {
3✔
661
                                        log.Errorf("Unable to send update "+
×
662
                                                "horizon to %x: %v",
×
663
                                                g.cfg.peerPub, err)
×
664
                                }
×
665
                        }
666
                        // With our horizon set, we'll simply reply to any new
667
                        // messages or process any state transitions and exit if
668
                        // needed.
669
                        fallthrough
3✔
670

671
                // Pinned peers will begin in this state, since they will
672
                // immediately receive a request to perform a historical sync.
673
                // Otherwise, we fall through after ending in chansSynced to
674
                // facilitate new requests.
675
                case syncerIdle:
3✔
676
                        select {
3✔
677
                        case req := <-g.syncTransitionReqs:
3✔
678
                                req.errChan <- g.handleSyncTransition(ctx, req)
3✔
679

680
                        case req := <-g.historicalSyncReqs:
3✔
681
                                g.handleHistoricalSync(req)
3✔
682

UNCOV
683
                        case <-g.cg.Done():
×
UNCOV
684
                                return
×
685

686
                        case <-ctx.Done():
3✔
687
                                return
3✔
688
                        }
689
                }
690
        }
691
}
692

693
// replyHandler is an event loop whose sole purpose is to reply to the remote
694
// peers queries. Our replyHandler will respond to messages generated by their
695
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
696
// the other's replyHandler, allowing the replyHandler to operate independently
697
// from the state machine maintained on the same node.
698
//
699
// NOTE: This method MUST be run as a goroutine.
700
func (g *GossipSyncer) replyHandler(ctx context.Context) {
3✔
701
        defer g.cg.WgDone()
3✔
702

3✔
703
        for {
6✔
704
                select {
3✔
705
                case msg := <-g.queryMsgs:
3✔
706
                        err := g.replyPeerQueries(ctx, msg)
3✔
707
                        switch {
3✔
708
                        case err == ErrGossipSyncerExiting:
×
709
                                return
×
710

711
                        case err == lnpeer.ErrPeerExiting:
×
712
                                return
×
713

714
                        case err != nil:
×
715
                                log.Errorf("Unable to reply to peer "+
×
716
                                        "query: %v", err)
×
717
                        }
718

UNCOV
719
                case <-g.cg.Done():
×
UNCOV
720
                        return
×
721

722
                case <-ctx.Done():
3✔
723
                        return
3✔
724
                }
725
        }
726
}
727

728
// processTimestampRangeQueue handles timestamp range messages from the queue
729
// asynchronously. This prevents blocking the gossiper when rate limiting is
730
// active and multiple peers are trying to apply gossip filters.
731
func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) {
3✔
732
        defer g.cg.WgDone()
3✔
733

3✔
734
        for {
6✔
735
                select {
3✔
736
                case msg := <-g.timestampRangeQueue:
3✔
737
                        // Process the timestamp range message. If we hit an
3✔
738
                        // error, log it but continue processing to avoid
3✔
739
                        // blocking the queue.
3✔
740
                        err := g.ApplyGossipFilter(ctx, msg)
3✔
741
                        switch {
3✔
742
                        case errors.Is(err, ErrGossipSyncerExiting):
×
743
                                return
×
744

745
                        case errors.Is(err, lnpeer.ErrPeerExiting):
×
746
                                return
×
747

748
                        case err != nil:
×
749
                                log.Errorf("Unable to apply gossip filter: %v",
×
750
                                        err)
×
751
                        }
752

753
                case <-g.cg.Done():
×
754
                        return
×
755

756
                case <-ctx.Done():
3✔
757
                        return
3✔
758
                }
759
        }
760
}
761

762
// QueueTimestampRange attempts to queue a timestamp range message for
763
// asynchronous processing. If the queue is full, it returns false to indicate
764
// the message was dropped.
765
func (g *GossipSyncer) QueueTimestampRange(
766
        msg *lnwire.GossipTimestampRange) bool {
3✔
767

3✔
768
        // If timestamp queries are disabled, don't queue the message.
3✔
769
        if g.cfg.noTimestampQueryOption {
3✔
770
                return false
×
771
        }
×
772

773
        select {
3✔
774
        case g.timestampRangeQueue <- msg:
3✔
775
                return true
3✔
776

777
        // Queue is full, drop the message to prevent blocking.
UNCOV
778
        default:
×
UNCOV
779
                log.Warnf("Timestamp range queue full for peer %x, "+
×
UNCOV
780
                        "dropping message", g.cfg.peerPub[:])
×
UNCOV
781
                return false
×
782
        }
783
}
784

785
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
786
// syncer and sends it to the remote peer.
787
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
788
        firstTimestamp time.Time, timestampRange uint32) error {
3✔
789

3✔
790
        endTimestamp := firstTimestamp.Add(
3✔
791
                time.Duration(timestampRange) * time.Second,
3✔
792
        )
3✔
793

3✔
794
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
3✔
795
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
3✔
796

3✔
797
        localUpdateHorizon := &lnwire.GossipTimestampRange{
3✔
798
                ChainHash:      g.cfg.chainHash,
3✔
799
                FirstTimestamp: uint32(firstTimestamp.Unix()),
3✔
800
                TimestampRange: timestampRange,
3✔
801
        }
3✔
802

3✔
803
        if err := g.sendToPeer(ctx, localUpdateHorizon); err != nil {
3✔
804
                return err
×
805
        }
×
806

807
        if firstTimestamp.Equal(zeroTimestamp) && timestampRange == 0 {
3✔
UNCOV
808
                g.localUpdateHorizon = nil
×
809
        } else {
3✔
810
                g.localUpdateHorizon = localUpdateHorizon
3✔
811
        }
3✔
812

813
        return nil
3✔
814
}
815

816
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
817
// the remote peer for its known set of channel IDs within a particular block
818
// range. This method will be called continually until the entire range has
819
// been queried for with a response received. We'll chunk our requests as
820
// required to ensure they fit into a single message. We may re-renter this
821
// state in the case that chunking is required.
822
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
3✔
823
        // If we're in this state yet there are no more new channels to query
3✔
824
        // for, then we'll transition to our final synced state and return true
3✔
825
        // to signal that we're fully synchronized.
3✔
826
        if len(g.newChansToQuery) == 0 {
6✔
827
                log.Infof("GossipSyncer(%x): no more chans to query",
3✔
828
                        g.cfg.peerPub[:])
3✔
829

3✔
830
                return true
3✔
831
        }
3✔
832

833
        // Otherwise, we'll issue our next chunked query to receive replies
834
        // for.
835
        var queryChunk []lnwire.ShortChannelID
3✔
836

3✔
837
        // If the number of channels to query for is less than the chunk size,
3✔
838
        // then we can issue a single query.
3✔
839
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
6✔
840
                queryChunk = g.newChansToQuery
3✔
841
                g.newChansToQuery = nil
3✔
842

3✔
843
        } else {
3✔
UNCOV
844
                // Otherwise, we'll need to only query for the next chunk.
×
UNCOV
845
                // We'll slice into our query chunk, then slide down our main
×
UNCOV
846
                // pointer down by the chunk size.
×
UNCOV
847
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
×
UNCOV
848
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
×
UNCOV
849
        }
×
850

851
        log.Infof("GossipSyncer(%x): querying for %v new channels",
3✔
852
                g.cfg.peerPub[:], len(queryChunk))
3✔
853

3✔
854
        // Change the state before sending the query msg.
3✔
855
        g.setSyncState(waitingQueryChanReply)
3✔
856

3✔
857
        // With our chunk obtained, we'll send over our next query, then return
3✔
858
        // false indicating that we're net yet fully synced.
3✔
859
        err := g.sendToPeer(ctx, &lnwire.QueryShortChanIDs{
3✔
860
                ChainHash:    g.cfg.chainHash,
3✔
861
                EncodingType: lnwire.EncodingSortedPlain,
3✔
862
                ShortChanIDs: queryChunk,
3✔
863
        })
3✔
864
        if err != nil {
3✔
865
                log.Errorf("Unable to sync chan IDs: %v", err)
×
866
        }
×
867

868
        return false
3✔
869
}
870

871
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
872
// considered legacy. There was a point where lnd used to include the same query
873
// over multiple replies, rather than including the portion of the query the
874
// reply is handling. We'll use this as a way of detecting whether we are
875
// communicating with a legacy node so we can properly sync with them.
876
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
877
        reply *lnwire.ReplyChannelRange) bool {
3✔
878

3✔
879
        return (reply.ChainHash == query.ChainHash &&
3✔
880
                reply.FirstBlockHeight == query.FirstBlockHeight &&
3✔
881
                reply.NumBlocks == query.NumBlocks)
3✔
882
}
3✔
883

884
// processChanRangeReply is called each time the GossipSyncer receives a new
885
// reply to the initial range query to discover new channels that it didn't
886
// previously know of.
887
func (g *GossipSyncer) processChanRangeReply(_ context.Context,
888
        msg *lnwire.ReplyChannelRange) error {
3✔
889

3✔
890
        // isStale returns whether the timestamp is too far into the past.
3✔
891
        isStale := func(timestamp time.Time) bool {
6✔
892
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
3✔
893
        }
3✔
894

895
        // isSkewed returns whether the timestamp is too far into the future.
896
        isSkewed := func(timestamp time.Time) bool {
6✔
897
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
3✔
898
        }
3✔
899

900
        // If we're not communicating with a legacy node, we'll apply some
901
        // further constraints on their reply to ensure it satisfies our query.
902
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
3✔
UNCOV
903
                // The first block should be within our original request.
×
UNCOV
904
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
×
905
                        return fmt.Errorf("reply includes channels for height "+
×
906
                                "%v prior to query %v", msg.FirstBlockHeight,
×
907
                                g.curQueryRangeMsg.FirstBlockHeight)
×
908
                }
×
909

910
                // The last block should also be. We don't need to check the
911
                // intermediate ones because they should already be in sorted
912
                // order.
UNCOV
913
                replyLastHeight := msg.LastBlockHeight()
×
UNCOV
914
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
×
UNCOV
915
                if replyLastHeight > queryLastHeight {
×
916
                        return fmt.Errorf("reply includes channels for height "+
×
917
                                "%v after query %v", replyLastHeight,
×
918
                                queryLastHeight)
×
919
                }
×
920

921
                // If we've previously received a reply for this query, look at
922
                // its last block to ensure the current reply properly follows
923
                // it.
UNCOV
924
                if g.prevReplyChannelRange != nil {
×
UNCOV
925
                        prevReply := g.prevReplyChannelRange
×
UNCOV
926
                        prevReplyLastHeight := prevReply.LastBlockHeight()
×
UNCOV
927

×
UNCOV
928
                        // The current reply can either start from the previous
×
UNCOV
929
                        // reply's last block, if there are still more channels
×
UNCOV
930
                        // for the same block, or the block after.
×
UNCOV
931
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
×
UNCOV
932
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
×
933

×
934
                                return fmt.Errorf("first block of reply %v "+
×
935
                                        "does not continue from last block of "+
×
936
                                        "previous %v", msg.FirstBlockHeight,
×
937
                                        prevReplyLastHeight)
×
938
                        }
×
939
                }
940
        }
941

942
        g.prevReplyChannelRange = msg
3✔
943

3✔
944
        for i, scid := range msg.ShortChanIDs {
6✔
945
                info := graphdb.NewChannelUpdateInfo(
3✔
946
                        scid, time.Time{}, time.Time{},
3✔
947
                )
3✔
948

3✔
949
                if len(msg.Timestamps) != 0 {
6✔
950
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
3✔
951
                        info.Node1UpdateTimestamp = t1
3✔
952

3✔
953
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
3✔
954
                        info.Node2UpdateTimestamp = t2
3✔
955

3✔
956
                        // Sort out all channels with outdated or skewed
3✔
957
                        // timestamps. Both timestamps need to be out of
3✔
958
                        // boundaries for us to skip the channel and not query
3✔
959
                        // it later on.
3✔
960
                        switch {
3✔
961
                        case isStale(info.Node1UpdateTimestamp) &&
UNCOV
962
                                isStale(info.Node2UpdateTimestamp):
×
UNCOV
963

×
UNCOV
964
                                continue
×
965

966
                        case isSkewed(info.Node1UpdateTimestamp) &&
UNCOV
967
                                isSkewed(info.Node2UpdateTimestamp):
×
UNCOV
968

×
UNCOV
969
                                continue
×
970

971
                        case isStale(info.Node1UpdateTimestamp) &&
UNCOV
972
                                isSkewed(info.Node2UpdateTimestamp):
×
UNCOV
973

×
UNCOV
974
                                continue
×
975

976
                        case isStale(info.Node2UpdateTimestamp) &&
UNCOV
977
                                isSkewed(info.Node1UpdateTimestamp):
×
UNCOV
978

×
UNCOV
979
                                continue
×
980
                        }
981
                }
982

983
                g.bufferedChanRangeReplies = append(
3✔
984
                        g.bufferedChanRangeReplies, info,
3✔
985
                )
3✔
986
        }
987

988
        switch g.cfg.encodingType {
3✔
989
        case lnwire.EncodingSortedPlain:
3✔
990
                g.numChanRangeRepliesRcvd++
3✔
991
        case lnwire.EncodingSortedZlib:
×
992
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
993
        default:
×
994
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
995
        }
996

997
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
3✔
998
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
3✔
999

3✔
1000
        // If this isn't the last response and we can continue to receive more,
3✔
1001
        // then we can exit as we've already buffered the latest portion of the
3✔
1002
        // streaming reply.
3✔
1003
        maxReplies := g.cfg.maxQueryChanRangeReplies
3✔
1004
        switch {
3✔
1005
        // If we're communicating with a legacy node, we'll need to look at the
1006
        // complete field.
1007
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
3✔
1008
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
3✔
UNCOV
1009
                        return nil
×
UNCOV
1010
                }
×
1011

1012
        // Otherwise, we'll look at the reply's height range.
UNCOV
1013
        default:
×
UNCOV
1014
                replyLastHeight := msg.LastBlockHeight()
×
UNCOV
1015
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
×
UNCOV
1016

×
UNCOV
1017
                // TODO(wilmer): This might require some padding if the remote
×
UNCOV
1018
                // node is not aware of the last height we sent them, i.e., is
×
UNCOV
1019
                // behind a few blocks from us.
×
UNCOV
1020
                if replyLastHeight < queryLastHeight &&
×
UNCOV
1021
                        g.numChanRangeRepliesRcvd < maxReplies {
×
UNCOV
1022

×
UNCOV
1023
                        return nil
×
UNCOV
1024
                }
×
1025
        }
1026

1027
        log.Infof("GossipSyncer(%x): filtering through %v chans",
3✔
1028
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
3✔
1029

3✔
1030
        // Otherwise, this is the final response, so we'll now check to see
3✔
1031
        // which channels they know of that we don't.
3✔
1032
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
3✔
1033
                g.cfg.chainHash, g.bufferedChanRangeReplies,
3✔
1034
                g.cfg.isStillZombieChannel,
3✔
1035
        )
3✔
1036
        if err != nil {
3✔
1037
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
1038
        }
×
1039

1040
        // As we've received the entirety of the reply, we no longer need to
1041
        // hold on to the set of buffered replies or the original query that
1042
        // prompted the replies, so we'll let that be garbage collected now.
1043
        g.curQueryRangeMsg = nil
3✔
1044
        g.prevReplyChannelRange = nil
3✔
1045
        g.bufferedChanRangeReplies = nil
3✔
1046
        g.numChanRangeRepliesRcvd = 0
3✔
1047

3✔
1048
        // If there aren't any channels that we don't know of, then we can
3✔
1049
        // switch straight to our terminal state.
3✔
1050
        if len(newChans) == 0 {
6✔
1051
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
3✔
1052
                        g.cfg.peerPub[:])
3✔
1053

3✔
1054
                g.setSyncState(chansSynced)
3✔
1055

3✔
1056
                // Ensure that the sync manager becomes aware that the
3✔
1057
                // historical sync completed so synced_to_graph is updated over
3✔
1058
                // rpc.
3✔
1059
                g.cfg.markGraphSynced()
3✔
1060
                return nil
3✔
1061
        }
3✔
1062

1063
        // Otherwise, we'll set the set of channels that we need to query for
1064
        // the next state, and also transition our state.
1065
        g.newChansToQuery = newChans
3✔
1066
        g.setSyncState(queryNewChannels)
3✔
1067

3✔
1068
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
3✔
1069
                g.cfg.peerPub[:], len(newChans))
3✔
1070

3✔
1071
        return nil
3✔
1072
}
1073

1074
// genChanRangeQuery generates the initial message we'll send to the remote
1075
// party when we're kicking off the channel graph synchronization upon
1076
// connection. The historicalQuery boolean can be used to generate a query from
1077
// the genesis block of the chain.
1078
func (g *GossipSyncer) genChanRangeQuery(ctx context.Context,
1079
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
3✔
1080

3✔
1081
        // First, we'll query our channel graph time series for its highest
3✔
1082
        // known channel ID.
3✔
1083
        newestChan, err := g.cfg.channelSeries.HighestChanID(
3✔
1084
                ctx, g.cfg.chainHash,
3✔
1085
        )
3✔
1086
        if err != nil {
3✔
1087
                return nil, err
×
1088
        }
×
1089

1090
        // Once we have the chan ID of the newest, we'll obtain the block height
1091
        // of the channel, then subtract our default horizon to ensure we don't
1092
        // miss any channels. By default, we go back 1 day from the newest
1093
        // channel, unless we're attempting a historical sync, where we'll
1094
        // actually start from the genesis block instead.
1095
        var startHeight uint32
3✔
1096
        switch {
3✔
1097
        case historicalQuery:
3✔
1098
                fallthrough
3✔
1099
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
3✔
1100
                startHeight = 0
3✔
UNCOV
1101
        default:
×
UNCOV
1102
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
×
1103
        }
1104

1105
        // Determine the number of blocks to request based on our best height.
1106
        // We'll take into account any potential underflows and explicitly set
1107
        // numBlocks to its minimum value of 1 if so.
1108
        bestHeight := g.cfg.bestHeight()
3✔
1109
        numBlocks := bestHeight - startHeight
3✔
1110
        if int64(numBlocks) < 1 {
3✔
1111
                numBlocks = 1
×
1112
        }
×
1113

1114
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
3✔
1115
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
3✔
1116

3✔
1117
        // Finally, we'll craft the channel range query, using our starting
3✔
1118
        // height, then asking for all known channels to the foreseeable end of
3✔
1119
        // the main chain.
3✔
1120
        query := &lnwire.QueryChannelRange{
3✔
1121
                ChainHash:        g.cfg.chainHash,
3✔
1122
                FirstBlockHeight: startHeight,
3✔
1123
                NumBlocks:        numBlocks,
3✔
1124
        }
3✔
1125

3✔
1126
        if !g.cfg.noTimestampQueryOption {
6✔
1127
                query.QueryOptions = lnwire.NewTimestampQueryOption()
3✔
1128
        }
3✔
1129

1130
        g.curQueryRangeMsg = query
3✔
1131

3✔
1132
        return query, nil
3✔
1133
}
1134

1135
// replyPeerQueries is called in response to any query by the remote peer.
1136
// We'll examine our state and send back our best response.
1137
func (g *GossipSyncer) replyPeerQueries(ctx context.Context,
1138
        msg lnwire.Message) error {
3✔
1139

3✔
1140
        switch msg := msg.(type) {
3✔
1141

1142
        // In this state, we'll also handle any incoming channel range queries
1143
        // from the remote peer as they're trying to sync their state as well.
1144
        case *lnwire.QueryChannelRange:
3✔
1145
                return g.replyChanRangeQuery(ctx, msg)
3✔
1146

1147
        // If the remote peer skips straight to requesting new channels that
1148
        // they don't know of, then we'll ensure that we also handle this case.
1149
        case *lnwire.QueryShortChanIDs:
3✔
1150
                return g.replyShortChanIDs(ctx, msg)
3✔
1151

1152
        default:
×
1153
                return fmt.Errorf("unknown message: %T", msg)
×
1154
        }
1155
}
1156

1157
// replyChanRangeQuery will be dispatched in response to a channel range query
1158
// by the remote node. We'll query the channel time series for channels that
1159
// meet the channel range, then chunk our responses to the remote node. We also
1160
// ensure that our final fragment carries the "complete" bit to indicate the
1161
// end of our streaming response.
1162
func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context,
1163
        query *lnwire.QueryChannelRange) error {
3✔
1164

3✔
1165
        // Before responding, we'll check to ensure that the remote peer is
3✔
1166
        // querying for the same chain that we're on. If not, we'll send back a
3✔
1167
        // response with a complete value of zero to indicate we're on a
3✔
1168
        // different chain.
3✔
1169
        if g.cfg.chainHash != query.ChainHash {
3✔
UNCOV
1170
                log.Warnf("Remote peer requested QueryChannelRange for "+
×
UNCOV
1171
                        "chain=%v, we're on chain=%v", query.ChainHash,
×
UNCOV
1172
                        g.cfg.chainHash)
×
UNCOV
1173

×
NEW
1174
                return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
×
UNCOV
1175
                        ChainHash:        query.ChainHash,
×
UNCOV
1176
                        FirstBlockHeight: query.FirstBlockHeight,
×
UNCOV
1177
                        NumBlocks:        query.NumBlocks,
×
UNCOV
1178
                        Complete:         0,
×
UNCOV
1179
                        EncodingType:     g.cfg.encodingType,
×
UNCOV
1180
                        ShortChanIDs:     nil,
×
UNCOV
1181
                })
×
UNCOV
1182
        }
×
1183

1184
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
3✔
1185
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
3✔
1186
                query.NumBlocks)
3✔
1187

3✔
1188
        // Check if the query asked for timestamps. We will only serve
3✔
1189
        // timestamps if this has not been disabled with
3✔
1190
        // noTimestampQueryOption.
3✔
1191
        withTimestamps := query.WithTimestamps() &&
3✔
1192
                !g.cfg.noTimestampQueryOption
3✔
1193

3✔
1194
        // Next, we'll consult the time series to obtain the set of known
3✔
1195
        // channel ID's that match their query.
3✔
1196
        startBlock := query.FirstBlockHeight
3✔
1197
        endBlock := query.LastBlockHeight()
3✔
1198
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
3✔
1199
                query.ChainHash, startBlock, endBlock, withTimestamps,
3✔
1200
        )
3✔
1201
        if err != nil {
3✔
1202
                return err
×
1203
        }
×
1204

1205
        // TODO(roasbeef): means can't send max uint above?
1206
        //  * or make internal 64
1207

1208
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1209
        // this as there's a transport message size limit which we'll need to
1210
        // adhere to. We also need to make sure all of our replies cover the
1211
        // expected range of the query.
1212
        sendReplyForChunk := func(channelChunk []graphdb.ChannelUpdateInfo,
3✔
1213
                firstHeight, lastHeight uint32, finalChunk bool) error {
6✔
1214

3✔
1215
                // The number of blocks contained in the current chunk (the
3✔
1216
                // total span) is the difference between the last channel ID and
3✔
1217
                // the first in the range. We add one as even if all channels
3✔
1218
                // returned are in the same block, we need to count that.
3✔
1219
                numBlocks := lastHeight - firstHeight + 1
3✔
1220
                complete := uint8(0)
3✔
1221
                if finalChunk {
6✔
1222
                        complete = 1
3✔
1223
                }
3✔
1224

1225
                var timestamps lnwire.Timestamps
3✔
1226
                if withTimestamps {
6✔
1227
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
3✔
1228
                }
3✔
1229

1230
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
3✔
1231
                for i, info := range channelChunk {
6✔
1232
                        scids[i] = info.ShortChannelID
3✔
1233

3✔
1234
                        if !withTimestamps {
3✔
UNCOV
1235
                                continue
×
1236
                        }
1237

1238
                        timestamps[i].Timestamp1 = uint32(
3✔
1239
                                info.Node1UpdateTimestamp.Unix(),
3✔
1240
                        )
3✔
1241

3✔
1242
                        timestamps[i].Timestamp2 = uint32(
3✔
1243
                                info.Node2UpdateTimestamp.Unix(),
3✔
1244
                        )
3✔
1245
                }
1246

1247
                return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
3✔
1248
                        ChainHash:        query.ChainHash,
3✔
1249
                        NumBlocks:        numBlocks,
3✔
1250
                        FirstBlockHeight: firstHeight,
3✔
1251
                        Complete:         complete,
3✔
1252
                        EncodingType:     g.cfg.encodingType,
3✔
1253
                        ShortChanIDs:     scids,
3✔
1254
                        Timestamps:       timestamps,
3✔
1255
                })
3✔
1256
        }
1257

1258
        var (
3✔
1259
                firstHeight  = query.FirstBlockHeight
3✔
1260
                lastHeight   uint32
3✔
1261
                channelChunk []graphdb.ChannelUpdateInfo
3✔
1262
        )
3✔
1263

3✔
1264
        // chunkSize is the maximum number of SCIDs that we can safely put in a
3✔
1265
        // single message. If we also need to include timestamps though, then
3✔
1266
        // this number is halved since encoding two timestamps takes the same
3✔
1267
        // number of bytes as encoding an SCID.
3✔
1268
        chunkSize := g.cfg.chunkSize
3✔
1269
        if withTimestamps {
6✔
1270
                chunkSize /= 2
3✔
1271
        }
3✔
1272

1273
        for _, channelRange := range channelRanges {
6✔
1274
                channels := channelRange.Channels
3✔
1275
                numChannels := int32(len(channels))
3✔
1276
                numLeftToAdd := chunkSize - int32(len(channelChunk))
3✔
1277

3✔
1278
                // Include the current block in the ongoing chunk if it can fit
3✔
1279
                // and move on to the next block.
3✔
1280
                if numChannels <= numLeftToAdd {
6✔
1281
                        channelChunk = append(channelChunk, channels...)
3✔
1282
                        continue
3✔
1283
                }
1284

1285
                // Otherwise, we need to send our existing channel chunk as is
1286
                // as its own reply and start a new one for the current block.
1287
                // We'll mark the end of our current chunk as the height before
1288
                // the current block to ensure the whole query range is replied
1289
                // to.
UNCOV
1290
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
×
UNCOV
1291
                        g.cfg.peerPub[:], len(channelChunk))
×
UNCOV
1292

×
UNCOV
1293
                lastHeight = channelRange.Height - 1
×
UNCOV
1294
                err := sendReplyForChunk(
×
UNCOV
1295
                        channelChunk, firstHeight, lastHeight, false,
×
UNCOV
1296
                )
×
UNCOV
1297
                if err != nil {
×
1298
                        return err
×
1299
                }
×
1300

1301
                // With the reply constructed, we'll start tallying channels for
1302
                // our next one keeping in mind our chunk size. This may result
1303
                // in channels for this block being left out from the reply, but
1304
                // this isn't an issue since we'll randomly shuffle them and we
1305
                // assume a historical gossip sync is performed at a later time.
UNCOV
1306
                firstHeight = channelRange.Height
×
UNCOV
1307
                finalChunkSize := numChannels
×
UNCOV
1308
                exceedsChunkSize := numChannels > chunkSize
×
UNCOV
1309
                if exceedsChunkSize {
×
1310
                        rand.Shuffle(len(channels), func(i, j int) {
×
1311
                                channels[i], channels[j] = channels[j], channels[i]
×
1312
                        })
×
1313
                        finalChunkSize = chunkSize
×
1314
                }
UNCOV
1315
                channelChunk = channels[:finalChunkSize]
×
UNCOV
1316

×
UNCOV
1317
                // Sort the chunk once again if we had to shuffle it.
×
UNCOV
1318
                if exceedsChunkSize {
×
1319
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1320
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1321
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1322

×
1323
                                return id1 < id2
×
1324
                        })
×
1325
                }
1326
        }
1327

1328
        // Send the remaining chunk as the final reply.
1329
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
3✔
1330
                g.cfg.peerPub[:], len(channelChunk))
3✔
1331

3✔
1332
        return sendReplyForChunk(
3✔
1333
                channelChunk, firstHeight, query.LastBlockHeight(), true,
3✔
1334
        )
3✔
1335
}
1336

1337
// replyShortChanIDs will be dispatched in response to a query by the remote
1338
// node for information concerning a set of short channel ID's. Our response
1339
// will be sent in a streaming chunked manner to ensure that we remain below
1340
// the current transport level message size.
1341
func (g *GossipSyncer) replyShortChanIDs(ctx context.Context,
1342
        query *lnwire.QueryShortChanIDs) error {
3✔
1343

3✔
1344
        // Before responding, we'll check to ensure that the remote peer is
3✔
1345
        // querying for the same chain that we're on. If not, we'll send back a
3✔
1346
        // response with a complete value of zero to indicate we're on a
3✔
1347
        // different chain.
3✔
1348
        if g.cfg.chainHash != query.ChainHash {
3✔
UNCOV
1349
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
×
UNCOV
1350
                        "chain=%v, we're on chain=%v", query.ChainHash,
×
UNCOV
1351
                        g.cfg.chainHash)
×
UNCOV
1352

×
NEW
1353
                return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
×
UNCOV
1354
                        ChainHash: query.ChainHash,
×
UNCOV
1355
                        Complete:  0,
×
UNCOV
1356
                })
×
UNCOV
1357
        }
×
1358

1359
        if len(query.ShortChanIDs) == 0 {
3✔
1360
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1361
                        g.cfg.peerPub[:])
×
1362
                return nil
×
1363
        }
×
1364

1365
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
3✔
1366
                g.cfg.peerPub[:], len(query.ShortChanIDs))
3✔
1367

3✔
1368
        // Now that we know we're on the same chain, we'll query the channel
3✔
1369
        // time series for the set of messages that we know of which satisfies
3✔
1370
        // the requirement of being a chan ann, chan update, or a node ann
3✔
1371
        // related to the set of queried channels.
3✔
1372
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
3✔
1373
                query.ChainHash, query.ShortChanIDs,
3✔
1374
        )
3✔
1375
        if err != nil {
3✔
1376
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1377
                        query.ShortChanIDs[0].ToUint64(), err)
×
1378
        }
×
1379

1380
        // Reply with any messages related to those channel ID's, we'll write
1381
        // each one individually and synchronously to throttle the sends and
1382
        // perform buffering of responses in the syncer as opposed to the peer.
1383
        for _, msg := range replyMsgs {
6✔
1384
                err := g.sendToPeerSync(ctx, msg)
3✔
1385
                if err != nil {
3✔
1386
                        return err
×
1387
                }
×
1388
        }
1389

1390
        // Regardless of whether we had any messages to reply with, send over
1391
        // the sentinel message to signal that the stream has terminated.
1392
        return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
3✔
1393
                ChainHash: query.ChainHash,
3✔
1394
                Complete:  1,
3✔
1395
        })
3✔
1396
}
1397

1398
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1399
// state machine. Once applied, we'll ensure that we don't forward any messages
1400
// to the peer that aren't within the time range of the filter.
1401
func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
1402
        filter *lnwire.GossipTimestampRange) error {
3✔
1403

3✔
1404
        g.Lock()
3✔
1405

3✔
1406
        g.remoteUpdateHorizon = filter
3✔
1407

3✔
1408
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
3✔
1409
        endTime := startTime.Add(
3✔
1410
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
3✔
1411
        )
3✔
1412

3✔
1413
        g.Unlock()
3✔
1414

3✔
1415
        // If requested, don't reply with historical gossip data when the remote
3✔
1416
        // peer sets their gossip timestamp range.
3✔
1417
        if g.cfg.ignoreHistoricalFilters {
3✔
UNCOV
1418
                return nil
×
UNCOV
1419
        }
×
1420

1421
        // Check if a goroutine is already sending the backlog. If so, return
1422
        // early without attempting to acquire the semaphore.
1423
        if g.isSendingBacklog.Load() {
3✔
UNCOV
1424
                log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+
×
UNCOV
1425
                        "backlog send already in progress", g.cfg.peerPub[:])
×
UNCOV
1426
                return nil
×
UNCOV
1427
        }
×
1428

1429
        select {
3✔
1430
        case <-g.syncerSema:
3✔
1431
        case <-g.cg.Done():
×
1432
                return ErrGossipSyncerExiting
×
1433
        case <-ctx.Done():
×
1434
                return ctx.Err()
×
1435
        }
1436

1437
        // We don't put this in a defer because if the goroutine is launched,
1438
        // it needs to be called when the goroutine is stopped.
1439
        returnSema := func() {
6✔
1440
                g.syncerSema <- struct{}{}
3✔
1441
        }
3✔
1442

1443
        // Now that the remote peer has applied their filter, we'll query the
1444
        // database for all the messages that are beyond this filter.
1445
        newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
3✔
1446
                g.cfg.chainHash, startTime, endTime,
3✔
1447
        )
3✔
1448
        if err != nil {
3✔
1449
                returnSema()
×
1450
                return err
×
1451
        }
×
1452

1453
        log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
3✔
1454
                "start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
3✔
1455
                startTime, endTime, len(newUpdatestoSend))
3✔
1456

3✔
1457
        // If we don't have any to send, then we can return early.
3✔
1458
        if len(newUpdatestoSend) == 0 {
6✔
1459
                returnSema()
3✔
1460
                return nil
3✔
1461
        }
3✔
1462

1463
        // Set the atomic flag to indicate we're starting to send the backlog.
1464
        // If the swap fails, it means another goroutine is already active, so
1465
        // we return early.
1466
        if !g.isSendingBacklog.CompareAndSwap(false, true) {
3✔
1467
                returnSema()
×
1468
                log.Debugf("GossipSyncer(%x): another goroutine already "+
×
1469
                        "sending backlog, skipping", g.cfg.peerPub[:])
×
1470

×
1471
                return nil
×
1472
        }
×
1473

1474
        // We'll conclude by launching a goroutine to send out any updates.
1475
        g.cg.WgAdd(1)
3✔
1476
        go func() {
6✔
1477
                defer g.cg.WgDone()
3✔
1478
                defer returnSema()
3✔
1479
                defer g.isSendingBacklog.Store(false)
3✔
1480

3✔
1481
                for _, msg := range newUpdatestoSend {
6✔
1482
                        err := g.sendToPeerSync(ctx, msg)
3✔
1483
                        switch {
3✔
1484
                        case err == ErrGossipSyncerExiting:
×
1485
                                return
×
1486

1487
                        case err == lnpeer.ErrPeerExiting:
×
1488
                                return
×
1489

1490
                        case err != nil:
×
1491
                                log.Errorf("Unable to send message for "+
×
1492
                                        "peer catch up: %v", err)
×
1493
                        }
1494
                }
1495
        }()
1496

1497
        return nil
3✔
1498
}
1499

1500
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1501
// iff the message is within the bounds of their set gossip filter. If the peer
1502
// doesn't have a gossip filter set, then no messages will be forwarded.
1503
func (g *GossipSyncer) FilterGossipMsgs(ctx context.Context,
1504
        msgs ...msgWithSenders) {
3✔
1505

3✔
1506
        // If the peer doesn't have an update horizon set, then we won't send
3✔
1507
        // it any new update messages.
3✔
1508
        if g.remoteUpdateHorizon == nil {
6✔
1509
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
3✔
1510
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
3✔
1511
                return
3✔
1512
        }
3✔
1513

1514
        // If we've been signaled to exit, or are exiting, then we'll stop
1515
        // short.
1516
        select {
3✔
1517
        case <-g.cg.Done():
×
1518
                return
×
1519
        case <-ctx.Done():
×
1520
                return
×
1521
        default:
3✔
1522
        }
1523

1524
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1525
        // gossiper on peer termination to signal peer disconnect?
1526

1527
        var err error
3✔
1528

3✔
1529
        // Before we filter out the messages, we'll construct an index over the
3✔
1530
        // set of channel announcements and channel updates. This will allow us
3✔
1531
        // to quickly check if we should forward a chan ann, based on the known
3✔
1532
        // channel updates for a channel.
3✔
1533
        chanUpdateIndex := make(
3✔
1534
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
3✔
1535
        )
3✔
1536
        for _, msg := range msgs {
6✔
1537
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
3✔
1538
                if !ok {
6✔
1539
                        continue
3✔
1540
                }
1541

1542
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
3✔
1543
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
3✔
1544
                )
3✔
1545
        }
1546

1547
        // We'll construct a helper function that we'll us below to determine
1548
        // if a given messages passes the gossip msg filter.
1549
        g.Lock()
3✔
1550
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
3✔
1551
        endTime := startTime.Add(
3✔
1552
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
3✔
1553
        )
3✔
1554
        g.Unlock()
3✔
1555

3✔
1556
        passesFilter := func(timeStamp uint32) bool {
6✔
1557
                t := time.Unix(int64(timeStamp), 0)
3✔
1558
                return t.Equal(startTime) ||
3✔
1559
                        (t.After(startTime) && t.Before(endTime))
3✔
1560
        }
3✔
1561

1562
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
3✔
1563
        for _, msg := range msgs {
6✔
1564
                // If the target peer is the peer that sent us this message,
3✔
1565
                // then we'll exit early as we don't need to filter this
3✔
1566
                // message.
3✔
1567
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
6✔
1568
                        continue
3✔
1569
                }
1570

1571
                switch msg := msg.msg.(type) {
3✔
1572

1573
                // For each channel announcement message, we'll only send this
1574
                // message if the channel updates for the channel are between
1575
                // our time range.
1576
                case *lnwire.ChannelAnnouncement1:
3✔
1577
                        // First, we'll check if the channel updates are in
3✔
1578
                        // this message batch.
3✔
1579
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
3✔
1580
                        if !ok {
6✔
1581
                                // If not, we'll attempt to query the database
3✔
1582
                                // to see if we know of the updates.
3✔
1583
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
3✔
1584
                                        g.cfg.chainHash, msg.ShortChannelID,
3✔
1585
                                )
3✔
1586
                                if err != nil {
3✔
1587
                                        log.Warnf("no channel updates found for "+
×
1588
                                                "short_chan_id=%v",
×
1589
                                                msg.ShortChannelID)
×
1590
                                        continue
×
1591
                                }
1592
                        }
1593

1594
                        for _, chanUpdate := range chanUpdates {
6✔
1595
                                if passesFilter(chanUpdate.Timestamp) {
6✔
1596
                                        msgsToSend = append(msgsToSend, msg)
3✔
1597
                                        break
3✔
1598
                                }
1599
                        }
1600

1601
                        if len(chanUpdates) == 0 {
6✔
1602
                                msgsToSend = append(msgsToSend, msg)
3✔
1603
                        }
3✔
1604

1605
                // For each channel update, we'll only send if it the timestamp
1606
                // is between our time range.
1607
                case *lnwire.ChannelUpdate1:
3✔
1608
                        if passesFilter(msg.Timestamp) {
6✔
1609
                                msgsToSend = append(msgsToSend, msg)
3✔
1610
                        }
3✔
1611

1612
                // Similarly, we only send node announcements if the update
1613
                // timestamp ifs between our set gossip filter time range.
1614
                case *lnwire.NodeAnnouncement:
3✔
1615
                        if passesFilter(msg.Timestamp) {
6✔
1616
                                msgsToSend = append(msgsToSend, msg)
3✔
1617
                        }
3✔
1618
                }
1619
        }
1620

1621
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
3✔
1622
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
3✔
1623

3✔
1624
        if len(msgsToSend) == 0 {
6✔
1625
                return
3✔
1626
        }
3✔
1627

1628
        if err = g.sendToPeer(ctx, msgsToSend...); err != nil {
3✔
1629
                log.Errorf("unable to send gossip msgs: %v", err)
×
1630
        }
×
1631

1632
}
1633

1634
// ProcessQueryMsg is used by outside callers to pass new channel time series
1635
// queries to the internal processing goroutine.
1636
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
3✔
1637
        var msgChan chan lnwire.Message
3✔
1638
        switch msg.(type) {
3✔
1639
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
3✔
1640
                msgChan = g.queryMsgs
3✔
1641

1642
        // Reply messages should only be expected in states where we're waiting
1643
        // for a reply.
1644
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
3✔
1645
                g.Lock()
3✔
1646
                syncState := g.syncState()
3✔
1647
                g.Unlock()
3✔
1648

3✔
1649
                if syncState != waitingQueryRangeReply &&
3✔
1650
                        syncState != waitingQueryChanReply {
3✔
UNCOV
1651

×
UNCOV
1652
                        return fmt.Errorf("unexpected msg %T received in "+
×
UNCOV
1653
                                "state %v", msg, syncState)
×
UNCOV
1654
                }
×
1655
                msgChan = g.gossipMsgs
3✔
1656

1657
        default:
×
1658
                msgChan = g.gossipMsgs
×
1659
        }
1660

1661
        select {
3✔
1662
        case msgChan <- msg:
3✔
1663
        case <-peerQuit:
×
1664
        case <-g.cg.Done():
×
1665
        }
1666

1667
        return nil
3✔
1668
}
1669

1670
// setSyncState sets the gossip syncer's state to the given state.
1671
func (g *GossipSyncer) setSyncState(state syncerState) {
3✔
1672
        atomic.StoreUint32(&g.state, uint32(state))
3✔
1673
}
3✔
1674

1675
// syncState returns the current syncerState of the target GossipSyncer.
1676
func (g *GossipSyncer) syncState() syncerState {
3✔
1677
        return syncerState(atomic.LoadUint32(&g.state))
3✔
1678
}
3✔
1679

1680
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1681
// a signal for when the GossipSyncer has reached its chansSynced state.
1682
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
3✔
1683
        g.Lock()
3✔
1684
        defer g.Unlock()
3✔
1685

3✔
1686
        syncedSignal := make(chan struct{})
3✔
1687

3✔
1688
        syncState := syncerState(atomic.LoadUint32(&g.state))
3✔
1689
        if syncState == chansSynced {
6✔
1690
                close(syncedSignal)
3✔
1691
                return syncedSignal
3✔
1692
        }
3✔
1693

1694
        g.syncedSignal = syncedSignal
3✔
1695
        return g.syncedSignal
3✔
1696
}
1697

1698
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1699
// sync type to a new one.
1700
//
1701
// NOTE: This can only be done once the gossip syncer has reached its final
1702
// chansSynced state.
1703
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
3✔
1704
        errChan := make(chan error, 1)
3✔
1705
        select {
3✔
1706
        case g.syncTransitionReqs <- &syncTransitionReq{
1707
                newSyncType: newSyncType,
1708
                errChan:     errChan,
1709
        }:
3✔
1710
        case <-time.After(syncTransitionTimeout):
×
1711
                return ErrSyncTransitionTimeout
×
1712
        case <-g.cg.Done():
×
1713
                return ErrGossipSyncerExiting
×
1714
        }
1715

1716
        select {
3✔
1717
        case err := <-errChan:
3✔
1718
                return err
3✔
1719
        case <-g.cg.Done():
×
1720
                return ErrGossipSyncerExiting
×
1721
        }
1722
}
1723

1724
// handleSyncTransition handles a new sync type transition request.
1725
//
1726
// NOTE: The gossip syncer might have another sync state as a result of this
1727
// transition.
1728
func (g *GossipSyncer) handleSyncTransition(ctx context.Context,
1729
        req *syncTransitionReq) error {
3✔
1730

3✔
1731
        // Return early from any NOP sync transitions.
3✔
1732
        syncType := g.SyncType()
3✔
1733
        if syncType == req.newSyncType {
3✔
1734
                return nil
×
1735
        }
×
1736

1737
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
3✔
1738
                g.cfg.peerPub, syncType, req.newSyncType)
3✔
1739

3✔
1740
        var (
3✔
1741
                firstTimestamp time.Time
3✔
1742
                timestampRange uint32
3✔
1743
        )
3✔
1744

3✔
1745
        switch req.newSyncType {
3✔
1746
        // If an active sync has been requested, then we should resume receiving
1747
        // new graph updates from the remote peer.
1748
        case ActiveSync, PinnedSync:
3✔
1749
                firstTimestamp = time.Now()
3✔
1750
                timestampRange = math.MaxUint32
3✔
1751

1752
        // If a PassiveSync transition has been requested, then we should no
1753
        // longer receive any new updates from the remote peer. We can do this
1754
        // by setting our update horizon to a range in the past ensuring no
1755
        // graph updates match the timestamp range.
UNCOV
1756
        case PassiveSync:
×
UNCOV
1757
                firstTimestamp = zeroTimestamp
×
UNCOV
1758
                timestampRange = 0
×
1759

1760
        default:
×
1761
                return fmt.Errorf("unhandled sync transition %v",
×
1762
                        req.newSyncType)
×
1763
        }
1764

1765
        err := g.sendGossipTimestampRange(ctx, firstTimestamp, timestampRange)
3✔
1766
        if err != nil {
3✔
1767
                return fmt.Errorf("unable to send local update horizon: %w",
×
1768
                        err)
×
1769
        }
×
1770

1771
        g.setSyncType(req.newSyncType)
3✔
1772

3✔
1773
        return nil
3✔
1774
}
1775

1776
// setSyncType sets the gossip syncer's sync type to the given type.
1777
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
3✔
1778
        atomic.StoreUint32(&g.syncType, uint32(syncType))
3✔
1779
}
3✔
1780

1781
// SyncType returns the current SyncerType of the target GossipSyncer.
1782
func (g *GossipSyncer) SyncType() SyncerType {
3✔
1783
        return SyncerType(atomic.LoadUint32(&g.syncType))
3✔
1784
}
3✔
1785

1786
// historicalSync sends a request to the gossip syncer to perofmr a historical
1787
// sync.
1788
//
1789
// NOTE: This can only be done once the gossip syncer has reached its final
1790
// chansSynced state.
1791
func (g *GossipSyncer) historicalSync() error {
3✔
1792
        done := make(chan struct{})
3✔
1793

3✔
1794
        select {
3✔
1795
        case g.historicalSyncReqs <- &historicalSyncReq{
1796
                doneChan: done,
1797
        }:
3✔
1798
        case <-time.After(syncTransitionTimeout):
×
1799
                return ErrSyncTransitionTimeout
×
1800
        case <-g.cg.Done():
×
1801
                return ErrGossiperShuttingDown
×
1802
        }
1803

1804
        select {
3✔
1805
        case <-done:
3✔
1806
                return nil
3✔
1807
        case <-g.cg.Done():
×
1808
                return ErrGossiperShuttingDown
×
1809
        }
1810
}
1811

1812
// handleHistoricalSync handles a request to the gossip syncer to perform a
1813
// historical sync.
1814
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
3✔
1815
        // We'll go back to our initial syncingChans state in order to request
3✔
1816
        // the remote peer to give us all of the channel IDs they know of
3✔
1817
        // starting from the genesis block.
3✔
1818
        g.genHistoricalChanRangeQuery = true
3✔
1819
        g.setSyncState(syncingChans)
3✔
1820
        close(req.doneChan)
3✔
1821
}
3✔
1822

1823
// sendToPeer sends a variadic number of messages to the remote peer. This
1824
// method should not block while waiting for sends to be written to the wire.
1825
func (g *GossipSyncer) sendToPeer(ctx context.Context,
1826
        msgs ...lnwire.Message) error {
3✔
1827

3✔
1828
        for _, msg := range msgs {
6✔
1829
                err := maybeRateLimitMsg(
3✔
1830
                        ctx, g.rateLimiter, g.cfg.peerPub, msg, g.cg.Done(),
3✔
1831
                )
3✔
1832
                if err != nil {
3✔
NEW
1833
                        return err
×
NEW
1834
                }
×
1835

1836
                err = g.cfg.sendMsg(ctx, false, msg)
3✔
1837
                if err != nil {
3✔
NEW
1838
                        return err
×
NEW
1839
                }
×
1840
        }
1841

1842
        return nil
3✔
1843
}
1844

1845
// sendToPeerSync sends a variadic number of messages to the remote peer,
1846
// blocking until all messages have been sent successfully or a write error is
1847
// encountered.
1848
func (g *GossipSyncer) sendToPeerSync(ctx context.Context,
1849
        msgs ...lnwire.Message) error {
3✔
1850

3✔
1851
        for _, msg := range msgs {
6✔
1852
                err := maybeRateLimitMsg(
3✔
1853
                        ctx, g.rateLimiter, g.cfg.peerPub, msg, g.cg.Done(),
3✔
1854
                )
3✔
1855
                if err != nil {
3✔
NEW
1856
                        return err
×
NEW
1857
                }
×
1858

1859
                err = g.cfg.sendMsg(ctx, true, msg)
3✔
1860
                if err != nil {
3✔
NEW
1861
                        return err
×
NEW
1862
                }
×
1863
        }
1864

1865
        return nil
3✔
1866
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc