• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13436790699

20 Feb 2025 01:40PM UTC coverage: 58.78% (-0.01%) from 58.794%
13436790699

Pull #9534

github

ellemouton
graph: refactor Builder network message handling

The exposed AddNode, AddEdge and UpdateEdge methods of the Builder are
currently synchronous since even though they pass messages to the
network handler which spins off the handling in a goroutine, the public
methods still wait for a response from the handling before returning.
The only part that is actually done asynchronously is the topology
notifications.

We previously tried to simplify things in [this
commit](https://github.com/lightningnetwork/lnd/pull/9476/commits/d757b3bcf)
but we soon realised that there was a reason for sending the messages to
the central/synchronous network handler first: it was to ensure
consistency for topology clients: ie, the ordering between when there is
a new topology client or if it is cancelled needs to be consistent and
handled synchronously with new network updates. So for example, if a new
update comes in right after a topology client cancels its subscription,
then it should _not_ be notified. Similariy for new subscriptions. So
this commit was reverted soon after.

We can, however, still simplify things as is done in this commit by
noting that _only topology subscriptions and notifications_ need to be
handled separately. The actual network updates do not need to. So that
is what is done here.

This refactor will make moving the topology subscription logic to a new
subsystem later on much easier.
Pull Request #9534: graph: refactor Builder network message handling

38 of 44 new or added lines in 1 file covered. (86.36%)

55 existing lines in 11 files now uncovered.

136048 of 231453 relevant lines covered (58.78%)

19264.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.75
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "errors"
5
        "fmt"
6
        "math"
7
        "math/rand"
8
        "sort"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/lightningnetwork/lnd/graph"
15
        graphdb "github.com/lightningnetwork/lnd/graph/db"
16
        "github.com/lightningnetwork/lnd/lnpeer"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "golang.org/x/time/rate"
19
)
20

21
// SyncerType encapsulates the different types of syncing mechanisms for a
22
// gossip syncer.
23
type SyncerType uint8
24

25
const (
26
        // ActiveSync denotes that a gossip syncer:
27
        //
28
        // 1. Should not attempt to synchronize with the remote peer for
29
        //    missing channels.
30
        // 2. Should respond to queries from the remote peer.
31
        // 3. Should receive new updates from the remote peer.
32
        //
33
        // They are started in a chansSynced state in order to accomplish their
34
        // responsibilities above.
35
        ActiveSync SyncerType = iota
36

37
        // PassiveSync denotes that a gossip syncer:
38
        //
39
        // 1. Should not attempt to synchronize with the remote peer for
40
        //    missing channels.
41
        // 2. Should respond to queries from the remote peer.
42
        // 3. Should not receive new updates from the remote peer.
43
        //
44
        // They are started in a chansSynced state in order to accomplish their
45
        // responsibilities above.
46
        PassiveSync
47

48
        // PinnedSync denotes an ActiveSync that doesn't count towards the
49
        // default active syncer limits and is always active throughout the
50
        // duration of the peer's connection. Each pinned syncer will begin by
51
        // performing a historical sync to ensure we are well synchronized with
52
        // their routing table.
53
        PinnedSync
54
)
55

56
// String returns a human readable string describing the target SyncerType.
57
func (t SyncerType) String() string {
3✔
58
        switch t {
3✔
59
        case ActiveSync:
3✔
60
                return "ActiveSync"
3✔
61
        case PassiveSync:
3✔
62
                return "PassiveSync"
3✔
63
        case PinnedSync:
3✔
64
                return "PinnedSync"
3✔
65
        default:
×
66
                return fmt.Sprintf("unknown sync type %d", t)
×
67
        }
68
}
69

70
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
71
// allowing new gossip messages to be received from the peer.
72
func (t SyncerType) IsActiveSync() bool {
48✔
73
        switch t {
48✔
74
        case ActiveSync, PinnedSync:
18✔
75
                return true
18✔
76
        default:
33✔
77
                return false
33✔
78
        }
79
}
80

81
// syncerState is an enum that represents the current state of the GossipSyncer.
82
// As the syncer is a state machine, we'll gate our actions based off of the
83
// current state and the next incoming message.
84
type syncerState uint32
85

86
const (
87
        // syncingChans is the default state of the GossipSyncer. We start in
88
        // this state when a new peer first connects and we don't yet know if
89
        // we're fully synchronized.
90
        syncingChans syncerState = iota
91

92
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
93
        // We enter this state after we send out our first QueryChannelRange
94
        // reply. We'll stay in this state until the remote party sends us a
95
        // ReplyShortChanIDsEnd message that indicates they've responded to our
96
        // query entirely. After this state, we'll transition to
97
        // waitingQueryChanReply after we send out requests for all the new
98
        // chan ID's to us.
99
        waitingQueryRangeReply
100

101
        // queryNewChannels is the third main phase of the GossipSyncer.  In
102
        // this phase we'll send out all of our QueryShortChanIDs messages in
103
        // response to the new channels that we don't yet know about.
104
        queryNewChannels
105

106
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
107
        // We enter this phase once we've sent off a query chink to the remote
108
        // peer.  We'll stay in this phase until we receive a
109
        // ReplyShortChanIDsEnd message which indicates that the remote party
110
        // has responded to all of our requests.
111
        waitingQueryChanReply
112

113
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
114
        // this phase, we'll send out our update horizon, which filters out the
115
        // set of channel updates that we're interested in. In this state,
116
        // we'll be able to accept any outgoing messages from the
117
        // AuthenticatedGossiper, and decide if we should forward them to our
118
        // target peer based on its update horizon.
119
        chansSynced
120

121
        // syncerIdle is a state in which the gossip syncer can handle external
122
        // requests to transition or perform historical syncs. It is used as the
123
        // initial state for pinned syncers, as well as a fallthrough case for
124
        // chansSynced allowing fully synced peers to facilitate requests.
125
        syncerIdle
126
)
127

128
// String returns a human readable string describing the target syncerState.
129
func (s syncerState) String() string {
4✔
130
        switch s {
4✔
131
        case syncingChans:
3✔
132
                return "syncingChans"
3✔
133

134
        case waitingQueryRangeReply:
3✔
135
                return "waitingQueryRangeReply"
3✔
136

137
        case queryNewChannels:
3✔
138
                return "queryNewChannels"
3✔
139

140
        case waitingQueryChanReply:
3✔
141
                return "waitingQueryChanReply"
3✔
142

143
        case chansSynced:
4✔
144
                return "chansSynced"
4✔
145

146
        case syncerIdle:
3✔
147
                return "syncerIdle"
3✔
148

149
        default:
×
150
                return "UNKNOWN STATE"
×
151
        }
152
}
153

154
const (
155
        // DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
156
        // will respond to immediately before starting to delay responses.
157
        DefaultMaxUndelayedQueryReplies = 10
158

159
        // DefaultDelayedQueryReplyInterval is the length of time we will wait
160
        // before responding to gossip queries after replying to
161
        // maxUndelayedQueryReplies queries.
162
        DefaultDelayedQueryReplyInterval = 5 * time.Second
163

164
        // maxQueryChanRangeReplies specifies the default limit of replies to
165
        // process for a single QueryChannelRange request.
166
        maxQueryChanRangeReplies = 500
167

168
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
169
        // the maximum number of replies allowed for zlib encoded replies.
170
        maxQueryChanRangeRepliesZlibFactor = 4
171

172
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
173
        // asking the remote peer for their any channels they know of beyond
174
        // our highest known channel ID.
175
        chanRangeQueryBuffer = 144
176

177
        // syncTransitionTimeout is the default timeout in which we'll wait up
178
        // to when attempting to perform a sync transition.
179
        syncTransitionTimeout = 5 * time.Second
180

181
        // requestBatchSize is the maximum number of channels we will query the
182
        // remote peer for in a QueryShortChanIDs message.
183
        requestBatchSize = 500
184
)
185

186
var (
187
        // encodingTypeToChunkSize maps an encoding type, to the max number of
188
        // short chan ID's using the encoding type that we can fit into a
189
        // single message safely.
190
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
191
                lnwire.EncodingSortedPlain: 8000,
192
        }
193

194
        // ErrGossipSyncerExiting signals that the syncer has been killed.
195
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
196

197
        // ErrSyncTransitionTimeout is an error returned when we've timed out
198
        // attempting to perform a sync transition.
199
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
200
                "transition sync type")
201

202
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
203
        // peers that we do not want to receive any new graph updates.
204
        zeroTimestamp time.Time
205
)
206

207
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
208
type syncTransitionReq struct {
209
        newSyncType SyncerType
210
        errChan     chan error
211
}
212

213
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
214
// historical sync.
215
type historicalSyncReq struct {
216
        // doneChan is a channel that serves as a signal and is closed to ensure
217
        // the historical sync is attempted by the time we return to the caller.
218
        doneChan chan struct{}
219
}
220

221
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
222
// needs to carry out its duties.
223
type gossipSyncerCfg struct {
224
        // chainHash is the chain that this syncer is responsible for.
225
        chainHash chainhash.Hash
226

227
        // peerPub is the public key of the peer we're syncing with, serialized
228
        // in compressed format.
229
        peerPub [33]byte
230

231
        // channelSeries is the primary interface that we'll use to generate
232
        // our queries and respond to the queries of the remote peer.
233
        channelSeries ChannelGraphTimeSeries
234

235
        // encodingType is the current encoding type we're aware of. Requests
236
        // with different encoding types will be rejected.
237
        encodingType lnwire.QueryEncoding
238

239
        // chunkSize is the max number of short chan IDs using the syncer's
240
        // encoding type that we can fit into a single message safely.
241
        chunkSize int32
242

243
        // batchSize is the max number of channels the syncer will query from
244
        // the remote node in a single QueryShortChanIDs request.
245
        batchSize int32
246

247
        // sendToPeer sends a variadic number of messages to the remote peer.
248
        // This method should not block while waiting for sends to be written
249
        // to the wire.
250
        sendToPeer func(...lnwire.Message) error
251

252
        // sendToPeerSync sends a variadic number of messages to the remote
253
        // peer, blocking until all messages have been sent successfully or a
254
        // write error is encountered.
255
        sendToPeerSync func(...lnwire.Message) error
256

257
        // maxUndelayedQueryReplies specifies how many gossip queries we will
258
        // respond to immediately before starting to delay responses.
259
        maxUndelayedQueryReplies int
260

261
        // delayedQueryReplyInterval is the length of time we will wait before
262
        // responding to gossip queries after replying to
263
        // maxUndelayedQueryReplies queries.
264
        delayedQueryReplyInterval time.Duration
265

266
        // noSyncChannels will prevent the GossipSyncer from spawning a
267
        // channelGraphSyncer, meaning we will not try to reconcile unknown
268
        // channels with the remote peer.
269
        noSyncChannels bool
270

271
        // noReplyQueries will prevent the GossipSyncer from spawning a
272
        // replyHandler, meaning we will not reply to queries from our remote
273
        // peer.
274
        noReplyQueries bool
275

276
        // noTimestampQueryOption will prevent the GossipSyncer from querying
277
        // timestamps of announcement messages from the peer, and it will
278
        // prevent it from responding to timestamp queries.
279
        noTimestampQueryOption bool
280

281
        // ignoreHistoricalFilters will prevent syncers from replying with
282
        // historical data when the remote peer sets a gossip_timestamp_range.
283
        // This prevents ranges with old start times from causing us to dump the
284
        // graph on connect.
285
        ignoreHistoricalFilters bool
286

287
        // bestHeight returns the latest height known of the chain.
288
        bestHeight func() uint32
289

290
        // markGraphSynced updates the SyncManager's perception of whether we
291
        // have completed at least one historical sync.
292
        markGraphSynced func()
293

294
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
295
        // for a single QueryChannelRange request.
296
        maxQueryChanRangeReplies uint32
297

298
        // isStillZombieChannel takes the timestamps of the latest channel
299
        // updates for a channel and returns true if the channel should be
300
        // considered a zombie based on these timestamps.
301
        isStillZombieChannel func(time.Time, time.Time) bool
302
}
303

304
// GossipSyncer is a struct that handles synchronizing the channel graph state
305
// with a remote peer. The GossipSyncer implements a state machine that will
306
// progressively ensure we're synchronized with the channel state of the remote
307
// node. Once both nodes have been synchronized, we'll use an update filter to
308
// filter out which messages should be sent to a remote peer based on their
309
// update horizon. If the update horizon isn't specified, then we won't send
310
// them any channel updates at all.
311
type GossipSyncer struct {
312
        started sync.Once
313
        stopped sync.Once
314

315
        // state is the current state of the GossipSyncer.
316
        //
317
        // NOTE: This variable MUST be used atomically.
318
        state uint32
319

320
        // syncType denotes the SyncerType the gossip syncer is currently
321
        // exercising.
322
        //
323
        // NOTE: This variable MUST be used atomically.
324
        syncType uint32
325

326
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
327
        // use this to properly filter out any messages.
328
        remoteUpdateHorizon *lnwire.GossipTimestampRange
329

330
        // localUpdateHorizon is our local update horizon, we'll use this to
331
        // determine if we've already sent out our update.
332
        localUpdateHorizon *lnwire.GossipTimestampRange
333

334
        // syncTransitions is a channel through which new sync type transition
335
        // requests will be sent through. These requests should only be handled
336
        // when the gossip syncer is in a chansSynced state to ensure its state
337
        // machine behaves as expected.
338
        syncTransitionReqs chan *syncTransitionReq
339

340
        // historicalSyncReqs is a channel that serves as a signal for the
341
        // gossip syncer to perform a historical sync. These can only be done
342
        // once the gossip syncer is in a chansSynced state to ensure its state
343
        // machine behaves as expected.
344
        historicalSyncReqs chan *historicalSyncReq
345

346
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
347
        // that it should request the remote peer for all of its known channel
348
        // IDs starting from the genesis block of the chain. This can only
349
        // happen if the gossip syncer receives a request to attempt a
350
        // historical sync. It can be unset if the syncer ever transitions from
351
        // PassiveSync to ActiveSync.
352
        genHistoricalChanRangeQuery bool
353

354
        // gossipMsgs is a channel that all responses to our queries from the
355
        // target peer will be sent over, these will be read by the
356
        // channelGraphSyncer.
357
        gossipMsgs chan lnwire.Message
358

359
        // queryMsgs is a channel that all queries from the remote peer will be
360
        // received over, these will be read by the replyHandler.
361
        queryMsgs chan lnwire.Message
362

363
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
364
        // we've sent to a peer to ensure we've consumed all expected replies.
365
        // This field is primarily used within the waitingQueryChanReply state.
366
        curQueryRangeMsg *lnwire.QueryChannelRange
367

368
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
369
        // message we've received from a peer to ensure they've fully replied to
370
        // our query by ensuring they covered our requested block range. This
371
        // field is primarily used within the waitingQueryChanReply state.
372
        prevReplyChannelRange *lnwire.ReplyChannelRange
373

374
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
375
        // buffer all the chunked response to our query.
376
        bufferedChanRangeReplies []graphdb.ChannelUpdateInfo
377

378
        // numChanRangeRepliesRcvd is used to track the number of replies
379
        // received as part of a QueryChannelRange. This field is primarily used
380
        // within the waitingQueryChanReply state.
381
        numChanRangeRepliesRcvd uint32
382

383
        // newChansToQuery is used to pass the set of channels we should query
384
        // for from the waitingQueryChanReply state to the queryNewChannels
385
        // state.
386
        newChansToQuery []lnwire.ShortChannelID
387

388
        cfg gossipSyncerCfg
389

390
        // rateLimiter dictates the frequency with which we will reply to gossip
391
        // queries from a peer. This is used to delay responses to peers to
392
        // prevent DOS vulnerabilities if they are spamming with an unreasonable
393
        // number of queries.
394
        rateLimiter *rate.Limiter
395

396
        // syncedSignal is a channel that, if set, will be closed when the
397
        // GossipSyncer reaches its terminal chansSynced state.
398
        syncedSignal chan struct{}
399

400
        // syncerSema is used to more finely control the syncer's ability to
401
        // respond to gossip timestamp range messages.
402
        syncerSema chan struct{}
403

404
        sync.Mutex
405

406
        quit chan struct{}
407
        wg   sync.WaitGroup
408
}
409

410
// newGossipSyncer returns a new instance of the GossipSyncer populated using
411
// the passed config.
412
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
54✔
413
        // If no parameter was specified for max undelayed query replies, set it
54✔
414
        // to the default of 5 queries.
54✔
415
        if cfg.maxUndelayedQueryReplies <= 0 {
80✔
416
                cfg.maxUndelayedQueryReplies = DefaultMaxUndelayedQueryReplies
26✔
417
        }
26✔
418

419
        // If no parameter was specified for delayed query reply interval, set
420
        // to the default of 5 seconds.
421
        if cfg.delayedQueryReplyInterval <= 0 {
54✔
422
                cfg.delayedQueryReplyInterval = DefaultDelayedQueryReplyInterval
×
423
        }
×
424

425
        // Construct a rate limiter that will govern how frequently we reply to
426
        // gossip queries from this peer. The limiter will automatically adjust
427
        // during periods of quiescence, and increase the reply interval under
428
        // load.
429
        interval := rate.Every(cfg.delayedQueryReplyInterval)
54✔
430
        rateLimiter := rate.NewLimiter(
54✔
431
                interval, cfg.maxUndelayedQueryReplies,
54✔
432
        )
54✔
433

54✔
434
        return &GossipSyncer{
54✔
435
                cfg:                cfg,
54✔
436
                rateLimiter:        rateLimiter,
54✔
437
                syncTransitionReqs: make(chan *syncTransitionReq),
54✔
438
                historicalSyncReqs: make(chan *historicalSyncReq),
54✔
439
                gossipMsgs:         make(chan lnwire.Message, 100),
54✔
440
                queryMsgs:          make(chan lnwire.Message, 100),
54✔
441
                syncerSema:         sema,
54✔
442
                quit:               make(chan struct{}),
54✔
443
        }
54✔
444
}
445

446
// Start starts the GossipSyncer and any goroutines that it needs to carry out
447
// its duties.
448
func (g *GossipSyncer) Start() {
40✔
449
        g.started.Do(func() {
80✔
450
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
40✔
451

40✔
452
                // TODO(conner): only spawn channelGraphSyncer if remote
40✔
453
                // supports gossip queries, and only spawn replyHandler if we
40✔
454
                // advertise support
40✔
455
                if !g.cfg.noSyncChannels {
78✔
456
                        g.wg.Add(1)
38✔
457
                        go g.channelGraphSyncer()
38✔
458
                }
38✔
459
                if !g.cfg.noReplyQueries {
78✔
460
                        g.wg.Add(1)
38✔
461
                        go g.replyHandler()
38✔
462
                }
38✔
463
        })
464
}
465

466
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
467
// exited.
468
func (g *GossipSyncer) Stop() {
37✔
469
        g.stopped.Do(func() {
74✔
470
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
37✔
471
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
37✔
472

37✔
473
                close(g.quit)
37✔
474
                g.wg.Wait()
37✔
475
        })
37✔
476
}
477

478
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
479
// in this state, we will send a QueryChannelRange msg to our peer and advance
480
// the syncer's state to waitingQueryRangeReply.
481
func (g *GossipSyncer) handleSyncingChans() {
24✔
482
        // Prepare the query msg.
24✔
483
        queryRangeMsg, err := g.genChanRangeQuery(g.genHistoricalChanRangeQuery)
24✔
484
        if err != nil {
24✔
485
                log.Errorf("Unable to gen chan range query: %v", err)
×
486
                return
×
487
        }
×
488

489
        // Acquire a lock so the following state transition is atomic.
490
        //
491
        // NOTE: We must lock the following steps as it's possible we get an
492
        // immediate response (ReplyChannelRange) after sending the query msg.
493
        // The response is handled in ProcessQueryMsg, which requires the
494
        // current state to be waitingQueryRangeReply.
495
        g.Lock()
24✔
496
        defer g.Unlock()
24✔
497

24✔
498
        // Send the msg to the remote peer, which is non-blocking as
24✔
499
        // `sendToPeer` only queues the msg in Brontide.
24✔
500
        err = g.cfg.sendToPeer(queryRangeMsg)
24✔
501
        if err != nil {
24✔
502
                log.Errorf("Unable to send chan range query: %v", err)
×
503
                return
×
504
        }
×
505

506
        // With the message sent successfully, we'll transition into the next
507
        // state where we wait for their reply.
508
        g.setSyncState(waitingQueryRangeReply)
24✔
509
}
510

511
// channelGraphSyncer is the main goroutine responsible for ensuring that we
512
// properly channel graph state with the remote peer, and also that we only
513
// send them messages which actually pass their defined update horizon.
514
func (g *GossipSyncer) channelGraphSyncer() {
38✔
515
        defer g.wg.Done()
38✔
516

38✔
517
        for {
283✔
518
                state := g.syncState()
245✔
519
                syncType := g.SyncType()
245✔
520

245✔
521
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
245✔
522
                        g.cfg.peerPub[:], state, syncType)
245✔
523

245✔
524
                switch state {
245✔
525
                // When we're in this state, we're trying to synchronize our
526
                // view of the network with the remote peer. We'll kick off
527
                // this sync by asking them for the set of channels they
528
                // understand, as we'll as responding to any other queries by
529
                // them.
530
                case syncingChans:
24✔
531
                        g.handleSyncingChans()
24✔
532

533
                // In this state, we've sent out our initial channel range
534
                // query and are waiting for the final response from the remote
535
                // peer before we perform a diff to see with channels they know
536
                // of that we don't.
537
                case waitingQueryRangeReply:
136✔
538
                        // We'll wait to either process a new message from the
136✔
539
                        // remote party, or exit due to the gossiper exiting,
136✔
540
                        // or us being signalled to do so.
136✔
541
                        select {
136✔
542
                        case msg := <-g.gossipMsgs:
131✔
543
                                // The remote peer is sending a response to our
131✔
544
                                // initial query, we'll collate this response,
131✔
545
                                // and see if it's the final one in the series.
131✔
546
                                // If so, we can then transition to querying
131✔
547
                                // for the new channels.
131✔
548
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
131✔
549
                                if ok {
262✔
550
                                        err := g.processChanRangeReply(queryReply)
131✔
551
                                        if err != nil {
131✔
552
                                                log.Errorf("Unable to "+
×
553
                                                        "process chan range "+
×
554
                                                        "query: %v", err)
×
555
                                                return
×
556
                                        }
×
557
                                        continue
131✔
558
                                }
559

560
                                log.Warnf("Unexpected message: %T in state=%v",
×
561
                                        msg, state)
×
562

563
                        case <-g.quit:
5✔
564
                                return
5✔
565
                        }
566

567
                // We'll enter this state once we've discovered which channels
568
                // the remote party knows of that we don't yet know of
569
                // ourselves.
570
                case queryNewChannels:
18✔
571
                        // First, we'll attempt to continue our channel
18✔
572
                        // synchronization by continuing to send off another
18✔
573
                        // query chunk.
18✔
574
                        done, err := g.synchronizeChanIDs()
18✔
575
                        if err != nil {
18✔
576
                                log.Errorf("Unable to sync chan IDs: %v", err)
×
577
                        }
×
578

579
                        // If this wasn't our last query, then we'll need to
580
                        // transition to our waiting state.
581
                        if !done {
34✔
582
                                g.setSyncState(waitingQueryChanReply)
16✔
583
                                continue
16✔
584
                        }
585

586
                        // If we're fully synchronized, then we can transition
587
                        // to our terminal state.
588
                        g.setSyncState(chansSynced)
5✔
589

5✔
590
                        // Ensure that the sync manager becomes aware that the
5✔
591
                        // historical sync completed so synced_to_graph is
5✔
592
                        // updated over rpc.
5✔
593
                        g.cfg.markGraphSynced()
5✔
594

595
                // In this state, we've just sent off a new query for channels
596
                // that we don't yet know of. We'll remain in this state until
597
                // the remote party signals they've responded to our query in
598
                // totality.
599
                case waitingQueryChanReply:
16✔
600
                        // Once we've sent off our query, we'll wait for either
16✔
601
                        // an ending reply, or just another query from the
16✔
602
                        // remote peer.
16✔
603
                        select {
16✔
604
                        case msg := <-g.gossipMsgs:
16✔
605
                                // If this is the final reply to one of our
16✔
606
                                // queries, then we'll loop back into our query
16✔
607
                                // state to send of the remaining query chunks.
16✔
608
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
16✔
609
                                if ok {
32✔
610
                                        g.setSyncState(queryNewChannels)
16✔
611
                                        continue
16✔
612
                                }
613

614
                                log.Warnf("Unexpected message: %T in state=%v",
×
615
                                        msg, state)
×
616

UNCOV
617
                        case <-g.quit:
×
UNCOV
618
                                return
×
619
                        }
620

621
                // This is our final terminal state where we'll only reply to
622
                // any further queries by the remote peer.
623
                case chansSynced:
60✔
624
                        g.Lock()
60✔
625
                        if g.syncedSignal != nil {
71✔
626
                                close(g.syncedSignal)
11✔
627
                                g.syncedSignal = nil
11✔
628
                        }
11✔
629
                        g.Unlock()
60✔
630

60✔
631
                        // If we haven't yet sent out our update horizon, and
60✔
632
                        // we want to receive real-time channel updates, we'll
60✔
633
                        // do so now.
60✔
634
                        if g.localUpdateHorizon == nil &&
60✔
635
                                syncType.IsActiveSync() {
78✔
636

18✔
637
                                err := g.sendGossipTimestampRange(
18✔
638
                                        time.Now(), math.MaxUint32,
18✔
639
                                )
18✔
640
                                if err != nil {
18✔
641
                                        log.Errorf("Unable to send update "+
×
642
                                                "horizon to %x: %v",
×
643
                                                g.cfg.peerPub, err)
×
644
                                }
×
645
                        }
646
                        // With our horizon set, we'll simply reply to any new
647
                        // messages or process any state transitions and exit if
648
                        // needed.
649
                        fallthrough
60✔
650

651
                // Pinned peers will begin in this state, since they will
652
                // immediately receive a request to perform a historical sync.
653
                // Otherwise, we fall through after ending in chansSynced to
654
                // facilitate new requests.
655
                case syncerIdle:
63✔
656
                        select {
63✔
657
                        case req := <-g.syncTransitionReqs:
17✔
658
                                req.errChan <- g.handleSyncTransition(req)
17✔
659

660
                        case req := <-g.historicalSyncReqs:
19✔
661
                                g.handleHistoricalSync(req)
19✔
662

663
                        case <-g.quit:
30✔
664
                                return
30✔
665
                        }
666
                }
667
        }
668
}
669

670
// replyHandler is an event loop whose sole purpose is to reply to the remote
671
// peers queries. Our replyHandler will respond to messages generated by their
672
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
673
// the other's replyHandler, allowing the replyHandler to operate independently
674
// from the state machine maintained on the same node.
675
//
676
// NOTE: This method MUST be run as a goroutine.
677
func (g *GossipSyncer) replyHandler() {
38✔
678
        defer g.wg.Done()
38✔
679

38✔
680
        for {
93✔
681
                select {
55✔
682
                case msg := <-g.queryMsgs:
20✔
683
                        err := g.replyPeerQueries(msg)
20✔
684
                        switch {
20✔
685
                        case err == ErrGossipSyncerExiting:
×
686
                                return
×
687

688
                        case err == lnpeer.ErrPeerExiting:
×
689
                                return
×
690

691
                        case err != nil:
×
692
                                log.Errorf("Unable to reply to peer "+
×
693
                                        "query: %v", err)
×
694
                        }
695

696
                case <-g.quit:
35✔
697
                        return
35✔
698
                }
699
        }
700
}
701

702
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
703
// syncer and sends it to the remote peer.
704
func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
705
        timestampRange uint32) error {
32✔
706

32✔
707
        endTimestamp := firstTimestamp.Add(
32✔
708
                time.Duration(timestampRange) * time.Second,
32✔
709
        )
32✔
710

32✔
711
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
32✔
712
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
32✔
713

32✔
714
        localUpdateHorizon := &lnwire.GossipTimestampRange{
32✔
715
                ChainHash:      g.cfg.chainHash,
32✔
716
                FirstTimestamp: uint32(firstTimestamp.Unix()),
32✔
717
                TimestampRange: timestampRange,
32✔
718
        }
32✔
719

32✔
720
        if err := g.cfg.sendToPeer(localUpdateHorizon); err != nil {
32✔
721
                return err
×
722
        }
×
723

724
        if firstTimestamp == zeroTimestamp && timestampRange == 0 {
34✔
725
                g.localUpdateHorizon = nil
2✔
726
        } else {
32✔
727
                g.localUpdateHorizon = localUpdateHorizon
30✔
728
        }
30✔
729

730
        return nil
32✔
731
}
732

733
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
734
// the remote peer for its known set of channel IDs within a particular block
735
// range. This method will be called continually until the entire range has
736
// been queried for with a response received. We'll chunk our requests as
737
// required to ensure they fit into a single message. We may re-renter this
738
// state in the case that chunking is required.
739
func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
21✔
740
        // If we're in this state yet there are no more new channels to query
21✔
741
        // for, then we'll transition to our final synced state and return true
21✔
742
        // to signal that we're fully synchronized.
21✔
743
        if len(g.newChansToQuery) == 0 {
26✔
744
                log.Infof("GossipSyncer(%x): no more chans to query",
5✔
745
                        g.cfg.peerPub[:])
5✔
746
                return true, nil
5✔
747
        }
5✔
748

749
        // Otherwise, we'll issue our next chunked query to receive replies
750
        // for.
751
        var queryChunk []lnwire.ShortChannelID
19✔
752

19✔
753
        // If the number of channels to query for is less than the chunk size,
19✔
754
        // then we can issue a single query.
19✔
755
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
24✔
756
                queryChunk = g.newChansToQuery
5✔
757
                g.newChansToQuery = nil
5✔
758

5✔
759
        } else {
19✔
760
                // Otherwise, we'll need to only query for the next chunk.
14✔
761
                // We'll slice into our query chunk, then slide down our main
14✔
762
                // pointer down by the chunk size.
14✔
763
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
14✔
764
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
14✔
765
        }
14✔
766

767
        log.Infof("GossipSyncer(%x): querying for %v new channels",
19✔
768
                g.cfg.peerPub[:], len(queryChunk))
19✔
769

19✔
770
        // With our chunk obtained, we'll send over our next query, then return
19✔
771
        // false indicating that we're net yet fully synced.
19✔
772
        err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
19✔
773
                ChainHash:    g.cfg.chainHash,
19✔
774
                EncodingType: lnwire.EncodingSortedPlain,
19✔
775
                ShortChanIDs: queryChunk,
19✔
776
        })
19✔
777

19✔
778
        return false, err
19✔
779
}
780

781
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
782
// considered legacy. There was a point where lnd used to include the same query
783
// over multiple replies, rather than including the portion of the query the
784
// reply is handling. We'll use this as a way of detecting whether we are
785
// communicating with a legacy node so we can properly sync with them.
786
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
787
        reply *lnwire.ReplyChannelRange) bool {
275✔
788

275✔
789
        return (reply.ChainHash == query.ChainHash &&
275✔
790
                reply.FirstBlockHeight == query.FirstBlockHeight &&
275✔
791
                reply.NumBlocks == query.NumBlocks)
275✔
792
}
275✔
793

794
// processChanRangeReply is called each time the GossipSyncer receives a new
795
// reply to the initial range query to discover new channels that it didn't
796
// previously know of.
797
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
139✔
798
        // isStale returns whether the timestamp is too far into the past.
139✔
799
        isStale := func(timestamp time.Time) bool {
172✔
800
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
33✔
801
        }
33✔
802

803
        // isSkewed returns whether the timestamp is too far into the future.
804
        isSkewed := func(timestamp time.Time) bool {
162✔
805
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
23✔
806
        }
23✔
807

808
        // If we're not communicating with a legacy node, we'll apply some
809
        // further constraints on their reply to ensure it satisfies our query.
810
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
260✔
811
                // The first block should be within our original request.
121✔
812
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
121✔
813
                        return fmt.Errorf("reply includes channels for height "+
×
814
                                "%v prior to query %v", msg.FirstBlockHeight,
×
815
                                g.curQueryRangeMsg.FirstBlockHeight)
×
816
                }
×
817

818
                // The last block should also be. We don't need to check the
819
                // intermediate ones because they should already be in sorted
820
                // order.
821
                replyLastHeight := msg.LastBlockHeight()
121✔
822
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
121✔
823
                if replyLastHeight > queryLastHeight {
121✔
824
                        return fmt.Errorf("reply includes channels for height "+
×
825
                                "%v after query %v", replyLastHeight,
×
826
                                queryLastHeight)
×
827
                }
×
828

829
                // If we've previously received a reply for this query, look at
830
                // its last block to ensure the current reply properly follows
831
                // it.
832
                if g.prevReplyChannelRange != nil {
236✔
833
                        prevReply := g.prevReplyChannelRange
115✔
834
                        prevReplyLastHeight := prevReply.LastBlockHeight()
115✔
835

115✔
836
                        // The current reply can either start from the previous
115✔
837
                        // reply's last block, if there are still more channels
115✔
838
                        // for the same block, or the block after.
115✔
839
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
115✔
840
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
115✔
841

×
842
                                return fmt.Errorf("first block of reply %v "+
×
843
                                        "does not continue from last block of "+
×
844
                                        "previous %v", msg.FirstBlockHeight,
×
845
                                        prevReplyLastHeight)
×
846
                        }
×
847
                }
848
        }
849

850
        g.prevReplyChannelRange = msg
139✔
851

139✔
852
        for i, scid := range msg.ShortChanIDs {
291✔
853
                info := graphdb.NewChannelUpdateInfo(
152✔
854
                        scid, time.Time{}, time.Time{},
152✔
855
                )
152✔
856

152✔
857
                if len(msg.Timestamps) != 0 {
167✔
858
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
15✔
859
                        info.Node1UpdateTimestamp = t1
15✔
860

15✔
861
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
15✔
862
                        info.Node2UpdateTimestamp = t2
15✔
863

15✔
864
                        // Sort out all channels with outdated or skewed
15✔
865
                        // timestamps. Both timestamps need to be out of
15✔
866
                        // boundaries for us to skip the channel and not query
15✔
867
                        // it later on.
15✔
868
                        switch {
15✔
869
                        case isStale(info.Node1UpdateTimestamp) &&
870
                                isStale(info.Node2UpdateTimestamp):
2✔
871

2✔
872
                                continue
2✔
873

874
                        case isSkewed(info.Node1UpdateTimestamp) &&
875
                                isSkewed(info.Node2UpdateTimestamp):
2✔
876

2✔
877
                                continue
2✔
878

879
                        case isStale(info.Node1UpdateTimestamp) &&
880
                                isSkewed(info.Node2UpdateTimestamp):
2✔
881

2✔
882
                                continue
2✔
883

884
                        case isStale(info.Node2UpdateTimestamp) &&
885
                                isSkewed(info.Node1UpdateTimestamp):
2✔
886

2✔
887
                                continue
2✔
888
                        }
889
                }
890

891
                g.bufferedChanRangeReplies = append(
144✔
892
                        g.bufferedChanRangeReplies, info,
144✔
893
                )
144✔
894
        }
895

896
        switch g.cfg.encodingType {
139✔
897
        case lnwire.EncodingSortedPlain:
139✔
898
                g.numChanRangeRepliesRcvd++
139✔
899
        case lnwire.EncodingSortedZlib:
×
900
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
901
        default:
×
902
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
903
        }
904

905
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
139✔
906
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
139✔
907

139✔
908
        // If this isn't the last response and we can continue to receive more,
139✔
909
        // then we can exit as we've already buffered the latest portion of the
139✔
910
        // streaming reply.
139✔
911
        maxReplies := g.cfg.maxQueryChanRangeReplies
139✔
912
        switch {
139✔
913
        // If we're communicating with a legacy node, we'll need to look at the
914
        // complete field.
915
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
18✔
916
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
21✔
917
                        return nil
3✔
918
                }
3✔
919

920
        // Otherwise, we'll look at the reply's height range.
921
        default:
121✔
922
                replyLastHeight := msg.LastBlockHeight()
121✔
923
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
121✔
924

121✔
925
                // TODO(wilmer): This might require some padding if the remote
121✔
926
                // node is not aware of the last height we sent them, i.e., is
121✔
927
                // behind a few blocks from us.
121✔
928
                if replyLastHeight < queryLastHeight &&
121✔
929
                        g.numChanRangeRepliesRcvd < maxReplies {
236✔
930

115✔
931
                        return nil
115✔
932
                }
115✔
933
        }
934

935
        log.Infof("GossipSyncer(%x): filtering through %v chans",
21✔
936
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
21✔
937

21✔
938
        // Otherwise, this is the final response, so we'll now check to see
21✔
939
        // which channels they know of that we don't.
21✔
940
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
21✔
941
                g.cfg.chainHash, g.bufferedChanRangeReplies,
21✔
942
                g.cfg.isStillZombieChannel,
21✔
943
        )
21✔
944
        if err != nil {
21✔
945
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
946
        }
×
947

948
        // As we've received the entirety of the reply, we no longer need to
949
        // hold on to the set of buffered replies or the original query that
950
        // prompted the replies, so we'll let that be garbage collected now.
951
        g.curQueryRangeMsg = nil
21✔
952
        g.prevReplyChannelRange = nil
21✔
953
        g.bufferedChanRangeReplies = nil
21✔
954
        g.numChanRangeRepliesRcvd = 0
21✔
955

21✔
956
        // If there aren't any channels that we don't know of, then we can
21✔
957
        // switch straight to our terminal state.
21✔
958
        if len(newChans) == 0 {
38✔
959
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
17✔
960
                        g.cfg.peerPub[:])
17✔
961

17✔
962
                g.setSyncState(chansSynced)
17✔
963

17✔
964
                // Ensure that the sync manager becomes aware that the
17✔
965
                // historical sync completed so synced_to_graph is updated over
17✔
966
                // rpc.
17✔
967
                g.cfg.markGraphSynced()
17✔
968
                return nil
17✔
969
        }
17✔
970

971
        // Otherwise, we'll set the set of channels that we need to query for
972
        // the next state, and also transition our state.
973
        g.newChansToQuery = newChans
7✔
974
        g.setSyncState(queryNewChannels)
7✔
975

7✔
976
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
7✔
977
                g.cfg.peerPub[:], len(newChans))
7✔
978

7✔
979
        return nil
7✔
980
}
981

982
// genChanRangeQuery generates the initial message we'll send to the remote
983
// party when we're kicking off the channel graph synchronization upon
984
// connection. The historicalQuery boolean can be used to generate a query from
985
// the genesis block of the chain.
986
func (g *GossipSyncer) genChanRangeQuery(
987
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
28✔
988

28✔
989
        // First, we'll query our channel graph time series for its highest
28✔
990
        // known channel ID.
28✔
991
        newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
28✔
992
        if err != nil {
28✔
993
                return nil, err
×
994
        }
×
995

996
        // Once we have the chan ID of the newest, we'll obtain the block height
997
        // of the channel, then subtract our default horizon to ensure we don't
998
        // miss any channels. By default, we go back 1 day from the newest
999
        // channel, unless we're attempting a historical sync, where we'll
1000
        // actually start from the genesis block instead.
1001
        var startHeight uint32
28✔
1002
        switch {
28✔
1003
        case historicalQuery:
22✔
1004
                fallthrough
22✔
1005
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
22✔
1006
                startHeight = 0
22✔
1007
        default:
6✔
1008
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
6✔
1009
        }
1010

1011
        // Determine the number of blocks to request based on our best height.
1012
        // We'll take into account any potential underflows and explicitly set
1013
        // numBlocks to its minimum value of 1 if so.
1014
        bestHeight := g.cfg.bestHeight()
28✔
1015
        numBlocks := bestHeight - startHeight
28✔
1016
        if int64(numBlocks) < 1 {
28✔
1017
                numBlocks = 1
×
1018
        }
×
1019

1020
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
28✔
1021
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
28✔
1022

28✔
1023
        // Finally, we'll craft the channel range query, using our starting
28✔
1024
        // height, then asking for all known channels to the foreseeable end of
28✔
1025
        // the main chain.
28✔
1026
        query := &lnwire.QueryChannelRange{
28✔
1027
                ChainHash:        g.cfg.chainHash,
28✔
1028
                FirstBlockHeight: startHeight,
28✔
1029
                NumBlocks:        numBlocks,
28✔
1030
        }
28✔
1031

28✔
1032
        if !g.cfg.noTimestampQueryOption {
47✔
1033
                query.QueryOptions = lnwire.NewTimestampQueryOption()
19✔
1034
        }
19✔
1035

1036
        g.curQueryRangeMsg = query
28✔
1037

28✔
1038
        return query, nil
28✔
1039
}
1040

1041
// replyPeerQueries is called in response to any query by the remote peer.
1042
// We'll examine our state and send back our best response.
1043
func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error {
20✔
1044
        reservation := g.rateLimiter.Reserve()
20✔
1045
        delay := reservation.Delay()
20✔
1046

20✔
1047
        // If we've already replied a handful of times, we will start to delay
20✔
1048
        // responses back to the remote peer. This can help prevent DOS attacks
20✔
1049
        // where the remote peer spams us endlessly.
20✔
1050
        if delay > 0 {
22✔
1051
                log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
2✔
1052
                        "responding in %s", g.cfg.peerPub[:], delay)
2✔
1053

2✔
1054
                select {
2✔
1055
                case <-time.After(delay):
2✔
1056
                case <-g.quit:
×
1057
                        return ErrGossipSyncerExiting
×
1058
                }
1059
        }
1060

1061
        switch msg := msg.(type) {
20✔
1062

1063
        // In this state, we'll also handle any incoming channel range queries
1064
        // from the remote peer as they're trying to sync their state as well.
1065
        case *lnwire.QueryChannelRange:
7✔
1066
                return g.replyChanRangeQuery(msg)
7✔
1067

1068
        // If the remote peer skips straight to requesting new channels that
1069
        // they don't know of, then we'll ensure that we also handle this case.
1070
        case *lnwire.QueryShortChanIDs:
16✔
1071
                return g.replyShortChanIDs(msg)
16✔
1072

1073
        default:
×
1074
                return fmt.Errorf("unknown message: %T", msg)
×
1075
        }
1076
}
1077

1078
// replyChanRangeQuery will be dispatched in response to a channel range query
1079
// by the remote node. We'll query the channel time series for channels that
1080
// meet the channel range, then chunk our responses to the remote node. We also
1081
// ensure that our final fragment carries the "complete" bit to indicate the
1082
// end of our streaming response.
1083
func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
13✔
1084
        // Before responding, we'll check to ensure that the remote peer is
13✔
1085
        // querying for the same chain that we're on. If not, we'll send back a
13✔
1086
        // response with a complete value of zero to indicate we're on a
13✔
1087
        // different chain.
13✔
1088
        if g.cfg.chainHash != query.ChainHash {
14✔
1089
                log.Warnf("Remote peer requested QueryChannelRange for "+
1✔
1090
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1091
                        g.cfg.chainHash)
1✔
1092

1✔
1093
                return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
1✔
1094
                        ChainHash:        query.ChainHash,
1✔
1095
                        FirstBlockHeight: query.FirstBlockHeight,
1✔
1096
                        NumBlocks:        query.NumBlocks,
1✔
1097
                        Complete:         0,
1✔
1098
                        EncodingType:     g.cfg.encodingType,
1✔
1099
                        ShortChanIDs:     nil,
1✔
1100
                })
1✔
1101
        }
1✔
1102

1103
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
12✔
1104
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
12✔
1105
                query.NumBlocks)
12✔
1106

12✔
1107
        // Check if the query asked for timestamps. We will only serve
12✔
1108
        // timestamps if this has not been disabled with
12✔
1109
        // noTimestampQueryOption.
12✔
1110
        withTimestamps := query.WithTimestamps() &&
12✔
1111
                !g.cfg.noTimestampQueryOption
12✔
1112

12✔
1113
        // Next, we'll consult the time series to obtain the set of known
12✔
1114
        // channel ID's that match their query.
12✔
1115
        startBlock := query.FirstBlockHeight
12✔
1116
        endBlock := query.LastBlockHeight()
12✔
1117
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
12✔
1118
                query.ChainHash, startBlock, endBlock, withTimestamps,
12✔
1119
        )
12✔
1120
        if err != nil {
12✔
1121
                return err
×
1122
        }
×
1123

1124
        // TODO(roasbeef): means can't send max uint above?
1125
        //  * or make internal 64
1126

1127
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1128
        // this as there's a transport message size limit which we'll need to
1129
        // adhere to. We also need to make sure all of our replies cover the
1130
        // expected range of the query.
1131
        sendReplyForChunk := func(channelChunk []graphdb.ChannelUpdateInfo,
12✔
1132
                firstHeight, lastHeight uint32, finalChunk bool) error {
39✔
1133

27✔
1134
                // The number of blocks contained in the current chunk (the
27✔
1135
                // total span) is the difference between the last channel ID and
27✔
1136
                // the first in the range. We add one as even if all channels
27✔
1137
                // returned are in the same block, we need to count that.
27✔
1138
                numBlocks := lastHeight - firstHeight + 1
27✔
1139
                complete := uint8(0)
27✔
1140
                if finalChunk {
39✔
1141
                        complete = 1
12✔
1142
                }
12✔
1143

1144
                var timestamps lnwire.Timestamps
27✔
1145
                if withTimestamps {
30✔
1146
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
3✔
1147
                }
3✔
1148

1149
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
27✔
1150
                for i, info := range channelChunk {
66✔
1151
                        scids[i] = info.ShortChannelID
39✔
1152

39✔
1153
                        if !withTimestamps {
75✔
1154
                                continue
36✔
1155
                        }
1156

1157
                        timestamps[i].Timestamp1 = uint32(
3✔
1158
                                info.Node1UpdateTimestamp.Unix(),
3✔
1159
                        )
3✔
1160

3✔
1161
                        timestamps[i].Timestamp2 = uint32(
3✔
1162
                                info.Node2UpdateTimestamp.Unix(),
3✔
1163
                        )
3✔
1164
                }
1165

1166
                return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
27✔
1167
                        ChainHash:        query.ChainHash,
27✔
1168
                        NumBlocks:        numBlocks,
27✔
1169
                        FirstBlockHeight: firstHeight,
27✔
1170
                        Complete:         complete,
27✔
1171
                        EncodingType:     g.cfg.encodingType,
27✔
1172
                        ShortChanIDs:     scids,
27✔
1173
                        Timestamps:       timestamps,
27✔
1174
                })
27✔
1175
        }
1176

1177
        var (
12✔
1178
                firstHeight  = query.FirstBlockHeight
12✔
1179
                lastHeight   uint32
12✔
1180
                channelChunk []graphdb.ChannelUpdateInfo
12✔
1181
        )
12✔
1182

12✔
1183
        // chunkSize is the maximum number of SCIDs that we can safely put in a
12✔
1184
        // single message. If we also need to include timestamps though, then
12✔
1185
        // this number is halved since encoding two timestamps takes the same
12✔
1186
        // number of bytes as encoding an SCID.
12✔
1187
        chunkSize := g.cfg.chunkSize
12✔
1188
        if withTimestamps {
15✔
1189
                chunkSize /= 2
3✔
1190
        }
3✔
1191

1192
        for _, channelRange := range channelRanges {
51✔
1193
                channels := channelRange.Channels
39✔
1194
                numChannels := int32(len(channels))
39✔
1195
                numLeftToAdd := chunkSize - int32(len(channelChunk))
39✔
1196

39✔
1197
                // Include the current block in the ongoing chunk if it can fit
39✔
1198
                // and move on to the next block.
39✔
1199
                if numChannels <= numLeftToAdd {
63✔
1200
                        channelChunk = append(channelChunk, channels...)
24✔
1201
                        continue
24✔
1202
                }
1203

1204
                // Otherwise, we need to send our existing channel chunk as is
1205
                // as its own reply and start a new one for the current block.
1206
                // We'll mark the end of our current chunk as the height before
1207
                // the current block to ensure the whole query range is replied
1208
                // to.
1209
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
15✔
1210
                        g.cfg.peerPub[:], len(channelChunk))
15✔
1211

15✔
1212
                lastHeight = channelRange.Height - 1
15✔
1213
                err := sendReplyForChunk(
15✔
1214
                        channelChunk, firstHeight, lastHeight, false,
15✔
1215
                )
15✔
1216
                if err != nil {
15✔
1217
                        return err
×
1218
                }
×
1219

1220
                // With the reply constructed, we'll start tallying channels for
1221
                // our next one keeping in mind our chunk size. This may result
1222
                // in channels for this block being left out from the reply, but
1223
                // this isn't an issue since we'll randomly shuffle them and we
1224
                // assume a historical gossip sync is performed at a later time.
1225
                firstHeight = channelRange.Height
15✔
1226
                finalChunkSize := numChannels
15✔
1227
                exceedsChunkSize := numChannels > chunkSize
15✔
1228
                if exceedsChunkSize {
15✔
1229
                        rand.Shuffle(len(channels), func(i, j int) {
×
1230
                                channels[i], channels[j] = channels[j], channels[i]
×
1231
                        })
×
1232
                        finalChunkSize = chunkSize
×
1233
                }
1234
                channelChunk = channels[:finalChunkSize]
15✔
1235

15✔
1236
                // Sort the chunk once again if we had to shuffle it.
15✔
1237
                if exceedsChunkSize {
15✔
1238
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1239
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1240
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1241

×
1242
                                return id1 < id2
×
1243
                        })
×
1244
                }
1245
        }
1246

1247
        // Send the remaining chunk as the final reply.
1248
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
12✔
1249
                g.cfg.peerPub[:], len(channelChunk))
12✔
1250

12✔
1251
        return sendReplyForChunk(
12✔
1252
                channelChunk, firstHeight, query.LastBlockHeight(), true,
12✔
1253
        )
12✔
1254
}
1255

1256
// replyShortChanIDs will be dispatched in response to a query by the remote
1257
// node for information concerning a set of short channel ID's. Our response
1258
// will be sent in a streaming chunked manner to ensure that we remain below
1259
// the current transport level message size.
1260
func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error {
18✔
1261
        // Before responding, we'll check to ensure that the remote peer is
18✔
1262
        // querying for the same chain that we're on. If not, we'll send back a
18✔
1263
        // response with a complete value of zero to indicate we're on a
18✔
1264
        // different chain.
18✔
1265
        if g.cfg.chainHash != query.ChainHash {
19✔
1266
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
1✔
1267
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1268
                        g.cfg.chainHash)
1✔
1269

1✔
1270
                return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
1✔
1271
                        ChainHash: query.ChainHash,
1✔
1272
                        Complete:  0,
1✔
1273
                })
1✔
1274
        }
1✔
1275

1276
        if len(query.ShortChanIDs) == 0 {
17✔
1277
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1278
                        g.cfg.peerPub[:])
×
1279
                return nil
×
1280
        }
×
1281

1282
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
17✔
1283
                g.cfg.peerPub[:], len(query.ShortChanIDs))
17✔
1284

17✔
1285
        // Now that we know we're on the same chain, we'll query the channel
17✔
1286
        // time series for the set of messages that we know of which satisfies
17✔
1287
        // the requirement of being a chan ann, chan update, or a node ann
17✔
1288
        // related to the set of queried channels.
17✔
1289
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
17✔
1290
                query.ChainHash, query.ShortChanIDs,
17✔
1291
        )
17✔
1292
        if err != nil {
17✔
1293
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1294
                        query.ShortChanIDs[0].ToUint64(), err)
×
1295
        }
×
1296

1297
        // Reply with any messages related to those channel ID's, we'll write
1298
        // each one individually and synchronously to throttle the sends and
1299
        // perform buffering of responses in the syncer as opposed to the peer.
1300
        for _, msg := range replyMsgs {
23✔
1301
                err := g.cfg.sendToPeerSync(msg)
6✔
1302
                if err != nil {
6✔
1303
                        return err
×
1304
                }
×
1305
        }
1306

1307
        // Regardless of whether we had any messages to reply with, send over
1308
        // the sentinel message to signal that the stream has terminated.
1309
        return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
17✔
1310
                ChainHash: query.ChainHash,
17✔
1311
                Complete:  1,
17✔
1312
        })
17✔
1313
}
1314

1315
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1316
// state machine. Once applied, we'll ensure that we don't forward any messages
1317
// to the peer that aren't within the time range of the filter.
1318
func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error {
6✔
1319
        g.Lock()
6✔
1320

6✔
1321
        g.remoteUpdateHorizon = filter
6✔
1322

6✔
1323
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
6✔
1324
        endTime := startTime.Add(
6✔
1325
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
6✔
1326
        )
6✔
1327

6✔
1328
        g.Unlock()
6✔
1329

6✔
1330
        // If requested, don't reply with historical gossip data when the remote
6✔
1331
        // peer sets their gossip timestamp range.
6✔
1332
        if g.cfg.ignoreHistoricalFilters {
7✔
1333
                return nil
1✔
1334
        }
1✔
1335

1336
        select {
5✔
1337
        case <-g.syncerSema:
5✔
1338
        case <-g.quit:
×
1339
                return ErrGossipSyncerExiting
×
1340
        }
1341

1342
        // We don't put this in a defer because if the goroutine is launched,
1343
        // it needs to be called when the goroutine is stopped.
1344
        returnSema := func() {
10✔
1345
                g.syncerSema <- struct{}{}
5✔
1346
        }
5✔
1347

1348
        // Now that the remote peer has applied their filter, we'll query the
1349
        // database for all the messages that are beyond this filter.
1350
        newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
5✔
1351
                g.cfg.chainHash, startTime, endTime,
5✔
1352
        )
5✔
1353
        if err != nil {
5✔
1354
                returnSema()
×
1355
                return err
×
1356
        }
×
1357

1358
        log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
5✔
1359
                "start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
5✔
1360
                startTime, endTime, len(newUpdatestoSend))
5✔
1361

5✔
1362
        // If we don't have any to send, then we can return early.
5✔
1363
        if len(newUpdatestoSend) == 0 {
9✔
1364
                returnSema()
4✔
1365
                return nil
4✔
1366
        }
4✔
1367

1368
        // We'll conclude by launching a goroutine to send out any updates.
1369
        g.wg.Add(1)
4✔
1370
        go func() {
8✔
1371
                defer g.wg.Done()
4✔
1372
                defer returnSema()
4✔
1373

4✔
1374
                for _, msg := range newUpdatestoSend {
8✔
1375
                        err := g.cfg.sendToPeerSync(msg)
4✔
1376
                        switch {
4✔
1377
                        case err == ErrGossipSyncerExiting:
×
1378
                                return
×
1379

1380
                        case err == lnpeer.ErrPeerExiting:
×
1381
                                return
×
1382

1383
                        case err != nil:
×
1384
                                log.Errorf("Unable to send message for "+
×
1385
                                        "peer catch up: %v", err)
×
1386
                        }
1387
                }
1388
        }()
1389

1390
        return nil
4✔
1391
}
1392

1393
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1394
// iff the message is within the bounds of their set gossip filter. If the peer
1395
// doesn't have a gossip filter set, then no messages will be forwarded.
1396
func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
5✔
1397
        // If the peer doesn't have an update horizon set, then we won't send
5✔
1398
        // it any new update messages.
5✔
1399
        if g.remoteUpdateHorizon == nil {
9✔
1400
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
4✔
1401
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
4✔
1402
                return
4✔
1403
        }
4✔
1404

1405
        // If we've been signaled to exit, or are exiting, then we'll stop
1406
        // short.
1407
        select {
4✔
1408
        case <-g.quit:
×
1409
                return
×
1410
        default:
4✔
1411
        }
1412

1413
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1414
        // gossiper on peer termination to signal peer disconnect?
1415

1416
        var err error
4✔
1417

4✔
1418
        // Before we filter out the messages, we'll construct an index over the
4✔
1419
        // set of channel announcements and channel updates. This will allow us
4✔
1420
        // to quickly check if we should forward a chan ann, based on the known
4✔
1421
        // channel updates for a channel.
4✔
1422
        chanUpdateIndex := make(
4✔
1423
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
4✔
1424
        )
4✔
1425
        for _, msg := range msgs {
17✔
1426
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
13✔
1427
                if !ok {
23✔
1428
                        continue
10✔
1429
                }
1430

1431
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
6✔
1432
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
6✔
1433
                )
6✔
1434
        }
1435

1436
        // We'll construct a helper function that we'll us below to determine
1437
        // if a given messages passes the gossip msg filter.
1438
        g.Lock()
4✔
1439
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
4✔
1440
        endTime := startTime.Add(
4✔
1441
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
4✔
1442
        )
4✔
1443
        g.Unlock()
4✔
1444

4✔
1445
        passesFilter := func(timeStamp uint32) bool {
17✔
1446
                t := time.Unix(int64(timeStamp), 0)
13✔
1447
                return t.Equal(startTime) ||
13✔
1448
                        (t.After(startTime) && t.Before(endTime))
13✔
1449
        }
13✔
1450

1451
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
4✔
1452
        for _, msg := range msgs {
17✔
1453
                // If the target peer is the peer that sent us this message,
13✔
1454
                // then we'll exit early as we don't need to filter this
13✔
1455
                // message.
13✔
1456
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
16✔
1457
                        continue
3✔
1458
                }
1459

1460
                switch msg := msg.msg.(type) {
13✔
1461

1462
                // For each channel announcement message, we'll only send this
1463
                // message if the channel updates for the channel are between
1464
                // our time range.
1465
                case *lnwire.ChannelAnnouncement1:
7✔
1466
                        // First, we'll check if the channel updates are in
7✔
1467
                        // this message batch.
7✔
1468
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
7✔
1469
                        if !ok {
11✔
1470
                                // If not, we'll attempt to query the database
4✔
1471
                                // to see if we know of the updates.
4✔
1472
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
4✔
1473
                                        g.cfg.chainHash, msg.ShortChannelID,
4✔
1474
                                )
4✔
1475
                                if err != nil {
4✔
1476
                                        log.Warnf("no channel updates found for "+
×
1477
                                                "short_chan_id=%v",
×
1478
                                                msg.ShortChannelID)
×
1479
                                        continue
×
1480
                                }
1481
                        }
1482

1483
                        for _, chanUpdate := range chanUpdates {
14✔
1484
                                if passesFilter(chanUpdate.Timestamp) {
11✔
1485
                                        msgsToSend = append(msgsToSend, msg)
4✔
1486
                                        break
4✔
1487
                                }
1488
                        }
1489

1490
                        if len(chanUpdates) == 0 {
10✔
1491
                                msgsToSend = append(msgsToSend, msg)
3✔
1492
                        }
3✔
1493

1494
                // For each channel update, we'll only send if it the timestamp
1495
                // is between our time range.
1496
                case *lnwire.ChannelUpdate1:
6✔
1497
                        if passesFilter(msg.Timestamp) {
10✔
1498
                                msgsToSend = append(msgsToSend, msg)
4✔
1499
                        }
4✔
1500

1501
                // Similarly, we only send node announcements if the update
1502
                // timestamp ifs between our set gossip filter time range.
1503
                case *lnwire.NodeAnnouncement:
6✔
1504
                        if passesFilter(msg.Timestamp) {
10✔
1505
                                msgsToSend = append(msgsToSend, msg)
4✔
1506
                        }
4✔
1507
                }
1508
        }
1509

1510
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
4✔
1511
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
4✔
1512

4✔
1513
        if len(msgsToSend) == 0 {
7✔
1514
                return
3✔
1515
        }
3✔
1516

1517
        g.cfg.sendToPeer(msgsToSend...)
4✔
1518
}
1519

1520
// ProcessQueryMsg is used by outside callers to pass new channel time series
1521
// queries to the internal processing goroutine.
1522
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
115✔
1523
        var msgChan chan lnwire.Message
115✔
1524
        switch msg.(type) {
115✔
1525
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
3✔
1526
                msgChan = g.queryMsgs
3✔
1527

1528
        // Reply messages should only be expected in states where we're waiting
1529
        // for a reply.
1530
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
115✔
1531
                g.Lock()
115✔
1532
                syncState := g.syncState()
115✔
1533
                g.Unlock()
115✔
1534

115✔
1535
                if syncState != waitingQueryRangeReply &&
115✔
1536
                        syncState != waitingQueryChanReply {
116✔
1537

1✔
1538
                        return fmt.Errorf("unexpected msg %T received in "+
1✔
1539
                                "state %v", msg, syncState)
1✔
1540
                }
1✔
1541
                msgChan = g.gossipMsgs
114✔
1542

1543
        default:
×
1544
                msgChan = g.gossipMsgs
×
1545
        }
1546

1547
        select {
114✔
1548
        case msgChan <- msg:
114✔
1549
        case <-peerQuit:
×
1550
        case <-g.quit:
×
1551
        }
1552

1553
        return nil
114✔
1554
}
1555

1556
// setSyncState sets the gossip syncer's state to the given state.
1557
func (g *GossipSyncer) setSyncState(state syncerState) {
119✔
1558
        atomic.StoreUint32(&g.state, uint32(state))
119✔
1559
}
119✔
1560

1561
// syncState returns the current syncerState of the target GossipSyncer.
1562
func (g *GossipSyncer) syncState() syncerState {
463✔
1563
        return syncerState(atomic.LoadUint32(&g.state))
463✔
1564
}
463✔
1565

1566
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1567
// a signal for when the GossipSyncer has reached its chansSynced state.
1568
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
17✔
1569
        g.Lock()
17✔
1570
        defer g.Unlock()
17✔
1571

17✔
1572
        syncedSignal := make(chan struct{})
17✔
1573

17✔
1574
        syncState := syncerState(atomic.LoadUint32(&g.state))
17✔
1575
        if syncState == chansSynced {
19✔
1576
                close(syncedSignal)
2✔
1577
                return syncedSignal
2✔
1578
        }
2✔
1579

1580
        g.syncedSignal = syncedSignal
15✔
1581
        return g.syncedSignal
15✔
1582
}
1583

1584
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1585
// sync type to a new one.
1586
//
1587
// NOTE: This can only be done once the gossip syncer has reached its final
1588
// chansSynced state.
1589
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
17✔
1590
        errChan := make(chan error, 1)
17✔
1591
        select {
17✔
1592
        case g.syncTransitionReqs <- &syncTransitionReq{
1593
                newSyncType: newSyncType,
1594
                errChan:     errChan,
1595
        }:
17✔
1596
        case <-time.After(syncTransitionTimeout):
×
1597
                return ErrSyncTransitionTimeout
×
1598
        case <-g.quit:
×
1599
                return ErrGossipSyncerExiting
×
1600
        }
1601

1602
        select {
17✔
1603
        case err := <-errChan:
17✔
1604
                return err
17✔
1605
        case <-g.quit:
×
1606
                return ErrGossipSyncerExiting
×
1607
        }
1608
}
1609

1610
// handleSyncTransition handles a new sync type transition request.
1611
//
1612
// NOTE: The gossip syncer might have another sync state as a result of this
1613
// transition.
1614
func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error {
17✔
1615
        // Return early from any NOP sync transitions.
17✔
1616
        syncType := g.SyncType()
17✔
1617
        if syncType == req.newSyncType {
17✔
1618
                return nil
×
1619
        }
×
1620

1621
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
17✔
1622
                g.cfg.peerPub, syncType, req.newSyncType)
17✔
1623

17✔
1624
        var (
17✔
1625
                firstTimestamp time.Time
17✔
1626
                timestampRange uint32
17✔
1627
        )
17✔
1628

17✔
1629
        switch req.newSyncType {
17✔
1630
        // If an active sync has been requested, then we should resume receiving
1631
        // new graph updates from the remote peer.
1632
        case ActiveSync, PinnedSync:
15✔
1633
                firstTimestamp = time.Now()
15✔
1634
                timestampRange = math.MaxUint32
15✔
1635

1636
        // If a PassiveSync transition has been requested, then we should no
1637
        // longer receive any new updates from the remote peer. We can do this
1638
        // by setting our update horizon to a range in the past ensuring no
1639
        // graph updates match the timestamp range.
1640
        case PassiveSync:
2✔
1641
                firstTimestamp = zeroTimestamp
2✔
1642
                timestampRange = 0
2✔
1643

1644
        default:
×
1645
                return fmt.Errorf("unhandled sync transition %v",
×
1646
                        req.newSyncType)
×
1647
        }
1648

1649
        err := g.sendGossipTimestampRange(firstTimestamp, timestampRange)
17✔
1650
        if err != nil {
17✔
1651
                return fmt.Errorf("unable to send local update horizon: %w",
×
1652
                        err)
×
1653
        }
×
1654

1655
        g.setSyncType(req.newSyncType)
17✔
1656

17✔
1657
        return nil
17✔
1658
}
1659

1660
// setSyncType sets the gossip syncer's sync type to the given type.
1661
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
70✔
1662
        atomic.StoreUint32(&g.syncType, uint32(syncType))
70✔
1663
}
70✔
1664

1665
// SyncType returns the current SyncerType of the target GossipSyncer.
1666
func (g *GossipSyncer) SyncType() SyncerType {
334✔
1667
        return SyncerType(atomic.LoadUint32(&g.syncType))
334✔
1668
}
334✔
1669

1670
// historicalSync sends a request to the gossip syncer to perofmr a historical
1671
// sync.
1672
//
1673
// NOTE: This can only be done once the gossip syncer has reached its final
1674
// chansSynced state.
1675
func (g *GossipSyncer) historicalSync() error {
19✔
1676
        done := make(chan struct{})
19✔
1677

19✔
1678
        select {
19✔
1679
        case g.historicalSyncReqs <- &historicalSyncReq{
1680
                doneChan: done,
1681
        }:
19✔
1682
        case <-time.After(syncTransitionTimeout):
×
1683
                return ErrSyncTransitionTimeout
×
1684
        case <-g.quit:
×
1685
                return ErrGossiperShuttingDown
×
1686
        }
1687

1688
        select {
19✔
1689
        case <-done:
19✔
1690
                return nil
19✔
1691
        case <-g.quit:
×
1692
                return ErrGossiperShuttingDown
×
1693
        }
1694
}
1695

1696
// handleHistoricalSync handles a request to the gossip syncer to perform a
1697
// historical sync.
1698
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
19✔
1699
        // We'll go back to our initial syncingChans state in order to request
19✔
1700
        // the remote peer to give us all of the channel IDs they know of
19✔
1701
        // starting from the genesis block.
19✔
1702
        g.genHistoricalChanRangeQuery = true
19✔
1703
        g.setSyncState(syncingChans)
19✔
1704
        close(req.doneChan)
19✔
1705
}
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc