• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13980275562

20 Mar 2025 10:06PM UTC coverage: 58.6% (-10.2%) from 68.789%
13980275562

Pull #9623

github

web-flow
Merge b9b960345 into 09b674508
Pull Request #9623: Size msg test msg

0 of 1518 new or added lines in 42 files covered. (0.0%)

26603 existing lines in 443 files now uncovered.

96807 of 165200 relevant lines covered (58.6%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.03
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "errors"
5
        "fmt"
6
        "math"
7
        "math/rand"
8
        "sort"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/lightningnetwork/lnd/graph"
15
        graphdb "github.com/lightningnetwork/lnd/graph/db"
16
        "github.com/lightningnetwork/lnd/lnpeer"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "golang.org/x/time/rate"
19
)
20

21
// SyncerType encapsulates the different types of syncing mechanisms for a
22
// gossip syncer.
23
type SyncerType uint8
24

25
const (
26
        // ActiveSync denotes that a gossip syncer:
27
        //
28
        // 1. Should not attempt to synchronize with the remote peer for
29
        //    missing channels.
30
        // 2. Should respond to queries from the remote peer.
31
        // 3. Should receive new updates from the remote peer.
32
        //
33
        // They are started in a chansSynced state in order to accomplish their
34
        // responsibilities above.
35
        ActiveSync SyncerType = iota
36

37
        // PassiveSync denotes that a gossip syncer:
38
        //
39
        // 1. Should not attempt to synchronize with the remote peer for
40
        //    missing channels.
41
        // 2. Should respond to queries from the remote peer.
42
        // 3. Should not receive new updates from the remote peer.
43
        //
44
        // They are started in a chansSynced state in order to accomplish their
45
        // responsibilities above.
46
        PassiveSync
47

48
        // PinnedSync denotes an ActiveSync that doesn't count towards the
49
        // default active syncer limits and is always active throughout the
50
        // duration of the peer's connection. Each pinned syncer will begin by
51
        // performing a historical sync to ensure we are well synchronized with
52
        // their routing table.
53
        PinnedSync
54
)
55

56
// String returns a human readable string describing the target SyncerType.
57
func (t SyncerType) String() string {
3✔
58
        switch t {
3✔
59
        case ActiveSync:
3✔
60
                return "ActiveSync"
3✔
61
        case PassiveSync:
3✔
62
                return "PassiveSync"
3✔
63
        case PinnedSync:
3✔
64
                return "PinnedSync"
3✔
65
        default:
×
66
                return fmt.Sprintf("unknown sync type %d", t)
×
67
        }
68
}
69

70
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
71
// allowing new gossip messages to be received from the peer.
72
func (t SyncerType) IsActiveSync() bool {
3✔
73
        switch t {
3✔
74
        case ActiveSync, PinnedSync:
3✔
75
                return true
3✔
76
        default:
3✔
77
                return false
3✔
78
        }
79
}
80

81
// syncerState is an enum that represents the current state of the GossipSyncer.
82
// As the syncer is a state machine, we'll gate our actions based off of the
83
// current state and the next incoming message.
84
type syncerState uint32
85

86
const (
87
        // syncingChans is the default state of the GossipSyncer. We start in
88
        // this state when a new peer first connects and we don't yet know if
89
        // we're fully synchronized.
90
        syncingChans syncerState = iota
91

92
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
93
        // We enter this state after we send out our first QueryChannelRange
94
        // reply. We'll stay in this state until the remote party sends us a
95
        // ReplyShortChanIDsEnd message that indicates they've responded to our
96
        // query entirely. After this state, we'll transition to
97
        // waitingQueryChanReply after we send out requests for all the new
98
        // chan ID's to us.
99
        waitingQueryRangeReply
100

101
        // queryNewChannels is the third main phase of the GossipSyncer.  In
102
        // this phase we'll send out all of our QueryShortChanIDs messages in
103
        // response to the new channels that we don't yet know about.
104
        queryNewChannels
105

106
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
107
        // We enter this phase once we've sent off a query chink to the remote
108
        // peer.  We'll stay in this phase until we receive a
109
        // ReplyShortChanIDsEnd message which indicates that the remote party
110
        // has responded to all of our requests.
111
        waitingQueryChanReply
112

113
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
114
        // this phase, we'll send out our update horizon, which filters out the
115
        // set of channel updates that we're interested in. In this state,
116
        // we'll be able to accept any outgoing messages from the
117
        // AuthenticatedGossiper, and decide if we should forward them to our
118
        // target peer based on its update horizon.
119
        chansSynced
120

121
        // syncerIdle is a state in which the gossip syncer can handle external
122
        // requests to transition or perform historical syncs. It is used as the
123
        // initial state for pinned syncers, as well as a fallthrough case for
124
        // chansSynced allowing fully synced peers to facilitate requests.
125
        syncerIdle
126
)
127

128
// String returns a human readable string describing the target syncerState.
129
func (s syncerState) String() string {
3✔
130
        switch s {
3✔
131
        case syncingChans:
3✔
132
                return "syncingChans"
3✔
133

134
        case waitingQueryRangeReply:
3✔
135
                return "waitingQueryRangeReply"
3✔
136

137
        case queryNewChannels:
3✔
138
                return "queryNewChannels"
3✔
139

140
        case waitingQueryChanReply:
3✔
141
                return "waitingQueryChanReply"
3✔
142

143
        case chansSynced:
3✔
144
                return "chansSynced"
3✔
145

146
        case syncerIdle:
3✔
147
                return "syncerIdle"
3✔
148

149
        default:
×
150
                return "UNKNOWN STATE"
×
151
        }
152
}
153

154
const (
155
        // DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
156
        // will respond to immediately before starting to delay responses.
157
        DefaultMaxUndelayedQueryReplies = 10
158

159
        // DefaultDelayedQueryReplyInterval is the length of time we will wait
160
        // before responding to gossip queries after replying to
161
        // maxUndelayedQueryReplies queries.
162
        DefaultDelayedQueryReplyInterval = 5 * time.Second
163

164
        // maxQueryChanRangeReplies specifies the default limit of replies to
165
        // process for a single QueryChannelRange request.
166
        maxQueryChanRangeReplies = 500
167

168
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
169
        // the maximum number of replies allowed for zlib encoded replies.
170
        maxQueryChanRangeRepliesZlibFactor = 4
171

172
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
173
        // asking the remote peer for their any channels they know of beyond
174
        // our highest known channel ID.
175
        chanRangeQueryBuffer = 144
176

177
        // syncTransitionTimeout is the default timeout in which we'll wait up
178
        // to when attempting to perform a sync transition.
179
        syncTransitionTimeout = 5 * time.Second
180

181
        // requestBatchSize is the maximum number of channels we will query the
182
        // remote peer for in a QueryShortChanIDs message.
183
        requestBatchSize = 500
184

185
        // syncerBufferSize is the size of the syncer's buffers.
186
        syncerBufferSize = 5
187
)
188

189
var (
190
        // encodingTypeToChunkSize maps an encoding type, to the max number of
191
        // short chan ID's using the encoding type that we can fit into a
192
        // single message safely.
193
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
194
                lnwire.EncodingSortedPlain: 8000,
195
        }
196

197
        // ErrGossipSyncerExiting signals that the syncer has been killed.
198
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
199

200
        // ErrSyncTransitionTimeout is an error returned when we've timed out
201
        // attempting to perform a sync transition.
202
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
203
                "transition sync type")
204

205
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
206
        // peers that we do not want to receive any new graph updates.
207
        zeroTimestamp time.Time
208
)
209

210
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
211
type syncTransitionReq struct {
212
        newSyncType SyncerType
213
        errChan     chan error
214
}
215

216
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
217
// historical sync.
218
type historicalSyncReq struct {
219
        // doneChan is a channel that serves as a signal and is closed to ensure
220
        // the historical sync is attempted by the time we return to the caller.
221
        doneChan chan struct{}
222
}
223

224
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
225
// needs to carry out its duties.
226
type gossipSyncerCfg struct {
227
        // chainHash is the chain that this syncer is responsible for.
228
        chainHash chainhash.Hash
229

230
        // peerPub is the public key of the peer we're syncing with, serialized
231
        // in compressed format.
232
        peerPub [33]byte
233

234
        // channelSeries is the primary interface that we'll use to generate
235
        // our queries and respond to the queries of the remote peer.
236
        channelSeries ChannelGraphTimeSeries
237

238
        // encodingType is the current encoding type we're aware of. Requests
239
        // with different encoding types will be rejected.
240
        encodingType lnwire.QueryEncoding
241

242
        // chunkSize is the max number of short chan IDs using the syncer's
243
        // encoding type that we can fit into a single message safely.
244
        chunkSize int32
245

246
        // batchSize is the max number of channels the syncer will query from
247
        // the remote node in a single QueryShortChanIDs request.
248
        batchSize int32
249

250
        // sendToPeer sends a variadic number of messages to the remote peer.
251
        // This method should not block while waiting for sends to be written
252
        // to the wire.
253
        sendToPeer func(...lnwire.Message) error
254

255
        // sendToPeerSync sends a variadic number of messages to the remote
256
        // peer, blocking until all messages have been sent successfully or a
257
        // write error is encountered.
258
        sendToPeerSync func(...lnwire.Message) error
259

260
        // maxUndelayedQueryReplies specifies how many gossip queries we will
261
        // respond to immediately before starting to delay responses.
262
        maxUndelayedQueryReplies int
263

264
        // delayedQueryReplyInterval is the length of time we will wait before
265
        // responding to gossip queries after replying to
266
        // maxUndelayedQueryReplies queries.
267
        delayedQueryReplyInterval time.Duration
268

269
        // noSyncChannels will prevent the GossipSyncer from spawning a
270
        // channelGraphSyncer, meaning we will not try to reconcile unknown
271
        // channels with the remote peer.
272
        noSyncChannels bool
273

274
        // noReplyQueries will prevent the GossipSyncer from spawning a
275
        // replyHandler, meaning we will not reply to queries from our remote
276
        // peer.
277
        noReplyQueries bool
278

279
        // noTimestampQueryOption will prevent the GossipSyncer from querying
280
        // timestamps of announcement messages from the peer, and it will
281
        // prevent it from responding to timestamp queries.
282
        noTimestampQueryOption bool
283

284
        // ignoreHistoricalFilters will prevent syncers from replying with
285
        // historical data when the remote peer sets a gossip_timestamp_range.
286
        // This prevents ranges with old start times from causing us to dump the
287
        // graph on connect.
288
        ignoreHistoricalFilters bool
289

290
        // bestHeight returns the latest height known of the chain.
291
        bestHeight func() uint32
292

293
        // markGraphSynced updates the SyncManager's perception of whether we
294
        // have completed at least one historical sync.
295
        markGraphSynced func()
296

297
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
298
        // for a single QueryChannelRange request.
299
        maxQueryChanRangeReplies uint32
300

301
        // isStillZombieChannel takes the timestamps of the latest channel
302
        // updates for a channel and returns true if the channel should be
303
        // considered a zombie based on these timestamps.
304
        isStillZombieChannel func(time.Time, time.Time) bool
305
}
306

307
// GossipSyncer is a struct that handles synchronizing the channel graph state
308
// with a remote peer. The GossipSyncer implements a state machine that will
309
// progressively ensure we're synchronized with the channel state of the remote
310
// node. Once both nodes have been synchronized, we'll use an update filter to
311
// filter out which messages should be sent to a remote peer based on their
312
// update horizon. If the update horizon isn't specified, then we won't send
313
// them any channel updates at all.
314
type GossipSyncer struct {
315
        started sync.Once
316
        stopped sync.Once
317

318
        // state is the current state of the GossipSyncer.
319
        //
320
        // NOTE: This variable MUST be used atomically.
321
        state uint32
322

323
        // syncType denotes the SyncerType the gossip syncer is currently
324
        // exercising.
325
        //
326
        // NOTE: This variable MUST be used atomically.
327
        syncType uint32
328

329
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
330
        // use this to properly filter out any messages.
331
        remoteUpdateHorizon *lnwire.GossipTimestampRange
332

333
        // localUpdateHorizon is our local update horizon, we'll use this to
334
        // determine if we've already sent out our update.
335
        localUpdateHorizon *lnwire.GossipTimestampRange
336

337
        // syncTransitions is a channel through which new sync type transition
338
        // requests will be sent through. These requests should only be handled
339
        // when the gossip syncer is in a chansSynced state to ensure its state
340
        // machine behaves as expected.
341
        syncTransitionReqs chan *syncTransitionReq
342

343
        // historicalSyncReqs is a channel that serves as a signal for the
344
        // gossip syncer to perform a historical sync. These can only be done
345
        // once the gossip syncer is in a chansSynced state to ensure its state
346
        // machine behaves as expected.
347
        historicalSyncReqs chan *historicalSyncReq
348

349
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
350
        // that it should request the remote peer for all of its known channel
351
        // IDs starting from the genesis block of the chain. This can only
352
        // happen if the gossip syncer receives a request to attempt a
353
        // historical sync. It can be unset if the syncer ever transitions from
354
        // PassiveSync to ActiveSync.
355
        genHistoricalChanRangeQuery bool
356

357
        // gossipMsgs is a channel that all responses to our queries from the
358
        // target peer will be sent over, these will be read by the
359
        // channelGraphSyncer.
360
        gossipMsgs chan lnwire.Message
361

362
        // queryMsgs is a channel that all queries from the remote peer will be
363
        // received over, these will be read by the replyHandler.
364
        queryMsgs chan lnwire.Message
365

366
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
367
        // we've sent to a peer to ensure we've consumed all expected replies.
368
        // This field is primarily used within the waitingQueryChanReply state.
369
        curQueryRangeMsg *lnwire.QueryChannelRange
370

371
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
372
        // message we've received from a peer to ensure they've fully replied to
373
        // our query by ensuring they covered our requested block range. This
374
        // field is primarily used within the waitingQueryChanReply state.
375
        prevReplyChannelRange *lnwire.ReplyChannelRange
376

377
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
378
        // buffer all the chunked response to our query.
379
        bufferedChanRangeReplies []graphdb.ChannelUpdateInfo
380

381
        // numChanRangeRepliesRcvd is used to track the number of replies
382
        // received as part of a QueryChannelRange. This field is primarily used
383
        // within the waitingQueryChanReply state.
384
        numChanRangeRepliesRcvd uint32
385

386
        // newChansToQuery is used to pass the set of channels we should query
387
        // for from the waitingQueryChanReply state to the queryNewChannels
388
        // state.
389
        newChansToQuery []lnwire.ShortChannelID
390

391
        cfg gossipSyncerCfg
392

393
        // rateLimiter dictates the frequency with which we will reply to gossip
394
        // queries from a peer. This is used to delay responses to peers to
395
        // prevent DOS vulnerabilities if they are spamming with an unreasonable
396
        // number of queries.
397
        rateLimiter *rate.Limiter
398

399
        // syncedSignal is a channel that, if set, will be closed when the
400
        // GossipSyncer reaches its terminal chansSynced state.
401
        syncedSignal chan struct{}
402

403
        // syncerSema is used to more finely control the syncer's ability to
404
        // respond to gossip timestamp range messages.
405
        syncerSema chan struct{}
406

407
        sync.Mutex
408

409
        quit chan struct{}
410
        wg   sync.WaitGroup
411
}
412

413
// newGossipSyncer returns a new instance of the GossipSyncer populated using
414
// the passed config.
415
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
3✔
416
        // If no parameter was specified for max undelayed query replies, set it
3✔
417
        // to the default of 5 queries.
3✔
418
        if cfg.maxUndelayedQueryReplies <= 0 {
3✔
UNCOV
419
                cfg.maxUndelayedQueryReplies = DefaultMaxUndelayedQueryReplies
×
UNCOV
420
        }
×
421

422
        // If no parameter was specified for delayed query reply interval, set
423
        // to the default of 5 seconds.
424
        if cfg.delayedQueryReplyInterval <= 0 {
3✔
425
                cfg.delayedQueryReplyInterval = DefaultDelayedQueryReplyInterval
×
426
        }
×
427

428
        // Construct a rate limiter that will govern how frequently we reply to
429
        // gossip queries from this peer. The limiter will automatically adjust
430
        // during periods of quiescence, and increase the reply interval under
431
        // load.
432
        interval := rate.Every(cfg.delayedQueryReplyInterval)
3✔
433
        rateLimiter := rate.NewLimiter(
3✔
434
                interval, cfg.maxUndelayedQueryReplies,
3✔
435
        )
3✔
436

3✔
437
        return &GossipSyncer{
3✔
438
                cfg:                cfg,
3✔
439
                rateLimiter:        rateLimiter,
3✔
440
                syncTransitionReqs: make(chan *syncTransitionReq),
3✔
441
                historicalSyncReqs: make(chan *historicalSyncReq),
3✔
442
                gossipMsgs:         make(chan lnwire.Message, syncerBufferSize),
3✔
443
                queryMsgs:          make(chan lnwire.Message, syncerBufferSize),
3✔
444
                syncerSema:         sema,
3✔
445
                quit:               make(chan struct{}),
3✔
446
        }
3✔
447
}
448

449
// Start starts the GossipSyncer and any goroutines that it needs to carry out
450
// its duties.
451
func (g *GossipSyncer) Start() {
3✔
452
        g.started.Do(func() {
6✔
453
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
3✔
454

3✔
455
                // TODO(conner): only spawn channelGraphSyncer if remote
3✔
456
                // supports gossip queries, and only spawn replyHandler if we
3✔
457
                // advertise support
3✔
458
                if !g.cfg.noSyncChannels {
6✔
459
                        g.wg.Add(1)
3✔
460
                        go g.channelGraphSyncer()
3✔
461
                }
3✔
462
                if !g.cfg.noReplyQueries {
6✔
463
                        g.wg.Add(1)
3✔
464
                        go g.replyHandler()
3✔
465
                }
3✔
466
        })
467
}
468

469
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
470
// exited.
471
func (g *GossipSyncer) Stop() {
3✔
472
        g.stopped.Do(func() {
6✔
473
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
3✔
474
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
3✔
475

3✔
476
                close(g.quit)
3✔
477
                g.wg.Wait()
3✔
478
        })
3✔
479
}
480

481
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
482
// in this state, we will send a QueryChannelRange msg to our peer and advance
483
// the syncer's state to waitingQueryRangeReply.
484
func (g *GossipSyncer) handleSyncingChans() {
3✔
485
        // Prepare the query msg.
3✔
486
        queryRangeMsg, err := g.genChanRangeQuery(g.genHistoricalChanRangeQuery)
3✔
487
        if err != nil {
3✔
488
                log.Errorf("Unable to gen chan range query: %v", err)
×
489
                return
×
490
        }
×
491

492
        // Acquire a lock so the following state transition is atomic.
493
        //
494
        // NOTE: We must lock the following steps as it's possible we get an
495
        // immediate response (ReplyChannelRange) after sending the query msg.
496
        // The response is handled in ProcessQueryMsg, which requires the
497
        // current state to be waitingQueryRangeReply.
498
        g.Lock()
3✔
499
        defer g.Unlock()
3✔
500

3✔
501
        // Send the msg to the remote peer, which is non-blocking as
3✔
502
        // `sendToPeer` only queues the msg in Brontide.
3✔
503
        err = g.cfg.sendToPeer(queryRangeMsg)
3✔
504
        if err != nil {
3✔
505
                log.Errorf("Unable to send chan range query: %v", err)
×
506
                return
×
507
        }
×
508

509
        // With the message sent successfully, we'll transition into the next
510
        // state where we wait for their reply.
511
        g.setSyncState(waitingQueryRangeReply)
3✔
512
}
513

514
// channelGraphSyncer is the main goroutine responsible for ensuring that we
515
// properly channel graph state with the remote peer, and also that we only
516
// send them messages which actually pass their defined update horizon.
517
func (g *GossipSyncer) channelGraphSyncer() {
3✔
518
        defer g.wg.Done()
3✔
519

3✔
520
        for {
6✔
521
                state := g.syncState()
3✔
522
                syncType := g.SyncType()
3✔
523

3✔
524
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
3✔
525
                        g.cfg.peerPub[:], state, syncType)
3✔
526

3✔
527
                switch state {
3✔
528
                // When we're in this state, we're trying to synchronize our
529
                // view of the network with the remote peer. We'll kick off
530
                // this sync by asking them for the set of channels they
531
                // understand, as we'll as responding to any other queries by
532
                // them.
533
                case syncingChans:
3✔
534
                        g.handleSyncingChans()
3✔
535

536
                // In this state, we've sent out our initial channel range
537
                // query and are waiting for the final response from the remote
538
                // peer before we perform a diff to see with channels they know
539
                // of that we don't.
540
                case waitingQueryRangeReply:
3✔
541
                        // We'll wait to either process a new message from the
3✔
542
                        // remote party, or exit due to the gossiper exiting,
3✔
543
                        // or us being signalled to do so.
3✔
544
                        select {
3✔
545
                        case msg := <-g.gossipMsgs:
3✔
546
                                // The remote peer is sending a response to our
3✔
547
                                // initial query, we'll collate this response,
3✔
548
                                // and see if it's the final one in the series.
3✔
549
                                // If so, we can then transition to querying
3✔
550
                                // for the new channels.
3✔
551
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
3✔
552
                                if ok {
6✔
553
                                        err := g.processChanRangeReply(queryReply)
3✔
554
                                        if err != nil {
3✔
555
                                                log.Errorf("Unable to "+
×
556
                                                        "process chan range "+
×
557
                                                        "query: %v", err)
×
558
                                                return
×
559
                                        }
×
560
                                        continue
3✔
561
                                }
562

563
                                log.Warnf("Unexpected message: %T in state=%v",
×
564
                                        msg, state)
×
565

UNCOV
566
                        case <-g.quit:
×
UNCOV
567
                                return
×
568
                        }
569

570
                // We'll enter this state once we've discovered which channels
571
                // the remote party knows of that we don't yet know of
572
                // ourselves.
573
                case queryNewChannels:
3✔
574
                        // First, we'll attempt to continue our channel
3✔
575
                        // synchronization by continuing to send off another
3✔
576
                        // query chunk.
3✔
577
                        done := g.synchronizeChanIDs()
3✔
578

3✔
579
                        // If this wasn't our last query, then we'll need to
3✔
580
                        // transition to our waiting state.
3✔
581
                        if !done {
6✔
582
                                continue
3✔
583
                        }
584

585
                        // If we're fully synchronized, then we can transition
586
                        // to our terminal state.
587
                        g.setSyncState(chansSynced)
3✔
588

3✔
589
                        // Ensure that the sync manager becomes aware that the
3✔
590
                        // historical sync completed so synced_to_graph is
3✔
591
                        // updated over rpc.
3✔
592
                        g.cfg.markGraphSynced()
3✔
593

594
                // In this state, we've just sent off a new query for channels
595
                // that we don't yet know of. We'll remain in this state until
596
                // the remote party signals they've responded to our query in
597
                // totality.
598
                case waitingQueryChanReply:
3✔
599
                        // Once we've sent off our query, we'll wait for either
3✔
600
                        // an ending reply, or just another query from the
3✔
601
                        // remote peer.
3✔
602
                        select {
3✔
603
                        case msg := <-g.gossipMsgs:
3✔
604
                                // If this is the final reply to one of our
3✔
605
                                // queries, then we'll loop back into our query
3✔
606
                                // state to send of the remaining query chunks.
3✔
607
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
3✔
608
                                if ok {
6✔
609
                                        g.setSyncState(queryNewChannels)
3✔
610
                                        continue
3✔
611
                                }
612

613
                                log.Warnf("Unexpected message: %T in state=%v",
×
614
                                        msg, state)
×
615

616
                        case <-g.quit:
×
617
                                return
×
618
                        }
619

620
                // This is our final terminal state where we'll only reply to
621
                // any further queries by the remote peer.
622
                case chansSynced:
3✔
623
                        g.Lock()
3✔
624
                        if g.syncedSignal != nil {
6✔
625
                                close(g.syncedSignal)
3✔
626
                                g.syncedSignal = nil
3✔
627
                        }
3✔
628
                        g.Unlock()
3✔
629

3✔
630
                        // If we haven't yet sent out our update horizon, and
3✔
631
                        // we want to receive real-time channel updates, we'll
3✔
632
                        // do so now.
3✔
633
                        if g.localUpdateHorizon == nil &&
3✔
634
                                syncType.IsActiveSync() {
6✔
635

3✔
636
                                err := g.sendGossipTimestampRange(
3✔
637
                                        time.Now(), math.MaxUint32,
3✔
638
                                )
3✔
639
                                if err != nil {
3✔
640
                                        log.Errorf("Unable to send update "+
×
641
                                                "horizon to %x: %v",
×
642
                                                g.cfg.peerPub, err)
×
643
                                }
×
644
                        }
645
                        // With our horizon set, we'll simply reply to any new
646
                        // messages or process any state transitions and exit if
647
                        // needed.
648
                        fallthrough
3✔
649

650
                // Pinned peers will begin in this state, since they will
651
                // immediately receive a request to perform a historical sync.
652
                // Otherwise, we fall through after ending in chansSynced to
653
                // facilitate new requests.
654
                case syncerIdle:
3✔
655
                        select {
3✔
656
                        case req := <-g.syncTransitionReqs:
3✔
657
                                req.errChan <- g.handleSyncTransition(req)
3✔
658

659
                        case req := <-g.historicalSyncReqs:
3✔
660
                                g.handleHistoricalSync(req)
3✔
661

662
                        case <-g.quit:
3✔
663
                                return
3✔
664
                        }
665
                }
666
        }
667
}
668

669
// replyHandler is an event loop whose sole purpose is to reply to the remote
670
// peers queries. Our replyHandler will respond to messages generated by their
671
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
672
// the other's replyHandler, allowing the replyHandler to operate independently
673
// from the state machine maintained on the same node.
674
//
675
// NOTE: This method MUST be run as a goroutine.
676
func (g *GossipSyncer) replyHandler() {
3✔
677
        defer g.wg.Done()
3✔
678

3✔
679
        for {
6✔
680
                select {
3✔
681
                case msg := <-g.queryMsgs:
3✔
682
                        err := g.replyPeerQueries(msg)
3✔
683
                        switch {
3✔
684
                        case err == ErrGossipSyncerExiting:
×
685
                                return
×
686

687
                        case err == lnpeer.ErrPeerExiting:
×
688
                                return
×
689

690
                        case err != nil:
×
691
                                log.Errorf("Unable to reply to peer "+
×
692
                                        "query: %v", err)
×
693
                        }
694

695
                case <-g.quit:
3✔
696
                        return
3✔
697
                }
698
        }
699
}
700

701
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
702
// syncer and sends it to the remote peer.
703
func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
704
        timestampRange uint32) error {
3✔
705

3✔
706
        endTimestamp := firstTimestamp.Add(
3✔
707
                time.Duration(timestampRange) * time.Second,
3✔
708
        )
3✔
709

3✔
710
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
3✔
711
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
3✔
712

3✔
713
        localUpdateHorizon := &lnwire.GossipTimestampRange{
3✔
714
                ChainHash:      g.cfg.chainHash,
3✔
715
                FirstTimestamp: uint32(firstTimestamp.Unix()),
3✔
716
                TimestampRange: timestampRange,
3✔
717
        }
3✔
718

3✔
719
        if err := g.cfg.sendToPeer(localUpdateHorizon); err != nil {
3✔
720
                return err
×
721
        }
×
722

723
        if firstTimestamp == zeroTimestamp && timestampRange == 0 {
3✔
UNCOV
724
                g.localUpdateHorizon = nil
×
725
        } else {
3✔
726
                g.localUpdateHorizon = localUpdateHorizon
3✔
727
        }
3✔
728

729
        return nil
3✔
730
}
731

732
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
733
// the remote peer for its known set of channel IDs within a particular block
734
// range. This method will be called continually until the entire range has
735
// been queried for with a response received. We'll chunk our requests as
736
// required to ensure they fit into a single message. We may re-renter this
737
// state in the case that chunking is required.
738
func (g *GossipSyncer) synchronizeChanIDs() bool {
3✔
739
        // If we're in this state yet there are no more new channels to query
3✔
740
        // for, then we'll transition to our final synced state and return true
3✔
741
        // to signal that we're fully synchronized.
3✔
742
        if len(g.newChansToQuery) == 0 {
6✔
743
                log.Infof("GossipSyncer(%x): no more chans to query",
3✔
744
                        g.cfg.peerPub[:])
3✔
745

3✔
746
                return true
3✔
747
        }
3✔
748

749
        // Otherwise, we'll issue our next chunked query to receive replies
750
        // for.
751
        var queryChunk []lnwire.ShortChannelID
3✔
752

3✔
753
        // If the number of channels to query for is less than the chunk size,
3✔
754
        // then we can issue a single query.
3✔
755
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
6✔
756
                queryChunk = g.newChansToQuery
3✔
757
                g.newChansToQuery = nil
3✔
758

3✔
759
        } else {
3✔
UNCOV
760
                // Otherwise, we'll need to only query for the next chunk.
×
UNCOV
761
                // We'll slice into our query chunk, then slide down our main
×
UNCOV
762
                // pointer down by the chunk size.
×
UNCOV
763
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
×
UNCOV
764
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
×
UNCOV
765
        }
×
766

767
        log.Infof("GossipSyncer(%x): querying for %v new channels",
3✔
768
                g.cfg.peerPub[:], len(queryChunk))
3✔
769

3✔
770
        // Change the state before sending the query msg.
3✔
771
        g.setSyncState(waitingQueryChanReply)
3✔
772

3✔
773
        // With our chunk obtained, we'll send over our next query, then return
3✔
774
        // false indicating that we're net yet fully synced.
3✔
775
        err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
3✔
776
                ChainHash:    g.cfg.chainHash,
3✔
777
                EncodingType: lnwire.EncodingSortedPlain,
3✔
778
                ShortChanIDs: queryChunk,
3✔
779
        })
3✔
780
        if err != nil {
3✔
781
                log.Errorf("Unable to sync chan IDs: %v", err)
×
782
        }
×
783

784
        return false
3✔
785
}
786

787
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
788
// considered legacy. There was a point where lnd used to include the same query
789
// over multiple replies, rather than including the portion of the query the
790
// reply is handling. We'll use this as a way of detecting whether we are
791
// communicating with a legacy node so we can properly sync with them.
792
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
793
        reply *lnwire.ReplyChannelRange) bool {
3✔
794

3✔
795
        return (reply.ChainHash == query.ChainHash &&
3✔
796
                reply.FirstBlockHeight == query.FirstBlockHeight &&
3✔
797
                reply.NumBlocks == query.NumBlocks)
3✔
798
}
3✔
799

800
// processChanRangeReply is called each time the GossipSyncer receives a new
801
// reply to the initial range query to discover new channels that it didn't
802
// previously know of.
803
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
3✔
804
        // isStale returns whether the timestamp is too far into the past.
3✔
805
        isStale := func(timestamp time.Time) bool {
6✔
806
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
3✔
807
        }
3✔
808

809
        // isSkewed returns whether the timestamp is too far into the future.
810
        isSkewed := func(timestamp time.Time) bool {
6✔
811
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
3✔
812
        }
3✔
813

814
        // If we're not communicating with a legacy node, we'll apply some
815
        // further constraints on their reply to ensure it satisfies our query.
816
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
3✔
UNCOV
817
                // The first block should be within our original request.
×
UNCOV
818
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
×
819
                        return fmt.Errorf("reply includes channels for height "+
×
820
                                "%v prior to query %v", msg.FirstBlockHeight,
×
821
                                g.curQueryRangeMsg.FirstBlockHeight)
×
822
                }
×
823

824
                // The last block should also be. We don't need to check the
825
                // intermediate ones because they should already be in sorted
826
                // order.
UNCOV
827
                replyLastHeight := msg.LastBlockHeight()
×
UNCOV
828
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
×
UNCOV
829
                if replyLastHeight > queryLastHeight {
×
830
                        return fmt.Errorf("reply includes channels for height "+
×
831
                                "%v after query %v", replyLastHeight,
×
832
                                queryLastHeight)
×
833
                }
×
834

835
                // If we've previously received a reply for this query, look at
836
                // its last block to ensure the current reply properly follows
837
                // it.
UNCOV
838
                if g.prevReplyChannelRange != nil {
×
UNCOV
839
                        prevReply := g.prevReplyChannelRange
×
UNCOV
840
                        prevReplyLastHeight := prevReply.LastBlockHeight()
×
UNCOV
841

×
UNCOV
842
                        // The current reply can either start from the previous
×
UNCOV
843
                        // reply's last block, if there are still more channels
×
UNCOV
844
                        // for the same block, or the block after.
×
UNCOV
845
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
×
UNCOV
846
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
×
847

×
848
                                return fmt.Errorf("first block of reply %v "+
×
849
                                        "does not continue from last block of "+
×
850
                                        "previous %v", msg.FirstBlockHeight,
×
851
                                        prevReplyLastHeight)
×
852
                        }
×
853
                }
854
        }
855

856
        g.prevReplyChannelRange = msg
3✔
857

3✔
858
        for i, scid := range msg.ShortChanIDs {
6✔
859
                info := graphdb.NewChannelUpdateInfo(
3✔
860
                        scid, time.Time{}, time.Time{},
3✔
861
                )
3✔
862

3✔
863
                if len(msg.Timestamps) != 0 {
6✔
864
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
3✔
865
                        info.Node1UpdateTimestamp = t1
3✔
866

3✔
867
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
3✔
868
                        info.Node2UpdateTimestamp = t2
3✔
869

3✔
870
                        // Sort out all channels with outdated or skewed
3✔
871
                        // timestamps. Both timestamps need to be out of
3✔
872
                        // boundaries for us to skip the channel and not query
3✔
873
                        // it later on.
3✔
874
                        switch {
3✔
875
                        case isStale(info.Node1UpdateTimestamp) &&
UNCOV
876
                                isStale(info.Node2UpdateTimestamp):
×
UNCOV
877

×
UNCOV
878
                                continue
×
879

880
                        case isSkewed(info.Node1UpdateTimestamp) &&
UNCOV
881
                                isSkewed(info.Node2UpdateTimestamp):
×
UNCOV
882

×
UNCOV
883
                                continue
×
884

885
                        case isStale(info.Node1UpdateTimestamp) &&
UNCOV
886
                                isSkewed(info.Node2UpdateTimestamp):
×
UNCOV
887

×
UNCOV
888
                                continue
×
889

890
                        case isStale(info.Node2UpdateTimestamp) &&
UNCOV
891
                                isSkewed(info.Node1UpdateTimestamp):
×
UNCOV
892

×
UNCOV
893
                                continue
×
894
                        }
895
                }
896

897
                g.bufferedChanRangeReplies = append(
3✔
898
                        g.bufferedChanRangeReplies, info,
3✔
899
                )
3✔
900
        }
901

902
        switch g.cfg.encodingType {
3✔
903
        case lnwire.EncodingSortedPlain:
3✔
904
                g.numChanRangeRepliesRcvd++
3✔
905
        case lnwire.EncodingSortedZlib:
×
906
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
907
        default:
×
908
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
909
        }
910

911
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
3✔
912
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
3✔
913

3✔
914
        // If this isn't the last response and we can continue to receive more,
3✔
915
        // then we can exit as we've already buffered the latest portion of the
3✔
916
        // streaming reply.
3✔
917
        maxReplies := g.cfg.maxQueryChanRangeReplies
3✔
918
        switch {
3✔
919
        // If we're communicating with a legacy node, we'll need to look at the
920
        // complete field.
921
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
3✔
922
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
3✔
UNCOV
923
                        return nil
×
UNCOV
924
                }
×
925

926
        // Otherwise, we'll look at the reply's height range.
UNCOV
927
        default:
×
UNCOV
928
                replyLastHeight := msg.LastBlockHeight()
×
UNCOV
929
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
×
UNCOV
930

×
UNCOV
931
                // TODO(wilmer): This might require some padding if the remote
×
UNCOV
932
                // node is not aware of the last height we sent them, i.e., is
×
UNCOV
933
                // behind a few blocks from us.
×
UNCOV
934
                if replyLastHeight < queryLastHeight &&
×
UNCOV
935
                        g.numChanRangeRepliesRcvd < maxReplies {
×
UNCOV
936

×
UNCOV
937
                        return nil
×
UNCOV
938
                }
×
939
        }
940

941
        log.Infof("GossipSyncer(%x): filtering through %v chans",
3✔
942
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
3✔
943

3✔
944
        // Otherwise, this is the final response, so we'll now check to see
3✔
945
        // which channels they know of that we don't.
3✔
946
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
3✔
947
                g.cfg.chainHash, g.bufferedChanRangeReplies,
3✔
948
                g.cfg.isStillZombieChannel,
3✔
949
        )
3✔
950
        if err != nil {
3✔
951
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
952
        }
×
953

954
        // As we've received the entirety of the reply, we no longer need to
955
        // hold on to the set of buffered replies or the original query that
956
        // prompted the replies, so we'll let that be garbage collected now.
957
        g.curQueryRangeMsg = nil
3✔
958
        g.prevReplyChannelRange = nil
3✔
959
        g.bufferedChanRangeReplies = nil
3✔
960
        g.numChanRangeRepliesRcvd = 0
3✔
961

3✔
962
        // If there aren't any channels that we don't know of, then we can
3✔
963
        // switch straight to our terminal state.
3✔
964
        if len(newChans) == 0 {
6✔
965
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
3✔
966
                        g.cfg.peerPub[:])
3✔
967

3✔
968
                g.setSyncState(chansSynced)
3✔
969

3✔
970
                // Ensure that the sync manager becomes aware that the
3✔
971
                // historical sync completed so synced_to_graph is updated over
3✔
972
                // rpc.
3✔
973
                g.cfg.markGraphSynced()
3✔
974
                return nil
3✔
975
        }
3✔
976

977
        // Otherwise, we'll set the set of channels that we need to query for
978
        // the next state, and also transition our state.
979
        g.newChansToQuery = newChans
3✔
980
        g.setSyncState(queryNewChannels)
3✔
981

3✔
982
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
3✔
983
                g.cfg.peerPub[:], len(newChans))
3✔
984

3✔
985
        return nil
3✔
986
}
987

988
// genChanRangeQuery generates the initial message we'll send to the remote
989
// party when we're kicking off the channel graph synchronization upon
990
// connection. The historicalQuery boolean can be used to generate a query from
991
// the genesis block of the chain.
992
func (g *GossipSyncer) genChanRangeQuery(
993
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
3✔
994

3✔
995
        // First, we'll query our channel graph time series for its highest
3✔
996
        // known channel ID.
3✔
997
        newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
3✔
998
        if err != nil {
3✔
999
                return nil, err
×
1000
        }
×
1001

1002
        // Once we have the chan ID of the newest, we'll obtain the block height
1003
        // of the channel, then subtract our default horizon to ensure we don't
1004
        // miss any channels. By default, we go back 1 day from the newest
1005
        // channel, unless we're attempting a historical sync, where we'll
1006
        // actually start from the genesis block instead.
1007
        var startHeight uint32
3✔
1008
        switch {
3✔
1009
        case historicalQuery:
3✔
1010
                fallthrough
3✔
1011
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
3✔
1012
                startHeight = 0
3✔
UNCOV
1013
        default:
×
UNCOV
1014
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
×
1015
        }
1016

1017
        // Determine the number of blocks to request based on our best height.
1018
        // We'll take into account any potential underflows and explicitly set
1019
        // numBlocks to its minimum value of 1 if so.
1020
        bestHeight := g.cfg.bestHeight()
3✔
1021
        numBlocks := bestHeight - startHeight
3✔
1022
        if int64(numBlocks) < 1 {
3✔
1023
                numBlocks = 1
×
1024
        }
×
1025

1026
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
3✔
1027
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
3✔
1028

3✔
1029
        // Finally, we'll craft the channel range query, using our starting
3✔
1030
        // height, then asking for all known channels to the foreseeable end of
3✔
1031
        // the main chain.
3✔
1032
        query := &lnwire.QueryChannelRange{
3✔
1033
                ChainHash:        g.cfg.chainHash,
3✔
1034
                FirstBlockHeight: startHeight,
3✔
1035
                NumBlocks:        numBlocks,
3✔
1036
        }
3✔
1037

3✔
1038
        if !g.cfg.noTimestampQueryOption {
6✔
1039
                query.QueryOptions = lnwire.NewTimestampQueryOption()
3✔
1040
        }
3✔
1041

1042
        g.curQueryRangeMsg = query
3✔
1043

3✔
1044
        return query, nil
3✔
1045
}
1046

1047
// replyPeerQueries is called in response to any query by the remote peer.
1048
// We'll examine our state and send back our best response.
1049
func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error {
3✔
1050
        reservation := g.rateLimiter.Reserve()
3✔
1051
        delay := reservation.Delay()
3✔
1052

3✔
1053
        // If we've already replied a handful of times, we will start to delay
3✔
1054
        // responses back to the remote peer. This can help prevent DOS attacks
3✔
1055
        // where the remote peer spams us endlessly.
3✔
1056
        if delay > 0 {
3✔
UNCOV
1057
                log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
×
UNCOV
1058
                        "responding in %s", g.cfg.peerPub[:], delay)
×
UNCOV
1059

×
UNCOV
1060
                select {
×
UNCOV
1061
                case <-time.After(delay):
×
1062
                case <-g.quit:
×
1063
                        return ErrGossipSyncerExiting
×
1064
                }
1065
        }
1066

1067
        switch msg := msg.(type) {
3✔
1068

1069
        // In this state, we'll also handle any incoming channel range queries
1070
        // from the remote peer as they're trying to sync their state as well.
1071
        case *lnwire.QueryChannelRange:
3✔
1072
                return g.replyChanRangeQuery(msg)
3✔
1073

1074
        // If the remote peer skips straight to requesting new channels that
1075
        // they don't know of, then we'll ensure that we also handle this case.
1076
        case *lnwire.QueryShortChanIDs:
3✔
1077
                return g.replyShortChanIDs(msg)
3✔
1078

1079
        default:
×
1080
                return fmt.Errorf("unknown message: %T", msg)
×
1081
        }
1082
}
1083

1084
// replyChanRangeQuery will be dispatched in response to a channel range query
1085
// by the remote node. We'll query the channel time series for channels that
1086
// meet the channel range, then chunk our responses to the remote node. We also
1087
// ensure that our final fragment carries the "complete" bit to indicate the
1088
// end of our streaming response.
1089
func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
3✔
1090
        // Before responding, we'll check to ensure that the remote peer is
3✔
1091
        // querying for the same chain that we're on. If not, we'll send back a
3✔
1092
        // response with a complete value of zero to indicate we're on a
3✔
1093
        // different chain.
3✔
1094
        if g.cfg.chainHash != query.ChainHash {
3✔
UNCOV
1095
                log.Warnf("Remote peer requested QueryChannelRange for "+
×
UNCOV
1096
                        "chain=%v, we're on chain=%v", query.ChainHash,
×
UNCOV
1097
                        g.cfg.chainHash)
×
UNCOV
1098

×
UNCOV
1099
                return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
×
UNCOV
1100
                        ChainHash:        query.ChainHash,
×
UNCOV
1101
                        FirstBlockHeight: query.FirstBlockHeight,
×
UNCOV
1102
                        NumBlocks:        query.NumBlocks,
×
UNCOV
1103
                        Complete:         0,
×
UNCOV
1104
                        EncodingType:     g.cfg.encodingType,
×
UNCOV
1105
                        ShortChanIDs:     nil,
×
UNCOV
1106
                })
×
UNCOV
1107
        }
×
1108

1109
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
3✔
1110
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
3✔
1111
                query.NumBlocks)
3✔
1112

3✔
1113
        // Check if the query asked for timestamps. We will only serve
3✔
1114
        // timestamps if this has not been disabled with
3✔
1115
        // noTimestampQueryOption.
3✔
1116
        withTimestamps := query.WithTimestamps() &&
3✔
1117
                !g.cfg.noTimestampQueryOption
3✔
1118

3✔
1119
        // Next, we'll consult the time series to obtain the set of known
3✔
1120
        // channel ID's that match their query.
3✔
1121
        startBlock := query.FirstBlockHeight
3✔
1122
        endBlock := query.LastBlockHeight()
3✔
1123
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
3✔
1124
                query.ChainHash, startBlock, endBlock, withTimestamps,
3✔
1125
        )
3✔
1126
        if err != nil {
3✔
1127
                return err
×
1128
        }
×
1129

1130
        // TODO(roasbeef): means can't send max uint above?
1131
        //  * or make internal 64
1132

1133
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1134
        // this as there's a transport message size limit which we'll need to
1135
        // adhere to. We also need to make sure all of our replies cover the
1136
        // expected range of the query.
1137
        sendReplyForChunk := func(channelChunk []graphdb.ChannelUpdateInfo,
3✔
1138
                firstHeight, lastHeight uint32, finalChunk bool) error {
6✔
1139

3✔
1140
                // The number of blocks contained in the current chunk (the
3✔
1141
                // total span) is the difference between the last channel ID and
3✔
1142
                // the first in the range. We add one as even if all channels
3✔
1143
                // returned are in the same block, we need to count that.
3✔
1144
                numBlocks := lastHeight - firstHeight + 1
3✔
1145
                complete := uint8(0)
3✔
1146
                if finalChunk {
6✔
1147
                        complete = 1
3✔
1148
                }
3✔
1149

1150
                var timestamps lnwire.Timestamps
3✔
1151
                if withTimestamps {
6✔
1152
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
3✔
1153
                }
3✔
1154

1155
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
3✔
1156
                for i, info := range channelChunk {
6✔
1157
                        scids[i] = info.ShortChannelID
3✔
1158

3✔
1159
                        if !withTimestamps {
3✔
UNCOV
1160
                                continue
×
1161
                        }
1162

1163
                        timestamps[i].Timestamp1 = uint32(
3✔
1164
                                info.Node1UpdateTimestamp.Unix(),
3✔
1165
                        )
3✔
1166

3✔
1167
                        timestamps[i].Timestamp2 = uint32(
3✔
1168
                                info.Node2UpdateTimestamp.Unix(),
3✔
1169
                        )
3✔
1170
                }
1171

1172
                return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
3✔
1173
                        ChainHash:        query.ChainHash,
3✔
1174
                        NumBlocks:        numBlocks,
3✔
1175
                        FirstBlockHeight: firstHeight,
3✔
1176
                        Complete:         complete,
3✔
1177
                        EncodingType:     g.cfg.encodingType,
3✔
1178
                        ShortChanIDs:     scids,
3✔
1179
                        Timestamps:       timestamps,
3✔
1180
                })
3✔
1181
        }
1182

1183
        var (
3✔
1184
                firstHeight  = query.FirstBlockHeight
3✔
1185
                lastHeight   uint32
3✔
1186
                channelChunk []graphdb.ChannelUpdateInfo
3✔
1187
        )
3✔
1188

3✔
1189
        // chunkSize is the maximum number of SCIDs that we can safely put in a
3✔
1190
        // single message. If we also need to include timestamps though, then
3✔
1191
        // this number is halved since encoding two timestamps takes the same
3✔
1192
        // number of bytes as encoding an SCID.
3✔
1193
        chunkSize := g.cfg.chunkSize
3✔
1194
        if withTimestamps {
6✔
1195
                chunkSize /= 2
3✔
1196
        }
3✔
1197

1198
        for _, channelRange := range channelRanges {
6✔
1199
                channels := channelRange.Channels
3✔
1200
                numChannels := int32(len(channels))
3✔
1201
                numLeftToAdd := chunkSize - int32(len(channelChunk))
3✔
1202

3✔
1203
                // Include the current block in the ongoing chunk if it can fit
3✔
1204
                // and move on to the next block.
3✔
1205
                if numChannels <= numLeftToAdd {
6✔
1206
                        channelChunk = append(channelChunk, channels...)
3✔
1207
                        continue
3✔
1208
                }
1209

1210
                // Otherwise, we need to send our existing channel chunk as is
1211
                // as its own reply and start a new one for the current block.
1212
                // We'll mark the end of our current chunk as the height before
1213
                // the current block to ensure the whole query range is replied
1214
                // to.
UNCOV
1215
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
×
UNCOV
1216
                        g.cfg.peerPub[:], len(channelChunk))
×
UNCOV
1217

×
UNCOV
1218
                lastHeight = channelRange.Height - 1
×
UNCOV
1219
                err := sendReplyForChunk(
×
UNCOV
1220
                        channelChunk, firstHeight, lastHeight, false,
×
UNCOV
1221
                )
×
UNCOV
1222
                if err != nil {
×
1223
                        return err
×
1224
                }
×
1225

1226
                // With the reply constructed, we'll start tallying channels for
1227
                // our next one keeping in mind our chunk size. This may result
1228
                // in channels for this block being left out from the reply, but
1229
                // this isn't an issue since we'll randomly shuffle them and we
1230
                // assume a historical gossip sync is performed at a later time.
UNCOV
1231
                firstHeight = channelRange.Height
×
UNCOV
1232
                finalChunkSize := numChannels
×
UNCOV
1233
                exceedsChunkSize := numChannels > chunkSize
×
UNCOV
1234
                if exceedsChunkSize {
×
1235
                        rand.Shuffle(len(channels), func(i, j int) {
×
1236
                                channels[i], channels[j] = channels[j], channels[i]
×
1237
                        })
×
1238
                        finalChunkSize = chunkSize
×
1239
                }
UNCOV
1240
                channelChunk = channels[:finalChunkSize]
×
UNCOV
1241

×
UNCOV
1242
                // Sort the chunk once again if we had to shuffle it.
×
UNCOV
1243
                if exceedsChunkSize {
×
1244
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1245
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1246
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1247

×
1248
                                return id1 < id2
×
1249
                        })
×
1250
                }
1251
        }
1252

1253
        // Send the remaining chunk as the final reply.
1254
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
3✔
1255
                g.cfg.peerPub[:], len(channelChunk))
3✔
1256

3✔
1257
        return sendReplyForChunk(
3✔
1258
                channelChunk, firstHeight, query.LastBlockHeight(), true,
3✔
1259
        )
3✔
1260
}
1261

1262
// replyShortChanIDs will be dispatched in response to a query by the remote
1263
// node for information concerning a set of short channel ID's. Our response
1264
// will be sent in a streaming chunked manner to ensure that we remain below
1265
// the current transport level message size.
1266
func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error {
3✔
1267
        // Before responding, we'll check to ensure that the remote peer is
3✔
1268
        // querying for the same chain that we're on. If not, we'll send back a
3✔
1269
        // response with a complete value of zero to indicate we're on a
3✔
1270
        // different chain.
3✔
1271
        if g.cfg.chainHash != query.ChainHash {
3✔
UNCOV
1272
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
×
UNCOV
1273
                        "chain=%v, we're on chain=%v", query.ChainHash,
×
UNCOV
1274
                        g.cfg.chainHash)
×
UNCOV
1275

×
UNCOV
1276
                return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
×
UNCOV
1277
                        ChainHash: query.ChainHash,
×
UNCOV
1278
                        Complete:  0,
×
UNCOV
1279
                })
×
UNCOV
1280
        }
×
1281

1282
        if len(query.ShortChanIDs) == 0 {
3✔
1283
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1284
                        g.cfg.peerPub[:])
×
1285
                return nil
×
1286
        }
×
1287

1288
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
3✔
1289
                g.cfg.peerPub[:], len(query.ShortChanIDs))
3✔
1290

3✔
1291
        // Now that we know we're on the same chain, we'll query the channel
3✔
1292
        // time series for the set of messages that we know of which satisfies
3✔
1293
        // the requirement of being a chan ann, chan update, or a node ann
3✔
1294
        // related to the set of queried channels.
3✔
1295
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
3✔
1296
                query.ChainHash, query.ShortChanIDs,
3✔
1297
        )
3✔
1298
        if err != nil {
3✔
1299
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1300
                        query.ShortChanIDs[0].ToUint64(), err)
×
1301
        }
×
1302

1303
        // Reply with any messages related to those channel ID's, we'll write
1304
        // each one individually and synchronously to throttle the sends and
1305
        // perform buffering of responses in the syncer as opposed to the peer.
1306
        for _, msg := range replyMsgs {
6✔
1307
                err := g.cfg.sendToPeerSync(msg)
3✔
1308
                if err != nil {
3✔
1309
                        return err
×
1310
                }
×
1311
        }
1312

1313
        // Regardless of whether we had any messages to reply with, send over
1314
        // the sentinel message to signal that the stream has terminated.
1315
        return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
3✔
1316
                ChainHash: query.ChainHash,
3✔
1317
                Complete:  1,
3✔
1318
        })
3✔
1319
}
1320

1321
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1322
// state machine. Once applied, we'll ensure that we don't forward any messages
1323
// to the peer that aren't within the time range of the filter.
1324
func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error {
3✔
1325
        g.Lock()
3✔
1326

3✔
1327
        g.remoteUpdateHorizon = filter
3✔
1328

3✔
1329
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
3✔
1330
        endTime := startTime.Add(
3✔
1331
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
3✔
1332
        )
3✔
1333

3✔
1334
        g.Unlock()
3✔
1335

3✔
1336
        // If requested, don't reply with historical gossip data when the remote
3✔
1337
        // peer sets their gossip timestamp range.
3✔
1338
        if g.cfg.ignoreHistoricalFilters {
3✔
UNCOV
1339
                return nil
×
UNCOV
1340
        }
×
1341

1342
        select {
3✔
1343
        case <-g.syncerSema:
3✔
1344
        case <-g.quit:
×
1345
                return ErrGossipSyncerExiting
×
1346
        }
1347

1348
        // We don't put this in a defer because if the goroutine is launched,
1349
        // it needs to be called when the goroutine is stopped.
1350
        returnSema := func() {
6✔
1351
                g.syncerSema <- struct{}{}
3✔
1352
        }
3✔
1353

1354
        // Now that the remote peer has applied their filter, we'll query the
1355
        // database for all the messages that are beyond this filter.
1356
        newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
3✔
1357
                g.cfg.chainHash, startTime, endTime,
3✔
1358
        )
3✔
1359
        if err != nil {
3✔
1360
                returnSema()
×
1361
                return err
×
1362
        }
×
1363

1364
        log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
3✔
1365
                "start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
3✔
1366
                startTime, endTime, len(newUpdatestoSend))
3✔
1367

3✔
1368
        // If we don't have any to send, then we can return early.
3✔
1369
        if len(newUpdatestoSend) == 0 {
6✔
1370
                returnSema()
3✔
1371
                return nil
3✔
1372
        }
3✔
1373

1374
        // We'll conclude by launching a goroutine to send out any updates.
1375
        g.wg.Add(1)
3✔
1376
        go func() {
6✔
1377
                defer g.wg.Done()
3✔
1378
                defer returnSema()
3✔
1379

3✔
1380
                for _, msg := range newUpdatestoSend {
6✔
1381
                        err := g.cfg.sendToPeerSync(msg)
3✔
1382
                        switch {
3✔
1383
                        case err == ErrGossipSyncerExiting:
×
1384
                                return
×
1385

1386
                        case err == lnpeer.ErrPeerExiting:
×
1387
                                return
×
1388

1389
                        case err != nil:
×
1390
                                log.Errorf("Unable to send message for "+
×
1391
                                        "peer catch up: %v", err)
×
1392
                        }
1393
                }
1394
        }()
1395

1396
        return nil
3✔
1397
}
1398

1399
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1400
// iff the message is within the bounds of their set gossip filter. If the peer
1401
// doesn't have a gossip filter set, then no messages will be forwarded.
1402
func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
3✔
1403
        // If the peer doesn't have an update horizon set, then we won't send
3✔
1404
        // it any new update messages.
3✔
1405
        if g.remoteUpdateHorizon == nil {
6✔
1406
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
3✔
1407
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
3✔
1408
                return
3✔
1409
        }
3✔
1410

1411
        // If we've been signaled to exit, or are exiting, then we'll stop
1412
        // short.
1413
        select {
3✔
1414
        case <-g.quit:
×
1415
                return
×
1416
        default:
3✔
1417
        }
1418

1419
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1420
        // gossiper on peer termination to signal peer disconnect?
1421

1422
        var err error
3✔
1423

3✔
1424
        // Before we filter out the messages, we'll construct an index over the
3✔
1425
        // set of channel announcements and channel updates. This will allow us
3✔
1426
        // to quickly check if we should forward a chan ann, based on the known
3✔
1427
        // channel updates for a channel.
3✔
1428
        chanUpdateIndex := make(
3✔
1429
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
3✔
1430
        )
3✔
1431
        for _, msg := range msgs {
6✔
1432
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
3✔
1433
                if !ok {
6✔
1434
                        continue
3✔
1435
                }
1436

1437
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
3✔
1438
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
3✔
1439
                )
3✔
1440
        }
1441

1442
        // We'll construct a helper function that we'll us below to determine
1443
        // if a given messages passes the gossip msg filter.
1444
        g.Lock()
3✔
1445
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
3✔
1446
        endTime := startTime.Add(
3✔
1447
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
3✔
1448
        )
3✔
1449
        g.Unlock()
3✔
1450

3✔
1451
        passesFilter := func(timeStamp uint32) bool {
6✔
1452
                t := time.Unix(int64(timeStamp), 0)
3✔
1453
                return t.Equal(startTime) ||
3✔
1454
                        (t.After(startTime) && t.Before(endTime))
3✔
1455
        }
3✔
1456

1457
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
3✔
1458
        for _, msg := range msgs {
6✔
1459
                // If the target peer is the peer that sent us this message,
3✔
1460
                // then we'll exit early as we don't need to filter this
3✔
1461
                // message.
3✔
1462
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
6✔
1463
                        continue
3✔
1464
                }
1465

1466
                switch msg := msg.msg.(type) {
3✔
1467

1468
                // For each channel announcement message, we'll only send this
1469
                // message if the channel updates for the channel are between
1470
                // our time range.
1471
                case *lnwire.ChannelAnnouncement1:
3✔
1472
                        // First, we'll check if the channel updates are in
3✔
1473
                        // this message batch.
3✔
1474
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
3✔
1475
                        if !ok {
6✔
1476
                                // If not, we'll attempt to query the database
3✔
1477
                                // to see if we know of the updates.
3✔
1478
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
3✔
1479
                                        g.cfg.chainHash, msg.ShortChannelID,
3✔
1480
                                )
3✔
1481
                                if err != nil {
3✔
1482
                                        log.Warnf("no channel updates found for "+
×
1483
                                                "short_chan_id=%v",
×
1484
                                                msg.ShortChannelID)
×
1485
                                        continue
×
1486
                                }
1487
                        }
1488

1489
                        for _, chanUpdate := range chanUpdates {
6✔
1490
                                if passesFilter(chanUpdate.Timestamp) {
6✔
1491
                                        msgsToSend = append(msgsToSend, msg)
3✔
1492
                                        break
3✔
1493
                                }
1494
                        }
1495

1496
                        if len(chanUpdates) == 0 {
6✔
1497
                                msgsToSend = append(msgsToSend, msg)
3✔
1498
                        }
3✔
1499

1500
                // For each channel update, we'll only send if it the timestamp
1501
                // is between our time range.
1502
                case *lnwire.ChannelUpdate1:
3✔
1503
                        if passesFilter(msg.Timestamp) {
6✔
1504
                                msgsToSend = append(msgsToSend, msg)
3✔
1505
                        }
3✔
1506

1507
                // Similarly, we only send node announcements if the update
1508
                // timestamp ifs between our set gossip filter time range.
1509
                case *lnwire.NodeAnnouncement:
3✔
1510
                        if passesFilter(msg.Timestamp) {
6✔
1511
                                msgsToSend = append(msgsToSend, msg)
3✔
1512
                        }
3✔
1513
                }
1514
        }
1515

1516
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
3✔
1517
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
3✔
1518

3✔
1519
        if len(msgsToSend) == 0 {
6✔
1520
                return
3✔
1521
        }
3✔
1522

1523
        g.cfg.sendToPeer(msgsToSend...)
3✔
1524
}
1525

1526
// ProcessQueryMsg is used by outside callers to pass new channel time series
1527
// queries to the internal processing goroutine.
1528
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
3✔
1529
        var msgChan chan lnwire.Message
3✔
1530
        switch msg.(type) {
3✔
1531
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
3✔
1532
                msgChan = g.queryMsgs
3✔
1533

1534
        // Reply messages should only be expected in states where we're waiting
1535
        // for a reply.
1536
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
3✔
1537
                g.Lock()
3✔
1538
                syncState := g.syncState()
3✔
1539
                g.Unlock()
3✔
1540

3✔
1541
                if syncState != waitingQueryRangeReply &&
3✔
1542
                        syncState != waitingQueryChanReply {
3✔
UNCOV
1543

×
UNCOV
1544
                        return fmt.Errorf("unexpected msg %T received in "+
×
UNCOV
1545
                                "state %v", msg, syncState)
×
UNCOV
1546
                }
×
1547
                msgChan = g.gossipMsgs
3✔
1548

1549
        default:
×
1550
                msgChan = g.gossipMsgs
×
1551
        }
1552

1553
        select {
3✔
1554
        case msgChan <- msg:
3✔
1555
        case <-peerQuit:
×
1556
        case <-g.quit:
×
1557
        }
1558

1559
        return nil
3✔
1560
}
1561

1562
// setSyncState sets the gossip syncer's state to the given state.
1563
func (g *GossipSyncer) setSyncState(state syncerState) {
3✔
1564
        atomic.StoreUint32(&g.state, uint32(state))
3✔
1565
}
3✔
1566

1567
// syncState returns the current syncerState of the target GossipSyncer.
1568
func (g *GossipSyncer) syncState() syncerState {
3✔
1569
        return syncerState(atomic.LoadUint32(&g.state))
3✔
1570
}
3✔
1571

1572
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1573
// a signal for when the GossipSyncer has reached its chansSynced state.
1574
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
3✔
1575
        g.Lock()
3✔
1576
        defer g.Unlock()
3✔
1577

3✔
1578
        syncedSignal := make(chan struct{})
3✔
1579

3✔
1580
        syncState := syncerState(atomic.LoadUint32(&g.state))
3✔
1581
        if syncState == chansSynced {
3✔
UNCOV
1582
                close(syncedSignal)
×
UNCOV
1583
                return syncedSignal
×
UNCOV
1584
        }
×
1585

1586
        g.syncedSignal = syncedSignal
3✔
1587
        return g.syncedSignal
3✔
1588
}
1589

1590
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1591
// sync type to a new one.
1592
//
1593
// NOTE: This can only be done once the gossip syncer has reached its final
1594
// chansSynced state.
1595
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
3✔
1596
        errChan := make(chan error, 1)
3✔
1597
        select {
3✔
1598
        case g.syncTransitionReqs <- &syncTransitionReq{
1599
                newSyncType: newSyncType,
1600
                errChan:     errChan,
1601
        }:
3✔
1602
        case <-time.After(syncTransitionTimeout):
×
1603
                return ErrSyncTransitionTimeout
×
1604
        case <-g.quit:
×
1605
                return ErrGossipSyncerExiting
×
1606
        }
1607

1608
        select {
3✔
1609
        case err := <-errChan:
3✔
1610
                return err
3✔
1611
        case <-g.quit:
×
1612
                return ErrGossipSyncerExiting
×
1613
        }
1614
}
1615

1616
// handleSyncTransition handles a new sync type transition request.
1617
//
1618
// NOTE: The gossip syncer might have another sync state as a result of this
1619
// transition.
1620
func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error {
3✔
1621
        // Return early from any NOP sync transitions.
3✔
1622
        syncType := g.SyncType()
3✔
1623
        if syncType == req.newSyncType {
3✔
1624
                return nil
×
1625
        }
×
1626

1627
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
3✔
1628
                g.cfg.peerPub, syncType, req.newSyncType)
3✔
1629

3✔
1630
        var (
3✔
1631
                firstTimestamp time.Time
3✔
1632
                timestampRange uint32
3✔
1633
        )
3✔
1634

3✔
1635
        switch req.newSyncType {
3✔
1636
        // If an active sync has been requested, then we should resume receiving
1637
        // new graph updates from the remote peer.
1638
        case ActiveSync, PinnedSync:
3✔
1639
                firstTimestamp = time.Now()
3✔
1640
                timestampRange = math.MaxUint32
3✔
1641

1642
        // If a PassiveSync transition has been requested, then we should no
1643
        // longer receive any new updates from the remote peer. We can do this
1644
        // by setting our update horizon to a range in the past ensuring no
1645
        // graph updates match the timestamp range.
UNCOV
1646
        case PassiveSync:
×
UNCOV
1647
                firstTimestamp = zeroTimestamp
×
UNCOV
1648
                timestampRange = 0
×
1649

1650
        default:
×
1651
                return fmt.Errorf("unhandled sync transition %v",
×
1652
                        req.newSyncType)
×
1653
        }
1654

1655
        err := g.sendGossipTimestampRange(firstTimestamp, timestampRange)
3✔
1656
        if err != nil {
3✔
1657
                return fmt.Errorf("unable to send local update horizon: %w",
×
1658
                        err)
×
1659
        }
×
1660

1661
        g.setSyncType(req.newSyncType)
3✔
1662

3✔
1663
        return nil
3✔
1664
}
1665

1666
// setSyncType sets the gossip syncer's sync type to the given type.
1667
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
3✔
1668
        atomic.StoreUint32(&g.syncType, uint32(syncType))
3✔
1669
}
3✔
1670

1671
// SyncType returns the current SyncerType of the target GossipSyncer.
1672
func (g *GossipSyncer) SyncType() SyncerType {
3✔
1673
        return SyncerType(atomic.LoadUint32(&g.syncType))
3✔
1674
}
3✔
1675

1676
// historicalSync sends a request to the gossip syncer to perofmr a historical
1677
// sync.
1678
//
1679
// NOTE: This can only be done once the gossip syncer has reached its final
1680
// chansSynced state.
1681
func (g *GossipSyncer) historicalSync() error {
3✔
1682
        done := make(chan struct{})
3✔
1683

3✔
1684
        select {
3✔
1685
        case g.historicalSyncReqs <- &historicalSyncReq{
1686
                doneChan: done,
1687
        }:
3✔
1688
        case <-time.After(syncTransitionTimeout):
×
1689
                return ErrSyncTransitionTimeout
×
1690
        case <-g.quit:
×
1691
                return ErrGossiperShuttingDown
×
1692
        }
1693

1694
        select {
3✔
1695
        case <-done:
3✔
1696
                return nil
3✔
1697
        case <-g.quit:
×
1698
                return ErrGossiperShuttingDown
×
1699
        }
1700
}
1701

1702
// handleHistoricalSync handles a request to the gossip syncer to perform a
1703
// historical sync.
1704
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
3✔
1705
        // We'll go back to our initial syncingChans state in order to request
3✔
1706
        // the remote peer to give us all of the channel IDs they know of
3✔
1707
        // starting from the genesis block.
3✔
1708
        g.genHistoricalChanRangeQuery = true
3✔
1709
        g.setSyncState(syncingChans)
3✔
1710
        close(req.doneChan)
3✔
1711
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc