• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18016273007

25 Sep 2025 05:55PM UTC coverage: 54.653% (-12.0%) from 66.622%
18016273007

Pull #10248

github

web-flow
Merge 128443298 into b09b20c69
Pull Request #10248: Enforce TLV when creating a Route

25 of 30 new or added lines in 4 files covered. (83.33%)

23906 existing lines in 281 files now uncovered.

109536 of 200421 relevant lines covered (54.65%)

21816.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.34
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "math"
8
        "math/rand"
9
        "sort"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/graph"
17
        graphdb "github.com/lightningnetwork/lnd/graph/db"
18
        "github.com/lightningnetwork/lnd/lnpeer"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "golang.org/x/time/rate"
21
)
22

23
// SyncerType encapsulates the different types of syncing mechanisms for a
24
// gossip syncer.
25
type SyncerType uint8
26

27
const (
28
        // ActiveSync denotes that a gossip syncer:
29
        //
30
        // 1. Should not attempt to synchronize with the remote peer for
31
        //    missing channels.
32
        // 2. Should respond to queries from the remote peer.
33
        // 3. Should receive new updates from the remote peer.
34
        //
35
        // They are started in a chansSynced state in order to accomplish their
36
        // responsibilities above.
37
        ActiveSync SyncerType = iota
38

39
        // PassiveSync denotes that a gossip syncer:
40
        //
41
        // 1. Should not attempt to synchronize with the remote peer for
42
        //    missing channels.
43
        // 2. Should respond to queries from the remote peer.
44
        // 3. Should not receive new updates from the remote peer.
45
        //
46
        // They are started in a chansSynced state in order to accomplish their
47
        // responsibilities above.
48
        PassiveSync
49

50
        // PinnedSync denotes an ActiveSync that doesn't count towards the
51
        // default active syncer limits and is always active throughout the
52
        // duration of the peer's connection. Each pinned syncer will begin by
53
        // performing a historical sync to ensure we are well synchronized with
54
        // their routing table.
55
        PinnedSync
56
)
57

58
const (
59
        // defaultTimestampQueueSize is the size of the timestamp range queue
60
        // used.
61
        defaultTimestampQueueSize = 1
62
)
63

64
// String returns a human readable string describing the target SyncerType.
UNCOV
65
func (t SyncerType) String() string {
×
UNCOV
66
        switch t {
×
UNCOV
67
        case ActiveSync:
×
UNCOV
68
                return "ActiveSync"
×
UNCOV
69
        case PassiveSync:
×
UNCOV
70
                return "PassiveSync"
×
UNCOV
71
        case PinnedSync:
×
UNCOV
72
                return "PinnedSync"
×
73
        default:
×
74
                return fmt.Sprintf("unknown sync type %d", t)
×
75
        }
76
}
77

78
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
79
// allowing new gossip messages to be received from the peer.
80
func (t SyncerType) IsActiveSync() bool {
44✔
81
        switch t {
44✔
82
        case ActiveSync, PinnedSync:
14✔
83
                return true
14✔
84
        default:
30✔
85
                return false
30✔
86
        }
87
}
88

89
// syncerState is an enum that represents the current state of the GossipSyncer.
90
// As the syncer is a state machine, we'll gate our actions based off of the
91
// current state and the next incoming message.
92
type syncerState uint32
93

94
const (
95
        // syncingChans is the default state of the GossipSyncer. We start in
96
        // this state when a new peer first connects and we don't yet know if
97
        // we're fully synchronized.
98
        syncingChans syncerState = iota
99

100
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
101
        // We enter this state after we send out our first QueryChannelRange
102
        // reply. We'll stay in this state until the remote party sends us a
103
        // ReplyShortChanIDsEnd message that indicates they've responded to our
104
        // query entirely. After this state, we'll transition to
105
        // waitingQueryChanReply after we send out requests for all the new
106
        // chan ID's to us.
107
        waitingQueryRangeReply
108

109
        // queryNewChannels is the third main phase of the GossipSyncer.  In
110
        // this phase we'll send out all of our QueryShortChanIDs messages in
111
        // response to the new channels that we don't yet know about.
112
        queryNewChannels
113

114
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
115
        // We enter this phase once we've sent off a query chink to the remote
116
        // peer.  We'll stay in this phase until we receive a
117
        // ReplyShortChanIDsEnd message which indicates that the remote party
118
        // has responded to all of our requests.
119
        waitingQueryChanReply
120

121
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
122
        // this phase, we'll send out our update horizon, which filters out the
123
        // set of channel updates that we're interested in. In this state,
124
        // we'll be able to accept any outgoing messages from the
125
        // AuthenticatedGossiper, and decide if we should forward them to our
126
        // target peer based on its update horizon.
127
        chansSynced
128

129
        // syncerIdle is a state in which the gossip syncer can handle external
130
        // requests to transition or perform historical syncs. It is used as the
131
        // initial state for pinned syncers, as well as a fallthrough case for
132
        // chansSynced allowing fully synced peers to facilitate requests.
133
        syncerIdle
134
)
135

136
// String returns a human readable string describing the target syncerState.
137
func (s syncerState) String() string {
1✔
138
        switch s {
1✔
UNCOV
139
        case syncingChans:
×
UNCOV
140
                return "syncingChans"
×
141

UNCOV
142
        case waitingQueryRangeReply:
×
UNCOV
143
                return "waitingQueryRangeReply"
×
144

UNCOV
145
        case queryNewChannels:
×
UNCOV
146
                return "queryNewChannels"
×
147

UNCOV
148
        case waitingQueryChanReply:
×
UNCOV
149
                return "waitingQueryChanReply"
×
150

151
        case chansSynced:
1✔
152
                return "chansSynced"
1✔
153

UNCOV
154
        case syncerIdle:
×
UNCOV
155
                return "syncerIdle"
×
156

157
        default:
×
158
                return "UNKNOWN STATE"
×
159
        }
160
}
161

162
const (
163
        // maxQueryChanRangeReplies specifies the default limit of replies to
164
        // process for a single QueryChannelRange request.
165
        maxQueryChanRangeReplies = 500
166

167
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
168
        // the maximum number of replies allowed for zlib encoded replies.
169
        maxQueryChanRangeRepliesZlibFactor = 4
170

171
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
172
        // asking the remote peer for their any channels they know of beyond
173
        // our highest known channel ID.
174
        chanRangeQueryBuffer = 144
175

176
        // syncTransitionTimeout is the default timeout in which we'll wait up
177
        // to when attempting to perform a sync transition.
178
        syncTransitionTimeout = 5 * time.Second
179

180
        // requestBatchSize is the maximum number of channels we will query the
181
        // remote peer for in a QueryShortChanIDs message.
182
        requestBatchSize = 500
183

184
        // syncerBufferSize is the size of the syncer's buffers.
185
        syncerBufferSize = 50
186
)
187

188
var (
189
        // encodingTypeToChunkSize maps an encoding type, to the max number of
190
        // short chan ID's using the encoding type that we can fit into a
191
        // single message safely.
192
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
193
                lnwire.EncodingSortedPlain: 8000,
194
        }
195

196
        // ErrGossipSyncerExiting signals that the syncer has been killed.
197
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
198

199
        // ErrSyncTransitionTimeout is an error returned when we've timed out
200
        // attempting to perform a sync transition.
201
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
202
                "transition sync type")
203

204
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
205
        // peers that we do not want to receive any new graph updates.
206
        zeroTimestamp time.Time
207
)
208

209
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
210
type syncTransitionReq struct {
211
        newSyncType SyncerType
212
        errChan     chan error
213
}
214

215
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
216
// historical sync.
217
type historicalSyncReq struct {
218
        // doneChan is a channel that serves as a signal and is closed to ensure
219
        // the historical sync is attempted by the time we return to the caller.
220
        doneChan chan struct{}
221
}
222

223
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
224
// needs to carry out its duties.
225
type gossipSyncerCfg struct {
226
        // chainHash is the chain that this syncer is responsible for.
227
        chainHash chainhash.Hash
228

229
        // peerPub is the public key of the peer we're syncing with, serialized
230
        // in compressed format.
231
        peerPub [33]byte
232

233
        // channelSeries is the primary interface that we'll use to generate
234
        // our queries and respond to the queries of the remote peer.
235
        channelSeries ChannelGraphTimeSeries
236

237
        // encodingType is the current encoding type we're aware of. Requests
238
        // with different encoding types will be rejected.
239
        encodingType lnwire.QueryEncoding
240

241
        // chunkSize is the max number of short chan IDs using the syncer's
242
        // encoding type that we can fit into a single message safely.
243
        chunkSize int32
244

245
        // batchSize is the max number of channels the syncer will query from
246
        // the remote node in a single QueryShortChanIDs request.
247
        batchSize int32
248

249
        // sendMsg sends a variadic number of messages to the remote peer.
250
        // The boolean indicates whether this method should be blocked or not
251
        // while waiting for sends to be written to the wire.
252
        sendMsg func(context.Context, bool, ...lnwire.Message) error
253

254
        // noSyncChannels will prevent the GossipSyncer from spawning a
255
        // channelGraphSyncer, meaning we will not try to reconcile unknown
256
        // channels with the remote peer.
257
        noSyncChannels bool
258

259
        // noReplyQueries will prevent the GossipSyncer from spawning a
260
        // replyHandler, meaning we will not reply to queries from our remote
261
        // peer.
262
        noReplyQueries bool
263

264
        // noTimestampQueryOption will prevent the GossipSyncer from querying
265
        // timestamps of announcement messages from the peer, and it will
266
        // prevent it from responding to timestamp queries.
267
        noTimestampQueryOption bool
268

269
        // ignoreHistoricalFilters will prevent syncers from replying with
270
        // historical data when the remote peer sets a gossip_timestamp_range.
271
        // This prevents ranges with old start times from causing us to dump the
272
        // graph on connect.
273
        ignoreHistoricalFilters bool
274

275
        // bestHeight returns the latest height known of the chain.
276
        bestHeight func() uint32
277

278
        // markGraphSynced updates the SyncManager's perception of whether we
279
        // have completed at least one historical sync.
280
        markGraphSynced func()
281

282
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
283
        // for a single QueryChannelRange request.
284
        maxQueryChanRangeReplies uint32
285

286
        // isStillZombieChannel takes the timestamps of the latest channel
287
        // updates for a channel and returns true if the channel should be
288
        // considered a zombie based on these timestamps.
289
        isStillZombieChannel func(time.Time, time.Time) bool
290

291
        // timestampQueueSize is the size of the timestamp range queue. If not
292
        // set, defaults to the global timestampQueueSize constant.
293
        timestampQueueSize int
294

295
        // msgBytesPerSecond is the allotted bandwidth rate, expressed in
296
        // bytes/second that this gossip syncer can consume. Once we exceed this
297
        // rate, message sending will block until we're below the rate.
298
        msgBytesPerSecond uint64
299
}
300

301
// GossipSyncer is a struct that handles synchronizing the channel graph state
302
// with a remote peer. The GossipSyncer implements a state machine that will
303
// progressively ensure we're synchronized with the channel state of the remote
304
// node. Once both nodes have been synchronized, we'll use an update filter to
305
// filter out which messages should be sent to a remote peer based on their
306
// update horizon. If the update horizon isn't specified, then we won't send
307
// them any channel updates at all.
308
type GossipSyncer struct {
309
        started sync.Once
310
        stopped sync.Once
311

312
        // state is the current state of the GossipSyncer.
313
        //
314
        // NOTE: This variable MUST be used atomically.
315
        state uint32
316

317
        // syncType denotes the SyncerType the gossip syncer is currently
318
        // exercising.
319
        //
320
        // NOTE: This variable MUST be used atomically.
321
        syncType uint32
322

323
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
324
        // use this to properly filter out any messages.
325
        remoteUpdateHorizon *lnwire.GossipTimestampRange
326

327
        // localUpdateHorizon is our local update horizon, we'll use this to
328
        // determine if we've already sent out our update.
329
        localUpdateHorizon *lnwire.GossipTimestampRange
330

331
        // syncTransitions is a channel through which new sync type transition
332
        // requests will be sent through. These requests should only be handled
333
        // when the gossip syncer is in a chansSynced state to ensure its state
334
        // machine behaves as expected.
335
        syncTransitionReqs chan *syncTransitionReq
336

337
        // historicalSyncReqs is a channel that serves as a signal for the
338
        // gossip syncer to perform a historical sync. These can only be done
339
        // once the gossip syncer is in a chansSynced state to ensure its state
340
        // machine behaves as expected.
341
        historicalSyncReqs chan *historicalSyncReq
342

343
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
344
        // that it should request the remote peer for all of its known channel
345
        // IDs starting from the genesis block of the chain. This can only
346
        // happen if the gossip syncer receives a request to attempt a
347
        // historical sync. It can be unset if the syncer ever transitions from
348
        // PassiveSync to ActiveSync.
349
        genHistoricalChanRangeQuery bool
350

351
        // gossipMsgs is a channel that all responses to our queries from the
352
        // target peer will be sent over, these will be read by the
353
        // channelGraphSyncer.
354
        gossipMsgs chan lnwire.Message
355

356
        // queryMsgs is a channel that all queries from the remote peer will be
357
        // received over, these will be read by the replyHandler.
358
        queryMsgs chan lnwire.Message
359

360
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
361
        // we've sent to a peer to ensure we've consumed all expected replies.
362
        // This field is primarily used within the waitingQueryChanReply state.
363
        curQueryRangeMsg *lnwire.QueryChannelRange
364

365
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
366
        // message we've received from a peer to ensure they've fully replied to
367
        // our query by ensuring they covered our requested block range. This
368
        // field is primarily used within the waitingQueryChanReply state.
369
        prevReplyChannelRange *lnwire.ReplyChannelRange
370

371
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
372
        // buffer all the chunked response to our query.
373
        bufferedChanRangeReplies []graphdb.ChannelUpdateInfo
374

375
        // numChanRangeRepliesRcvd is used to track the number of replies
376
        // received as part of a QueryChannelRange. This field is primarily used
377
        // within the waitingQueryChanReply state.
378
        numChanRangeRepliesRcvd uint32
379

380
        // newChansToQuery is used to pass the set of channels we should query
381
        // for from the waitingQueryChanReply state to the queryNewChannels
382
        // state.
383
        newChansToQuery []lnwire.ShortChannelID
384

385
        cfg gossipSyncerCfg
386

387
        // syncedSignal is a channel that, if set, will be closed when the
388
        // GossipSyncer reaches its terminal chansSynced state.
389
        syncedSignal chan struct{}
390

391
        // syncerSema is used to more finely control the syncer's ability to
392
        // respond to gossip timestamp range messages.
393
        syncerSema chan struct{}
394

395
        // timestampRangeQueue is a buffered channel for queuing timestamp range
396
        // messages that need to be processed asynchronously. This prevents the
397
        // gossiper from blocking when ApplyGossipFilter is called.
398
        timestampRangeQueue chan *lnwire.GossipTimestampRange
399

400
        // isSendingBacklog is an atomic flag that indicates whether a goroutine
401
        // is currently sending the backlog of messages. This ensures only one
402
        // goroutine is active at a time.
403
        isSendingBacklog atomic.Bool
404

405
        sync.Mutex
406

407
        // cg is a helper that encapsulates a wait group and quit channel and
408
        // allows contexts that either block or cancel on those depending on
409
        // the use case.
410
        cg *fn.ContextGuard
411

412
        // rateLimiter dictates the frequency with which we will reply to gossip
413
        // queries to this peer.
414
        rateLimiter *rate.Limiter
415
}
416

417
// newGossipSyncer returns a new instance of the GossipSyncer populated using
418
// the passed config.
419
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
155✔
420
        // Use the configured queue size if set, otherwise use the default.
155✔
421
        queueSize := cfg.timestampQueueSize
155✔
422
        if queueSize == 0 {
180✔
423
                queueSize = defaultTimestampQueueSize
25✔
424
        }
25✔
425

426
        bytesPerSecond := cfg.msgBytesPerSecond
155✔
427
        if bytesPerSecond == 0 {
310✔
428
                bytesPerSecond = DefaultPeerMsgBytesPerSecond
155✔
429
        }
155✔
430
        bytesBurst := 2 * bytesPerSecond
155✔
431

155✔
432
        // We'll use this rate limiter to limit this single peer.
155✔
433
        rateLimiter := rate.NewLimiter(
155✔
434
                rate.Limit(bytesPerSecond), int(bytesBurst),
155✔
435
        )
155✔
436

155✔
437
        return &GossipSyncer{
155✔
438
                cfg:                cfg,
155✔
439
                syncTransitionReqs: make(chan *syncTransitionReq),
155✔
440
                historicalSyncReqs: make(chan *historicalSyncReq),
155✔
441
                gossipMsgs:         make(chan lnwire.Message, syncerBufferSize),
155✔
442
                queryMsgs:          make(chan lnwire.Message, syncerBufferSize),
155✔
443
                timestampRangeQueue: make(
155✔
444
                        chan *lnwire.GossipTimestampRange, queueSize,
155✔
445
                ),
155✔
446
                syncerSema:  sema,
155✔
447
                cg:          fn.NewContextGuard(),
155✔
448
                rateLimiter: rateLimiter,
155✔
449
        }
155✔
450
}
451

452
// Start starts the GossipSyncer and any goroutines that it needs to carry out
453
// its duties.
454
func (g *GossipSyncer) Start() {
91✔
455
        g.started.Do(func() {
182✔
456
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
91✔
457

91✔
458
                ctx, _ := g.cg.Create(context.Background())
91✔
459

91✔
460
                // TODO(conner): only spawn channelGraphSyncer if remote
91✔
461
                // supports gossip queries, and only spawn replyHandler if we
91✔
462
                // advertise support
91✔
463
                if !g.cfg.noSyncChannels {
181✔
464
                        g.cg.WgAdd(1)
90✔
465
                        go g.channelGraphSyncer(ctx)
90✔
466
                }
90✔
467
                if !g.cfg.noReplyQueries {
181✔
468
                        g.cg.WgAdd(1)
90✔
469
                        go g.replyHandler(ctx)
90✔
470
                }
90✔
471

472
                // Start the timestamp range queue processor to handle gossip
473
                // filter applications asynchronously.
474
                if !g.cfg.noTimestampQueryOption {
173✔
475
                        g.cg.WgAdd(1)
82✔
476
                        go g.processTimestampRangeQueue(ctx)
82✔
477
                }
82✔
478
        })
479
}
480

481
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
482
// exited.
483
func (g *GossipSyncer) Stop() {
88✔
484
        g.stopped.Do(func() {
176✔
485
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
88✔
486
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
88✔
487

88✔
488
                g.cg.Quit()
88✔
489
        })
88✔
490
}
491

492
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
493
// in this state, we will send a QueryChannelRange msg to our peer and advance
494
// the syncer's state to waitingQueryRangeReply.
495
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
76✔
496
        // Prepare the query msg.
76✔
497
        queryRangeMsg, err := g.genChanRangeQuery(
76✔
498
                ctx, g.genHistoricalChanRangeQuery,
76✔
499
        )
76✔
500
        if err != nil {
76✔
501
                log.Errorf("Unable to gen chan range query: %v", err)
×
502
                return
×
503
        }
×
504

505
        // Acquire a lock so the following state transition is atomic.
506
        //
507
        // NOTE: We must lock the following steps as it's possible we get an
508
        // immediate response (ReplyChannelRange) after sending the query msg.
509
        // The response is handled in ProcessQueryMsg, which requires the
510
        // current state to be waitingQueryRangeReply.
511
        g.Lock()
76✔
512
        defer g.Unlock()
76✔
513

76✔
514
        // Send the msg to the remote peer, which is non-blocking as
76✔
515
        // `sendToPeer` only queues the msg in Brontide.
76✔
516
        err = g.sendToPeer(ctx, queryRangeMsg)
76✔
517
        if err != nil {
76✔
518
                log.Errorf("Unable to send chan range query: %v", err)
×
519
                return
×
520
        }
×
521

522
        // With the message sent successfully, we'll transition into the next
523
        // state where we wait for their reply.
524
        g.setSyncState(waitingQueryRangeReply)
76✔
525
}
526

527
// channelGraphSyncer is the main goroutine responsible for ensuring that we
528
// properly channel graph state with the remote peer, and also that we only
529
// send them messages which actually pass their defined update horizon.
530
func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
90✔
531
        defer g.cg.WgDone()
90✔
532

90✔
533
        for {
408✔
534
                state := g.syncState()
318✔
535
                syncType := g.SyncType()
318✔
536

318✔
537
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
318✔
538
                        g.cfg.peerPub[:], state, syncType)
318✔
539

318✔
540
                switch state {
318✔
541
                // When we're in this state, we're trying to synchronize our
542
                // view of the network with the remote peer. We'll kick off
543
                // this sync by asking them for the set of channels they
544
                // understand, as we'll as responding to any other queries by
545
                // them.
546
                case syncingChans:
76✔
547
                        g.handleSyncingChans(ctx)
76✔
548

549
                // In this state, we've sent out our initial channel range
550
                // query and are waiting for the final response from the remote
551
                // peer before we perform a diff to see with channels they know
552
                // of that we don't.
553
                case waitingQueryRangeReply:
178✔
554
                        // We'll wait to either process a new message from the
178✔
555
                        // remote party, or exit due to the gossiper exiting,
178✔
556
                        // or us being signalled to do so.
178✔
557
                        select {
178✔
558
                        case msg := <-g.gossipMsgs:
117✔
559
                                // The remote peer is sending a response to our
117✔
560
                                // initial query, we'll collate this response,
117✔
561
                                // and see if it's the final one in the series.
117✔
562
                                // If so, we can then transition to querying
117✔
563
                                // for the new channels.
117✔
564
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
117✔
565
                                if ok {
234✔
566
                                        err := g.processChanRangeReply(
117✔
567
                                                ctx, queryReply,
117✔
568
                                        )
117✔
569
                                        if err != nil {
117✔
570
                                                log.Errorf("Unable to "+
×
571
                                                        "process chan range "+
×
572
                                                        "query: %v", err)
×
573
                                                return
×
574
                                        }
×
575
                                        continue
117✔
576
                                }
577

578
                                log.Warnf("Unexpected message: %T in state=%v",
×
579
                                        msg, state)
×
580

UNCOV
581
                        case <-g.cg.Done():
×
UNCOV
582
                                return
×
583

584
                        case <-ctx.Done():
61✔
585
                                return
61✔
586
                        }
587

588
                // We'll enter this state once we've discovered which channels
589
                // the remote party knows of that we don't yet know of
590
                // ourselves.
591
                case queryNewChannels:
3✔
592
                        // First, we'll attempt to continue our channel
3✔
593
                        // synchronization by continuing to send off another
3✔
594
                        // query chunk.
3✔
595
                        done := g.synchronizeChanIDs(ctx)
3✔
596

3✔
597
                        // If this wasn't our last query, then we'll need to
3✔
598
                        // transition to our waiting state.
3✔
599
                        if !done {
5✔
600
                                continue
2✔
601
                        }
602

603
                        // If we're fully synchronized, then we can transition
604
                        // to our terminal state.
605
                        g.setSyncState(chansSynced)
1✔
606

1✔
607
                        // Ensure that the sync manager becomes aware that the
1✔
608
                        // historical sync completed so synced_to_graph is
1✔
609
                        // updated over rpc.
1✔
610
                        g.cfg.markGraphSynced()
1✔
611

612
                // In this state, we've just sent off a new query for channels
613
                // that we don't yet know of. We'll remain in this state until
614
                // the remote party signals they've responded to our query in
615
                // totality.
616
                case waitingQueryChanReply:
2✔
617
                        // Once we've sent off our query, we'll wait for either
2✔
618
                        // an ending reply, or just another query from the
2✔
619
                        // remote peer.
2✔
620
                        select {
2✔
621
                        case msg := <-g.gossipMsgs:
2✔
622
                                // If this is the final reply to one of our
2✔
623
                                // queries, then we'll loop back into our query
2✔
624
                                // state to send of the remaining query chunks.
2✔
625
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
2✔
626
                                if ok {
4✔
627
                                        g.setSyncState(queryNewChannels)
2✔
628
                                        continue
2✔
629
                                }
630

631
                                log.Warnf("Unexpected message: %T in state=%v",
×
632
                                        msg, state)
×
633

634
                        case <-g.cg.Done():
×
635
                                return
×
636

637
                        case <-ctx.Done():
×
638
                                return
×
639
                        }
640

641
                // This is our final terminal state where we'll only reply to
642
                // any further queries by the remote peer.
643
                case chansSynced:
56✔
644
                        g.Lock()
56✔
645
                        if g.syncedSignal != nil {
64✔
646
                                close(g.syncedSignal)
8✔
647
                                g.syncedSignal = nil
8✔
648
                        }
8✔
649
                        g.Unlock()
56✔
650

56✔
651
                        // If we haven't yet sent out our update horizon, and
56✔
652
                        // we want to receive real-time channel updates, we'll
56✔
653
                        // do so now.
56✔
654
                        if g.localUpdateHorizon == nil &&
56✔
655
                                syncType.IsActiveSync() {
70✔
656

14✔
657
                                err := g.sendGossipTimestampRange(
14✔
658
                                        ctx, time.Now(), math.MaxUint32,
14✔
659
                                )
14✔
660
                                if err != nil {
14✔
661
                                        log.Errorf("Unable to send update "+
×
662
                                                "horizon to %x: %v",
×
663
                                                g.cfg.peerPub, err)
×
664
                                }
×
665
                        }
666
                        // With our horizon set, we'll simply reply to any new
667
                        // messages or process any state transitions and exit if
668
                        // needed.
669
                        fallthrough
56✔
670

671
                // Pinned peers will begin in this state, since they will
672
                // immediately receive a request to perform a historical sync.
673
                // Otherwise, we fall through after ending in chansSynced to
674
                // facilitate new requests.
675
                case syncerIdle:
59✔
676
                        select {
59✔
677
                        case req := <-g.syncTransitionReqs:
14✔
678
                                req.errChan <- g.handleSyncTransition(ctx, req)
14✔
679

680
                        case req := <-g.historicalSyncReqs:
16✔
681
                                g.handleHistoricalSync(req)
16✔
682

683
                        case <-g.cg.Done():
1✔
684
                                return
1✔
685

686
                        case <-ctx.Done():
25✔
687
                                return
25✔
688
                        }
689
                }
690
        }
691
}
692

693
// replyHandler is an event loop whose sole purpose is to reply to the remote
694
// peers queries. Our replyHandler will respond to messages generated by their
695
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
696
// the other's replyHandler, allowing the replyHandler to operate independently
697
// from the state machine maintained on the same node.
698
//
699
// NOTE: This method MUST be run as a goroutine.
700
func (g *GossipSyncer) replyHandler(ctx context.Context) {
90✔
701
        defer g.cg.WgDone()
90✔
702

90✔
703
        for {
185✔
704
                select {
95✔
705
                case msg := <-g.queryMsgs:
5✔
706
                        err := g.replyPeerQueries(ctx, msg)
5✔
707
                        switch {
5✔
708
                        case err == ErrGossipSyncerExiting:
×
709
                                return
×
710

711
                        case err == lnpeer.ErrPeerExiting:
×
712
                                return
×
713

714
                        case err != nil:
×
715
                                log.Errorf("Unable to reply to peer "+
×
716
                                        "query: %v", err)
×
717
                        }
718

719
                case <-g.cg.Done():
1✔
720
                        return
1✔
721

722
                case <-ctx.Done():
86✔
723
                        return
86✔
724
                }
725
        }
726
}
727

728
// processTimestampRangeQueue handles timestamp range messages from the queue
729
// asynchronously. This prevents blocking the gossiper when rate limiting is
730
// active and multiple peers are trying to apply gossip filters.
731
func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) {
82✔
732
        defer g.cg.WgDone()
82✔
733

82✔
734
        for {
169✔
735
                select {
87✔
736
                case msg := <-g.timestampRangeQueue:
59✔
737
                        // Process the timestamp range message. If we hit an
59✔
738
                        // error, log it but continue processing to avoid
59✔
739
                        // blocking the queue.
59✔
740
                        err := g.ApplyGossipFilter(ctx, msg)
59✔
741
                        switch {
59✔
742
                        case errors.Is(err, ErrGossipSyncerExiting):
1✔
743
                                return
1✔
744

745
                        case errors.Is(err, lnpeer.ErrPeerExiting):
×
746
                                return
×
747

748
                        case err != nil:
×
749
                                log.Errorf("Unable to apply gossip filter: %v",
×
750
                                        err)
×
751
                        }
752

753
                case <-g.cg.Done():
×
754
                        return
×
755

756
                case <-ctx.Done():
25✔
757
                        return
25✔
758
                }
759
        }
760
}
761

762
// QueueTimestampRange attempts to queue a timestamp range message for
763
// asynchronous processing. If the queue is full, it returns false to indicate
764
// the message was dropped.
765
func (g *GossipSyncer) QueueTimestampRange(
766
        msg *lnwire.GossipTimestampRange) bool {
1,888✔
767

1,888✔
768
        // If timestamp queries are disabled, don't queue the message.
1,888✔
769
        if g.cfg.noTimestampQueryOption {
1,888✔
770
                return false
×
771
        }
×
772

773
        select {
1,888✔
774
        case g.timestampRangeQueue <- msg:
771✔
775
                return true
771✔
776

777
        // Queue is full, drop the message to prevent blocking.
778
        default:
1,117✔
779
                log.Warnf("Timestamp range queue full for peer %x, "+
1,117✔
780
                        "dropping message", g.cfg.peerPub[:])
1,117✔
781
                return false
1,117✔
782
        }
783
}
784

785
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
786
// syncer and sends it to the remote peer.
787
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
788
        firstTimestamp time.Time, timestampRange uint32) error {
28✔
789

28✔
790
        endTimestamp := firstTimestamp.Add(
28✔
791
                time.Duration(timestampRange) * time.Second,
28✔
792
        )
28✔
793

28✔
794
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
28✔
795
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
28✔
796

28✔
797
        localUpdateHorizon := &lnwire.GossipTimestampRange{
28✔
798
                ChainHash:      g.cfg.chainHash,
28✔
799
                FirstTimestamp: uint32(firstTimestamp.Unix()),
28✔
800
                TimestampRange: timestampRange,
28✔
801
        }
28✔
802

28✔
803
        if err := g.sendToPeer(ctx, localUpdateHorizon); err != nil {
28✔
804
                return err
×
805
        }
×
806

807
        if firstTimestamp.Equal(zeroTimestamp) && timestampRange == 0 {
30✔
808
                g.localUpdateHorizon = nil
2✔
809
        } else {
28✔
810
                g.localUpdateHorizon = localUpdateHorizon
26✔
811
        }
26✔
812

813
        return nil
28✔
814
}
815

816
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
817
// the remote peer for its known set of channel IDs within a particular block
818
// range. This method will be called continually until the entire range has
819
// been queried for with a response received. We'll chunk our requests as
820
// required to ensure they fit into a single message. We may re-renter this
821
// state in the case that chunking is required.
822
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
6✔
823
        // If we're in this state yet there are no more new channels to query
6✔
824
        // for, then we'll transition to our final synced state and return true
6✔
825
        // to signal that we're fully synchronized.
6✔
826
        if len(g.newChansToQuery) == 0 {
7✔
827
                log.Infof("GossipSyncer(%x): no more chans to query",
1✔
828
                        g.cfg.peerPub[:])
1✔
829

1✔
830
                return true
1✔
831
        }
1✔
832

833
        // Otherwise, we'll issue our next chunked query to receive replies
834
        // for.
835
        var queryChunk []lnwire.ShortChannelID
5✔
836

5✔
837
        // If the number of channels to query for is less than the chunk size,
5✔
838
        // then we can issue a single query.
5✔
839
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
7✔
840
                queryChunk = g.newChansToQuery
2✔
841
                g.newChansToQuery = nil
2✔
842

2✔
843
        } else {
5✔
844
                // Otherwise, we'll need to only query for the next chunk.
3✔
845
                // We'll slice into our query chunk, then slide down our main
3✔
846
                // pointer down by the chunk size.
3✔
847
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
3✔
848
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
3✔
849
        }
3✔
850

851
        log.Infof("GossipSyncer(%x): querying for %v new channels",
5✔
852
                g.cfg.peerPub[:], len(queryChunk))
5✔
853

5✔
854
        // Change the state before sending the query msg.
5✔
855
        g.setSyncState(waitingQueryChanReply)
5✔
856

5✔
857
        // With our chunk obtained, we'll send over our next query, then return
5✔
858
        // false indicating that we're net yet fully synced.
5✔
859
        err := g.sendToPeer(ctx, &lnwire.QueryShortChanIDs{
5✔
860
                ChainHash:    g.cfg.chainHash,
5✔
861
                EncodingType: lnwire.EncodingSortedPlain,
5✔
862
                ShortChanIDs: queryChunk,
5✔
863
        })
5✔
864
        if err != nil {
5✔
865
                log.Errorf("Unable to sync chan IDs: %v", err)
×
866
        }
×
867

868
        return false
5✔
869
}
870

871
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
872
// considered legacy. There was a point where lnd used to include the same query
873
// over multiple replies, rather than including the portion of the query the
874
// reply is handling. We'll use this as a way of detecting whether we are
875
// communicating with a legacy node so we can properly sync with them.
876
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
877
        reply *lnwire.ReplyChannelRange) bool {
250✔
878

250✔
879
        return (reply.ChainHash == query.ChainHash &&
250✔
880
                reply.FirstBlockHeight == query.FirstBlockHeight &&
250✔
881
                reply.NumBlocks == query.NumBlocks)
250✔
882
}
250✔
883

884
// processChanRangeReply is called each time the GossipSyncer receives a new
885
// reply to the initial range query to discover new channels that it didn't
886
// previously know of.
887
func (g *GossipSyncer) processChanRangeReply(_ context.Context,
888
        msg *lnwire.ReplyChannelRange) error {
125✔
889

125✔
890
        // isStale returns whether the timestamp is too far into the past.
125✔
891
        isStale := func(timestamp time.Time) bool {
155✔
892
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
30✔
893
        }
30✔
894

895
        // isSkewed returns whether the timestamp is too far into the future.
896
        isSkewed := func(timestamp time.Time) bool {
145✔
897
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
20✔
898
        }
20✔
899

900
        // If we're not communicating with a legacy node, we'll apply some
901
        // further constraints on their reply to ensure it satisfies our query.
902
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
235✔
903
                // The first block should be within our original request.
110✔
904
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
110✔
905
                        return fmt.Errorf("reply includes channels for height "+
×
906
                                "%v prior to query %v", msg.FirstBlockHeight,
×
907
                                g.curQueryRangeMsg.FirstBlockHeight)
×
908
                }
×
909

910
                // The last block should also be. We don't need to check the
911
                // intermediate ones because they should already be in sorted
912
                // order.
913
                replyLastHeight := msg.LastBlockHeight()
110✔
914
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
915
                if replyLastHeight > queryLastHeight {
110✔
916
                        return fmt.Errorf("reply includes channels for height "+
×
917
                                "%v after query %v", replyLastHeight,
×
918
                                queryLastHeight)
×
919
                }
×
920

921
                // If we've previously received a reply for this query, look at
922
                // its last block to ensure the current reply properly follows
923
                // it.
924
                if g.prevReplyChannelRange != nil {
215✔
925
                        prevReply := g.prevReplyChannelRange
105✔
926
                        prevReplyLastHeight := prevReply.LastBlockHeight()
105✔
927

105✔
928
                        // The current reply can either start from the previous
105✔
929
                        // reply's last block, if there are still more channels
105✔
930
                        // for the same block, or the block after.
105✔
931
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
105✔
932
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
105✔
933

×
934
                                return fmt.Errorf("first block of reply %v "+
×
935
                                        "does not continue from last block of "+
×
936
                                        "previous %v", msg.FirstBlockHeight,
×
937
                                        prevReplyLastHeight)
×
938
                        }
×
939
                }
940
        }
941

942
        g.prevReplyChannelRange = msg
125✔
943

125✔
944
        for i, scid := range msg.ShortChanIDs {
252✔
945
                info := graphdb.NewChannelUpdateInfo(
127✔
946
                        scid, time.Time{}, time.Time{},
127✔
947
                )
127✔
948

127✔
949
                if len(msg.Timestamps) != 0 {
139✔
950
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
12✔
951
                        info.Node1UpdateTimestamp = t1
12✔
952

12✔
953
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
12✔
954
                        info.Node2UpdateTimestamp = t2
12✔
955

12✔
956
                        // Sort out all channels with outdated or skewed
12✔
957
                        // timestamps. Both timestamps need to be out of
12✔
958
                        // boundaries for us to skip the channel and not query
12✔
959
                        // it later on.
12✔
960
                        switch {
12✔
961
                        case isStale(info.Node1UpdateTimestamp) &&
962
                                isStale(info.Node2UpdateTimestamp):
2✔
963

2✔
964
                                continue
2✔
965

966
                        case isSkewed(info.Node1UpdateTimestamp) &&
967
                                isSkewed(info.Node2UpdateTimestamp):
2✔
968

2✔
969
                                continue
2✔
970

971
                        case isStale(info.Node1UpdateTimestamp) &&
972
                                isSkewed(info.Node2UpdateTimestamp):
2✔
973

2✔
974
                                continue
2✔
975

976
                        case isStale(info.Node2UpdateTimestamp) &&
977
                                isSkewed(info.Node1UpdateTimestamp):
2✔
978

2✔
979
                                continue
2✔
980
                        }
981
                }
982

983
                g.bufferedChanRangeReplies = append(
119✔
984
                        g.bufferedChanRangeReplies, info,
119✔
985
                )
119✔
986
        }
987

988
        switch g.cfg.encodingType {
125✔
989
        case lnwire.EncodingSortedPlain:
125✔
990
                g.numChanRangeRepliesRcvd++
125✔
991
        case lnwire.EncodingSortedZlib:
×
992
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
993
        default:
×
994
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
995
        }
996

997
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
125✔
998
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
125✔
999

125✔
1000
        // If this isn't the last response and we can continue to receive more,
125✔
1001
        // then we can exit as we've already buffered the latest portion of the
125✔
1002
        // streaming reply.
125✔
1003
        maxReplies := g.cfg.maxQueryChanRangeReplies
125✔
1004
        switch {
125✔
1005
        // If we're communicating with a legacy node, we'll need to look at the
1006
        // complete field.
1007
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
15✔
1008
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
18✔
1009
                        return nil
3✔
1010
                }
3✔
1011

1012
        // Otherwise, we'll look at the reply's height range.
1013
        default:
110✔
1014
                replyLastHeight := msg.LastBlockHeight()
110✔
1015
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
1016

110✔
1017
                // TODO(wilmer): This might require some padding if the remote
110✔
1018
                // node is not aware of the last height we sent them, i.e., is
110✔
1019
                // behind a few blocks from us.
110✔
1020
                if replyLastHeight < queryLastHeight &&
110✔
1021
                        g.numChanRangeRepliesRcvd < maxReplies {
215✔
1022

105✔
1023
                        return nil
105✔
1024
                }
105✔
1025
        }
1026

1027
        log.Infof("GossipSyncer(%x): filtering through %v chans",
17✔
1028
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
17✔
1029

17✔
1030
        // Otherwise, this is the final response, so we'll now check to see
17✔
1031
        // which channels they know of that we don't.
17✔
1032
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
17✔
1033
                g.cfg.chainHash, g.bufferedChanRangeReplies,
17✔
1034
                g.cfg.isStillZombieChannel,
17✔
1035
        )
17✔
1036
        if err != nil {
17✔
1037
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
1038
        }
×
1039

1040
        // As we've received the entirety of the reply, we no longer need to
1041
        // hold on to the set of buffered replies or the original query that
1042
        // prompted the replies, so we'll let that be garbage collected now.
1043
        g.curQueryRangeMsg = nil
17✔
1044
        g.prevReplyChannelRange = nil
17✔
1045
        g.bufferedChanRangeReplies = nil
17✔
1046
        g.numChanRangeRepliesRcvd = 0
17✔
1047

17✔
1048
        // If there aren't any channels that we don't know of, then we can
17✔
1049
        // switch straight to our terminal state.
17✔
1050
        if len(newChans) == 0 {
31✔
1051
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
14✔
1052
                        g.cfg.peerPub[:])
14✔
1053

14✔
1054
                g.setSyncState(chansSynced)
14✔
1055

14✔
1056
                // Ensure that the sync manager becomes aware that the
14✔
1057
                // historical sync completed so synced_to_graph is updated over
14✔
1058
                // rpc.
14✔
1059
                g.cfg.markGraphSynced()
14✔
1060
                return nil
14✔
1061
        }
14✔
1062

1063
        // Otherwise, we'll set the set of channels that we need to query for
1064
        // the next state, and also transition our state.
1065
        g.newChansToQuery = newChans
3✔
1066
        g.setSyncState(queryNewChannels)
3✔
1067

3✔
1068
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
3✔
1069
                g.cfg.peerPub[:], len(newChans))
3✔
1070

3✔
1071
        return nil
3✔
1072
}
1073

1074
// genChanRangeQuery generates the initial message we'll send to the remote
1075
// party when we're kicking off the channel graph synchronization upon
1076
// connection. The historicalQuery boolean can be used to generate a query from
1077
// the genesis block of the chain.
1078
func (g *GossipSyncer) genChanRangeQuery(ctx context.Context,
1079
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
80✔
1080

80✔
1081
        // First, we'll query our channel graph time series for its highest
80✔
1082
        // known channel ID.
80✔
1083
        newestChan, err := g.cfg.channelSeries.HighestChanID(
80✔
1084
                ctx, g.cfg.chainHash,
80✔
1085
        )
80✔
1086
        if err != nil {
80✔
1087
                return nil, err
×
1088
        }
×
1089

1090
        // Once we have the chan ID of the newest, we'll obtain the block height
1091
        // of the channel, then subtract our default horizon to ensure we don't
1092
        // miss any channels. By default, we go back 1 day from the newest
1093
        // channel, unless we're attempting a historical sync, where we'll
1094
        // actually start from the genesis block instead.
1095
        var startHeight uint32
80✔
1096
        switch {
80✔
1097
        case historicalQuery:
19✔
1098
                fallthrough
19✔
1099
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
20✔
1100
                startHeight = 0
20✔
1101
        default:
60✔
1102
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
60✔
1103
        }
1104

1105
        // Determine the number of blocks to request based on our best height.
1106
        // We'll take into account any potential underflows and explicitly set
1107
        // numBlocks to its minimum value of 1 if so.
1108
        bestHeight := g.cfg.bestHeight()
80✔
1109
        numBlocks := bestHeight - startHeight
80✔
1110
        if int64(numBlocks) < 1 {
80✔
1111
                numBlocks = 1
×
1112
        }
×
1113

1114
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
80✔
1115
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
80✔
1116

80✔
1117
        // Finally, we'll craft the channel range query, using our starting
80✔
1118
        // height, then asking for all known channels to the foreseeable end of
80✔
1119
        // the main chain.
80✔
1120
        query := &lnwire.QueryChannelRange{
80✔
1121
                ChainHash:        g.cfg.chainHash,
80✔
1122
                FirstBlockHeight: startHeight,
80✔
1123
                NumBlocks:        numBlocks,
80✔
1124
        }
80✔
1125

80✔
1126
        if !g.cfg.noTimestampQueryOption {
152✔
1127
                query.QueryOptions = lnwire.NewTimestampQueryOption()
72✔
1128
        }
72✔
1129

1130
        g.curQueryRangeMsg = query
80✔
1131

80✔
1132
        return query, nil
80✔
1133
}
1134

1135
// replyPeerQueries is called in response to any query by the remote peer.
1136
// We'll examine our state and send back our best response.
1137
func (g *GossipSyncer) replyPeerQueries(ctx context.Context,
1138
        msg lnwire.Message) error {
5✔
1139

5✔
1140
        switch msg := msg.(type) {
5✔
1141

1142
        // In this state, we'll also handle any incoming channel range queries
1143
        // from the remote peer as they're trying to sync their state as well.
1144
        case *lnwire.QueryChannelRange:
3✔
1145
                return g.replyChanRangeQuery(ctx, msg)
3✔
1146

1147
        // If the remote peer skips straight to requesting new channels that
1148
        // they don't know of, then we'll ensure that we also handle this case.
1149
        case *lnwire.QueryShortChanIDs:
2✔
1150
                return g.replyShortChanIDs(ctx, msg)
2✔
1151

1152
        default:
×
1153
                return fmt.Errorf("unknown message: %T", msg)
×
1154
        }
1155
}
1156

1157
// replyChanRangeQuery will be dispatched in response to a channel range query
1158
// by the remote node. We'll query the channel time series for channels that
1159
// meet the channel range, then chunk our responses to the remote node. We also
1160
// ensure that our final fragment carries the "complete" bit to indicate the
1161
// end of our streaming response.
1162
func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context,
1163
        query *lnwire.QueryChannelRange) error {
9✔
1164

9✔
1165
        // Before responding, we'll check to ensure that the remote peer is
9✔
1166
        // querying for the same chain that we're on. If not, we'll send back a
9✔
1167
        // response with a complete value of zero to indicate we're on a
9✔
1168
        // different chain.
9✔
1169
        if g.cfg.chainHash != query.ChainHash {
10✔
1170
                log.Warnf("Remote peer requested QueryChannelRange for "+
1✔
1171
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1172
                        g.cfg.chainHash)
1✔
1173

1✔
1174
                return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
1✔
1175
                        ChainHash:        query.ChainHash,
1✔
1176
                        FirstBlockHeight: query.FirstBlockHeight,
1✔
1177
                        NumBlocks:        query.NumBlocks,
1✔
1178
                        Complete:         0,
1✔
1179
                        EncodingType:     g.cfg.encodingType,
1✔
1180
                        ShortChanIDs:     nil,
1✔
1181
                })
1✔
1182
        }
1✔
1183

1184
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
8✔
1185
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
8✔
1186
                query.NumBlocks)
8✔
1187

8✔
1188
        // Check if the query asked for timestamps. We will only serve
8✔
1189
        // timestamps if this has not been disabled with
8✔
1190
        // noTimestampQueryOption.
8✔
1191
        withTimestamps := query.WithTimestamps() &&
8✔
1192
                !g.cfg.noTimestampQueryOption
8✔
1193

8✔
1194
        // Next, we'll consult the time series to obtain the set of known
8✔
1195
        // channel ID's that match their query.
8✔
1196
        startBlock := query.FirstBlockHeight
8✔
1197
        endBlock := query.LastBlockHeight()
8✔
1198
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
8✔
1199
                query.ChainHash, startBlock, endBlock, withTimestamps,
8✔
1200
        )
8✔
1201
        if err != nil {
8✔
1202
                return err
×
1203
        }
×
1204

1205
        // TODO(roasbeef): means can't send max uint above?
1206
        //  * or make internal 64
1207

1208
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1209
        // this as there's a transport message size limit which we'll need to
1210
        // adhere to. We also need to make sure all of our replies cover the
1211
        // expected range of the query.
1212
        sendReplyForChunk := func(channelChunk []graphdb.ChannelUpdateInfo,
8✔
1213
                firstHeight, lastHeight uint32, finalChunk bool) error {
21✔
1214

13✔
1215
                // The number of blocks contained in the current chunk (the
13✔
1216
                // total span) is the difference between the last channel ID and
13✔
1217
                // the first in the range. We add one as even if all channels
13✔
1218
                // returned are in the same block, we need to count that.
13✔
1219
                numBlocks := lastHeight - firstHeight + 1
13✔
1220
                complete := uint8(0)
13✔
1221
                if finalChunk {
21✔
1222
                        complete = 1
8✔
1223
                }
8✔
1224

1225
                var timestamps lnwire.Timestamps
13✔
1226
                if withTimestamps {
13✔
UNCOV
1227
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
×
UNCOV
1228
                }
×
1229

1230
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
13✔
1231
                for i, info := range channelChunk {
27✔
1232
                        scids[i] = info.ShortChannelID
14✔
1233

14✔
1234
                        if !withTimestamps {
28✔
1235
                                continue
14✔
1236
                        }
1237

UNCOV
1238
                        timestamps[i].Timestamp1 = uint32(
×
UNCOV
1239
                                info.Node1UpdateTimestamp.Unix(),
×
UNCOV
1240
                        )
×
UNCOV
1241

×
UNCOV
1242
                        timestamps[i].Timestamp2 = uint32(
×
UNCOV
1243
                                info.Node2UpdateTimestamp.Unix(),
×
UNCOV
1244
                        )
×
1245
                }
1246

1247
                return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
13✔
1248
                        ChainHash:        query.ChainHash,
13✔
1249
                        NumBlocks:        numBlocks,
13✔
1250
                        FirstBlockHeight: firstHeight,
13✔
1251
                        Complete:         complete,
13✔
1252
                        EncodingType:     g.cfg.encodingType,
13✔
1253
                        ShortChanIDs:     scids,
13✔
1254
                        Timestamps:       timestamps,
13✔
1255
                })
13✔
1256
        }
1257

1258
        var (
8✔
1259
                firstHeight  = query.FirstBlockHeight
8✔
1260
                lastHeight   uint32
8✔
1261
                channelChunk []graphdb.ChannelUpdateInfo
8✔
1262
        )
8✔
1263

8✔
1264
        // chunkSize is the maximum number of SCIDs that we can safely put in a
8✔
1265
        // single message. If we also need to include timestamps though, then
8✔
1266
        // this number is halved since encoding two timestamps takes the same
8✔
1267
        // number of bytes as encoding an SCID.
8✔
1268
        chunkSize := g.cfg.chunkSize
8✔
1269
        if withTimestamps {
8✔
UNCOV
1270
                chunkSize /= 2
×
UNCOV
1271
        }
×
1272

1273
        for _, channelRange := range channelRanges {
22✔
1274
                channels := channelRange.Channels
14✔
1275
                numChannels := int32(len(channels))
14✔
1276
                numLeftToAdd := chunkSize - int32(len(channelChunk))
14✔
1277

14✔
1278
                // Include the current block in the ongoing chunk if it can fit
14✔
1279
                // and move on to the next block.
14✔
1280
                if numChannels <= numLeftToAdd {
23✔
1281
                        channelChunk = append(channelChunk, channels...)
9✔
1282
                        continue
9✔
1283
                }
1284

1285
                // Otherwise, we need to send our existing channel chunk as is
1286
                // as its own reply and start a new one for the current block.
1287
                // We'll mark the end of our current chunk as the height before
1288
                // the current block to ensure the whole query range is replied
1289
                // to.
1290
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
5✔
1291
                        g.cfg.peerPub[:], len(channelChunk))
5✔
1292

5✔
1293
                lastHeight = channelRange.Height - 1
5✔
1294
                err := sendReplyForChunk(
5✔
1295
                        channelChunk, firstHeight, lastHeight, false,
5✔
1296
                )
5✔
1297
                if err != nil {
5✔
1298
                        return err
×
1299
                }
×
1300

1301
                // With the reply constructed, we'll start tallying channels for
1302
                // our next one keeping in mind our chunk size. This may result
1303
                // in channels for this block being left out from the reply, but
1304
                // this isn't an issue since we'll randomly shuffle them and we
1305
                // assume a historical gossip sync is performed at a later time.
1306
                firstHeight = channelRange.Height
5✔
1307
                finalChunkSize := numChannels
5✔
1308
                exceedsChunkSize := numChannels > chunkSize
5✔
1309
                if exceedsChunkSize {
5✔
1310
                        rand.Shuffle(len(channels), func(i, j int) {
×
1311
                                channels[i], channels[j] = channels[j], channels[i]
×
1312
                        })
×
1313
                        finalChunkSize = chunkSize
×
1314
                }
1315
                channelChunk = channels[:finalChunkSize]
5✔
1316

5✔
1317
                // Sort the chunk once again if we had to shuffle it.
5✔
1318
                if exceedsChunkSize {
5✔
1319
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1320
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1321
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1322

×
1323
                                return id1 < id2
×
1324
                        })
×
1325
                }
1326
        }
1327

1328
        // Send the remaining chunk as the final reply.
1329
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
8✔
1330
                g.cfg.peerPub[:], len(channelChunk))
8✔
1331

8✔
1332
        return sendReplyForChunk(
8✔
1333
                channelChunk, firstHeight, query.LastBlockHeight(), true,
8✔
1334
        )
8✔
1335
}
1336

1337
// replyShortChanIDs will be dispatched in response to a query by the remote
1338
// node for information concerning a set of short channel ID's. Our response
1339
// will be sent in a streaming chunked manner to ensure that we remain below
1340
// the current transport level message size.
1341
func (g *GossipSyncer) replyShortChanIDs(ctx context.Context,
1342
        query *lnwire.QueryShortChanIDs) error {
4✔
1343

4✔
1344
        // Before responding, we'll check to ensure that the remote peer is
4✔
1345
        // querying for the same chain that we're on. If not, we'll send back a
4✔
1346
        // response with a complete value of zero to indicate we're on a
4✔
1347
        // different chain.
4✔
1348
        if g.cfg.chainHash != query.ChainHash {
5✔
1349
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
1✔
1350
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1351
                        g.cfg.chainHash)
1✔
1352

1✔
1353
                return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
1✔
1354
                        ChainHash: query.ChainHash,
1✔
1355
                        Complete:  0,
1✔
1356
                })
1✔
1357
        }
1✔
1358

1359
        if len(query.ShortChanIDs) == 0 {
3✔
1360
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1361
                        g.cfg.peerPub[:])
×
1362
                return nil
×
1363
        }
×
1364

1365
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
3✔
1366
                g.cfg.peerPub[:], len(query.ShortChanIDs))
3✔
1367

3✔
1368
        // Now that we know we're on the same chain, we'll query the channel
3✔
1369
        // time series for the set of messages that we know of which satisfies
3✔
1370
        // the requirement of being a chan ann, chan update, or a node ann
3✔
1371
        // related to the set of queried channels.
3✔
1372
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
3✔
1373
                query.ChainHash, query.ShortChanIDs,
3✔
1374
        )
3✔
1375
        if err != nil {
3✔
1376
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1377
                        query.ShortChanIDs[0].ToUint64(), err)
×
1378
        }
×
1379

1380
        // Reply with any messages related to those channel ID's, we'll write
1381
        // each one individually and synchronously to throttle the sends and
1382
        // perform buffering of responses in the syncer as opposed to the peer.
1383
        for _, msg := range replyMsgs {
6✔
1384
                err := g.sendToPeerSync(ctx, msg)
3✔
1385
                if err != nil {
3✔
1386
                        return err
×
1387
                }
×
1388
        }
1389

1390
        // Regardless of whether we had any messages to reply with, send over
1391
        // the sentinel message to signal that the stream has terminated.
1392
        return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
3✔
1393
                ChainHash: query.ChainHash,
3✔
1394
                Complete:  1,
3✔
1395
        })
3✔
1396
}
1397

1398
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1399
// state machine. Once applied, we'll ensure that we don't forward any messages
1400
// to the peer that aren't within the time range of the filter.
1401
func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
1402
        filter *lnwire.GossipTimestampRange) error {
68✔
1403

68✔
1404
        g.Lock()
68✔
1405

68✔
1406
        g.remoteUpdateHorizon = filter
68✔
1407

68✔
1408
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
68✔
1409
        endTime := startTime.Add(
68✔
1410
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
68✔
1411
        )
68✔
1412

68✔
1413
        g.Unlock()
68✔
1414

68✔
1415
        // If requested, don't reply with historical gossip data when the remote
68✔
1416
        // peer sets their gossip timestamp range.
68✔
1417
        if g.cfg.ignoreHistoricalFilters {
69✔
1418
                return nil
1✔
1419
        }
1✔
1420

1421
        // Check if a goroutine is already sending the backlog. If so, return
1422
        // early without attempting to acquire the semaphore.
1423
        if g.isSendingBacklog.Load() {
72✔
1424
                log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+
5✔
1425
                        "backlog send already in progress", g.cfg.peerPub[:])
5✔
1426
                return nil
5✔
1427
        }
5✔
1428

1429
        select {
62✔
1430
        case <-g.syncerSema:
61✔
1431
        case <-g.cg.Done():
1✔
1432
                return ErrGossipSyncerExiting
1✔
1433
        case <-ctx.Done():
×
1434
                return ctx.Err()
×
1435
        }
1436

1437
        // We don't put this in a defer because if the goroutine is launched,
1438
        // it needs to be called when the goroutine is stopped.
1439
        returnSema := func() {
69✔
1440
                g.syncerSema <- struct{}{}
8✔
1441
        }
8✔
1442

1443
        // Now that the remote peer has applied their filter, we'll query the
1444
        // database for all the messages that are beyond this filter.
1445
        newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
61✔
1446
                g.cfg.chainHash, startTime, endTime,
61✔
1447
        )
61✔
1448
        if err != nil {
61✔
1449
                returnSema()
×
1450
                return err
×
1451
        }
×
1452

1453
        log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
8✔
1454
                "start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
8✔
1455
                startTime, endTime, len(newUpdatestoSend))
8✔
1456

8✔
1457
        // If we don't have any to send, then we can return early.
8✔
1458
        if len(newUpdatestoSend) == 0 {
14✔
1459
                returnSema()
6✔
1460
                return nil
6✔
1461
        }
6✔
1462

1463
        // Set the atomic flag to indicate we're starting to send the backlog.
1464
        // If the swap fails, it means another goroutine is already active, so
1465
        // we return early.
1466
        if !g.isSendingBacklog.CompareAndSwap(false, true) {
2✔
1467
                returnSema()
×
1468
                log.Debugf("GossipSyncer(%x): another goroutine already "+
×
1469
                        "sending backlog, skipping", g.cfg.peerPub[:])
×
1470

×
1471
                return nil
×
1472
        }
×
1473

1474
        // We'll conclude by launching a goroutine to send out any updates.
1475
        g.cg.WgAdd(1)
2✔
1476
        go func() {
4✔
1477
                defer g.cg.WgDone()
2✔
1478
                defer returnSema()
2✔
1479
                defer g.isSendingBacklog.Store(false)
2✔
1480

2✔
1481
                for _, msg := range newUpdatestoSend {
4✔
1482
                        err := g.sendToPeerSync(ctx, msg)
2✔
1483
                        switch {
2✔
1484
                        case err == ErrGossipSyncerExiting:
×
1485
                                return
×
1486

1487
                        case err == lnpeer.ErrPeerExiting:
×
1488
                                return
×
1489

1490
                        case err != nil:
×
1491
                                log.Errorf("Unable to send message for "+
×
1492
                                        "peer catch up: %v", err)
×
1493
                        }
1494
                }
1495
        }()
1496

1497
        return nil
2✔
1498
}
1499

1500
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1501
// iff the message is within the bounds of their set gossip filter. If the peer
1502
// doesn't have a gossip filter set, then no messages will be forwarded.
1503
func (g *GossipSyncer) FilterGossipMsgs(ctx context.Context,
1504
        msgs ...msgWithSenders) {
2✔
1505

2✔
1506
        // If the peer doesn't have an update horizon set, then we won't send
2✔
1507
        // it any new update messages.
2✔
1508
        if g.remoteUpdateHorizon == nil {
3✔
1509
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
1✔
1510
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
1✔
1511
                return
1✔
1512
        }
1✔
1513

1514
        // If we've been signaled to exit, or are exiting, then we'll stop
1515
        // short.
1516
        select {
1✔
1517
        case <-g.cg.Done():
×
1518
                return
×
1519
        case <-ctx.Done():
×
1520
                return
×
1521
        default:
1✔
1522
        }
1523

1524
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1525
        // gossiper on peer termination to signal peer disconnect?
1526

1527
        var err error
1✔
1528

1✔
1529
        // Before we filter out the messages, we'll construct an index over the
1✔
1530
        // set of channel announcements and channel updates. This will allow us
1✔
1531
        // to quickly check if we should forward a chan ann, based on the known
1✔
1532
        // channel updates for a channel.
1✔
1533
        chanUpdateIndex := make(
1✔
1534
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
1✔
1535
        )
1✔
1536
        for _, msg := range msgs {
11✔
1537
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
10✔
1538
                if !ok {
17✔
1539
                        continue
7✔
1540
                }
1541

1542
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
3✔
1543
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
3✔
1544
                )
3✔
1545
        }
1546

1547
        // We'll construct a helper function that we'll us below to determine
1548
        // if a given messages passes the gossip msg filter.
1549
        g.Lock()
1✔
1550
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
1✔
1551
        endTime := startTime.Add(
1✔
1552
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
1✔
1553
        )
1✔
1554
        g.Unlock()
1✔
1555

1✔
1556
        passesFilter := func(timeStamp uint32) bool {
11✔
1557
                t := time.Unix(int64(timeStamp), 0)
10✔
1558
                return t.Equal(startTime) ||
10✔
1559
                        (t.After(startTime) && t.Before(endTime))
10✔
1560
        }
10✔
1561

1562
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
1✔
1563
        for _, msg := range msgs {
11✔
1564
                // If the target peer is the peer that sent us this message,
10✔
1565
                // then we'll exit early as we don't need to filter this
10✔
1566
                // message.
10✔
1567
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
10✔
UNCOV
1568
                        continue
×
1569
                }
1570

1571
                switch msg := msg.msg.(type) {
10✔
1572

1573
                // For each channel announcement message, we'll only send this
1574
                // message if the channel updates for the channel are between
1575
                // our time range.
1576
                case *lnwire.ChannelAnnouncement1:
4✔
1577
                        // First, we'll check if the channel updates are in
4✔
1578
                        // this message batch.
4✔
1579
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
4✔
1580
                        if !ok {
5✔
1581
                                // If not, we'll attempt to query the database
1✔
1582
                                // to see if we know of the updates.
1✔
1583
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
1✔
1584
                                        g.cfg.chainHash, msg.ShortChannelID,
1✔
1585
                                )
1✔
1586
                                if err != nil {
1✔
1587
                                        log.Warnf("no channel updates found for "+
×
1588
                                                "short_chan_id=%v",
×
1589
                                                msg.ShortChannelID)
×
1590
                                        continue
×
1591
                                }
1592
                        }
1593

1594
                        for _, chanUpdate := range chanUpdates {
8✔
1595
                                if passesFilter(chanUpdate.Timestamp) {
5✔
1596
                                        msgsToSend = append(msgsToSend, msg)
1✔
1597
                                        break
1✔
1598
                                }
1599
                        }
1600

1601
                        if len(chanUpdates) == 0 {
4✔
UNCOV
1602
                                msgsToSend = append(msgsToSend, msg)
×
UNCOV
1603
                        }
×
1604

1605
                // For each channel update, we'll only send if it the timestamp
1606
                // is between our time range.
1607
                case *lnwire.ChannelUpdate1:
3✔
1608
                        if passesFilter(msg.Timestamp) {
4✔
1609
                                msgsToSend = append(msgsToSend, msg)
1✔
1610
                        }
1✔
1611

1612
                // Similarly, we only send node announcements if the update
1613
                // timestamp ifs between our set gossip filter time range.
1614
                case *lnwire.NodeAnnouncement:
3✔
1615
                        if passesFilter(msg.Timestamp) {
4✔
1616
                                msgsToSend = append(msgsToSend, msg)
1✔
1617
                        }
1✔
1618
                }
1619
        }
1620

1621
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
1✔
1622
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
1✔
1623

1✔
1624
        if len(msgsToSend) == 0 {
1✔
UNCOV
1625
                return
×
UNCOV
1626
        }
×
1627

1628
        if err = g.sendToPeer(ctx, msgsToSend...); err != nil {
1✔
1629
                log.Errorf("unable to send gossip msgs: %v", err)
×
1630
        }
×
1631

1632
}
1633

1634
// ProcessQueryMsg is used by outside callers to pass new channel time series
1635
// queries to the internal processing goroutine.
1636
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
112✔
1637
        var msgChan chan lnwire.Message
112✔
1638
        switch msg.(type) {
112✔
UNCOV
1639
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
×
UNCOV
1640
                msgChan = g.queryMsgs
×
1641

1642
        // Reply messages should only be expected in states where we're waiting
1643
        // for a reply.
1644
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
112✔
1645
                g.Lock()
112✔
1646
                syncState := g.syncState()
112✔
1647
                g.Unlock()
112✔
1648

112✔
1649
                if syncState != waitingQueryRangeReply &&
112✔
1650
                        syncState != waitingQueryChanReply {
113✔
1651

1✔
1652
                        return fmt.Errorf("unexpected msg %T received in "+
1✔
1653
                                "state %v", msg, syncState)
1✔
1654
                }
1✔
1655
                msgChan = g.gossipMsgs
111✔
1656

1657
        default:
×
1658
                msgChan = g.gossipMsgs
×
1659
        }
1660

1661
        select {
111✔
1662
        case msgChan <- msg:
111✔
1663
        case <-peerQuit:
×
1664
        case <-g.cg.Done():
×
1665
        }
1666

1667
        return nil
111✔
1668
}
1669

1670
// setSyncState sets the gossip syncer's state to the given state.
1671
func (g *GossipSyncer) setSyncState(state syncerState) {
150✔
1672
        atomic.StoreUint32(&g.state, uint32(state))
150✔
1673
}
150✔
1674

1675
// syncState returns the current syncerState of the target GossipSyncer.
1676
func (g *GossipSyncer) syncState() syncerState {
536✔
1677
        return syncerState(atomic.LoadUint32(&g.state))
536✔
1678
}
536✔
1679

1680
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1681
// a signal for when the GossipSyncer has reached its chansSynced state.
1682
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
14✔
1683
        g.Lock()
14✔
1684
        defer g.Unlock()
14✔
1685

14✔
1686
        syncedSignal := make(chan struct{})
14✔
1687

14✔
1688
        syncState := syncerState(atomic.LoadUint32(&g.state))
14✔
1689
        if syncState == chansSynced {
16✔
1690
                close(syncedSignal)
2✔
1691
                return syncedSignal
2✔
1692
        }
2✔
1693

1694
        g.syncedSignal = syncedSignal
12✔
1695
        return g.syncedSignal
12✔
1696
}
1697

1698
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1699
// sync type to a new one.
1700
//
1701
// NOTE: This can only be done once the gossip syncer has reached its final
1702
// chansSynced state.
1703
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
14✔
1704
        errChan := make(chan error, 1)
14✔
1705
        select {
14✔
1706
        case g.syncTransitionReqs <- &syncTransitionReq{
1707
                newSyncType: newSyncType,
1708
                errChan:     errChan,
1709
        }:
14✔
1710
        case <-time.After(syncTransitionTimeout):
×
1711
                return ErrSyncTransitionTimeout
×
1712
        case <-g.cg.Done():
×
1713
                return ErrGossipSyncerExiting
×
1714
        }
1715

1716
        select {
14✔
1717
        case err := <-errChan:
14✔
1718
                return err
14✔
1719
        case <-g.cg.Done():
×
1720
                return ErrGossipSyncerExiting
×
1721
        }
1722
}
1723

1724
// handleSyncTransition handles a new sync type transition request.
1725
//
1726
// NOTE: The gossip syncer might have another sync state as a result of this
1727
// transition.
1728
func (g *GossipSyncer) handleSyncTransition(ctx context.Context,
1729
        req *syncTransitionReq) error {
14✔
1730

14✔
1731
        // Return early from any NOP sync transitions.
14✔
1732
        syncType := g.SyncType()
14✔
1733
        if syncType == req.newSyncType {
14✔
1734
                return nil
×
1735
        }
×
1736

1737
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
14✔
1738
                g.cfg.peerPub, syncType, req.newSyncType)
14✔
1739

14✔
1740
        var (
14✔
1741
                firstTimestamp time.Time
14✔
1742
                timestampRange uint32
14✔
1743
        )
14✔
1744

14✔
1745
        switch req.newSyncType {
14✔
1746
        // If an active sync has been requested, then we should resume receiving
1747
        // new graph updates from the remote peer.
1748
        case ActiveSync, PinnedSync:
12✔
1749
                firstTimestamp = time.Now()
12✔
1750
                timestampRange = math.MaxUint32
12✔
1751

1752
        // If a PassiveSync transition has been requested, then we should no
1753
        // longer receive any new updates from the remote peer. We can do this
1754
        // by setting our update horizon to a range in the past ensuring no
1755
        // graph updates match the timestamp range.
1756
        case PassiveSync:
2✔
1757
                firstTimestamp = zeroTimestamp
2✔
1758
                timestampRange = 0
2✔
1759

1760
        default:
×
1761
                return fmt.Errorf("unhandled sync transition %v",
×
1762
                        req.newSyncType)
×
1763
        }
1764

1765
        err := g.sendGossipTimestampRange(ctx, firstTimestamp, timestampRange)
14✔
1766
        if err != nil {
14✔
1767
                return fmt.Errorf("unable to send local update horizon: %w",
×
1768
                        err)
×
1769
        }
×
1770

1771
        g.setSyncType(req.newSyncType)
14✔
1772

14✔
1773
        return nil
14✔
1774
}
1775

1776
// setSyncType sets the gossip syncer's sync type to the given type.
1777
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
67✔
1778
        atomic.StoreUint32(&g.syncType, uint32(syncType))
67✔
1779
}
67✔
1780

1781
// SyncType returns the current SyncerType of the target GossipSyncer.
1782
func (g *GossipSyncer) SyncType() SyncerType {
407✔
1783
        return SyncerType(atomic.LoadUint32(&g.syncType))
407✔
1784
}
407✔
1785

1786
// historicalSync sends a request to the gossip syncer to perofmr a historical
1787
// sync.
1788
//
1789
// NOTE: This can only be done once the gossip syncer has reached its final
1790
// chansSynced state.
1791
func (g *GossipSyncer) historicalSync() error {
16✔
1792
        done := make(chan struct{})
16✔
1793

16✔
1794
        select {
16✔
1795
        case g.historicalSyncReqs <- &historicalSyncReq{
1796
                doneChan: done,
1797
        }:
16✔
1798
        case <-time.After(syncTransitionTimeout):
×
1799
                return ErrSyncTransitionTimeout
×
1800
        case <-g.cg.Done():
×
1801
                return ErrGossiperShuttingDown
×
1802
        }
1803

1804
        select {
16✔
1805
        case <-done:
16✔
1806
                return nil
16✔
1807
        case <-g.cg.Done():
×
1808
                return ErrGossiperShuttingDown
×
1809
        }
1810
}
1811

1812
// handleHistoricalSync handles a request to the gossip syncer to perform a
1813
// historical sync.
1814
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
16✔
1815
        // We'll go back to our initial syncingChans state in order to request
16✔
1816
        // the remote peer to give us all of the channel IDs they know of
16✔
1817
        // starting from the genesis block.
16✔
1818
        g.genHistoricalChanRangeQuery = true
16✔
1819
        g.setSyncState(syncingChans)
16✔
1820
        close(req.doneChan)
16✔
1821
}
16✔
1822

1823
// sendToPeer sends a variadic number of messages to the remote peer. This
1824
// method should not block while waiting for sends to be written to the wire.
1825
func (g *GossipSyncer) sendToPeer(ctx context.Context,
1826
        msgs ...lnwire.Message) error {
110✔
1827

110✔
1828
        return g.sendMsgRateLimited(ctx, false, msgs...)
110✔
1829
}
110✔
1830

1831
// sendToPeerSync sends a variadic number of messages to the remote peer,
1832
// blocking until all messages have been sent successfully or a write error is
1833
// encountered.
1834
func (g *GossipSyncer) sendToPeerSync(ctx context.Context,
1835
        msgs ...lnwire.Message) error {
23✔
1836

23✔
1837
        return g.sendMsgRateLimited(ctx, true, msgs...)
23✔
1838
}
23✔
1839

1840
// sendMsgRateLimited sends a variadic number of messages to the remote peer,
1841
// applying our per-peer rate limit before each send. The sync boolean
1842
// determines if the send is blocking or not.
1843
func (g *GossipSyncer) sendMsgRateLimited(ctx context.Context, sync bool,
1844
        msgs ...lnwire.Message) error {
133✔
1845

133✔
1846
        for _, msg := range msgs {
268✔
1847
                err := maybeRateLimitMsg(
135✔
1848
                        ctx, g.rateLimiter, g.cfg.peerPub, msg, g.cg.Done(),
135✔
1849
                )
135✔
1850
                if err != nil {
135✔
1851
                        return err
×
1852
                }
×
1853

1854
                err = g.cfg.sendMsg(ctx, sync, msg)
135✔
1855
                if err != nil {
135✔
1856
                        return err
×
1857
                }
×
1858
        }
1859

1860
        return nil
133✔
1861
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc