• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.62
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "errors"
5
        "fmt"
6
        "math"
7
        "math/rand"
8
        "sort"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/lightningnetwork/lnd/graph"
15
        graphdb "github.com/lightningnetwork/lnd/graph/db"
16
        "github.com/lightningnetwork/lnd/lnpeer"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "golang.org/x/time/rate"
19
)
20

21
// SyncerType encapsulates the different types of syncing mechanisms for a
22
// gossip syncer.
23
type SyncerType uint8
24

25
const (
26
        // ActiveSync denotes that a gossip syncer:
27
        //
28
        // 1. Should not attempt to synchronize with the remote peer for
29
        //    missing channels.
30
        // 2. Should respond to queries from the remote peer.
31
        // 3. Should receive new updates from the remote peer.
32
        //
33
        // They are started in a chansSynced state in order to accomplish their
34
        // responsibilities above.
35
        ActiveSync SyncerType = iota
36

37
        // PassiveSync denotes that a gossip syncer:
38
        //
39
        // 1. Should not attempt to synchronize with the remote peer for
40
        //    missing channels.
41
        // 2. Should respond to queries from the remote peer.
42
        // 3. Should not receive new updates from the remote peer.
43
        //
44
        // They are started in a chansSynced state in order to accomplish their
45
        // responsibilities above.
46
        PassiveSync
47

48
        // PinnedSync denotes an ActiveSync that doesn't count towards the
49
        // default active syncer limits and is always active throughout the
50
        // duration of the peer's connection. Each pinned syncer will begin by
51
        // performing a historical sync to ensure we are well synchronized with
52
        // their routing table.
53
        PinnedSync
54
)
55

56
// String returns a human readable string describing the target SyncerType.
57
func (t SyncerType) String() string {
×
58
        switch t {
×
59
        case ActiveSync:
×
60
                return "ActiveSync"
×
61
        case PassiveSync:
×
62
                return "PassiveSync"
×
63
        case PinnedSync:
×
64
                return "PinnedSync"
×
65
        default:
×
66
                return fmt.Sprintf("unknown sync type %d", t)
×
67
        }
68
}
69

70
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
71
// allowing new gossip messages to be received from the peer.
72
func (t SyncerType) IsActiveSync() bool {
46✔
73
        switch t {
46✔
74
        case ActiveSync, PinnedSync:
15✔
75
                return true
15✔
76
        default:
31✔
77
                return false
31✔
78
        }
79
}
80

81
// syncerState is an enum that represents the current state of the GossipSyncer.
82
// As the syncer is a state machine, we'll gate our actions based off of the
83
// current state and the next incoming message.
84
type syncerState uint32
85

86
const (
87
        // syncingChans is the default state of the GossipSyncer. We start in
88
        // this state when a new peer first connects and we don't yet know if
89
        // we're fully synchronized.
90
        syncingChans syncerState = iota
91

92
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
93
        // We enter this state after we send out our first QueryChannelRange
94
        // reply. We'll stay in this state until the remote party sends us a
95
        // ReplyShortChanIDsEnd message that indicates they've responded to our
96
        // query entirely. After this state, we'll transition to
97
        // waitingQueryChanReply after we send out requests for all the new
98
        // chan ID's to us.
99
        waitingQueryRangeReply
100

101
        // queryNewChannels is the third main phase of the GossipSyncer.  In
102
        // this phase we'll send out all of our QueryShortChanIDs messages in
103
        // response to the new channels that we don't yet know about.
104
        queryNewChannels
105

106
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
107
        // We enter this phase once we've sent off a query chink to the remote
108
        // peer.  We'll stay in this phase until we receive a
109
        // ReplyShortChanIDsEnd message which indicates that the remote party
110
        // has responded to all of our requests.
111
        waitingQueryChanReply
112

113
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
114
        // this phase, we'll send out our update horizon, which filters out the
115
        // set of channel updates that we're interested in. In this state,
116
        // we'll be able to accept any outgoing messages from the
117
        // AuthenticatedGossiper, and decide if we should forward them to our
118
        // target peer based on its update horizon.
119
        chansSynced
120

121
        // syncerIdle is a state in which the gossip syncer can handle external
122
        // requests to transition or perform historical syncs. It is used as the
123
        // initial state for pinned syncers, as well as a fallthrough case for
124
        // chansSynced allowing fully synced peers to facilitate requests.
125
        syncerIdle
126
)
127

128
// String returns a human readable string describing the target syncerState.
129
func (s syncerState) String() string {
×
130
        switch s {
×
131
        case syncingChans:
×
132
                return "syncingChans"
×
133

134
        case waitingQueryRangeReply:
×
135
                return "waitingQueryRangeReply"
×
136

137
        case queryNewChannels:
×
138
                return "queryNewChannels"
×
139

140
        case waitingQueryChanReply:
×
141
                return "waitingQueryChanReply"
×
142

143
        case chansSynced:
×
144
                return "chansSynced"
×
145

146
        case syncerIdle:
×
147
                return "syncerIdle"
×
148

149
        default:
×
150
                return "UNKNOWN STATE"
×
151
        }
152
}
153

154
const (
155
        // DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
156
        // will respond to immediately before starting to delay responses.
157
        DefaultMaxUndelayedQueryReplies = 10
158

159
        // DefaultDelayedQueryReplyInterval is the length of time we will wait
160
        // before responding to gossip queries after replying to
161
        // maxUndelayedQueryReplies queries.
162
        DefaultDelayedQueryReplyInterval = 5 * time.Second
163

164
        // maxQueryChanRangeReplies specifies the default limit of replies to
165
        // process for a single QueryChannelRange request.
166
        maxQueryChanRangeReplies = 500
167

168
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
169
        // the maximum number of replies allowed for zlib encoded replies.
170
        maxQueryChanRangeRepliesZlibFactor = 4
171

172
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
173
        // asking the remote peer for their any channels they know of beyond
174
        // our highest known channel ID.
175
        chanRangeQueryBuffer = 144
176

177
        // syncTransitionTimeout is the default timeout in which we'll wait up
178
        // to when attempting to perform a sync transition.
179
        syncTransitionTimeout = 5 * time.Second
180

181
        // requestBatchSize is the maximum number of channels we will query the
182
        // remote peer for in a QueryShortChanIDs message.
183
        requestBatchSize = 500
184
)
185

186
var (
187
        // encodingTypeToChunkSize maps an encoding type, to the max number of
188
        // short chan ID's using the encoding type that we can fit into a
189
        // single message safely.
190
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
191
                lnwire.EncodingSortedPlain: 8000,
192
        }
193

194
        // ErrGossipSyncerExiting signals that the syncer has been killed.
195
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
196

197
        // ErrSyncTransitionTimeout is an error returned when we've timed out
198
        // attempting to perform a sync transition.
199
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
200
                "transition sync type")
201

202
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
203
        // peers that we do not want to receive any new graph updates.
204
        zeroTimestamp time.Time
205
)
206

207
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
208
type syncTransitionReq struct {
209
        newSyncType SyncerType
210
        errChan     chan error
211
}
212

213
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
214
// historical sync.
215
type historicalSyncReq struct {
216
        // doneChan is a channel that serves as a signal and is closed to ensure
217
        // the historical sync is attempted by the time we return to the caller.
218
        doneChan chan struct{}
219
}
220

221
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
222
// needs to carry out its duties.
223
type gossipSyncerCfg struct {
224
        // chainHash is the chain that this syncer is responsible for.
225
        chainHash chainhash.Hash
226

227
        // peerPub is the public key of the peer we're syncing with, serialized
228
        // in compressed format.
229
        peerPub [33]byte
230

231
        // channelSeries is the primary interface that we'll use to generate
232
        // our queries and respond to the queries of the remote peer.
233
        channelSeries ChannelGraphTimeSeries
234

235
        // encodingType is the current encoding type we're aware of. Requests
236
        // with different encoding types will be rejected.
237
        encodingType lnwire.QueryEncoding
238

239
        // chunkSize is the max number of short chan IDs using the syncer's
240
        // encoding type that we can fit into a single message safely.
241
        chunkSize int32
242

243
        // batchSize is the max number of channels the syncer will query from
244
        // the remote node in a single QueryShortChanIDs request.
245
        batchSize int32
246

247
        // sendToPeer sends a variadic number of messages to the remote peer.
248
        // This method should not block while waiting for sends to be written
249
        // to the wire.
250
        sendToPeer func(...lnwire.Message) error
251

252
        // sendToPeerSync sends a variadic number of messages to the remote
253
        // peer, blocking until all messages have been sent successfully or a
254
        // write error is encountered.
255
        sendToPeerSync func(...lnwire.Message) error
256

257
        // maxUndelayedQueryReplies specifies how many gossip queries we will
258
        // respond to immediately before starting to delay responses.
259
        maxUndelayedQueryReplies int
260

261
        // delayedQueryReplyInterval is the length of time we will wait before
262
        // responding to gossip queries after replying to
263
        // maxUndelayedQueryReplies queries.
264
        delayedQueryReplyInterval time.Duration
265

266
        // noSyncChannels will prevent the GossipSyncer from spawning a
267
        // channelGraphSyncer, meaning we will not try to reconcile unknown
268
        // channels with the remote peer.
269
        noSyncChannels bool
270

271
        // noReplyQueries will prevent the GossipSyncer from spawning a
272
        // replyHandler, meaning we will not reply to queries from our remote
273
        // peer.
274
        noReplyQueries bool
275

276
        // noTimestampQueryOption will prevent the GossipSyncer from querying
277
        // timestamps of announcement messages from the peer, and it will
278
        // prevent it from responding to timestamp queries.
279
        noTimestampQueryOption bool
280

281
        // ignoreHistoricalFilters will prevent syncers from replying with
282
        // historical data when the remote peer sets a gossip_timestamp_range.
283
        // This prevents ranges with old start times from causing us to dump the
284
        // graph on connect.
285
        ignoreHistoricalFilters bool
286

287
        // bestHeight returns the latest height known of the chain.
288
        bestHeight func() uint32
289

290
        // markGraphSynced updates the SyncManager's perception of whether we
291
        // have completed at least one historical sync.
292
        markGraphSynced func()
293

294
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
295
        // for a single QueryChannelRange request.
296
        maxQueryChanRangeReplies uint32
297

298
        // isStillZombieChannel takes the timestamps of the latest channel
299
        // updates for a channel and returns true if the channel should be
300
        // considered a zombie based on these timestamps.
301
        isStillZombieChannel func(time.Time, time.Time) bool
302
}
303

304
// GossipSyncer is a struct that handles synchronizing the channel graph state
305
// with a remote peer. The GossipSyncer implements a state machine that will
306
// progressively ensure we're synchronized with the channel state of the remote
307
// node. Once both nodes have been synchronized, we'll use an update filter to
308
// filter out which messages should be sent to a remote peer based on their
309
// update horizon. If the update horizon isn't specified, then we won't send
310
// them any channel updates at all.
311
type GossipSyncer struct {
312
        started sync.Once
313
        stopped sync.Once
314

315
        // state is the current state of the GossipSyncer.
316
        //
317
        // NOTE: This variable MUST be used atomically.
318
        state uint32
319

320
        // syncType denotes the SyncerType the gossip syncer is currently
321
        // exercising.
322
        //
323
        // NOTE: This variable MUST be used atomically.
324
        syncType uint32
325

326
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
327
        // use this to properly filter out any messages.
328
        remoteUpdateHorizon *lnwire.GossipTimestampRange
329

330
        // localUpdateHorizon is our local update horizon, we'll use this to
331
        // determine if we've already sent out our update.
332
        localUpdateHorizon *lnwire.GossipTimestampRange
333

334
        // syncTransitions is a channel through which new sync type transition
335
        // requests will be sent through. These requests should only be handled
336
        // when the gossip syncer is in a chansSynced state to ensure its state
337
        // machine behaves as expected.
338
        syncTransitionReqs chan *syncTransitionReq
339

340
        // historicalSyncReqs is a channel that serves as a signal for the
341
        // gossip syncer to perform a historical sync. These can only be done
342
        // once the gossip syncer is in a chansSynced state to ensure its state
343
        // machine behaves as expected.
344
        historicalSyncReqs chan *historicalSyncReq
345

346
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
347
        // that it should request the remote peer for all of its known channel
348
        // IDs starting from the genesis block of the chain. This can only
349
        // happen if the gossip syncer receives a request to attempt a
350
        // historical sync. It can be unset if the syncer ever transitions from
351
        // PassiveSync to ActiveSync.
352
        genHistoricalChanRangeQuery bool
353

354
        // gossipMsgs is a channel that all responses to our queries from the
355
        // target peer will be sent over, these will be read by the
356
        // channelGraphSyncer.
357
        gossipMsgs chan lnwire.Message
358

359
        // queryMsgs is a channel that all queries from the remote peer will be
360
        // received over, these will be read by the replyHandler.
361
        queryMsgs chan lnwire.Message
362

363
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
364
        // we've sent to a peer to ensure we've consumed all expected replies.
365
        // This field is primarily used within the waitingQueryChanReply state.
366
        curQueryRangeMsg *lnwire.QueryChannelRange
367

368
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
369
        // message we've received from a peer to ensure they've fully replied to
370
        // our query by ensuring they covered our requested block range. This
371
        // field is primarily used within the waitingQueryChanReply state.
372
        prevReplyChannelRange *lnwire.ReplyChannelRange
373

374
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
375
        // buffer all the chunked response to our query.
376
        bufferedChanRangeReplies []graphdb.ChannelUpdateInfo
377

378
        // numChanRangeRepliesRcvd is used to track the number of replies
379
        // received as part of a QueryChannelRange. This field is primarily used
380
        // within the waitingQueryChanReply state.
381
        numChanRangeRepliesRcvd uint32
382

383
        // newChansToQuery is used to pass the set of channels we should query
384
        // for from the waitingQueryChanReply state to the queryNewChannels
385
        // state.
386
        newChansToQuery []lnwire.ShortChannelID
387

388
        cfg gossipSyncerCfg
389

390
        // rateLimiter dictates the frequency with which we will reply to gossip
391
        // queries from a peer. This is used to delay responses to peers to
392
        // prevent DOS vulnerabilities if they are spamming with an unreasonable
393
        // number of queries.
394
        rateLimiter *rate.Limiter
395

396
        // syncedSignal is a channel that, if set, will be closed when the
397
        // GossipSyncer reaches its terminal chansSynced state.
398
        syncedSignal chan struct{}
399

400
        // syncerSema is used to more finely control the syncer's ability to
401
        // respond to gossip timestamp range messages.
402
        syncerSema chan struct{}
403

404
        sync.Mutex
405

406
        quit chan struct{}
407
        wg   sync.WaitGroup
408
}
409

410
// newGossipSyncer returns a new instance of the GossipSyncer populated using
411
// the passed config.
412
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
51✔
413
        // If no parameter was specified for max undelayed query replies, set it
51✔
414
        // to the default of 5 queries.
51✔
415
        if cfg.maxUndelayedQueryReplies <= 0 {
77✔
416
                cfg.maxUndelayedQueryReplies = DefaultMaxUndelayedQueryReplies
26✔
417
        }
26✔
418

419
        // If no parameter was specified for delayed query reply interval, set
420
        // to the default of 5 seconds.
421
        if cfg.delayedQueryReplyInterval <= 0 {
51✔
422
                cfg.delayedQueryReplyInterval = DefaultDelayedQueryReplyInterval
×
423
        }
×
424

425
        // Construct a rate limiter that will govern how frequently we reply to
426
        // gossip queries from this peer. The limiter will automatically adjust
427
        // during periods of quiescence, and increase the reply interval under
428
        // load.
429
        interval := rate.Every(cfg.delayedQueryReplyInterval)
51✔
430
        rateLimiter := rate.NewLimiter(
51✔
431
                interval, cfg.maxUndelayedQueryReplies,
51✔
432
        )
51✔
433

51✔
434
        return &GossipSyncer{
51✔
435
                cfg:                cfg,
51✔
436
                rateLimiter:        rateLimiter,
51✔
437
                syncTransitionReqs: make(chan *syncTransitionReq),
51✔
438
                historicalSyncReqs: make(chan *historicalSyncReq),
51✔
439
                gossipMsgs:         make(chan lnwire.Message, 100),
51✔
440
                queryMsgs:          make(chan lnwire.Message, 100),
51✔
441
                syncerSema:         sema,
51✔
442
                quit:               make(chan struct{}),
51✔
443
        }
51✔
444
}
445

446
// Start starts the GossipSyncer and any goroutines that it needs to carry out
447
// its duties.
448
func (g *GossipSyncer) Start() {
37✔
449
        g.started.Do(func() {
74✔
450
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
37✔
451

37✔
452
                // TODO(conner): only spawn channelGraphSyncer if remote
37✔
453
                // supports gossip queries, and only spawn replyHandler if we
37✔
454
                // advertise support
37✔
455
                if !g.cfg.noSyncChannels {
72✔
456
                        g.wg.Add(1)
35✔
457
                        go g.channelGraphSyncer()
35✔
458
                }
35✔
459
                if !g.cfg.noReplyQueries {
72✔
460
                        g.wg.Add(1)
35✔
461
                        go g.replyHandler()
35✔
462
                }
35✔
463
        })
464
}
465

466
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
467
// exited.
468
func (g *GossipSyncer) Stop() {
34✔
469
        g.stopped.Do(func() {
68✔
470
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
34✔
471
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
34✔
472

34✔
473
                close(g.quit)
34✔
474
                g.wg.Wait()
34✔
475
        })
34✔
476
}
477

478
// channelGraphSyncer is the main goroutine responsible for ensuring that we
479
// properly channel graph state with the remote peer, and also that we only
480
// send them messages which actually pass their defined update horizon.
481
func (g *GossipSyncer) channelGraphSyncer() {
35✔
482
        defer g.wg.Done()
35✔
483

35✔
484
        for {
277✔
485
                state := g.syncState()
242✔
486
                syncType := g.SyncType()
242✔
487

242✔
488
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
242✔
489
                        g.cfg.peerPub[:], state, syncType)
242✔
490

242✔
491
                switch state {
242✔
492
                // When we're in this state, we're trying to synchronize our
493
                // view of the network with the remote peer. We'll kick off
494
                // this sync by asking them for the set of channels they
495
                // understand, as we'll as responding to any other queries by
496
                // them.
497
                case syncingChans:
21✔
498
                        // If we're in this state, then we'll send the remote
21✔
499
                        // peer our opening QueryChannelRange message.
21✔
500
                        queryRangeMsg, err := g.genChanRangeQuery(
21✔
501
                                g.genHistoricalChanRangeQuery,
21✔
502
                        )
21✔
503
                        if err != nil {
21✔
504
                                log.Errorf("Unable to gen chan range "+
×
505
                                        "query: %v", err)
×
506
                                return
×
507
                        }
×
508

509
                        err = g.cfg.sendToPeer(queryRangeMsg)
21✔
510
                        if err != nil {
21✔
511
                                log.Errorf("Unable to send chan range "+
×
512
                                        "query: %v", err)
×
513
                                return
×
514
                        }
×
515

516
                        // With the message sent successfully, we'll transition
517
                        // into the next state where we wait for their reply.
518
                        g.setSyncState(waitingQueryRangeReply)
21✔
519

520
                // In this state, we've sent out our initial channel range
521
                // query and are waiting for the final response from the remote
522
                // peer before we perform a diff to see with channels they know
523
                // of that we don't.
524
                case waitingQueryRangeReply:
133✔
525
                        // We'll wait to either process a new message from the
133✔
526
                        // remote party, or exit due to the gossiper exiting,
133✔
527
                        // or us being signalled to do so.
133✔
528
                        select {
133✔
529
                        case msg := <-g.gossipMsgs:
128✔
530
                                // The remote peer is sending a response to our
128✔
531
                                // initial query, we'll collate this response,
128✔
532
                                // and see if it's the final one in the series.
128✔
533
                                // If so, we can then transition to querying
128✔
534
                                // for the new channels.
128✔
535
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
128✔
536
                                if ok {
256✔
537
                                        err := g.processChanRangeReply(queryReply)
128✔
538
                                        if err != nil {
128✔
539
                                                log.Errorf("Unable to "+
×
540
                                                        "process chan range "+
×
541
                                                        "query: %v", err)
×
542
                                                return
×
543
                                        }
×
544
                                        continue
128✔
545
                                }
546

547
                                log.Warnf("Unexpected message: %T in state=%v",
×
548
                                        msg, state)
×
549

550
                        case <-g.quit:
5✔
551
                                return
5✔
552
                        }
553

554
                // We'll enter this state once we've discovered which channels
555
                // the remote party knows of that we don't yet know of
556
                // ourselves.
557
                case queryNewChannels:
15✔
558
                        // First, we'll attempt to continue our channel
15✔
559
                        // synchronization by continuing to send off another
15✔
560
                        // query chunk.
15✔
561
                        done, err := g.synchronizeChanIDs()
15✔
562
                        if err != nil {
15✔
563
                                log.Errorf("Unable to sync chan IDs: %v", err)
×
564
                        }
×
565

566
                        // If this wasn't our last query, then we'll need to
567
                        // transition to our waiting state.
568
                        if !done {
28✔
569
                                g.setSyncState(waitingQueryChanReply)
13✔
570
                                continue
13✔
571
                        }
572

573
                        // If we're fully synchronized, then we can transition
574
                        // to our terminal state.
575
                        g.setSyncState(chansSynced)
2✔
576

2✔
577
                        // Ensure that the sync manager becomes aware that the
2✔
578
                        // historical sync completed so synced_to_graph is
2✔
579
                        // updated over rpc.
2✔
580
                        g.cfg.markGraphSynced()
2✔
581

582
                // In this state, we've just sent off a new query for channels
583
                // that we don't yet know of. We'll remain in this state until
584
                // the remote party signals they've responded to our query in
585
                // totality.
586
                case waitingQueryChanReply:
13✔
587
                        // Once we've sent off our query, we'll wait for either
13✔
588
                        // an ending reply, or just another query from the
13✔
589
                        // remote peer.
13✔
590
                        select {
13✔
591
                        case msg := <-g.gossipMsgs:
13✔
592
                                // If this is the final reply to one of our
13✔
593
                                // queries, then we'll loop back into our query
13✔
594
                                // state to send of the remaining query chunks.
13✔
595
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
13✔
596
                                if ok {
26✔
597
                                        g.setSyncState(queryNewChannels)
13✔
598
                                        continue
13✔
599
                                }
600

601
                                log.Warnf("Unexpected message: %T in state=%v",
×
602
                                        msg, state)
×
603

604
                        case <-g.quit:
×
605
                                return
×
606
                        }
607

608
                // This is our final terminal state where we'll only reply to
609
                // any further queries by the remote peer.
610
                case chansSynced:
57✔
611
                        g.Lock()
57✔
612
                        if g.syncedSignal != nil {
65✔
613
                                close(g.syncedSignal)
8✔
614
                                g.syncedSignal = nil
8✔
615
                        }
8✔
616
                        g.Unlock()
57✔
617

57✔
618
                        // If we haven't yet sent out our update horizon, and
57✔
619
                        // we want to receive real-time channel updates, we'll
57✔
620
                        // do so now.
57✔
621
                        if g.localUpdateHorizon == nil &&
57✔
622
                                syncType.IsActiveSync() {
72✔
623

15✔
624
                                err := g.sendGossipTimestampRange(
15✔
625
                                        time.Now(), math.MaxUint32,
15✔
626
                                )
15✔
627
                                if err != nil {
15✔
628
                                        log.Errorf("Unable to send update "+
×
629
                                                "horizon to %x: %v",
×
630
                                                g.cfg.peerPub, err)
×
631
                                }
×
632
                        }
633
                        // With our horizon set, we'll simply reply to any new
634
                        // messages or process any state transitions and exit if
635
                        // needed.
636
                        fallthrough
57✔
637

638
                // Pinned peers will begin in this state, since they will
639
                // immediately receive a request to perform a historical sync.
640
                // Otherwise, we fall through after ending in chansSynced to
641
                // facilitate new requests.
642
                case syncerIdle:
60✔
643
                        select {
60✔
644
                        case req := <-g.syncTransitionReqs:
14✔
645
                                req.errChan <- g.handleSyncTransition(req)
14✔
646

647
                        case req := <-g.historicalSyncReqs:
16✔
648
                                g.handleHistoricalSync(req)
16✔
649

650
                        case <-g.quit:
27✔
651
                                return
27✔
652
                        }
653
                }
654
        }
655
}
656

657
// replyHandler is an event loop whose sole purpose is to reply to the remote
658
// peers queries. Our replyHandler will respond to messages generated by their
659
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
660
// the other's replyHandler, allowing the replyHandler to operate independently
661
// from the state machine maintained on the same node.
662
//
663
// NOTE: This method MUST be run as a goroutine.
664
func (g *GossipSyncer) replyHandler() {
35✔
665
        defer g.wg.Done()
35✔
666

35✔
667
        for {
87✔
668
                select {
52✔
669
                case msg := <-g.queryMsgs:
17✔
670
                        err := g.replyPeerQueries(msg)
17✔
671
                        switch {
17✔
672
                        case err == ErrGossipSyncerExiting:
×
673
                                return
×
674

675
                        case err == lnpeer.ErrPeerExiting:
×
676
                                return
×
677

678
                        case err != nil:
×
679
                                log.Errorf("Unable to reply to peer "+
×
680
                                        "query: %v", err)
×
681
                        }
682

683
                case <-g.quit:
32✔
684
                        return
32✔
685
                }
686
        }
687
}
688

689
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
690
// syncer and sends it to the remote peer.
691
func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
692
        timestampRange uint32) error {
29✔
693

29✔
694
        endTimestamp := firstTimestamp.Add(
29✔
695
                time.Duration(timestampRange) * time.Second,
29✔
696
        )
29✔
697

29✔
698
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
29✔
699
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
29✔
700

29✔
701
        localUpdateHorizon := &lnwire.GossipTimestampRange{
29✔
702
                ChainHash:      g.cfg.chainHash,
29✔
703
                FirstTimestamp: uint32(firstTimestamp.Unix()),
29✔
704
                TimestampRange: timestampRange,
29✔
705
        }
29✔
706

29✔
707
        if err := g.cfg.sendToPeer(localUpdateHorizon); err != nil {
30✔
708
                return err
1✔
709
        }
1✔
710

711
        if firstTimestamp == zeroTimestamp && timestampRange == 0 {
30✔
712
                g.localUpdateHorizon = nil
2✔
713
        } else {
28✔
714
                g.localUpdateHorizon = localUpdateHorizon
26✔
715
        }
26✔
716

717
        return nil
28✔
718
}
719

720
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
721
// the remote peer for its known set of channel IDs within a particular block
722
// range. This method will be called continually until the entire range has
723
// been queried for with a response received. We'll chunk our requests as
724
// required to ensure they fit into a single message. We may re-renter this
725
// state in the case that chunking is required.
726
func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
18✔
727
        // If we're in this state yet there are no more new channels to query
18✔
728
        // for, then we'll transition to our final synced state and return true
18✔
729
        // to signal that we're fully synchronized.
18✔
730
        if len(g.newChansToQuery) == 0 {
20✔
731
                log.Infof("GossipSyncer(%x): no more chans to query",
2✔
732
                        g.cfg.peerPub[:])
2✔
733
                return true, nil
2✔
734
        }
2✔
735

736
        // Otherwise, we'll issue our next chunked query to receive replies
737
        // for.
738
        var queryChunk []lnwire.ShortChannelID
16✔
739

16✔
740
        // If the number of channels to query for is less than the chunk size,
16✔
741
        // then we can issue a single query.
16✔
742
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
18✔
743
                queryChunk = g.newChansToQuery
2✔
744
                g.newChansToQuery = nil
2✔
745

2✔
746
        } else {
16✔
747
                // Otherwise, we'll need to only query for the next chunk.
14✔
748
                // We'll slice into our query chunk, then slide down our main
14✔
749
                // pointer down by the chunk size.
14✔
750
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
14✔
751
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
14✔
752
        }
14✔
753

754
        log.Infof("GossipSyncer(%x): querying for %v new channels",
16✔
755
                g.cfg.peerPub[:], len(queryChunk))
16✔
756

16✔
757
        // With our chunk obtained, we'll send over our next query, then return
16✔
758
        // false indicating that we're net yet fully synced.
16✔
759
        err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
16✔
760
                ChainHash:    g.cfg.chainHash,
16✔
761
                EncodingType: lnwire.EncodingSortedPlain,
16✔
762
                ShortChanIDs: queryChunk,
16✔
763
        })
16✔
764

16✔
765
        return false, err
16✔
766
}
767

768
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
769
// considered legacy. There was a point where lnd used to include the same query
770
// over multiple replies, rather than including the portion of the query the
771
// reply is handling. We'll use this as a way of detecting whether we are
772
// communicating with a legacy node so we can properly sync with them.
773
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
774
        reply *lnwire.ReplyChannelRange) bool {
272✔
775

272✔
776
        return (reply.ChainHash == query.ChainHash &&
272✔
777
                reply.FirstBlockHeight == query.FirstBlockHeight &&
272✔
778
                reply.NumBlocks == query.NumBlocks)
272✔
779
}
272✔
780

781
// processChanRangeReply is called each time the GossipSyncer receives a new
782
// reply to the initial range query to discover new channels that it didn't
783
// previously know of.
784
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
136✔
785
        // isStale returns whether the timestamp is too far into the past.
136✔
786
        isStale := func(timestamp time.Time) bool {
166✔
787
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
30✔
788
        }
30✔
789

790
        // isSkewed returns whether the timestamp is too far into the future.
791
        isSkewed := func(timestamp time.Time) bool {
156✔
792
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
20✔
793
        }
20✔
794

795
        // If we're not communicating with a legacy node, we'll apply some
796
        // further constraints on their reply to ensure it satisfies our query.
797
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
257✔
798
                // The first block should be within our original request.
121✔
799
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
121✔
800
                        return fmt.Errorf("reply includes channels for height "+
×
801
                                "%v prior to query %v", msg.FirstBlockHeight,
×
802
                                g.curQueryRangeMsg.FirstBlockHeight)
×
803
                }
×
804

805
                // The last block should also be. We don't need to check the
806
                // intermediate ones because they should already be in sorted
807
                // order.
808
                replyLastHeight := msg.LastBlockHeight()
121✔
809
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
121✔
810
                if replyLastHeight > queryLastHeight {
121✔
811
                        return fmt.Errorf("reply includes channels for height "+
×
812
                                "%v after query %v", replyLastHeight,
×
813
                                queryLastHeight)
×
814
                }
×
815

816
                // If we've previously received a reply for this query, look at
817
                // its last block to ensure the current reply properly follows
818
                // it.
819
                if g.prevReplyChannelRange != nil {
236✔
820
                        prevReply := g.prevReplyChannelRange
115✔
821
                        prevReplyLastHeight := prevReply.LastBlockHeight()
115✔
822

115✔
823
                        // The current reply can either start from the previous
115✔
824
                        // reply's last block, if there are still more channels
115✔
825
                        // for the same block, or the block after.
115✔
826
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
115✔
827
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
115✔
828

×
829
                                return fmt.Errorf("first block of reply %v "+
×
830
                                        "does not continue from last block of "+
×
831
                                        "previous %v", msg.FirstBlockHeight,
×
832
                                        prevReplyLastHeight)
×
833
                        }
×
834
                }
835
        }
836

837
        g.prevReplyChannelRange = msg
136✔
838

136✔
839
        for i, scid := range msg.ShortChanIDs {
285✔
840
                info := graphdb.NewChannelUpdateInfo(
149✔
841
                        scid, time.Time{}, time.Time{},
149✔
842
                )
149✔
843

149✔
844
                if len(msg.Timestamps) != 0 {
161✔
845
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
12✔
846
                        info.Node1UpdateTimestamp = t1
12✔
847

12✔
848
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
12✔
849
                        info.Node2UpdateTimestamp = t2
12✔
850

12✔
851
                        // Sort out all channels with outdated or skewed
12✔
852
                        // timestamps. Both timestamps need to be out of
12✔
853
                        // boundaries for us to skip the channel and not query
12✔
854
                        // it later on.
12✔
855
                        switch {
12✔
856
                        case isStale(info.Node1UpdateTimestamp) &&
857
                                isStale(info.Node2UpdateTimestamp):
2✔
858

2✔
859
                                continue
2✔
860

861
                        case isSkewed(info.Node1UpdateTimestamp) &&
862
                                isSkewed(info.Node2UpdateTimestamp):
2✔
863

2✔
864
                                continue
2✔
865

866
                        case isStale(info.Node1UpdateTimestamp) &&
867
                                isSkewed(info.Node2UpdateTimestamp):
2✔
868

2✔
869
                                continue
2✔
870

871
                        case isStale(info.Node2UpdateTimestamp) &&
872
                                isSkewed(info.Node1UpdateTimestamp):
2✔
873

2✔
874
                                continue
2✔
875
                        }
876
                }
877

878
                g.bufferedChanRangeReplies = append(
141✔
879
                        g.bufferedChanRangeReplies, info,
141✔
880
                )
141✔
881
        }
882

883
        switch g.cfg.encodingType {
136✔
884
        case lnwire.EncodingSortedPlain:
136✔
885
                g.numChanRangeRepliesRcvd++
136✔
886
        case lnwire.EncodingSortedZlib:
×
887
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
888
        default:
×
889
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
890
        }
891

892
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
136✔
893
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
136✔
894

136✔
895
        // If this isn't the last response and we can continue to receive more,
136✔
896
        // then we can exit as we've already buffered the latest portion of the
136✔
897
        // streaming reply.
136✔
898
        maxReplies := g.cfg.maxQueryChanRangeReplies
136✔
899
        switch {
136✔
900
        // If we're communicating with a legacy node, we'll need to look at the
901
        // complete field.
902
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
15✔
903
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
18✔
904
                        return nil
3✔
905
                }
3✔
906

907
        // Otherwise, we'll look at the reply's height range.
908
        default:
121✔
909
                replyLastHeight := msg.LastBlockHeight()
121✔
910
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
121✔
911

121✔
912
                // TODO(wilmer): This might require some padding if the remote
121✔
913
                // node is not aware of the last height we sent them, i.e., is
121✔
914
                // behind a few blocks from us.
121✔
915
                if replyLastHeight < queryLastHeight &&
121✔
916
                        g.numChanRangeRepliesRcvd < maxReplies {
236✔
917

115✔
918
                        return nil
115✔
919
                }
115✔
920
        }
921

922
        log.Infof("GossipSyncer(%x): filtering through %v chans",
18✔
923
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
18✔
924

18✔
925
        // Otherwise, this is the final response, so we'll now check to see
18✔
926
        // which channels they know of that we don't.
18✔
927
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
18✔
928
                g.cfg.chainHash, g.bufferedChanRangeReplies,
18✔
929
                g.cfg.isStillZombieChannel,
18✔
930
        )
18✔
931
        if err != nil {
18✔
932
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
933
        }
×
934

935
        // As we've received the entirety of the reply, we no longer need to
936
        // hold on to the set of buffered replies or the original query that
937
        // prompted the replies, so we'll let that be garbage collected now.
938
        g.curQueryRangeMsg = nil
18✔
939
        g.prevReplyChannelRange = nil
18✔
940
        g.bufferedChanRangeReplies = nil
18✔
941
        g.numChanRangeRepliesRcvd = 0
18✔
942

18✔
943
        // If there aren't any channels that we don't know of, then we can
18✔
944
        // switch straight to our terminal state.
18✔
945
        if len(newChans) == 0 {
32✔
946
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
14✔
947
                        g.cfg.peerPub[:])
14✔
948

14✔
949
                g.setSyncState(chansSynced)
14✔
950

14✔
951
                // Ensure that the sync manager becomes aware that the
14✔
952
                // historical sync completed so synced_to_graph is updated over
14✔
953
                // rpc.
14✔
954
                g.cfg.markGraphSynced()
14✔
955
                return nil
14✔
956
        }
14✔
957

958
        // Otherwise, we'll set the set of channels that we need to query for
959
        // the next state, and also transition our state.
960
        g.newChansToQuery = newChans
4✔
961
        g.setSyncState(queryNewChannels)
4✔
962

4✔
963
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
4✔
964
                g.cfg.peerPub[:], len(newChans))
4✔
965

4✔
966
        return nil
4✔
967
}
968

969
// genChanRangeQuery generates the initial message we'll send to the remote
970
// party when we're kicking off the channel graph synchronization upon
971
// connection. The historicalQuery boolean can be used to generate a query from
972
// the genesis block of the chain.
973
func (g *GossipSyncer) genChanRangeQuery(
974
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
25✔
975

25✔
976
        // First, we'll query our channel graph time series for its highest
25✔
977
        // known channel ID.
25✔
978
        newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
25✔
979
        if err != nil {
25✔
980
                return nil, err
×
981
        }
×
982

983
        // Once we have the chan ID of the newest, we'll obtain the block height
984
        // of the channel, then subtract our default horizon to ensure we don't
985
        // miss any channels. By default, we go back 1 day from the newest
986
        // channel, unless we're attempting a historical sync, where we'll
987
        // actually start from the genesis block instead.
988
        var startHeight uint32
25✔
989
        switch {
25✔
990
        case historicalQuery:
19✔
991
                fallthrough
19✔
992
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
19✔
993
                startHeight = 0
19✔
994
        default:
6✔
995
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
6✔
996
        }
997

998
        // Determine the number of blocks to request based on our best height.
999
        // We'll take into account any potential underflows and explicitly set
1000
        // numBlocks to its minimum value of 1 if so.
1001
        bestHeight := g.cfg.bestHeight()
25✔
1002
        numBlocks := bestHeight - startHeight
25✔
1003
        if int64(numBlocks) < 1 {
25✔
1004
                numBlocks = 1
×
1005
        }
×
1006

1007
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
25✔
1008
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
25✔
1009

25✔
1010
        // Finally, we'll craft the channel range query, using our starting
25✔
1011
        // height, then asking for all known channels to the foreseeable end of
25✔
1012
        // the main chain.
25✔
1013
        query := &lnwire.QueryChannelRange{
25✔
1014
                ChainHash:        g.cfg.chainHash,
25✔
1015
                FirstBlockHeight: startHeight,
25✔
1016
                NumBlocks:        numBlocks,
25✔
1017
        }
25✔
1018

25✔
1019
        if !g.cfg.noTimestampQueryOption {
41✔
1020
                query.QueryOptions = lnwire.NewTimestampQueryOption()
16✔
1021
        }
16✔
1022

1023
        g.curQueryRangeMsg = query
25✔
1024

25✔
1025
        return query, nil
25✔
1026
}
1027

1028
// replyPeerQueries is called in response to any query by the remote peer.
1029
// We'll examine our state and send back our best response.
1030
func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error {
17✔
1031
        reservation := g.rateLimiter.Reserve()
17✔
1032
        delay := reservation.Delay()
17✔
1033

17✔
1034
        // If we've already replied a handful of times, we will start to delay
17✔
1035
        // responses back to the remote peer. This can help prevent DOS attacks
17✔
1036
        // where the remote peer spams us endlessly.
17✔
1037
        if delay > 0 {
19✔
1038
                log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
2✔
1039
                        "responding in %s", g.cfg.peerPub[:], delay)
2✔
1040

2✔
1041
                select {
2✔
1042
                case <-time.After(delay):
2✔
1043
                case <-g.quit:
×
1044
                        return ErrGossipSyncerExiting
×
1045
                }
1046
        }
1047

1048
        switch msg := msg.(type) {
17✔
1049

1050
        // In this state, we'll also handle any incoming channel range queries
1051
        // from the remote peer as they're trying to sync their state as well.
1052
        case *lnwire.QueryChannelRange:
4✔
1053
                return g.replyChanRangeQuery(msg)
4✔
1054

1055
        // If the remote peer skips straight to requesting new channels that
1056
        // they don't know of, then we'll ensure that we also handle this case.
1057
        case *lnwire.QueryShortChanIDs:
13✔
1058
                return g.replyShortChanIDs(msg)
13✔
1059

1060
        default:
×
1061
                return fmt.Errorf("unknown message: %T", msg)
×
1062
        }
1063
}
1064

1065
// replyChanRangeQuery will be dispatched in response to a channel range query
1066
// by the remote node. We'll query the channel time series for channels that
1067
// meet the channel range, then chunk our responses to the remote node. We also
1068
// ensure that our final fragment carries the "complete" bit to indicate the
1069
// end of our streaming response.
1070
func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
10✔
1071
        // Before responding, we'll check to ensure that the remote peer is
10✔
1072
        // querying for the same chain that we're on. If not, we'll send back a
10✔
1073
        // response with a complete value of zero to indicate we're on a
10✔
1074
        // different chain.
10✔
1075
        if g.cfg.chainHash != query.ChainHash {
11✔
1076
                log.Warnf("Remote peer requested QueryChannelRange for "+
1✔
1077
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1078
                        g.cfg.chainHash)
1✔
1079

1✔
1080
                return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
1✔
1081
                        ChainHash:        query.ChainHash,
1✔
1082
                        FirstBlockHeight: query.FirstBlockHeight,
1✔
1083
                        NumBlocks:        query.NumBlocks,
1✔
1084
                        Complete:         0,
1✔
1085
                        EncodingType:     g.cfg.encodingType,
1✔
1086
                        ShortChanIDs:     nil,
1✔
1087
                })
1✔
1088
        }
1✔
1089

1090
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
9✔
1091
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
9✔
1092
                query.NumBlocks)
9✔
1093

9✔
1094
        // Check if the query asked for timestamps. We will only serve
9✔
1095
        // timestamps if this has not been disabled with
9✔
1096
        // noTimestampQueryOption.
9✔
1097
        withTimestamps := query.WithTimestamps() &&
9✔
1098
                !g.cfg.noTimestampQueryOption
9✔
1099

9✔
1100
        // Next, we'll consult the time series to obtain the set of known
9✔
1101
        // channel ID's that match their query.
9✔
1102
        startBlock := query.FirstBlockHeight
9✔
1103
        endBlock := query.LastBlockHeight()
9✔
1104
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
9✔
1105
                query.ChainHash, startBlock, endBlock, withTimestamps,
9✔
1106
        )
9✔
1107
        if err != nil {
9✔
1108
                return err
×
1109
        }
×
1110

1111
        // TODO(roasbeef): means can't send max uint above?
1112
        //  * or make internal 64
1113

1114
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1115
        // this as there's a transport message size limit which we'll need to
1116
        // adhere to. We also need to make sure all of our replies cover the
1117
        // expected range of the query.
1118
        sendReplyForChunk := func(channelChunk []graphdb.ChannelUpdateInfo,
9✔
1119
                firstHeight, lastHeight uint32, finalChunk bool) error {
33✔
1120

24✔
1121
                // The number of blocks contained in the current chunk (the
24✔
1122
                // total span) is the difference between the last channel ID and
24✔
1123
                // the first in the range. We add one as even if all channels
24✔
1124
                // returned are in the same block, we need to count that.
24✔
1125
                numBlocks := lastHeight - firstHeight + 1
24✔
1126
                complete := uint8(0)
24✔
1127
                if finalChunk {
33✔
1128
                        complete = 1
9✔
1129
                }
9✔
1130

1131
                var timestamps lnwire.Timestamps
24✔
1132
                if withTimestamps {
24✔
1133
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
×
1134
                }
×
1135

1136
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
24✔
1137
                for i, info := range channelChunk {
60✔
1138
                        scids[i] = info.ShortChannelID
36✔
1139

36✔
1140
                        if !withTimestamps {
72✔
1141
                                continue
36✔
1142
                        }
1143

1144
                        timestamps[i].Timestamp1 = uint32(
×
1145
                                info.Node1UpdateTimestamp.Unix(),
×
1146
                        )
×
1147

×
1148
                        timestamps[i].Timestamp2 = uint32(
×
1149
                                info.Node2UpdateTimestamp.Unix(),
×
1150
                        )
×
1151
                }
1152

1153
                return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
24✔
1154
                        ChainHash:        query.ChainHash,
24✔
1155
                        NumBlocks:        numBlocks,
24✔
1156
                        FirstBlockHeight: firstHeight,
24✔
1157
                        Complete:         complete,
24✔
1158
                        EncodingType:     g.cfg.encodingType,
24✔
1159
                        ShortChanIDs:     scids,
24✔
1160
                        Timestamps:       timestamps,
24✔
1161
                })
24✔
1162
        }
1163

1164
        var (
9✔
1165
                firstHeight  = query.FirstBlockHeight
9✔
1166
                lastHeight   uint32
9✔
1167
                channelChunk []graphdb.ChannelUpdateInfo
9✔
1168
        )
9✔
1169

9✔
1170
        // chunkSize is the maximum number of SCIDs that we can safely put in a
9✔
1171
        // single message. If we also need to include timestamps though, then
9✔
1172
        // this number is halved since encoding two timestamps takes the same
9✔
1173
        // number of bytes as encoding an SCID.
9✔
1174
        chunkSize := g.cfg.chunkSize
9✔
1175
        if withTimestamps {
9✔
1176
                chunkSize /= 2
×
1177
        }
×
1178

1179
        for _, channelRange := range channelRanges {
45✔
1180
                channels := channelRange.Channels
36✔
1181
                numChannels := int32(len(channels))
36✔
1182
                numLeftToAdd := chunkSize - int32(len(channelChunk))
36✔
1183

36✔
1184
                // Include the current block in the ongoing chunk if it can fit
36✔
1185
                // and move on to the next block.
36✔
1186
                if numChannels <= numLeftToAdd {
57✔
1187
                        channelChunk = append(channelChunk, channels...)
21✔
1188
                        continue
21✔
1189
                }
1190

1191
                // Otherwise, we need to send our existing channel chunk as is
1192
                // as its own reply and start a new one for the current block.
1193
                // We'll mark the end of our current chunk as the height before
1194
                // the current block to ensure the whole query range is replied
1195
                // to.
1196
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
15✔
1197
                        g.cfg.peerPub[:], len(channelChunk))
15✔
1198

15✔
1199
                lastHeight = channelRange.Height - 1
15✔
1200
                err := sendReplyForChunk(
15✔
1201
                        channelChunk, firstHeight, lastHeight, false,
15✔
1202
                )
15✔
1203
                if err != nil {
15✔
1204
                        return err
×
1205
                }
×
1206

1207
                // With the reply constructed, we'll start tallying channels for
1208
                // our next one keeping in mind our chunk size. This may result
1209
                // in channels for this block being left out from the reply, but
1210
                // this isn't an issue since we'll randomly shuffle them and we
1211
                // assume a historical gossip sync is performed at a later time.
1212
                firstHeight = channelRange.Height
15✔
1213
                finalChunkSize := numChannels
15✔
1214
                exceedsChunkSize := numChannels > chunkSize
15✔
1215
                if exceedsChunkSize {
15✔
1216
                        rand.Shuffle(len(channels), func(i, j int) {
×
1217
                                channels[i], channels[j] = channels[j], channels[i]
×
1218
                        })
×
1219
                        finalChunkSize = chunkSize
×
1220
                }
1221
                channelChunk = channels[:finalChunkSize]
15✔
1222

15✔
1223
                // Sort the chunk once again if we had to shuffle it.
15✔
1224
                if exceedsChunkSize {
15✔
1225
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1226
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1227
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1228

×
1229
                                return id1 < id2
×
1230
                        })
×
1231
                }
1232
        }
1233

1234
        // Send the remaining chunk as the final reply.
1235
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
9✔
1236
                g.cfg.peerPub[:], len(channelChunk))
9✔
1237

9✔
1238
        return sendReplyForChunk(
9✔
1239
                channelChunk, firstHeight, query.LastBlockHeight(), true,
9✔
1240
        )
9✔
1241
}
1242

1243
// replyShortChanIDs will be dispatched in response to a query by the remote
1244
// node for information concerning a set of short channel ID's. Our response
1245
// will be sent in a streaming chunked manner to ensure that we remain below
1246
// the current transport level message size.
1247
func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error {
15✔
1248
        // Before responding, we'll check to ensure that the remote peer is
15✔
1249
        // querying for the same chain that we're on. If not, we'll send back a
15✔
1250
        // response with a complete value of zero to indicate we're on a
15✔
1251
        // different chain.
15✔
1252
        if g.cfg.chainHash != query.ChainHash {
16✔
1253
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
1✔
1254
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1255
                        g.cfg.chainHash)
1✔
1256

1✔
1257
                return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
1✔
1258
                        ChainHash: query.ChainHash,
1✔
1259
                        Complete:  0,
1✔
1260
                })
1✔
1261
        }
1✔
1262

1263
        if len(query.ShortChanIDs) == 0 {
14✔
1264
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1265
                        g.cfg.peerPub[:])
×
1266
                return nil
×
1267
        }
×
1268

1269
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
14✔
1270
                g.cfg.peerPub[:], len(query.ShortChanIDs))
14✔
1271

14✔
1272
        // Now that we know we're on the same chain, we'll query the channel
14✔
1273
        // time series for the set of messages that we know of which satisfies
14✔
1274
        // the requirement of being a chan ann, chan update, or a node ann
14✔
1275
        // related to the set of queried channels.
14✔
1276
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
14✔
1277
                query.ChainHash, query.ShortChanIDs,
14✔
1278
        )
14✔
1279
        if err != nil {
14✔
1280
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1281
                        query.ShortChanIDs[0].ToUint64(), err)
×
1282
        }
×
1283

1284
        // Reply with any messages related to those channel ID's, we'll write
1285
        // each one individually and synchronously to throttle the sends and
1286
        // perform buffering of responses in the syncer as opposed to the peer.
1287
        for _, msg := range replyMsgs {
17✔
1288
                err := g.cfg.sendToPeerSync(msg)
3✔
1289
                if err != nil {
3✔
1290
                        return err
×
1291
                }
×
1292
        }
1293

1294
        // Regardless of whether we had any messages to reply with, send over
1295
        // the sentinel message to signal that the stream has terminated.
1296
        return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
14✔
1297
                ChainHash: query.ChainHash,
14✔
1298
                Complete:  1,
14✔
1299
        })
14✔
1300
}
1301

1302
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1303
// state machine. Once applied, we'll ensure that we don't forward any messages
1304
// to the peer that aren't within the time range of the filter.
1305
func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error {
3✔
1306
        g.Lock()
3✔
1307

3✔
1308
        g.remoteUpdateHorizon = filter
3✔
1309

3✔
1310
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
3✔
1311
        endTime := startTime.Add(
3✔
1312
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
3✔
1313
        )
3✔
1314

3✔
1315
        g.Unlock()
3✔
1316

3✔
1317
        // If requested, don't reply with historical gossip data when the remote
3✔
1318
        // peer sets their gossip timestamp range.
3✔
1319
        if g.cfg.ignoreHistoricalFilters {
4✔
1320
                return nil
1✔
1321
        }
1✔
1322

1323
        select {
2✔
1324
        case <-g.syncerSema:
2✔
1325
        case <-g.quit:
×
1326
                return ErrGossipSyncerExiting
×
1327
        }
1328

1329
        // We don't put this in a defer because if the goroutine is launched,
1330
        // it needs to be called when the goroutine is stopped.
1331
        returnSema := func() {
4✔
1332
                g.syncerSema <- struct{}{}
2✔
1333
        }
2✔
1334

1335
        // Now that the remote peer has applied their filter, we'll query the
1336
        // database for all the messages that are beyond this filter.
1337
        newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
2✔
1338
                g.cfg.chainHash, startTime, endTime,
2✔
1339
        )
2✔
1340
        if err != nil {
2✔
1341
                returnSema()
×
1342
                return err
×
1343
        }
×
1344

1345
        log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+
2✔
1346
                "end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime,
2✔
1347
                len(newUpdatestoSend))
2✔
1348

2✔
1349
        // If we don't have any to send, then we can return early.
2✔
1350
        if len(newUpdatestoSend) == 0 {
3✔
1351
                returnSema()
1✔
1352
                return nil
1✔
1353
        }
1✔
1354

1355
        // We'll conclude by launching a goroutine to send out any updates.
1356
        g.wg.Add(1)
1✔
1357
        go func() {
2✔
1358
                defer g.wg.Done()
1✔
1359
                defer returnSema()
1✔
1360

1✔
1361
                for _, msg := range newUpdatestoSend {
2✔
1362
                        err := g.cfg.sendToPeerSync(msg)
1✔
1363
                        switch {
1✔
1364
                        case err == ErrGossipSyncerExiting:
×
1365
                                return
×
1366

1367
                        case err == lnpeer.ErrPeerExiting:
×
1368
                                return
×
1369

1370
                        case err != nil:
×
1371
                                log.Errorf("Unable to send message for "+
×
1372
                                        "peer catch up: %v", err)
×
1373
                        }
1374
                }
1375
        }()
1376

1377
        return nil
1✔
1378
}
1379

1380
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1381
// iff the message is within the bounds of their set gossip filter. If the peer
1382
// doesn't have a gossip filter set, then no messages will be forwarded.
1383
func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
2✔
1384
        // If the peer doesn't have an update horizon set, then we won't send
2✔
1385
        // it any new update messages.
2✔
1386
        if g.remoteUpdateHorizon == nil {
3✔
1387
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
1✔
1388
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
1✔
1389
                return
1✔
1390
        }
1✔
1391

1392
        // If we've been signaled to exit, or are exiting, then we'll stop
1393
        // short.
1394
        select {
1✔
1395
        case <-g.quit:
×
1396
                return
×
1397
        default:
1✔
1398
        }
1399

1400
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1401
        // gossiper on peer termination to signal peer disconnect?
1402

1403
        var err error
1✔
1404

1✔
1405
        // Before we filter out the messages, we'll construct an index over the
1✔
1406
        // set of channel announcements and channel updates. This will allow us
1✔
1407
        // to quickly check if we should forward a chan ann, based on the known
1✔
1408
        // channel updates for a channel.
1✔
1409
        chanUpdateIndex := make(
1✔
1410
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
1✔
1411
        )
1✔
1412
        for _, msg := range msgs {
11✔
1413
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
10✔
1414
                if !ok {
17✔
1415
                        continue
7✔
1416
                }
1417

1418
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
3✔
1419
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
3✔
1420
                )
3✔
1421
        }
1422

1423
        // We'll construct a helper function that we'll us below to determine
1424
        // if a given messages passes the gossip msg filter.
1425
        g.Lock()
1✔
1426
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
1✔
1427
        endTime := startTime.Add(
1✔
1428
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
1✔
1429
        )
1✔
1430
        g.Unlock()
1✔
1431

1✔
1432
        passesFilter := func(timeStamp uint32) bool {
11✔
1433
                t := time.Unix(int64(timeStamp), 0)
10✔
1434
                return t.Equal(startTime) ||
10✔
1435
                        (t.After(startTime) && t.Before(endTime))
10✔
1436
        }
10✔
1437

1438
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
1✔
1439
        for _, msg := range msgs {
11✔
1440
                // If the target peer is the peer that sent us this message,
10✔
1441
                // then we'll exit early as we don't need to filter this
10✔
1442
                // message.
10✔
1443
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
10✔
1444
                        continue
×
1445
                }
1446

1447
                switch msg := msg.msg.(type) {
10✔
1448

1449
                // For each channel announcement message, we'll only send this
1450
                // message if the channel updates for the channel are between
1451
                // our time range.
1452
                case *lnwire.ChannelAnnouncement1:
4✔
1453
                        // First, we'll check if the channel updates are in
4✔
1454
                        // this message batch.
4✔
1455
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
4✔
1456
                        if !ok {
5✔
1457
                                // If not, we'll attempt to query the database
1✔
1458
                                // to see if we know of the updates.
1✔
1459
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
1✔
1460
                                        g.cfg.chainHash, msg.ShortChannelID,
1✔
1461
                                )
1✔
1462
                                if err != nil {
1✔
1463
                                        log.Warnf("no channel updates found for "+
×
1464
                                                "short_chan_id=%v",
×
1465
                                                msg.ShortChannelID)
×
1466
                                        continue
×
1467
                                }
1468
                        }
1469

1470
                        for _, chanUpdate := range chanUpdates {
8✔
1471
                                if passesFilter(chanUpdate.Timestamp) {
5✔
1472
                                        msgsToSend = append(msgsToSend, msg)
1✔
1473
                                        break
1✔
1474
                                }
1475
                        }
1476

1477
                        if len(chanUpdates) == 0 {
4✔
1478
                                msgsToSend = append(msgsToSend, msg)
×
1479
                        }
×
1480

1481
                // For each channel update, we'll only send if it the timestamp
1482
                // is between our time range.
1483
                case *lnwire.ChannelUpdate1:
3✔
1484
                        if passesFilter(msg.Timestamp) {
4✔
1485
                                msgsToSend = append(msgsToSend, msg)
1✔
1486
                        }
1✔
1487

1488
                // Similarly, we only send node announcements if the update
1489
                // timestamp ifs between our set gossip filter time range.
1490
                case *lnwire.NodeAnnouncement:
3✔
1491
                        if passesFilter(msg.Timestamp) {
4✔
1492
                                msgsToSend = append(msgsToSend, msg)
1✔
1493
                        }
1✔
1494
                }
1495
        }
1496

1497
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
1✔
1498
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
1✔
1499

1✔
1500
        if len(msgsToSend) == 0 {
1✔
1501
                return
×
1502
        }
×
1503

1504
        g.cfg.sendToPeer(msgsToSend...)
1✔
1505
}
1506

1507
// ProcessQueryMsg is used by outside callers to pass new channel time series
1508
// queries to the internal processing goroutine.
1509
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
112✔
1510
        var msgChan chan lnwire.Message
112✔
1511
        switch msg.(type) {
112✔
1512
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
×
1513
                msgChan = g.queryMsgs
×
1514

1515
        // Reply messages should only be expected in states where we're waiting
1516
        // for a reply.
1517
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
112✔
1518
                syncState := g.syncState()
112✔
1519
                if syncState != waitingQueryRangeReply &&
112✔
1520
                        syncState != waitingQueryChanReply {
113✔
1521

1✔
1522
                        return fmt.Errorf("received unexpected query reply "+
1✔
1523
                                "message %T", msg)
1✔
1524
                }
1✔
1525
                msgChan = g.gossipMsgs
111✔
1526

1527
        default:
×
1528
                msgChan = g.gossipMsgs
×
1529
        }
1530

1531
        select {
111✔
1532
        case msgChan <- msg:
111✔
1533
        case <-peerQuit:
×
1534
        case <-g.quit:
×
1535
        }
1536

1537
        return nil
111✔
1538
}
1539

1540
// setSyncState sets the gossip syncer's state to the given state.
1541
func (g *GossipSyncer) setSyncState(state syncerState) {
116✔
1542
        atomic.StoreUint32(&g.state, uint32(state))
116✔
1543
}
116✔
1544

1545
// syncState returns the current syncerState of the target GossipSyncer.
1546
func (g *GossipSyncer) syncState() syncerState {
461✔
1547
        return syncerState(atomic.LoadUint32(&g.state))
461✔
1548
}
461✔
1549

1550
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1551
// a signal for when the GossipSyncer has reached its chansSynced state.
1552
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
14✔
1553
        g.Lock()
14✔
1554
        defer g.Unlock()
14✔
1555

14✔
1556
        syncedSignal := make(chan struct{})
14✔
1557

14✔
1558
        syncState := syncerState(atomic.LoadUint32(&g.state))
14✔
1559
        if syncState == chansSynced {
16✔
1560
                close(syncedSignal)
2✔
1561
                return syncedSignal
2✔
1562
        }
2✔
1563

1564
        g.syncedSignal = syncedSignal
12✔
1565
        return g.syncedSignal
12✔
1566
}
1567

1568
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1569
// sync type to a new one.
1570
//
1571
// NOTE: This can only be done once the gossip syncer has reached its final
1572
// chansSynced state.
1573
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
14✔
1574
        errChan := make(chan error, 1)
14✔
1575
        select {
14✔
1576
        case g.syncTransitionReqs <- &syncTransitionReq{
1577
                newSyncType: newSyncType,
1578
                errChan:     errChan,
1579
        }:
14✔
1580
        case <-time.After(syncTransitionTimeout):
×
1581
                return ErrSyncTransitionTimeout
×
1582
        case <-g.quit:
×
1583
                return ErrGossipSyncerExiting
×
1584
        }
1585

1586
        select {
14✔
1587
        case err := <-errChan:
14✔
1588
                return err
14✔
1589
        case <-g.quit:
×
1590
                return ErrGossipSyncerExiting
×
1591
        }
1592
}
1593

1594
// handleSyncTransition handles a new sync type transition request.
1595
//
1596
// NOTE: The gossip syncer might have another sync state as a result of this
1597
// transition.
1598
func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error {
14✔
1599
        // Return early from any NOP sync transitions.
14✔
1600
        syncType := g.SyncType()
14✔
1601
        if syncType == req.newSyncType {
14✔
1602
                return nil
×
1603
        }
×
1604

1605
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
14✔
1606
                g.cfg.peerPub, syncType, req.newSyncType)
14✔
1607

14✔
1608
        var (
14✔
1609
                firstTimestamp time.Time
14✔
1610
                timestampRange uint32
14✔
1611
        )
14✔
1612

14✔
1613
        switch req.newSyncType {
14✔
1614
        // If an active sync has been requested, then we should resume receiving
1615
        // new graph updates from the remote peer.
1616
        case ActiveSync, PinnedSync:
12✔
1617
                firstTimestamp = time.Now()
12✔
1618
                timestampRange = math.MaxUint32
12✔
1619

1620
        // If a PassiveSync transition has been requested, then we should no
1621
        // longer receive any new updates from the remote peer. We can do this
1622
        // by setting our update horizon to a range in the past ensuring no
1623
        // graph updates match the timestamp range.
1624
        case PassiveSync:
2✔
1625
                firstTimestamp = zeroTimestamp
2✔
1626
                timestampRange = 0
2✔
1627

1628
        default:
×
1629
                return fmt.Errorf("unhandled sync transition %v",
×
1630
                        req.newSyncType)
×
1631
        }
1632

1633
        err := g.sendGossipTimestampRange(firstTimestamp, timestampRange)
14✔
1634
        if err != nil {
15✔
1635
                return fmt.Errorf("unable to send local update horizon: %w",
1✔
1636
                        err)
1✔
1637
        }
1✔
1638

1639
        g.setSyncType(req.newSyncType)
13✔
1640

13✔
1641
        return nil
13✔
1642
}
1643

1644
// setSyncType sets the gossip syncer's sync type to the given type.
1645
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
66✔
1646
        atomic.StoreUint32(&g.syncType, uint32(syncType))
66✔
1647
}
66✔
1648

1649
// SyncType returns the current SyncerType of the target GossipSyncer.
1650
func (g *GossipSyncer) SyncType() SyncerType {
331✔
1651
        return SyncerType(atomic.LoadUint32(&g.syncType))
331✔
1652
}
331✔
1653

1654
// historicalSync sends a request to the gossip syncer to perofmr a historical
1655
// sync.
1656
//
1657
// NOTE: This can only be done once the gossip syncer has reached its final
1658
// chansSynced state.
1659
func (g *GossipSyncer) historicalSync() error {
16✔
1660
        done := make(chan struct{})
16✔
1661

16✔
1662
        select {
16✔
1663
        case g.historicalSyncReqs <- &historicalSyncReq{
1664
                doneChan: done,
1665
        }:
16✔
1666
        case <-time.After(syncTransitionTimeout):
×
1667
                return ErrSyncTransitionTimeout
×
1668
        case <-g.quit:
×
1669
                return ErrGossiperShuttingDown
×
1670
        }
1671

1672
        select {
16✔
1673
        case <-done:
16✔
1674
                return nil
16✔
1675
        case <-g.quit:
×
1676
                return ErrGossiperShuttingDown
×
1677
        }
1678
}
1679

1680
// handleHistoricalSync handles a request to the gossip syncer to perform a
1681
// historical sync.
1682
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
16✔
1683
        // We'll go back to our initial syncingChans state in order to request
16✔
1684
        // the remote peer to give us all of the channel IDs they know of
16✔
1685
        // starting from the genesis block.
16✔
1686
        g.genHistoricalChanRangeQuery = true
16✔
1687
        g.setSyncState(syncingChans)
16✔
1688
        close(req.doneChan)
16✔
1689
}
16✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc