• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.62
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "errors"
5
        "fmt"
6
        "math"
7
        "math/rand"
8
        "sort"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/graph"
16
        "github.com/lightningnetwork/lnd/lnpeer"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "golang.org/x/time/rate"
19
)
20

21
// SyncerType encapsulates the different types of syncing mechanisms for a
22
// gossip syncer.
23
type SyncerType uint8
24

25
const (
26
        // ActiveSync denotes that a gossip syncer:
27
        //
28
        // 1. Should not attempt to synchronize with the remote peer for
29
        //    missing channels.
30
        // 2. Should respond to queries from the remote peer.
31
        // 3. Should receive new updates from the remote peer.
32
        //
33
        // They are started in a chansSynced state in order to accomplish their
34
        // responsibilities above.
35
        ActiveSync SyncerType = iota
36

37
        // PassiveSync denotes that a gossip syncer:
38
        //
39
        // 1. Should not attempt to synchronize with the remote peer for
40
        //    missing channels.
41
        // 2. Should respond to queries from the remote peer.
42
        // 3. Should not receive new updates from the remote peer.
43
        //
44
        // They are started in a chansSynced state in order to accomplish their
45
        // responsibilities above.
46
        PassiveSync
47

48
        // PinnedSync denotes an ActiveSync that doesn't count towards the
49
        // default active syncer limits and is always active throughout the
50
        // duration of the peer's connection. Each pinned syncer will begin by
51
        // performing a historical sync to ensure we are well synchronized with
52
        // their routing table.
53
        PinnedSync
54
)
55

56
// String returns a human readable string describing the target SyncerType.
UNCOV
57
func (t SyncerType) String() string {
×
UNCOV
58
        switch t {
×
UNCOV
59
        case ActiveSync:
×
UNCOV
60
                return "ActiveSync"
×
UNCOV
61
        case PassiveSync:
×
UNCOV
62
                return "PassiveSync"
×
UNCOV
63
        case PinnedSync:
×
UNCOV
64
                return "PinnedSync"
×
65
        default:
×
66
                return fmt.Sprintf("unknown sync type %d", t)
×
67
        }
68
}
69

70
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
71
// allowing new gossip messages to be received from the peer.
72
func (t SyncerType) IsActiveSync() bool {
46✔
73
        switch t {
46✔
74
        case ActiveSync, PinnedSync:
15✔
75
                return true
15✔
76
        default:
31✔
77
                return false
31✔
78
        }
79
}
80

81
// syncerState is an enum that represents the current state of the GossipSyncer.
82
// As the syncer is a state machine, we'll gate our actions based off of the
83
// current state and the next incoming message.
84
type syncerState uint32
85

86
const (
87
        // syncingChans is the default state of the GossipSyncer. We start in
88
        // this state when a new peer first connects and we don't yet know if
89
        // we're fully synchronized.
90
        syncingChans syncerState = iota
91

92
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
93
        // We enter this state after we send out our first QueryChannelRange
94
        // reply. We'll stay in this state until the remote party sends us a
95
        // ReplyShortChanIDsEnd message that indicates they've responded to our
96
        // query entirely. After this state, we'll transition to
97
        // waitingQueryChanReply after we send out requests for all the new
98
        // chan ID's to us.
99
        waitingQueryRangeReply
100

101
        // queryNewChannels is the third main phase of the GossipSyncer.  In
102
        // this phase we'll send out all of our QueryShortChanIDs messages in
103
        // response to the new channels that we don't yet know about.
104
        queryNewChannels
105

106
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
107
        // We enter this phase once we've sent off a query chink to the remote
108
        // peer.  We'll stay in this phase until we receive a
109
        // ReplyShortChanIDsEnd message which indicates that the remote party
110
        // has responded to all of our requests.
111
        waitingQueryChanReply
112

113
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
114
        // this phase, we'll send out our update horizon, which filters out the
115
        // set of channel updates that we're interested in. In this state,
116
        // we'll be able to accept any outgoing messages from the
117
        // AuthenticatedGossiper, and decide if we should forward them to our
118
        // target peer based on its update horizon.
119
        chansSynced
120

121
        // syncerIdle is a state in which the gossip syncer can handle external
122
        // requests to transition or perform historical syncs. It is used as the
123
        // initial state for pinned syncers, as well as a fallthrough case for
124
        // chansSynced allowing fully synced peers to facilitate requests.
125
        syncerIdle
126
)
127

128
// String returns a human readable string describing the target syncerState.
UNCOV
129
func (s syncerState) String() string {
×
UNCOV
130
        switch s {
×
UNCOV
131
        case syncingChans:
×
UNCOV
132
                return "syncingChans"
×
133

UNCOV
134
        case waitingQueryRangeReply:
×
UNCOV
135
                return "waitingQueryRangeReply"
×
136

UNCOV
137
        case queryNewChannels:
×
UNCOV
138
                return "queryNewChannels"
×
139

UNCOV
140
        case waitingQueryChanReply:
×
UNCOV
141
                return "waitingQueryChanReply"
×
142

UNCOV
143
        case chansSynced:
×
UNCOV
144
                return "chansSynced"
×
145

UNCOV
146
        case syncerIdle:
×
UNCOV
147
                return "syncerIdle"
×
148

149
        default:
×
150
                return "UNKNOWN STATE"
×
151
        }
152
}
153

154
const (
155
        // DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
156
        // will respond to immediately before starting to delay responses.
157
        DefaultMaxUndelayedQueryReplies = 10
158

159
        // DefaultDelayedQueryReplyInterval is the length of time we will wait
160
        // before responding to gossip queries after replying to
161
        // maxUndelayedQueryReplies queries.
162
        DefaultDelayedQueryReplyInterval = 5 * time.Second
163

164
        // maxQueryChanRangeReplies specifies the default limit of replies to
165
        // process for a single QueryChannelRange request.
166
        maxQueryChanRangeReplies = 500
167

168
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
169
        // the maximum number of replies allowed for zlib encoded replies.
170
        maxQueryChanRangeRepliesZlibFactor = 4
171

172
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
173
        // asking the remote peer for their any channels they know of beyond
174
        // our highest known channel ID.
175
        chanRangeQueryBuffer = 144
176

177
        // syncTransitionTimeout is the default timeout in which we'll wait up
178
        // to when attempting to perform a sync transition.
179
        syncTransitionTimeout = 5 * time.Second
180

181
        // requestBatchSize is the maximum number of channels we will query the
182
        // remote peer for in a QueryShortChanIDs message.
183
        requestBatchSize = 500
184
)
185

186
var (
187
        // encodingTypeToChunkSize maps an encoding type, to the max number of
188
        // short chan ID's using the encoding type that we can fit into a
189
        // single message safely.
190
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
191
                lnwire.EncodingSortedPlain: 8000,
192
        }
193

194
        // ErrGossipSyncerExiting signals that the syncer has been killed.
195
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
196

197
        // ErrSyncTransitionTimeout is an error returned when we've timed out
198
        // attempting to perform a sync transition.
199
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
200
                "transition sync type")
201

202
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
203
        // peers that we do not want to receive any new graph updates.
204
        zeroTimestamp time.Time
205
)
206

207
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
208
type syncTransitionReq struct {
209
        newSyncType SyncerType
210
        errChan     chan error
211
}
212

213
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
214
// historical sync.
215
type historicalSyncReq struct {
216
        // doneChan is a channel that serves as a signal and is closed to ensure
217
        // the historical sync is attempted by the time we return to the caller.
218
        doneChan chan struct{}
219
}
220

221
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
222
// needs to carry out its duties.
223
type gossipSyncerCfg struct {
224
        // chainHash is the chain that this syncer is responsible for.
225
        chainHash chainhash.Hash
226

227
        // peerPub is the public key of the peer we're syncing with, serialized
228
        // in compressed format.
229
        peerPub [33]byte
230

231
        // channelSeries is the primary interface that we'll use to generate
232
        // our queries and respond to the queries of the remote peer.
233
        channelSeries ChannelGraphTimeSeries
234

235
        // encodingType is the current encoding type we're aware of. Requests
236
        // with different encoding types will be rejected.
237
        encodingType lnwire.QueryEncoding
238

239
        // chunkSize is the max number of short chan IDs using the syncer's
240
        // encoding type that we can fit into a single message safely.
241
        chunkSize int32
242

243
        // batchSize is the max number of channels the syncer will query from
244
        // the remote node in a single QueryShortChanIDs request.
245
        batchSize int32
246

247
        // sendToPeer sends a variadic number of messages to the remote peer.
248
        // This method should not block while waiting for sends to be written
249
        // to the wire.
250
        sendToPeer func(...lnwire.Message) error
251

252
        // sendToPeerSync sends a variadic number of messages to the remote
253
        // peer, blocking until all messages have been sent successfully or a
254
        // write error is encountered.
255
        sendToPeerSync func(...lnwire.Message) error
256

257
        // maxUndelayedQueryReplies specifies how many gossip queries we will
258
        // respond to immediately before starting to delay responses.
259
        maxUndelayedQueryReplies int
260

261
        // delayedQueryReplyInterval is the length of time we will wait before
262
        // responding to gossip queries after replying to
263
        // maxUndelayedQueryReplies queries.
264
        delayedQueryReplyInterval time.Duration
265

266
        // noSyncChannels will prevent the GossipSyncer from spawning a
267
        // channelGraphSyncer, meaning we will not try to reconcile unknown
268
        // channels with the remote peer.
269
        noSyncChannels bool
270

271
        // noReplyQueries will prevent the GossipSyncer from spawning a
272
        // replyHandler, meaning we will not reply to queries from our remote
273
        // peer.
274
        noReplyQueries bool
275

276
        // noTimestampQueryOption will prevent the GossipSyncer from querying
277
        // timestamps of announcement messages from the peer, and it will
278
        // prevent it from responding to timestamp queries.
279
        noTimestampQueryOption bool
280

281
        // ignoreHistoricalFilters will prevent syncers from replying with
282
        // historical data when the remote peer sets a gossip_timestamp_range.
283
        // This prevents ranges with old start times from causing us to dump the
284
        // graph on connect.
285
        ignoreHistoricalFilters bool
286

287
        // bestHeight returns the latest height known of the chain.
288
        bestHeight func() uint32
289

290
        // markGraphSynced updates the SyncManager's perception of whether we
291
        // have completed at least one historical sync.
292
        markGraphSynced func()
293

294
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
295
        // for a single QueryChannelRange request.
296
        maxQueryChanRangeReplies uint32
297

298
        // isStillZombieChannel takes the timestamps of the latest channel
299
        // updates for a channel and returns true if the channel should be
300
        // considered a zombie based on these timestamps.
301
        isStillZombieChannel func(time.Time, time.Time) bool
302
}
303

304
// GossipSyncer is a struct that handles synchronizing the channel graph state
305
// with a remote peer. The GossipSyncer implements a state machine that will
306
// progressively ensure we're synchronized with the channel state of the remote
307
// node. Once both nodes have been synchronized, we'll use an update filter to
308
// filter out which messages should be sent to a remote peer based on their
309
// update horizon. If the update horizon isn't specified, then we won't send
310
// them any channel updates at all.
311
type GossipSyncer struct {
312
        started sync.Once
313
        stopped sync.Once
314

315
        // state is the current state of the GossipSyncer.
316
        //
317
        // NOTE: This variable MUST be used atomically.
318
        state uint32
319

320
        // syncType denotes the SyncerType the gossip syncer is currently
321
        // exercising.
322
        //
323
        // NOTE: This variable MUST be used atomically.
324
        syncType uint32
325

326
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
327
        // use this to properly filter out any messages.
328
        remoteUpdateHorizon *lnwire.GossipTimestampRange
329

330
        // localUpdateHorizon is our local update horizon, we'll use this to
331
        // determine if we've already sent out our update.
332
        localUpdateHorizon *lnwire.GossipTimestampRange
333

334
        // syncTransitions is a channel through which new sync type transition
335
        // requests will be sent through. These requests should only be handled
336
        // when the gossip syncer is in a chansSynced state to ensure its state
337
        // machine behaves as expected.
338
        syncTransitionReqs chan *syncTransitionReq
339

340
        // historicalSyncReqs is a channel that serves as a signal for the
341
        // gossip syncer to perform a historical sync. These can only be done
342
        // once the gossip syncer is in a chansSynced state to ensure its state
343
        // machine behaves as expected.
344
        historicalSyncReqs chan *historicalSyncReq
345

346
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
347
        // that it should request the remote peer for all of its known channel
348
        // IDs starting from the genesis block of the chain. This can only
349
        // happen if the gossip syncer receives a request to attempt a
350
        // historical sync. It can be unset if the syncer ever transitions from
351
        // PassiveSync to ActiveSync.
352
        genHistoricalChanRangeQuery bool
353

354
        // gossipMsgs is a channel that all responses to our queries from the
355
        // target peer will be sent over, these will be read by the
356
        // channelGraphSyncer.
357
        gossipMsgs chan lnwire.Message
358

359
        // queryMsgs is a channel that all queries from the remote peer will be
360
        // received over, these will be read by the replyHandler.
361
        queryMsgs chan lnwire.Message
362

363
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
364
        // we've sent to a peer to ensure we've consumed all expected replies.
365
        // This field is primarily used within the waitingQueryChanReply state.
366
        curQueryRangeMsg *lnwire.QueryChannelRange
367

368
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
369
        // message we've received from a peer to ensure they've fully replied to
370
        // our query by ensuring they covered our requested block range. This
371
        // field is primarily used within the waitingQueryChanReply state.
372
        prevReplyChannelRange *lnwire.ReplyChannelRange
373

374
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
375
        // buffer all the chunked response to our query.
376
        bufferedChanRangeReplies []channeldb.ChannelUpdateInfo
377

378
        // numChanRangeRepliesRcvd is used to track the number of replies
379
        // received as part of a QueryChannelRange. This field is primarily used
380
        // within the waitingQueryChanReply state.
381
        numChanRangeRepliesRcvd uint32
382

383
        // newChansToQuery is used to pass the set of channels we should query
384
        // for from the waitingQueryChanReply state to the queryNewChannels
385
        // state.
386
        newChansToQuery []lnwire.ShortChannelID
387

388
        cfg gossipSyncerCfg
389

390
        // rateLimiter dictates the frequency with which we will reply to gossip
391
        // queries from a peer. This is used to delay responses to peers to
392
        // prevent DOS vulnerabilities if they are spamming with an unreasonable
393
        // number of queries.
394
        rateLimiter *rate.Limiter
395

396
        // syncedSignal is a channel that, if set, will be closed when the
397
        // GossipSyncer reaches its terminal chansSynced state.
398
        syncedSignal chan struct{}
399

400
        // syncerSema is used to more finely control the syncer's ability to
401
        // respond to gossip timestamp range messages.
402
        syncerSema chan struct{}
403

404
        sync.Mutex
405

406
        quit chan struct{}
407
        wg   sync.WaitGroup
408
}
409

410
// newGossipSyncer returns a new instance of the GossipSyncer populated using
411
// the passed config.
412
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
51✔
413
        // If no parameter was specified for max undelayed query replies, set it
51✔
414
        // to the default of 5 queries.
51✔
415
        if cfg.maxUndelayedQueryReplies <= 0 {
77✔
416
                cfg.maxUndelayedQueryReplies = DefaultMaxUndelayedQueryReplies
26✔
417
        }
26✔
418

419
        // If no parameter was specified for delayed query reply interval, set
420
        // to the default of 5 seconds.
421
        if cfg.delayedQueryReplyInterval <= 0 {
51✔
422
                cfg.delayedQueryReplyInterval = DefaultDelayedQueryReplyInterval
×
423
        }
×
424

425
        // Construct a rate limiter that will govern how frequently we reply to
426
        // gossip queries from this peer. The limiter will automatically adjust
427
        // during periods of quiescence, and increase the reply interval under
428
        // load.
429
        interval := rate.Every(cfg.delayedQueryReplyInterval)
51✔
430
        rateLimiter := rate.NewLimiter(
51✔
431
                interval, cfg.maxUndelayedQueryReplies,
51✔
432
        )
51✔
433

51✔
434
        return &GossipSyncer{
51✔
435
                cfg:                cfg,
51✔
436
                rateLimiter:        rateLimiter,
51✔
437
                syncTransitionReqs: make(chan *syncTransitionReq),
51✔
438
                historicalSyncReqs: make(chan *historicalSyncReq),
51✔
439
                gossipMsgs:         make(chan lnwire.Message, 100),
51✔
440
                queryMsgs:          make(chan lnwire.Message, 100),
51✔
441
                syncerSema:         sema,
51✔
442
                quit:               make(chan struct{}),
51✔
443
        }
51✔
444
}
445

446
// Start starts the GossipSyncer and any goroutines that it needs to carry out
447
// its duties.
448
func (g *GossipSyncer) Start() {
37✔
449
        g.started.Do(func() {
74✔
450
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
37✔
451

37✔
452
                // TODO(conner): only spawn channelGraphSyncer if remote
37✔
453
                // supports gossip queries, and only spawn replyHandler if we
37✔
454
                // advertise support
37✔
455
                if !g.cfg.noSyncChannels {
72✔
456
                        g.wg.Add(1)
35✔
457
                        go g.channelGraphSyncer()
35✔
458
                }
35✔
459
                if !g.cfg.noReplyQueries {
72✔
460
                        g.wg.Add(1)
35✔
461
                        go g.replyHandler()
35✔
462
                }
35✔
463
        })
464
}
465

466
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
467
// exited.
468
func (g *GossipSyncer) Stop() {
34✔
469
        g.stopped.Do(func() {
68✔
470
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
34✔
471
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
34✔
472

34✔
473
                close(g.quit)
34✔
474
                g.wg.Wait()
34✔
475
        })
34✔
476
}
477

478
// channelGraphSyncer is the main goroutine responsible for ensuring that we
479
// properly channel graph state with the remote peer, and also that we only
480
// send them messages which actually pass their defined update horizon.
481
func (g *GossipSyncer) channelGraphSyncer() {
35✔
482
        defer g.wg.Done()
35✔
483

35✔
484
        for {
277✔
485
                state := g.syncState()
242✔
486
                syncType := g.SyncType()
242✔
487

242✔
488
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
242✔
489
                        g.cfg.peerPub[:], state, syncType)
242✔
490

242✔
491
                switch state {
242✔
492
                // When we're in this state, we're trying to synchronize our
493
                // view of the network with the remote peer. We'll kick off
494
                // this sync by asking them for the set of channels they
495
                // understand, as we'll as responding to any other queries by
496
                // them.
497
                case syncingChans:
21✔
498
                        // If we're in this state, then we'll send the remote
21✔
499
                        // peer our opening QueryChannelRange message.
21✔
500
                        queryRangeMsg, err := g.genChanRangeQuery(
21✔
501
                                g.genHistoricalChanRangeQuery,
21✔
502
                        )
21✔
503
                        if err != nil {
21✔
504
                                log.Errorf("Unable to gen chan range "+
×
505
                                        "query: %v", err)
×
506
                                return
×
507
                        }
×
508

509
                        err = g.cfg.sendToPeer(queryRangeMsg)
21✔
510
                        if err != nil {
21✔
511
                                log.Errorf("Unable to send chan range "+
×
512
                                        "query: %v", err)
×
513
                                return
×
514
                        }
×
515

516
                        // With the message sent successfully, we'll transition
517
                        // into the next state where we wait for their reply.
518
                        g.setSyncState(waitingQueryRangeReply)
21✔
519

520
                // In this state, we've sent out our initial channel range
521
                // query and are waiting for the final response from the remote
522
                // peer before we perform a diff to see with channels they know
523
                // of that we don't.
524
                case waitingQueryRangeReply:
133✔
525
                        // We'll wait to either process a new message from the
133✔
526
                        // remote party, or exit due to the gossiper exiting,
133✔
527
                        // or us being signalled to do so.
133✔
528
                        select {
133✔
529
                        case msg := <-g.gossipMsgs:
128✔
530
                                // The remote peer is sending a response to our
128✔
531
                                // initial query, we'll collate this response,
128✔
532
                                // and see if it's the final one in the series.
128✔
533
                                // If so, we can then transition to querying
128✔
534
                                // for the new channels.
128✔
535
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
128✔
536
                                if ok {
256✔
537
                                        err := g.processChanRangeReply(queryReply)
128✔
538
                                        if err != nil {
128✔
539
                                                log.Errorf("Unable to "+
×
540
                                                        "process chan range "+
×
541
                                                        "query: %v", err)
×
542
                                                return
×
543
                                        }
×
544
                                        continue
128✔
545
                                }
546

547
                                log.Warnf("Unexpected message: %T in state=%v",
×
548
                                        msg, state)
×
549

550
                        case <-g.quit:
5✔
551
                                return
5✔
552
                        }
553

554
                // We'll enter this state once we've discovered which channels
555
                // the remote party knows of that we don't yet know of
556
                // ourselves.
557
                case queryNewChannels:
15✔
558
                        // First, we'll attempt to continue our channel
15✔
559
                        // synchronization by continuing to send off another
15✔
560
                        // query chunk.
15✔
561
                        done, err := g.synchronizeChanIDs()
15✔
562
                        if err != nil {
15✔
563
                                log.Errorf("Unable to sync chan IDs: %v", err)
×
564
                        }
×
565

566
                        // If this wasn't our last query, then we'll need to
567
                        // transition to our waiting state.
568
                        if !done {
28✔
569
                                g.setSyncState(waitingQueryChanReply)
13✔
570
                                continue
13✔
571
                        }
572

573
                        // If we're fully synchronized, then we can transition
574
                        // to our terminal state.
575
                        g.setSyncState(chansSynced)
2✔
576

2✔
577
                        // Ensure that the sync manager becomes aware that the
2✔
578
                        // historical sync completed so synced_to_graph is
2✔
579
                        // updated over rpc.
2✔
580
                        g.cfg.markGraphSynced()
2✔
581

582
                // In this state, we've just sent off a new query for channels
583
                // that we don't yet know of. We'll remain in this state until
584
                // the remote party signals they've responded to our query in
585
                // totality.
586
                case waitingQueryChanReply:
13✔
587
                        // Once we've sent off our query, we'll wait for either
13✔
588
                        // an ending reply, or just another query from the
13✔
589
                        // remote peer.
13✔
590
                        select {
13✔
591
                        case msg := <-g.gossipMsgs:
13✔
592
                                // If this is the final reply to one of our
13✔
593
                                // queries, then we'll loop back into our query
13✔
594
                                // state to send of the remaining query chunks.
13✔
595
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
13✔
596
                                if ok {
26✔
597
                                        g.setSyncState(queryNewChannels)
13✔
598
                                        continue
13✔
599
                                }
600

601
                                log.Warnf("Unexpected message: %T in state=%v",
×
602
                                        msg, state)
×
603

604
                        case <-g.quit:
×
605
                                return
×
606
                        }
607

608
                // This is our final terminal state where we'll only reply to
609
                // any further queries by the remote peer.
610
                case chansSynced:
57✔
611
                        g.Lock()
57✔
612
                        if g.syncedSignal != nil {
65✔
613
                                close(g.syncedSignal)
8✔
614
                                g.syncedSignal = nil
8✔
615
                        }
8✔
616
                        g.Unlock()
57✔
617

57✔
618
                        // If we haven't yet sent out our update horizon, and
57✔
619
                        // we want to receive real-time channel updates, we'll
57✔
620
                        // do so now.
57✔
621
                        if g.localUpdateHorizon == nil &&
57✔
622
                                syncType.IsActiveSync() {
72✔
623

15✔
624
                                err := g.sendGossipTimestampRange(
15✔
625
                                        time.Now(), math.MaxUint32,
15✔
626
                                )
15✔
627
                                if err != nil {
15✔
628
                                        log.Errorf("Unable to send update "+
×
629
                                                "horizon to %x: %v",
×
630
                                                g.cfg.peerPub, err)
×
631
                                }
×
632
                        }
633
                        // With our horizon set, we'll simply reply to any new
634
                        // messages or process any state transitions and exit if
635
                        // needed.
636
                        fallthrough
57✔
637

638
                // Pinned peers will begin in this state, since they will
639
                // immediately receive a request to perform a historical sync.
640
                // Otherwise, we fall through after ending in chansSynced to
641
                // facilitate new requests.
642
                case syncerIdle:
60✔
643
                        select {
60✔
644
                        case req := <-g.syncTransitionReqs:
14✔
645
                                req.errChan <- g.handleSyncTransition(req)
14✔
646

647
                        case req := <-g.historicalSyncReqs:
16✔
648
                                g.handleHistoricalSync(req)
16✔
649

650
                        case <-g.quit:
27✔
651
                                return
27✔
652
                        }
653
                }
654
        }
655
}
656

657
// replyHandler is an event loop whose sole purpose is to reply to the remote
658
// peers queries. Our replyHandler will respond to messages generated by their
659
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
660
// the other's replyHandler, allowing the replyHandler to operate independently
661
// from the state machine maintained on the same node.
662
//
663
// NOTE: This method MUST be run as a goroutine.
664
func (g *GossipSyncer) replyHandler() {
35✔
665
        defer g.wg.Done()
35✔
666

35✔
667
        for {
87✔
668
                select {
52✔
669
                case msg := <-g.queryMsgs:
17✔
670
                        err := g.replyPeerQueries(msg)
17✔
671
                        switch {
17✔
672
                        case err == ErrGossipSyncerExiting:
×
673
                                return
×
674

UNCOV
675
                        case err == lnpeer.ErrPeerExiting:
×
UNCOV
676
                                return
×
677

678
                        case err != nil:
×
679
                                log.Errorf("Unable to reply to peer "+
×
680
                                        "query: %v", err)
×
681
                        }
682

683
                case <-g.quit:
32✔
684
                        return
32✔
685
                }
686
        }
687
}
688

689
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
690
// syncer and sends it to the remote peer.
691
func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
692
        timestampRange uint32) error {
29✔
693

29✔
694
        endTimestamp := firstTimestamp.Add(
29✔
695
                time.Duration(timestampRange) * time.Second,
29✔
696
        )
29✔
697

29✔
698
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
29✔
699
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
29✔
700

29✔
701
        localUpdateHorizon := &lnwire.GossipTimestampRange{
29✔
702
                ChainHash:      g.cfg.chainHash,
29✔
703
                FirstTimestamp: uint32(firstTimestamp.Unix()),
29✔
704
                TimestampRange: timestampRange,
29✔
705
        }
29✔
706

29✔
707
        if err := g.cfg.sendToPeer(localUpdateHorizon); err != nil {
30✔
708
                return err
1✔
709
        }
1✔
710

711
        if firstTimestamp == zeroTimestamp && timestampRange == 0 {
30✔
712
                g.localUpdateHorizon = nil
2✔
713
        } else {
28✔
714
                g.localUpdateHorizon = localUpdateHorizon
26✔
715
        }
26✔
716

717
        return nil
28✔
718
}
719

720
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
721
// the remote peer for its known set of channel IDs within a particular block
722
// range. This method will be called continually until the entire range has
723
// been queried for with a response received. We'll chunk our requests as
724
// required to ensure they fit into a single message. We may re-renter this
725
// state in the case that chunking is required.
726
func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
18✔
727
        // If we're in this state yet there are no more new channels to query
18✔
728
        // for, then we'll transition to our final synced state and return true
18✔
729
        // to signal that we're fully synchronized.
18✔
730
        if len(g.newChansToQuery) == 0 {
20✔
731
                log.Infof("GossipSyncer(%x): no more chans to query",
2✔
732
                        g.cfg.peerPub[:])
2✔
733
                return true, nil
2✔
734
        }
2✔
735

736
        // Otherwise, we'll issue our next chunked query to receive replies
737
        // for.
738
        var queryChunk []lnwire.ShortChannelID
16✔
739

16✔
740
        // If the number of channels to query for is less than the chunk size,
16✔
741
        // then we can issue a single query.
16✔
742
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
18✔
743
                queryChunk = g.newChansToQuery
2✔
744
                g.newChansToQuery = nil
2✔
745

2✔
746
        } else {
16✔
747
                // Otherwise, we'll need to only query for the next chunk.
14✔
748
                // We'll slice into our query chunk, then slide down our main
14✔
749
                // pointer down by the chunk size.
14✔
750
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
14✔
751
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
14✔
752
        }
14✔
753

754
        log.Infof("GossipSyncer(%x): querying for %v new channels",
16✔
755
                g.cfg.peerPub[:], len(queryChunk))
16✔
756

16✔
757
        // With our chunk obtained, we'll send over our next query, then return
16✔
758
        // false indicating that we're net yet fully synced.
16✔
759
        err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
16✔
760
                ChainHash:    g.cfg.chainHash,
16✔
761
                EncodingType: lnwire.EncodingSortedPlain,
16✔
762
                ShortChanIDs: queryChunk,
16✔
763
        })
16✔
764

16✔
765
        return false, err
16✔
766
}
767

768
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
769
// considered legacy. There was a point where lnd used to include the same query
770
// over multiple replies, rather than including the portion of the query the
771
// reply is handling. We'll use this as a way of detecting whether we are
772
// communicating with a legacy node so we can properly sync with them.
773
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
774
        reply *lnwire.ReplyChannelRange) bool {
272✔
775

272✔
776
        return (reply.ChainHash == query.ChainHash &&
272✔
777
                reply.FirstBlockHeight == query.FirstBlockHeight &&
272✔
778
                reply.NumBlocks == query.NumBlocks)
272✔
779
}
272✔
780

781
// processChanRangeReply is called each time the GossipSyncer receives a new
782
// reply to the initial range query to discover new channels that it didn't
783
// previously know of.
784
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
136✔
785
        // isStale returns whether the timestamp is too far into the past.
136✔
786
        isStale := func(timestamp time.Time) bool {
166✔
787
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
30✔
788
        }
30✔
789

790
        // isSkewed returns whether the timestamp is too far into the future.
791
        isSkewed := func(timestamp time.Time) bool {
156✔
792
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
20✔
793
        }
20✔
794

795
        // If we're not communicating with a legacy node, we'll apply some
796
        // further constraints on their reply to ensure it satisfies our query.
797
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
257✔
798
                // The first block should be within our original request.
121✔
799
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
121✔
800
                        return fmt.Errorf("reply includes channels for height "+
×
801
                                "%v prior to query %v", msg.FirstBlockHeight,
×
802
                                g.curQueryRangeMsg.FirstBlockHeight)
×
803
                }
×
804

805
                // The last block should also be. We don't need to check the
806
                // intermediate ones because they should already be in sorted
807
                // order.
808
                replyLastHeight := msg.LastBlockHeight()
121✔
809
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
121✔
810
                if replyLastHeight > queryLastHeight {
121✔
811
                        return fmt.Errorf("reply includes channels for height "+
×
812
                                "%v after query %v", replyLastHeight,
×
813
                                queryLastHeight)
×
814
                }
×
815

816
                // If we've previously received a reply for this query, look at
817
                // its last block to ensure the current reply properly follows
818
                // it.
819
                if g.prevReplyChannelRange != nil {
236✔
820
                        prevReply := g.prevReplyChannelRange
115✔
821
                        prevReplyLastHeight := prevReply.LastBlockHeight()
115✔
822

115✔
823
                        // The current reply can either start from the previous
115✔
824
                        // reply's last block, if there are still more channels
115✔
825
                        // for the same block, or the block after.
115✔
826
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
115✔
827
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
115✔
828

×
829
                                return fmt.Errorf("first block of reply %v "+
×
830
                                        "does not continue from last block of "+
×
831
                                        "previous %v", msg.FirstBlockHeight,
×
832
                                        prevReplyLastHeight)
×
833
                        }
×
834
                }
835
        }
836

837
        g.prevReplyChannelRange = msg
136✔
838

136✔
839
        for i, scid := range msg.ShortChanIDs {
285✔
840
                info := channeldb.NewChannelUpdateInfo(
149✔
841
                        scid, time.Time{}, time.Time{},
149✔
842
                )
149✔
843

149✔
844
                if len(msg.Timestamps) != 0 {
161✔
845
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
12✔
846
                        info.Node1UpdateTimestamp = t1
12✔
847

12✔
848
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
12✔
849
                        info.Node2UpdateTimestamp = t2
12✔
850

12✔
851
                        // Sort out all channels with outdated or skewed
12✔
852
                        // timestamps. Both timestamps need to be out of
12✔
853
                        // boundaries for us to skip the channel and not query
12✔
854
                        // it later on.
12✔
855
                        switch {
12✔
856
                        case isStale(info.Node1UpdateTimestamp) &&
857
                                isStale(info.Node2UpdateTimestamp):
2✔
858

2✔
859
                                continue
2✔
860

861
                        case isSkewed(info.Node1UpdateTimestamp) &&
862
                                isSkewed(info.Node2UpdateTimestamp):
2✔
863

2✔
864
                                continue
2✔
865

866
                        case isStale(info.Node1UpdateTimestamp) &&
867
                                isSkewed(info.Node2UpdateTimestamp):
2✔
868

2✔
869
                                continue
2✔
870

871
                        case isStale(info.Node2UpdateTimestamp) &&
872
                                isSkewed(info.Node1UpdateTimestamp):
2✔
873

2✔
874
                                continue
2✔
875
                        }
876
                }
877

878
                g.bufferedChanRangeReplies = append(
141✔
879
                        g.bufferedChanRangeReplies, info,
141✔
880
                )
141✔
881
        }
882

883
        switch g.cfg.encodingType {
136✔
884
        case lnwire.EncodingSortedPlain:
136✔
885
                g.numChanRangeRepliesRcvd++
136✔
886
        case lnwire.EncodingSortedZlib:
×
887
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
888
        default:
×
889
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
890
        }
891

892
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
136✔
893
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
136✔
894

136✔
895
        // If this isn't the last response and we can continue to receive more,
136✔
896
        // then we can exit as we've already buffered the latest portion of the
136✔
897
        // streaming reply.
136✔
898
        maxReplies := g.cfg.maxQueryChanRangeReplies
136✔
899
        switch {
136✔
900
        // If we're communicating with a legacy node, we'll need to look at the
901
        // complete field.
902
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
15✔
903
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
18✔
904
                        return nil
3✔
905
                }
3✔
906

907
        // Otherwise, we'll look at the reply's height range.
908
        default:
121✔
909
                replyLastHeight := msg.LastBlockHeight()
121✔
910
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
121✔
911

121✔
912
                // TODO(wilmer): This might require some padding if the remote
121✔
913
                // node is not aware of the last height we sent them, i.e., is
121✔
914
                // behind a few blocks from us.
121✔
915
                if replyLastHeight < queryLastHeight &&
121✔
916
                        g.numChanRangeRepliesRcvd < maxReplies {
236✔
917

115✔
918
                        return nil
115✔
919
                }
115✔
920
        }
921

922
        log.Infof("GossipSyncer(%x): filtering through %v chans",
18✔
923
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
18✔
924

18✔
925
        // Otherwise, this is the final response, so we'll now check to see
18✔
926
        // which channels they know of that we don't.
18✔
927
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
18✔
928
                g.cfg.chainHash, g.bufferedChanRangeReplies,
18✔
929
                g.cfg.isStillZombieChannel,
18✔
930
        )
18✔
931
        if err != nil {
18✔
932
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
933
        }
×
934

935
        // As we've received the entirety of the reply, we no longer need to
936
        // hold on to the set of buffered replies or the original query that
937
        // prompted the replies, so we'll let that be garbage collected now.
938
        g.curQueryRangeMsg = nil
18✔
939
        g.prevReplyChannelRange = nil
18✔
940
        g.bufferedChanRangeReplies = nil
18✔
941
        g.numChanRangeRepliesRcvd = 0
18✔
942

18✔
943
        // If there aren't any channels that we don't know of, then we can
18✔
944
        // switch straight to our terminal state.
18✔
945
        if len(newChans) == 0 {
32✔
946
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
14✔
947
                        g.cfg.peerPub[:])
14✔
948

14✔
949
                g.setSyncState(chansSynced)
14✔
950

14✔
951
                // Ensure that the sync manager becomes aware that the
14✔
952
                // historical sync completed so synced_to_graph is updated over
14✔
953
                // rpc.
14✔
954
                g.cfg.markGraphSynced()
14✔
955
                return nil
14✔
956
        }
14✔
957

958
        // Otherwise, we'll set the set of channels that we need to query for
959
        // the next state, and also transition our state.
960
        g.newChansToQuery = newChans
4✔
961
        g.setSyncState(queryNewChannels)
4✔
962

4✔
963
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
4✔
964
                g.cfg.peerPub[:], len(newChans))
4✔
965

4✔
966
        return nil
4✔
967
}
968

969
// genChanRangeQuery generates the initial message we'll send to the remote
970
// party when we're kicking off the channel graph synchronization upon
971
// connection. The historicalQuery boolean can be used to generate a query from
972
// the genesis block of the chain.
973
func (g *GossipSyncer) genChanRangeQuery(
974
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
25✔
975

25✔
976
        // First, we'll query our channel graph time series for its highest
25✔
977
        // known channel ID.
25✔
978
        newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
25✔
979
        if err != nil {
25✔
980
                return nil, err
×
981
        }
×
982

983
        // Once we have the chan ID of the newest, we'll obtain the block height
984
        // of the channel, then subtract our default horizon to ensure we don't
985
        // miss any channels. By default, we go back 1 day from the newest
986
        // channel, unless we're attempting a historical sync, where we'll
987
        // actually start from the genesis block instead.
988
        var startHeight uint32
25✔
989
        switch {
25✔
990
        case historicalQuery:
19✔
991
                fallthrough
19✔
992
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
19✔
993
                startHeight = 0
19✔
994
        default:
6✔
995
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
6✔
996
        }
997

998
        // Determine the number of blocks to request based on our best height.
999
        // We'll take into account any potential underflows and explicitly set
1000
        // numBlocks to its minimum value of 1 if so.
1001
        bestHeight := g.cfg.bestHeight()
25✔
1002
        numBlocks := bestHeight - startHeight
25✔
1003
        if int64(numBlocks) < 1 {
25✔
1004
                numBlocks = 1
×
1005
        }
×
1006

1007
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
25✔
1008
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
25✔
1009

25✔
1010
        // Finally, we'll craft the channel range query, using our starting
25✔
1011
        // height, then asking for all known channels to the foreseeable end of
25✔
1012
        // the main chain.
25✔
1013
        query := &lnwire.QueryChannelRange{
25✔
1014
                ChainHash:        g.cfg.chainHash,
25✔
1015
                FirstBlockHeight: startHeight,
25✔
1016
                NumBlocks:        numBlocks,
25✔
1017
        }
25✔
1018

25✔
1019
        if !g.cfg.noTimestampQueryOption {
41✔
1020
                query.QueryOptions = lnwire.NewTimestampQueryOption()
16✔
1021
        }
16✔
1022

1023
        g.curQueryRangeMsg = query
25✔
1024

25✔
1025
        return query, nil
25✔
1026
}
1027

1028
// replyPeerQueries is called in response to any query by the remote peer.
1029
// We'll examine our state and send back our best response.
1030
func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error {
17✔
1031
        reservation := g.rateLimiter.Reserve()
17✔
1032
        delay := reservation.Delay()
17✔
1033

17✔
1034
        // If we've already replied a handful of times, we will start to delay
17✔
1035
        // responses back to the remote peer. This can help prevent DOS attacks
17✔
1036
        // where the remote peer spams us endlessly.
17✔
1037
        if delay > 0 {
19✔
1038
                log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
2✔
1039
                        "responding in %s", g.cfg.peerPub[:], delay)
2✔
1040

2✔
1041
                select {
2✔
1042
                case <-time.After(delay):
2✔
1043
                case <-g.quit:
×
1044
                        return ErrGossipSyncerExiting
×
1045
                }
1046
        }
1047

1048
        switch msg := msg.(type) {
17✔
1049

1050
        // In this state, we'll also handle any incoming channel range queries
1051
        // from the remote peer as they're trying to sync their state as well.
1052
        case *lnwire.QueryChannelRange:
4✔
1053
                return g.replyChanRangeQuery(msg)
4✔
1054

1055
        // If the remote peer skips straight to requesting new channels that
1056
        // they don't know of, then we'll ensure that we also handle this case.
1057
        case *lnwire.QueryShortChanIDs:
13✔
1058
                return g.replyShortChanIDs(msg)
13✔
1059

1060
        default:
×
1061
                return fmt.Errorf("unknown message: %T", msg)
×
1062
        }
1063
}
1064

1065
// replyChanRangeQuery will be dispatched in response to a channel range query
1066
// by the remote node. We'll query the channel time series for channels that
1067
// meet the channel range, then chunk our responses to the remote node. We also
1068
// ensure that our final fragment carries the "complete" bit to indicate the
1069
// end of our streaming response.
1070
func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
10✔
1071
        // Before responding, we'll check to ensure that the remote peer is
10✔
1072
        // querying for the same chain that we're on. If not, we'll send back a
10✔
1073
        // response with a complete value of zero to indicate we're on a
10✔
1074
        // different chain.
10✔
1075
        if g.cfg.chainHash != query.ChainHash {
11✔
1076
                log.Warnf("Remote peer requested QueryChannelRange for "+
1✔
1077
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1078
                        g.cfg.chainHash)
1✔
1079

1✔
1080
                return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
1✔
1081
                        ChainHash:        query.ChainHash,
1✔
1082
                        FirstBlockHeight: query.FirstBlockHeight,
1✔
1083
                        NumBlocks:        query.NumBlocks,
1✔
1084
                        Complete:         0,
1✔
1085
                        EncodingType:     g.cfg.encodingType,
1✔
1086
                        ShortChanIDs:     nil,
1✔
1087
                })
1✔
1088
        }
1✔
1089

1090
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
9✔
1091
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
9✔
1092
                query.NumBlocks)
9✔
1093

9✔
1094
        // Check if the query asked for timestamps. We will only serve
9✔
1095
        // timestamps if this has not been disabled with
9✔
1096
        // noTimestampQueryOption.
9✔
1097
        withTimestamps := query.WithTimestamps() &&
9✔
1098
                !g.cfg.noTimestampQueryOption
9✔
1099

9✔
1100
        // Next, we'll consult the time series to obtain the set of known
9✔
1101
        // channel ID's that match their query.
9✔
1102
        startBlock := query.FirstBlockHeight
9✔
1103
        endBlock := query.LastBlockHeight()
9✔
1104
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
9✔
1105
                query.ChainHash, startBlock, endBlock, withTimestamps,
9✔
1106
        )
9✔
1107
        if err != nil {
9✔
1108
                return err
×
1109
        }
×
1110

1111
        // TODO(roasbeef): means can't send max uint above?
1112
        //  * or make internal 64
1113

1114
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1115
        // this as there's a transport message size limit which we'll need to
1116
        // adhere to. We also need to make sure all of our replies cover the
1117
        // expected range of the query.
1118
        sendReplyForChunk := func(channelChunk []channeldb.ChannelUpdateInfo,
9✔
1119
                firstHeight, lastHeight uint32, finalChunk bool) error {
33✔
1120

24✔
1121
                // The number of blocks contained in the current chunk (the
24✔
1122
                // total span) is the difference between the last channel ID and
24✔
1123
                // the first in the range. We add one as even if all channels
24✔
1124
                // returned are in the same block, we need to count that.
24✔
1125
                numBlocks := lastHeight - firstHeight + 1
24✔
1126
                complete := uint8(0)
24✔
1127
                if finalChunk {
33✔
1128
                        complete = 1
9✔
1129
                }
9✔
1130

1131
                var timestamps lnwire.Timestamps
24✔
1132
                if withTimestamps {
24✔
UNCOV
1133
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
×
UNCOV
1134
                }
×
1135

1136
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
24✔
1137
                for i, info := range channelChunk {
60✔
1138
                        scids[i] = info.ShortChannelID
36✔
1139

36✔
1140
                        if !withTimestamps {
72✔
1141
                                continue
36✔
1142
                        }
1143

UNCOV
1144
                        timestamps[i].Timestamp1 = uint32(
×
UNCOV
1145
                                info.Node1UpdateTimestamp.Unix(),
×
UNCOV
1146
                        )
×
UNCOV
1147

×
UNCOV
1148
                        timestamps[i].Timestamp2 = uint32(
×
UNCOV
1149
                                info.Node2UpdateTimestamp.Unix(),
×
UNCOV
1150
                        )
×
1151
                }
1152

1153
                return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
24✔
1154
                        ChainHash:        query.ChainHash,
24✔
1155
                        NumBlocks:        numBlocks,
24✔
1156
                        FirstBlockHeight: firstHeight,
24✔
1157
                        Complete:         complete,
24✔
1158
                        EncodingType:     g.cfg.encodingType,
24✔
1159
                        ShortChanIDs:     scids,
24✔
1160
                        Timestamps:       timestamps,
24✔
1161
                })
24✔
1162
        }
1163

1164
        var (
9✔
1165
                firstHeight  = query.FirstBlockHeight
9✔
1166
                lastHeight   uint32
9✔
1167
                channelChunk []channeldb.ChannelUpdateInfo
9✔
1168
        )
9✔
1169

9✔
1170
        // chunkSize is the maximum number of SCIDs that we can safely put in a
9✔
1171
        // single message. If we also need to include timestamps though, then
9✔
1172
        // this number is halved since encoding two timestamps takes the same
9✔
1173
        // number of bytes as encoding an SCID.
9✔
1174
        chunkSize := g.cfg.chunkSize
9✔
1175
        if withTimestamps {
9✔
UNCOV
1176
                chunkSize /= 2
×
UNCOV
1177
        }
×
1178

1179
        for _, channelRange := range channelRanges {
45✔
1180
                channels := channelRange.Channels
36✔
1181
                numChannels := int32(len(channels))
36✔
1182
                numLeftToAdd := chunkSize - int32(len(channelChunk))
36✔
1183

36✔
1184
                // Include the current block in the ongoing chunk if it can fit
36✔
1185
                // and move on to the next block.
36✔
1186
                if numChannels <= numLeftToAdd {
57✔
1187
                        channelChunk = append(channelChunk, channels...)
21✔
1188
                        continue
21✔
1189
                }
1190

1191
                // Otherwise, we need to send our existing channel chunk as is
1192
                // as its own reply and start a new one for the current block.
1193
                // We'll mark the end of our current chunk as the height before
1194
                // the current block to ensure the whole query range is replied
1195
                // to.
1196
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
15✔
1197
                        g.cfg.peerPub[:], len(channelChunk))
15✔
1198

15✔
1199
                lastHeight = channelRange.Height - 1
15✔
1200
                err := sendReplyForChunk(
15✔
1201
                        channelChunk, firstHeight, lastHeight, false,
15✔
1202
                )
15✔
1203
                if err != nil {
15✔
1204
                        return err
×
1205
                }
×
1206

1207
                // With the reply constructed, we'll start tallying channels for
1208
                // our next one keeping in mind our chunk size. This may result
1209
                // in channels for this block being left out from the reply, but
1210
                // this isn't an issue since we'll randomly shuffle them and we
1211
                // assume a historical gossip sync is performed at a later time.
1212
                firstHeight = channelRange.Height
15✔
1213
                finalChunkSize := numChannels
15✔
1214
                exceedsChunkSize := numChannels > chunkSize
15✔
1215
                if exceedsChunkSize {
15✔
1216
                        rand.Shuffle(len(channels), func(i, j int) {
×
1217
                                channels[i], channels[j] = channels[j], channels[i]
×
1218
                        })
×
1219
                        finalChunkSize = chunkSize
×
1220
                }
1221
                channelChunk = channels[:finalChunkSize]
15✔
1222

15✔
1223
                // Sort the chunk once again if we had to shuffle it.
15✔
1224
                if exceedsChunkSize {
15✔
1225
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1226
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1227
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1228

×
1229
                                return id1 < id2
×
1230
                        })
×
1231
                }
1232
        }
1233

1234
        // Send the remaining chunk as the final reply.
1235
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
9✔
1236
                g.cfg.peerPub[:], len(channelChunk))
9✔
1237

9✔
1238
        return sendReplyForChunk(
9✔
1239
                channelChunk, firstHeight, query.LastBlockHeight(), true,
9✔
1240
        )
9✔
1241
}
1242

1243
// replyShortChanIDs will be dispatched in response to a query by the remote
1244
// node for information concerning a set of short channel ID's. Our response
1245
// will be sent in a streaming chunked manner to ensure that we remain below
1246
// the current transport level message size.
1247
func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error {
15✔
1248
        // Before responding, we'll check to ensure that the remote peer is
15✔
1249
        // querying for the same chain that we're on. If not, we'll send back a
15✔
1250
        // response with a complete value of zero to indicate we're on a
15✔
1251
        // different chain.
15✔
1252
        if g.cfg.chainHash != query.ChainHash {
16✔
1253
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
1✔
1254
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1255
                        g.cfg.chainHash)
1✔
1256

1✔
1257
                return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
1✔
1258
                        ChainHash: query.ChainHash,
1✔
1259
                        Complete:  0,
1✔
1260
                })
1✔
1261
        }
1✔
1262

1263
        if len(query.ShortChanIDs) == 0 {
14✔
1264
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1265
                        g.cfg.peerPub[:])
×
1266
                return nil
×
1267
        }
×
1268

1269
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
14✔
1270
                g.cfg.peerPub[:], len(query.ShortChanIDs))
14✔
1271

14✔
1272
        // Now that we know we're on the same chain, we'll query the channel
14✔
1273
        // time series for the set of messages that we know of which satisfies
14✔
1274
        // the requirement of being a chan ann, chan update, or a node ann
14✔
1275
        // related to the set of queried channels.
14✔
1276
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
14✔
1277
                query.ChainHash, query.ShortChanIDs,
14✔
1278
        )
14✔
1279
        if err != nil {
14✔
1280
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1281
                        query.ShortChanIDs[0].ToUint64(), err)
×
1282
        }
×
1283

1284
        // Reply with any messages related to those channel ID's, we'll write
1285
        // each one individually and synchronously to throttle the sends and
1286
        // perform buffering of responses in the syncer as opposed to the peer.
1287
        for _, msg := range replyMsgs {
17✔
1288
                err := g.cfg.sendToPeerSync(msg)
3✔
1289
                if err != nil {
3✔
1290
                        return err
×
1291
                }
×
1292
        }
1293

1294
        // Regardless of whether we had any messages to reply with, send over
1295
        // the sentinel message to signal that the stream has terminated.
1296
        return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
14✔
1297
                ChainHash: query.ChainHash,
14✔
1298
                Complete:  1,
14✔
1299
        })
14✔
1300
}
1301

1302
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1303
// state machine. Once applied, we'll ensure that we don't forward any messages
1304
// to the peer that aren't within the time range of the filter.
1305
func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error {
3✔
1306
        g.Lock()
3✔
1307

3✔
1308
        g.remoteUpdateHorizon = filter
3✔
1309

3✔
1310
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
3✔
1311
        endTime := startTime.Add(
3✔
1312
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
3✔
1313
        )
3✔
1314

3✔
1315
        g.Unlock()
3✔
1316

3✔
1317
        // If requested, don't reply with historical gossip data when the remote
3✔
1318
        // peer sets their gossip timestamp range.
3✔
1319
        if g.cfg.ignoreHistoricalFilters {
4✔
1320
                return nil
1✔
1321
        }
1✔
1322

1323
        select {
2✔
1324
        case <-g.syncerSema:
2✔
1325
        case <-g.quit:
×
1326
                return ErrGossipSyncerExiting
×
1327
        }
1328

1329
        // We don't put this in a defer because if the goroutine is launched,
1330
        // it needs to be called when the goroutine is stopped.
1331
        returnSema := func() {
4✔
1332
                g.syncerSema <- struct{}{}
2✔
1333
        }
2✔
1334

1335
        // Now that the remote peer has applied their filter, we'll query the
1336
        // database for all the messages that are beyond this filter.
1337
        newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
2✔
1338
                g.cfg.chainHash, startTime, endTime,
2✔
1339
        )
2✔
1340
        if err != nil {
2✔
1341
                returnSema()
×
1342
                return err
×
1343
        }
×
1344

1345
        log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+
2✔
1346
                "end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime,
2✔
1347
                len(newUpdatestoSend))
2✔
1348

2✔
1349
        // If we don't have any to send, then we can return early.
2✔
1350
        if len(newUpdatestoSend) == 0 {
3✔
1351
                returnSema()
1✔
1352
                return nil
1✔
1353
        }
1✔
1354

1355
        // We'll conclude by launching a goroutine to send out any updates.
1356
        g.wg.Add(1)
1✔
1357
        go func() {
2✔
1358
                defer g.wg.Done()
1✔
1359
                defer returnSema()
1✔
1360

1✔
1361
                for _, msg := range newUpdatestoSend {
2✔
1362
                        err := g.cfg.sendToPeerSync(msg)
1✔
1363
                        switch {
1✔
1364
                        case err == ErrGossipSyncerExiting:
×
1365
                                return
×
1366

1367
                        case err == lnpeer.ErrPeerExiting:
×
1368
                                return
×
1369

1370
                        case err != nil:
×
1371
                                log.Errorf("Unable to send message for "+
×
1372
                                        "peer catch up: %v", err)
×
1373
                        }
1374
                }
1375
        }()
1376

1377
        return nil
1✔
1378
}
1379

1380
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1381
// iff the message is within the bounds of their set gossip filter. If the peer
1382
// doesn't have a gossip filter set, then no messages will be forwarded.
1383
func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
2✔
1384
        // If the peer doesn't have an update horizon set, then we won't send
2✔
1385
        // it any new update messages.
2✔
1386
        if g.remoteUpdateHorizon == nil {
3✔
1387
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
1✔
1388
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
1✔
1389
                return
1✔
1390
        }
1✔
1391

1392
        // If we've been signaled to exit, or are exiting, then we'll stop
1393
        // short.
1394
        select {
1✔
1395
        case <-g.quit:
×
1396
                return
×
1397
        default:
1✔
1398
        }
1399

1400
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1401
        // gossiper on peer termination to signal peer disconnect?
1402

1403
        var err error
1✔
1404

1✔
1405
        // Before we filter out the messages, we'll construct an index over the
1✔
1406
        // set of channel announcements and channel updates. This will allow us
1✔
1407
        // to quickly check if we should forward a chan ann, based on the known
1✔
1408
        // channel updates for a channel.
1✔
1409
        chanUpdateIndex := make(
1✔
1410
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
1✔
1411
        )
1✔
1412
        for _, msg := range msgs {
11✔
1413
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
10✔
1414
                if !ok {
17✔
1415
                        continue
7✔
1416
                }
1417

1418
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
3✔
1419
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
3✔
1420
                )
3✔
1421
        }
1422

1423
        // We'll construct a helper function that we'll us below to determine
1424
        // if a given messages passes the gossip msg filter.
1425
        g.Lock()
1✔
1426
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
1✔
1427
        endTime := startTime.Add(
1✔
1428
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
1✔
1429
        )
1✔
1430
        g.Unlock()
1✔
1431

1✔
1432
        passesFilter := func(timeStamp uint32) bool {
11✔
1433
                t := time.Unix(int64(timeStamp), 0)
10✔
1434
                return t.Equal(startTime) ||
10✔
1435
                        (t.After(startTime) && t.Before(endTime))
10✔
1436
        }
10✔
1437

1438
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
1✔
1439
        for _, msg := range msgs {
11✔
1440
                // If the target peer is the peer that sent us this message,
10✔
1441
                // then we'll exit early as we don't need to filter this
10✔
1442
                // message.
10✔
1443
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
10✔
UNCOV
1444
                        continue
×
1445
                }
1446

1447
                switch msg := msg.msg.(type) {
10✔
1448

1449
                // For each channel announcement message, we'll only send this
1450
                // message if the channel updates for the channel are between
1451
                // our time range.
1452
                case *lnwire.ChannelAnnouncement1:
4✔
1453
                        // First, we'll check if the channel updates are in
4✔
1454
                        // this message batch.
4✔
1455
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
4✔
1456
                        if !ok {
5✔
1457
                                // If not, we'll attempt to query the database
1✔
1458
                                // to see if we know of the updates.
1✔
1459
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
1✔
1460
                                        g.cfg.chainHash, msg.ShortChannelID,
1✔
1461
                                )
1✔
1462
                                if err != nil {
1✔
1463
                                        log.Warnf("no channel updates found for "+
×
1464
                                                "short_chan_id=%v",
×
1465
                                                msg.ShortChannelID)
×
1466
                                        continue
×
1467
                                }
1468
                        }
1469

1470
                        for _, chanUpdate := range chanUpdates {
8✔
1471
                                if passesFilter(chanUpdate.Timestamp) {
5✔
1472
                                        msgsToSend = append(msgsToSend, msg)
1✔
1473
                                        break
1✔
1474
                                }
1475
                        }
1476

1477
                        if len(chanUpdates) == 0 {
4✔
UNCOV
1478
                                msgsToSend = append(msgsToSend, msg)
×
UNCOV
1479
                        }
×
1480

1481
                // For each channel update, we'll only send if it the timestamp
1482
                // is between our time range.
1483
                case *lnwire.ChannelUpdate1:
3✔
1484
                        if passesFilter(msg.Timestamp) {
4✔
1485
                                msgsToSend = append(msgsToSend, msg)
1✔
1486
                        }
1✔
1487

1488
                // Similarly, we only send node announcements if the update
1489
                // timestamp ifs between our set gossip filter time range.
1490
                case *lnwire.NodeAnnouncement:
3✔
1491
                        if passesFilter(msg.Timestamp) {
4✔
1492
                                msgsToSend = append(msgsToSend, msg)
1✔
1493
                        }
1✔
1494
                }
1495
        }
1496

1497
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
1✔
1498
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
1✔
1499

1✔
1500
        if len(msgsToSend) == 0 {
1✔
UNCOV
1501
                return
×
UNCOV
1502
        }
×
1503

1504
        g.cfg.sendToPeer(msgsToSend...)
1✔
1505
}
1506

1507
// ProcessQueryMsg is used by outside callers to pass new channel time series
1508
// queries to the internal processing goroutine.
1509
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
112✔
1510
        var msgChan chan lnwire.Message
112✔
1511
        switch msg.(type) {
112✔
UNCOV
1512
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
×
UNCOV
1513
                msgChan = g.queryMsgs
×
1514

1515
        // Reply messages should only be expected in states where we're waiting
1516
        // for a reply.
1517
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
112✔
1518
                syncState := g.syncState()
112✔
1519
                if syncState != waitingQueryRangeReply &&
112✔
1520
                        syncState != waitingQueryChanReply {
113✔
1521

1✔
1522
                        return fmt.Errorf("received unexpected query reply "+
1✔
1523
                                "message %T", msg)
1✔
1524
                }
1✔
1525
                msgChan = g.gossipMsgs
111✔
1526

1527
        default:
×
1528
                msgChan = g.gossipMsgs
×
1529
        }
1530

1531
        select {
111✔
1532
        case msgChan <- msg:
111✔
1533
        case <-peerQuit:
×
1534
        case <-g.quit:
×
1535
        }
1536

1537
        return nil
111✔
1538
}
1539

1540
// setSyncState sets the gossip syncer's state to the given state.
1541
func (g *GossipSyncer) setSyncState(state syncerState) {
116✔
1542
        atomic.StoreUint32(&g.state, uint32(state))
116✔
1543
}
116✔
1544

1545
// syncState returns the current syncerState of the target GossipSyncer.
1546
func (g *GossipSyncer) syncState() syncerState {
460✔
1547
        return syncerState(atomic.LoadUint32(&g.state))
460✔
1548
}
460✔
1549

1550
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1551
// a signal for when the GossipSyncer has reached its chansSynced state.
1552
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
14✔
1553
        g.Lock()
14✔
1554
        defer g.Unlock()
14✔
1555

14✔
1556
        syncedSignal := make(chan struct{})
14✔
1557

14✔
1558
        syncState := syncerState(atomic.LoadUint32(&g.state))
14✔
1559
        if syncState == chansSynced {
16✔
1560
                close(syncedSignal)
2✔
1561
                return syncedSignal
2✔
1562
        }
2✔
1563

1564
        g.syncedSignal = syncedSignal
12✔
1565
        return g.syncedSignal
12✔
1566
}
1567

1568
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1569
// sync type to a new one.
1570
//
1571
// NOTE: This can only be done once the gossip syncer has reached its final
1572
// chansSynced state.
1573
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
14✔
1574
        errChan := make(chan error, 1)
14✔
1575
        select {
14✔
1576
        case g.syncTransitionReqs <- &syncTransitionReq{
1577
                newSyncType: newSyncType,
1578
                errChan:     errChan,
1579
        }:
14✔
1580
        case <-time.After(syncTransitionTimeout):
×
1581
                return ErrSyncTransitionTimeout
×
1582
        case <-g.quit:
×
1583
                return ErrGossipSyncerExiting
×
1584
        }
1585

1586
        select {
14✔
1587
        case err := <-errChan:
14✔
1588
                return err
14✔
1589
        case <-g.quit:
×
1590
                return ErrGossipSyncerExiting
×
1591
        }
1592
}
1593

1594
// handleSyncTransition handles a new sync type transition request.
1595
//
1596
// NOTE: The gossip syncer might have another sync state as a result of this
1597
// transition.
1598
func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error {
14✔
1599
        // Return early from any NOP sync transitions.
14✔
1600
        syncType := g.SyncType()
14✔
1601
        if syncType == req.newSyncType {
14✔
1602
                return nil
×
1603
        }
×
1604

1605
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
14✔
1606
                g.cfg.peerPub, syncType, req.newSyncType)
14✔
1607

14✔
1608
        var (
14✔
1609
                firstTimestamp time.Time
14✔
1610
                timestampRange uint32
14✔
1611
        )
14✔
1612

14✔
1613
        switch req.newSyncType {
14✔
1614
        // If an active sync has been requested, then we should resume receiving
1615
        // new graph updates from the remote peer.
1616
        case ActiveSync, PinnedSync:
12✔
1617
                firstTimestamp = time.Now()
12✔
1618
                timestampRange = math.MaxUint32
12✔
1619

1620
        // If a PassiveSync transition has been requested, then we should no
1621
        // longer receive any new updates from the remote peer. We can do this
1622
        // by setting our update horizon to a range in the past ensuring no
1623
        // graph updates match the timestamp range.
1624
        case PassiveSync:
2✔
1625
                firstTimestamp = zeroTimestamp
2✔
1626
                timestampRange = 0
2✔
1627

1628
        default:
×
1629
                return fmt.Errorf("unhandled sync transition %v",
×
1630
                        req.newSyncType)
×
1631
        }
1632

1633
        err := g.sendGossipTimestampRange(firstTimestamp, timestampRange)
14✔
1634
        if err != nil {
15✔
1635
                return fmt.Errorf("unable to send local update horizon: %w",
1✔
1636
                        err)
1✔
1637
        }
1✔
1638

1639
        g.setSyncType(req.newSyncType)
13✔
1640

13✔
1641
        return nil
13✔
1642
}
1643

1644
// setSyncType sets the gossip syncer's sync type to the given type.
1645
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
66✔
1646
        atomic.StoreUint32(&g.syncType, uint32(syncType))
66✔
1647
}
66✔
1648

1649
// SyncType returns the current SyncerType of the target GossipSyncer.
1650
func (g *GossipSyncer) SyncType() SyncerType {
331✔
1651
        return SyncerType(atomic.LoadUint32(&g.syncType))
331✔
1652
}
331✔
1653

1654
// historicalSync sends a request to the gossip syncer to perofmr a historical
1655
// sync.
1656
//
1657
// NOTE: This can only be done once the gossip syncer has reached its final
1658
// chansSynced state.
1659
func (g *GossipSyncer) historicalSync() error {
16✔
1660
        done := make(chan struct{})
16✔
1661

16✔
1662
        select {
16✔
1663
        case g.historicalSyncReqs <- &historicalSyncReq{
1664
                doneChan: done,
1665
        }:
16✔
1666
        case <-time.After(syncTransitionTimeout):
×
1667
                return ErrSyncTransitionTimeout
×
1668
        case <-g.quit:
×
1669
                return ErrGossiperShuttingDown
×
1670
        }
1671

1672
        select {
16✔
1673
        case <-done:
16✔
1674
                return nil
16✔
1675
        case <-g.quit:
×
1676
                return ErrGossiperShuttingDown
×
1677
        }
1678
}
1679

1680
// handleHistoricalSync handles a request to the gossip syncer to perform a
1681
// historical sync.
1682
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
16✔
1683
        // We'll go back to our initial syncingChans state in order to request
16✔
1684
        // the remote peer to give us all of the channel IDs they know of
16✔
1685
        // starting from the genesis block.
16✔
1686
        g.genHistoricalChanRangeQuery = true
16✔
1687
        g.setSyncState(syncingChans)
16✔
1688
        close(req.doneChan)
16✔
1689
}
16✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc