• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15707922856

17 Jun 2025 12:59PM UTC coverage: 68.521% (+0.01%) from 68.507%
15707922856

Pull #9957

github

web-flow
Merge 6b7c85cd4 into a5c4a7c54
Pull Request #9957: Re-send AnnouncementSignature not more than once per connection.

69 of 81 new or added lines in 4 files covered. (85.19%)

48 existing lines in 13 files now uncovered.

134580 of 196406 relevant lines covered (68.52%)

22264.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.96
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "math"
8
        "math/rand"
9
        "sort"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/graph"
17
        graphdb "github.com/lightningnetwork/lnd/graph/db"
18
        "github.com/lightningnetwork/lnd/lnpeer"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
)
21

22
// SyncerType encapsulates the different types of syncing mechanisms for a
23
// gossip syncer.
24
type SyncerType uint8
25

26
const (
27
        // ActiveSync denotes that a gossip syncer:
28
        //
29
        // 1. Should not attempt to synchronize with the remote peer for
30
        //    missing channels.
31
        // 2. Should respond to queries from the remote peer.
32
        // 3. Should receive new updates from the remote peer.
33
        //
34
        // They are started in a chansSynced state in order to accomplish their
35
        // responsibilities above.
36
        ActiveSync SyncerType = iota
37

38
        // PassiveSync denotes that a gossip syncer:
39
        //
40
        // 1. Should not attempt to synchronize with the remote peer for
41
        //    missing channels.
42
        // 2. Should respond to queries from the remote peer.
43
        // 3. Should not receive new updates from the remote peer.
44
        //
45
        // They are started in a chansSynced state in order to accomplish their
46
        // responsibilities above.
47
        PassiveSync
48

49
        // PinnedSync denotes an ActiveSync that doesn't count towards the
50
        // default active syncer limits and is always active throughout the
51
        // duration of the peer's connection. Each pinned syncer will begin by
52
        // performing a historical sync to ensure we are well synchronized with
53
        // their routing table.
54
        PinnedSync
55
)
56

57
// String returns a human readable string describing the target SyncerType.
58
func (t SyncerType) String() string {
3✔
59
        switch t {
3✔
60
        case ActiveSync:
3✔
61
                return "ActiveSync"
3✔
62
        case PassiveSync:
3✔
63
                return "PassiveSync"
3✔
64
        case PinnedSync:
3✔
65
                return "PinnedSync"
3✔
66
        default:
×
67
                return fmt.Sprintf("unknown sync type %d", t)
×
68
        }
69
}
70

71
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
72
// allowing new gossip messages to be received from the peer.
73
func (t SyncerType) IsActiveSync() bool {
51✔
74
        switch t {
51✔
75
        case ActiveSync, PinnedSync:
17✔
76
                return true
17✔
77
        default:
37✔
78
                return false
37✔
79
        }
80
}
81

82
// syncerState is an enum that represents the current state of the GossipSyncer.
83
// As the syncer is a state machine, we'll gate our actions based off of the
84
// current state and the next incoming message.
85
type syncerState uint32
86

87
const (
88
        // syncingChans is the default state of the GossipSyncer. We start in
89
        // this state when a new peer first connects and we don't yet know if
90
        // we're fully synchronized.
91
        syncingChans syncerState = iota
92

93
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
94
        // We enter this state after we send out our first QueryChannelRange
95
        // reply. We'll stay in this state until the remote party sends us a
96
        // ReplyShortChanIDsEnd message that indicates they've responded to our
97
        // query entirely. After this state, we'll transition to
98
        // waitingQueryChanReply after we send out requests for all the new
99
        // chan ID's to us.
100
        waitingQueryRangeReply
101

102
        // queryNewChannels is the third main phase of the GossipSyncer.  In
103
        // this phase we'll send out all of our QueryShortChanIDs messages in
104
        // response to the new channels that we don't yet know about.
105
        queryNewChannels
106

107
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
108
        // We enter this phase once we've sent off a query chink to the remote
109
        // peer.  We'll stay in this phase until we receive a
110
        // ReplyShortChanIDsEnd message which indicates that the remote party
111
        // has responded to all of our requests.
112
        waitingQueryChanReply
113

114
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
115
        // this phase, we'll send out our update horizon, which filters out the
116
        // set of channel updates that we're interested in. In this state,
117
        // we'll be able to accept any outgoing messages from the
118
        // AuthenticatedGossiper, and decide if we should forward them to our
119
        // target peer based on its update horizon.
120
        chansSynced
121

122
        // syncerIdle is a state in which the gossip syncer can handle external
123
        // requests to transition or perform historical syncs. It is used as the
124
        // initial state for pinned syncers, as well as a fallthrough case for
125
        // chansSynced allowing fully synced peers to facilitate requests.
126
        syncerIdle
127
)
128

129
// String returns a human readable string describing the target syncerState.
130
func (s syncerState) String() string {
4✔
131
        switch s {
4✔
132
        case syncingChans:
3✔
133
                return "syncingChans"
3✔
134

135
        case waitingQueryRangeReply:
3✔
136
                return "waitingQueryRangeReply"
3✔
137

138
        case queryNewChannels:
3✔
139
                return "queryNewChannels"
3✔
140

141
        case waitingQueryChanReply:
3✔
142
                return "waitingQueryChanReply"
3✔
143

144
        case chansSynced:
4✔
145
                return "chansSynced"
4✔
146

147
        case syncerIdle:
3✔
148
                return "syncerIdle"
3✔
149

150
        default:
×
151
                return "UNKNOWN STATE"
×
152
        }
153
}
154

155
const (
156
        // maxQueryChanRangeReplies specifies the default limit of replies to
157
        // process for a single QueryChannelRange request.
158
        maxQueryChanRangeReplies = 500
159

160
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
161
        // the maximum number of replies allowed for zlib encoded replies.
162
        maxQueryChanRangeRepliesZlibFactor = 4
163

164
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
165
        // asking the remote peer for their any channels they know of beyond
166
        // our highest known channel ID.
167
        chanRangeQueryBuffer = 144
168

169
        // syncTransitionTimeout is the default timeout in which we'll wait up
170
        // to when attempting to perform a sync transition.
171
        syncTransitionTimeout = 5 * time.Second
172

173
        // requestBatchSize is the maximum number of channels we will query the
174
        // remote peer for in a QueryShortChanIDs message.
175
        requestBatchSize = 500
176

177
        // syncerBufferSize is the size of the syncer's buffers.
178
        syncerBufferSize = 50
179
)
180

181
var (
182
        // encodingTypeToChunkSize maps an encoding type, to the max number of
183
        // short chan ID's using the encoding type that we can fit into a
184
        // single message safely.
185
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
186
                lnwire.EncodingSortedPlain: 8000,
187
        }
188

189
        // ErrGossipSyncerExiting signals that the syncer has been killed.
190
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
191

192
        // ErrSyncTransitionTimeout is an error returned when we've timed out
193
        // attempting to perform a sync transition.
194
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
195
                "transition sync type")
196

197
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
198
        // peers that we do not want to receive any new graph updates.
199
        zeroTimestamp time.Time
200
)
201

202
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
203
type syncTransitionReq struct {
204
        newSyncType SyncerType
205
        errChan     chan error
206
}
207

208
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
209
// historical sync.
210
type historicalSyncReq struct {
211
        // doneChan is a channel that serves as a signal and is closed to ensure
212
        // the historical sync is attempted by the time we return to the caller.
213
        doneChan chan struct{}
214
}
215

216
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
217
// needs to carry out its duties.
218
type gossipSyncerCfg struct {
219
        // chainHash is the chain that this syncer is responsible for.
220
        chainHash chainhash.Hash
221

222
        // peerPub is the public key of the peer we're syncing with, serialized
223
        // in compressed format.
224
        peerPub [33]byte
225

226
        // channelSeries is the primary interface that we'll use to generate
227
        // our queries and respond to the queries of the remote peer.
228
        channelSeries ChannelGraphTimeSeries
229

230
        // encodingType is the current encoding type we're aware of. Requests
231
        // with different encoding types will be rejected.
232
        encodingType lnwire.QueryEncoding
233

234
        // chunkSize is the max number of short chan IDs using the syncer's
235
        // encoding type that we can fit into a single message safely.
236
        chunkSize int32
237

238
        // batchSize is the max number of channels the syncer will query from
239
        // the remote node in a single QueryShortChanIDs request.
240
        batchSize int32
241

242
        // sendToPeer sends a variadic number of messages to the remote peer.
243
        // This method should not block while waiting for sends to be written
244
        // to the wire.
245
        sendToPeer func(context.Context, ...lnwire.Message) error
246

247
        // sendToPeerSync sends a variadic number of messages to the remote
248
        // peer, blocking until all messages have been sent successfully or a
249
        // write error is encountered.
250
        sendToPeerSync func(context.Context, ...lnwire.Message) error
251

252
        // noSyncChannels will prevent the GossipSyncer from spawning a
253
        // channelGraphSyncer, meaning we will not try to reconcile unknown
254
        // channels with the remote peer.
255
        noSyncChannels bool
256

257
        // noReplyQueries will prevent the GossipSyncer from spawning a
258
        // replyHandler, meaning we will not reply to queries from our remote
259
        // peer.
260
        noReplyQueries bool
261

262
        // noTimestampQueryOption will prevent the GossipSyncer from querying
263
        // timestamps of announcement messages from the peer, and it will
264
        // prevent it from responding to timestamp queries.
265
        noTimestampQueryOption bool
266

267
        // ignoreHistoricalFilters will prevent syncers from replying with
268
        // historical data when the remote peer sets a gossip_timestamp_range.
269
        // This prevents ranges with old start times from causing us to dump the
270
        // graph on connect.
271
        ignoreHistoricalFilters bool
272

273
        // bestHeight returns the latest height known of the chain.
274
        bestHeight func() uint32
275

276
        // markGraphSynced updates the SyncManager's perception of whether we
277
        // have completed at least one historical sync.
278
        markGraphSynced func()
279

280
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
281
        // for a single QueryChannelRange request.
282
        maxQueryChanRangeReplies uint32
283

284
        // isStillZombieChannel takes the timestamps of the latest channel
285
        // updates for a channel and returns true if the channel should be
286
        // considered a zombie based on these timestamps.
287
        isStillZombieChannel func(time.Time, time.Time) bool
288
}
289

290
// GossipSyncer is a struct that handles synchronizing the channel graph state
291
// with a remote peer. The GossipSyncer implements a state machine that will
292
// progressively ensure we're synchronized with the channel state of the remote
293
// node. Once both nodes have been synchronized, we'll use an update filter to
294
// filter out which messages should be sent to a remote peer based on their
295
// update horizon. If the update horizon isn't specified, then we won't send
296
// them any channel updates at all.
297
type GossipSyncer struct {
298
        started sync.Once
299
        stopped sync.Once
300

301
        // state is the current state of the GossipSyncer.
302
        //
303
        // NOTE: This variable MUST be used atomically.
304
        state uint32
305

306
        // syncType denotes the SyncerType the gossip syncer is currently
307
        // exercising.
308
        //
309
        // NOTE: This variable MUST be used atomically.
310
        syncType uint32
311

312
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
313
        // use this to properly filter out any messages.
314
        remoteUpdateHorizon *lnwire.GossipTimestampRange
315

316
        // localUpdateHorizon is our local update horizon, we'll use this to
317
        // determine if we've already sent out our update.
318
        localUpdateHorizon *lnwire.GossipTimestampRange
319

320
        // syncTransitions is a channel through which new sync type transition
321
        // requests will be sent through. These requests should only be handled
322
        // when the gossip syncer is in a chansSynced state to ensure its state
323
        // machine behaves as expected.
324
        syncTransitionReqs chan *syncTransitionReq
325

326
        // historicalSyncReqs is a channel that serves as a signal for the
327
        // gossip syncer to perform a historical sync. These can only be done
328
        // once the gossip syncer is in a chansSynced state to ensure its state
329
        // machine behaves as expected.
330
        historicalSyncReqs chan *historicalSyncReq
331

332
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
333
        // that it should request the remote peer for all of its known channel
334
        // IDs starting from the genesis block of the chain. This can only
335
        // happen if the gossip syncer receives a request to attempt a
336
        // historical sync. It can be unset if the syncer ever transitions from
337
        // PassiveSync to ActiveSync.
338
        genHistoricalChanRangeQuery bool
339

340
        // gossipMsgs is a channel that all responses to our queries from the
341
        // target peer will be sent over, these will be read by the
342
        // channelGraphSyncer.
343
        gossipMsgs chan lnwire.Message
344

345
        // queryMsgs is a channel that all queries from the remote peer will be
346
        // received over, these will be read by the replyHandler.
347
        queryMsgs chan lnwire.Message
348

349
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
350
        // we've sent to a peer to ensure we've consumed all expected replies.
351
        // This field is primarily used within the waitingQueryChanReply state.
352
        curQueryRangeMsg *lnwire.QueryChannelRange
353

354
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
355
        // message we've received from a peer to ensure they've fully replied to
356
        // our query by ensuring they covered our requested block range. This
357
        // field is primarily used within the waitingQueryChanReply state.
358
        prevReplyChannelRange *lnwire.ReplyChannelRange
359

360
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
361
        // buffer all the chunked response to our query.
362
        bufferedChanRangeReplies []graphdb.ChannelUpdateInfo
363

364
        // numChanRangeRepliesRcvd is used to track the number of replies
365
        // received as part of a QueryChannelRange. This field is primarily used
366
        // within the waitingQueryChanReply state.
367
        numChanRangeRepliesRcvd uint32
368

369
        // newChansToQuery is used to pass the set of channels we should query
370
        // for from the waitingQueryChanReply state to the queryNewChannels
371
        // state.
372
        newChansToQuery []lnwire.ShortChannelID
373

374
        // proofSentToChan is used when we already have the fully assembled
375
        // proof for a channel and the peer sending us their proof has probably
376
        // not received our local proof yet. So we are kind and send them our
377
        // proof, but only if we haven't done so since (re)connecting. We keep
378
        // track of sends with this map, so we don't send it twice.
379
        proofSentToChan fn.Set[lnwire.ChannelID]
380

381
        cfg gossipSyncerCfg
382

383
        // syncedSignal is a channel that, if set, will be closed when the
384
        // GossipSyncer reaches its terminal chansSynced state.
385
        syncedSignal chan struct{}
386

387
        // syncerSema is used to more finely control the syncer's ability to
388
        // respond to gossip timestamp range messages.
389
        syncerSema chan struct{}
390

391
        sync.Mutex
392

393
        // cg is a helper that encapsulates a wait group and quit channel and
394
        // allows contexts that either block or cancel on those depending on
395
        // the use case.
396
        cg *fn.ContextGuard
397
}
398

399
// newGossipSyncer returns a new instance of the GossipSyncer populated using
400
// the passed config.
401
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
54✔
402
        return &GossipSyncer{
54✔
403
                cfg:                cfg,
54✔
404
                syncTransitionReqs: make(chan *syncTransitionReq),
54✔
405
                historicalSyncReqs: make(chan *historicalSyncReq),
54✔
406
                gossipMsgs:         make(chan lnwire.Message, syncerBufferSize),
54✔
407
                queryMsgs:          make(chan lnwire.Message, syncerBufferSize),
54✔
408
                syncerSema:         sema,
54✔
409
                proofSentToChan:    fn.NewSet[lnwire.ChannelID](),
54✔
410
                cg:                 fn.NewContextGuard(),
54✔
411
        }
54✔
412
}
54✔
413

414
// Start starts the GossipSyncer and any goroutines that it needs to carry out
415
// its duties.
416
func (g *GossipSyncer) Start() {
40✔
417
        g.started.Do(func() {
80✔
418
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
40✔
419

40✔
420
                ctx, _ := g.cg.Create(context.Background())
40✔
421

40✔
422
                // TODO(conner): only spawn channelGraphSyncer if remote
40✔
423
                // supports gossip queries, and only spawn replyHandler if we
40✔
424
                // advertise support
40✔
425
                if !g.cfg.noSyncChannels {
79✔
426
                        g.cg.WgAdd(1)
39✔
427
                        go g.channelGraphSyncer(ctx)
39✔
428
                }
39✔
429
                if !g.cfg.noReplyQueries {
79✔
430
                        g.cg.WgAdd(1)
39✔
431
                        go g.replyHandler(ctx)
39✔
432
                }
39✔
433
        })
434
}
435

436
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
437
// exited.
438
func (g *GossipSyncer) Stop() {
37✔
439
        g.stopped.Do(func() {
74✔
440
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
37✔
441
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
37✔
442

37✔
443
                g.cg.Quit()
37✔
444
        })
37✔
445
}
446

447
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
448
// in this state, we will send a QueryChannelRange msg to our peer and advance
449
// the syncer's state to waitingQueryRangeReply.
450
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
25✔
451
        // Prepare the query msg.
25✔
452
        queryRangeMsg, err := g.genChanRangeQuery(
25✔
453
                ctx, g.genHistoricalChanRangeQuery,
25✔
454
        )
25✔
455
        if err != nil {
25✔
456
                log.Errorf("Unable to gen chan range query: %v", err)
×
457
                return
×
458
        }
×
459

460
        // Acquire a lock so the following state transition is atomic.
461
        //
462
        // NOTE: We must lock the following steps as it's possible we get an
463
        // immediate response (ReplyChannelRange) after sending the query msg.
464
        // The response is handled in ProcessQueryMsg, which requires the
465
        // current state to be waitingQueryRangeReply.
466
        g.Lock()
25✔
467
        defer g.Unlock()
25✔
468

25✔
469
        // Send the msg to the remote peer, which is non-blocking as
25✔
470
        // `sendToPeer` only queues the msg in Brontide.
25✔
471
        err = g.cfg.sendToPeer(ctx, queryRangeMsg)
25✔
472
        if err != nil {
25✔
473
                log.Errorf("Unable to send chan range query: %v", err)
×
474
                return
×
475
        }
×
476

477
        // With the message sent successfully, we'll transition into the next
478
        // state where we wait for their reply.
479
        g.setSyncState(waitingQueryRangeReply)
25✔
480
}
481

482
// channelGraphSyncer is the main goroutine responsible for ensuring that we
483
// properly channel graph state with the remote peer, and also that we only
484
// send them messages which actually pass their defined update horizon.
485
func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
39✔
486
        defer g.cg.WgDone()
39✔
487

39✔
488
        for {
258✔
489
                state := g.syncState()
219✔
490
                syncType := g.SyncType()
219✔
491

219✔
492
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
219✔
493
                        g.cfg.peerPub[:], state, syncType)
219✔
494

219✔
495
                switch state {
219✔
496
                // When we're in this state, we're trying to synchronize our
497
                // view of the network with the remote peer. We'll kick off
498
                // this sync by asking them for the set of channels they
499
                // understand, as we'll as responding to any other queries by
500
                // them.
501
                case syncingChans:
25✔
502
                        g.handleSyncingChans(ctx)
25✔
503

504
                // In this state, we've sent out our initial channel range
505
                // query and are waiting for the final response from the remote
506
                // peer before we perform a diff to see with channels they know
507
                // of that we don't.
508
                case waitingQueryRangeReply:
127✔
509
                        // We'll wait to either process a new message from the
127✔
510
                        // remote party, or exit due to the gossiper exiting,
127✔
511
                        // or us being signalled to do so.
127✔
512
                        select {
127✔
513
                        case msg := <-g.gossipMsgs:
122✔
514
                                // The remote peer is sending a response to our
122✔
515
                                // initial query, we'll collate this response,
122✔
516
                                // and see if it's the final one in the series.
122✔
517
                                // If so, we can then transition to querying
122✔
518
                                // for the new channels.
122✔
519
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
122✔
520
                                if ok {
244✔
521
                                        err := g.processChanRangeReply(
122✔
522
                                                ctx, queryReply,
122✔
523
                                        )
122✔
524
                                        if err != nil {
122✔
525
                                                log.Errorf("Unable to "+
×
526
                                                        "process chan range "+
×
527
                                                        "query: %v", err)
×
528
                                                return
×
529
                                        }
×
530
                                        continue
122✔
531
                                }
532

533
                                log.Warnf("Unexpected message: %T in state=%v",
×
534
                                        msg, state)
×
535

536
                        case <-g.cg.Done():
×
537
                                return
×
538

539
                        case <-ctx.Done():
5✔
540
                                return
5✔
541
                        }
542

543
                // We'll enter this state once we've discovered which channels
544
                // the remote party knows of that we don't yet know of
545
                // ourselves.
546
                case queryNewChannels:
6✔
547
                        // First, we'll attempt to continue our channel
6✔
548
                        // synchronization by continuing to send off another
6✔
549
                        // query chunk.
6✔
550
                        done := g.synchronizeChanIDs(ctx)
6✔
551

6✔
552
                        // If this wasn't our last query, then we'll need to
6✔
553
                        // transition to our waiting state.
6✔
554
                        if !done {
11✔
555
                                continue
5✔
556
                        }
557

558
                        // If we're fully synchronized, then we can transition
559
                        // to our terminal state.
560
                        g.setSyncState(chansSynced)
4✔
561

4✔
562
                        // Ensure that the sync manager becomes aware that the
4✔
563
                        // historical sync completed so synced_to_graph is
4✔
564
                        // updated over rpc.
4✔
565
                        g.cfg.markGraphSynced()
4✔
566

567
                // In this state, we've just sent off a new query for channels
568
                // that we don't yet know of. We'll remain in this state until
569
                // the remote party signals they've responded to our query in
570
                // totality.
571
                case waitingQueryChanReply:
5✔
572
                        // Once we've sent off our query, we'll wait for either
5✔
573
                        // an ending reply, or just another query from the
5✔
574
                        // remote peer.
5✔
575
                        select {
5✔
576
                        case msg := <-g.gossipMsgs:
5✔
577
                                // If this is the final reply to one of our
5✔
578
                                // queries, then we'll loop back into our query
5✔
579
                                // state to send of the remaining query chunks.
5✔
580
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
5✔
581
                                if ok {
10✔
582
                                        g.setSyncState(queryNewChannels)
5✔
583
                                        continue
5✔
584
                                }
585

586
                                log.Warnf("Unexpected message: %T in state=%v",
×
587
                                        msg, state)
×
588

589
                        case <-g.cg.Done():
×
590
                                return
×
591

592
                        case <-ctx.Done():
×
593
                                return
×
594
                        }
595

596
                // This is our final terminal state where we'll only reply to
597
                // any further queries by the remote peer.
598
                case chansSynced:
65✔
599
                        g.Lock()
65✔
600
                        if g.syncedSignal != nil {
78✔
601
                                close(g.syncedSignal)
13✔
602
                                g.syncedSignal = nil
13✔
603
                        }
13✔
604
                        g.Unlock()
65✔
605

65✔
606
                        // If we haven't yet sent out our update horizon, and
65✔
607
                        // we want to receive real-time channel updates, we'll
65✔
608
                        // do so now.
65✔
609
                        if g.localUpdateHorizon == nil &&
65✔
610
                                syncType.IsActiveSync() {
82✔
611

17✔
612
                                err := g.sendGossipTimestampRange(
17✔
613
                                        ctx, time.Now(), math.MaxUint32,
17✔
614
                                )
17✔
615
                                if err != nil {
17✔
616
                                        log.Errorf("Unable to send update "+
×
617
                                                "horizon to %x: %v",
×
618
                                                g.cfg.peerPub, err)
×
619
                                }
×
620
                        }
621
                        // With our horizon set, we'll simply reply to any new
622
                        // messages or process any state transitions and exit if
623
                        // needed.
624
                        fallthrough
65✔
625

626
                // Pinned peers will begin in this state, since they will
627
                // immediately receive a request to perform a historical sync.
628
                // Otherwise, we fall through after ending in chansSynced to
629
                // facilitate new requests.
630
                case syncerIdle:
68✔
631
                        select {
68✔
632
                        case req := <-g.syncTransitionReqs:
19✔
633
                                req.errChan <- g.handleSyncTransition(ctx, req)
19✔
634

635
                        case req := <-g.historicalSyncReqs:
21✔
636
                                g.handleHistoricalSync(req)
21✔
637

UNCOV
638
                        case <-g.cg.Done():
×
UNCOV
639
                                return
×
640

641
                        case <-ctx.Done():
31✔
642
                                return
31✔
643
                        }
644
                }
645
        }
646
}
647

648
// replyHandler is an event loop whose sole purpose is to reply to the remote
649
// peers queries. Our replyHandler will respond to messages generated by their
650
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
651
// the other's replyHandler, allowing the replyHandler to operate independently
652
// from the state machine maintained on the same node.
653
//
654
// NOTE: This method MUST be run as a goroutine.
655
func (g *GossipSyncer) replyHandler(ctx context.Context) {
39✔
656
        defer g.cg.WgDone()
39✔
657

39✔
658
        for {
83✔
659
                select {
44✔
660
                case msg := <-g.queryMsgs:
8✔
661
                        err := g.replyPeerQueries(ctx, msg)
8✔
662
                        switch {
8✔
663
                        case err == ErrGossipSyncerExiting:
×
664
                                return
×
665

666
                        case err == lnpeer.ErrPeerExiting:
×
667
                                return
×
668

669
                        case err != nil:
×
670
                                log.Errorf("Unable to reply to peer "+
×
671
                                        "query: %v", err)
×
672
                        }
673

674
                case <-g.cg.Done():
1✔
675
                        return
1✔
676

677
                case <-ctx.Done():
35✔
678
                        return
35✔
679
                }
680
        }
681
}
682

683
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
684
// syncer and sends it to the remote peer.
685
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
686
        firstTimestamp time.Time, timestampRange uint32) error {
33✔
687

33✔
688
        endTimestamp := firstTimestamp.Add(
33✔
689
                time.Duration(timestampRange) * time.Second,
33✔
690
        )
33✔
691

33✔
692
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
33✔
693
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
33✔
694

33✔
695
        localUpdateHorizon := &lnwire.GossipTimestampRange{
33✔
696
                ChainHash:      g.cfg.chainHash,
33✔
697
                FirstTimestamp: uint32(firstTimestamp.Unix()),
33✔
698
                TimestampRange: timestampRange,
33✔
699
        }
33✔
700

33✔
701
        if err := g.cfg.sendToPeer(ctx, localUpdateHorizon); err != nil {
33✔
702
                return err
×
703
        }
×
704

705
        if firstTimestamp == zeroTimestamp && timestampRange == 0 {
35✔
706
                g.localUpdateHorizon = nil
2✔
707
        } else {
33✔
708
                g.localUpdateHorizon = localUpdateHorizon
31✔
709
        }
31✔
710

711
        return nil
33✔
712
}
713

714
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
715
// the remote peer for its known set of channel IDs within a particular block
716
// range. This method will be called continually until the entire range has
717
// been queried for with a response received. We'll chunk our requests as
718
// required to ensure they fit into a single message. We may re-renter this
719
// state in the case that chunking is required.
720
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
9✔
721
        // If we're in this state yet there are no more new channels to query
9✔
722
        // for, then we'll transition to our final synced state and return true
9✔
723
        // to signal that we're fully synchronized.
9✔
724
        if len(g.newChansToQuery) == 0 {
13✔
725
                log.Infof("GossipSyncer(%x): no more chans to query",
4✔
726
                        g.cfg.peerPub[:])
4✔
727

4✔
728
                return true
4✔
729
        }
4✔
730

731
        // Otherwise, we'll issue our next chunked query to receive replies
732
        // for.
733
        var queryChunk []lnwire.ShortChannelID
8✔
734

8✔
735
        // If the number of channels to query for is less than the chunk size,
8✔
736
        // then we can issue a single query.
8✔
737
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
13✔
738
                queryChunk = g.newChansToQuery
5✔
739
                g.newChansToQuery = nil
5✔
740

5✔
741
        } else {
8✔
742
                // Otherwise, we'll need to only query for the next chunk.
3✔
743
                // We'll slice into our query chunk, then slide down our main
3✔
744
                // pointer down by the chunk size.
3✔
745
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
3✔
746
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
3✔
747
        }
3✔
748

749
        log.Infof("GossipSyncer(%x): querying for %v new channels",
8✔
750
                g.cfg.peerPub[:], len(queryChunk))
8✔
751

8✔
752
        // Change the state before sending the query msg.
8✔
753
        g.setSyncState(waitingQueryChanReply)
8✔
754

8✔
755
        // With our chunk obtained, we'll send over our next query, then return
8✔
756
        // false indicating that we're net yet fully synced.
8✔
757
        err := g.cfg.sendToPeer(ctx, &lnwire.QueryShortChanIDs{
8✔
758
                ChainHash:    g.cfg.chainHash,
8✔
759
                EncodingType: lnwire.EncodingSortedPlain,
8✔
760
                ShortChanIDs: queryChunk,
8✔
761
        })
8✔
762
        if err != nil {
8✔
763
                log.Errorf("Unable to sync chan IDs: %v", err)
×
764
        }
×
765

766
        return false
8✔
767
}
768

769
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
770
// considered legacy. There was a point where lnd used to include the same query
771
// over multiple replies, rather than including the portion of the query the
772
// reply is handling. We'll use this as a way of detecting whether we are
773
// communicating with a legacy node so we can properly sync with them.
774
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
775
        reply *lnwire.ReplyChannelRange) bool {
257✔
776

257✔
777
        return (reply.ChainHash == query.ChainHash &&
257✔
778
                reply.FirstBlockHeight == query.FirstBlockHeight &&
257✔
779
                reply.NumBlocks == query.NumBlocks)
257✔
780
}
257✔
781

782
// processChanRangeReply is called each time the GossipSyncer receives a new
783
// reply to the initial range query to discover new channels that it didn't
784
// previously know of.
785
func (g *GossipSyncer) processChanRangeReply(_ context.Context,
786
        msg *lnwire.ReplyChannelRange) error {
130✔
787

130✔
788
        // isStale returns whether the timestamp is too far into the past.
130✔
789
        isStale := func(timestamp time.Time) bool {
163✔
790
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
33✔
791
        }
33✔
792

793
        // isSkewed returns whether the timestamp is too far into the future.
794
        isSkewed := func(timestamp time.Time) bool {
153✔
795
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
23✔
796
        }
23✔
797

798
        // If we're not communicating with a legacy node, we'll apply some
799
        // further constraints on their reply to ensure it satisfies our query.
800
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
240✔
801
                // The first block should be within our original request.
110✔
802
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
110✔
803
                        return fmt.Errorf("reply includes channels for height "+
×
804
                                "%v prior to query %v", msg.FirstBlockHeight,
×
805
                                g.curQueryRangeMsg.FirstBlockHeight)
×
806
                }
×
807

808
                // The last block should also be. We don't need to check the
809
                // intermediate ones because they should already be in sorted
810
                // order.
811
                replyLastHeight := msg.LastBlockHeight()
110✔
812
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
813
                if replyLastHeight > queryLastHeight {
110✔
814
                        return fmt.Errorf("reply includes channels for height "+
×
815
                                "%v after query %v", replyLastHeight,
×
816
                                queryLastHeight)
×
817
                }
×
818

819
                // If we've previously received a reply for this query, look at
820
                // its last block to ensure the current reply properly follows
821
                // it.
822
                if g.prevReplyChannelRange != nil {
215✔
823
                        prevReply := g.prevReplyChannelRange
105✔
824
                        prevReplyLastHeight := prevReply.LastBlockHeight()
105✔
825

105✔
826
                        // The current reply can either start from the previous
105✔
827
                        // reply's last block, if there are still more channels
105✔
828
                        // for the same block, or the block after.
105✔
829
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
105✔
830
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
105✔
831

×
832
                                return fmt.Errorf("first block of reply %v "+
×
833
                                        "does not continue from last block of "+
×
834
                                        "previous %v", msg.FirstBlockHeight,
×
835
                                        prevReplyLastHeight)
×
836
                        }
×
837
                }
838
        }
839

840
        g.prevReplyChannelRange = msg
130✔
841

130✔
842
        for i, scid := range msg.ShortChanIDs {
260✔
843
                info := graphdb.NewChannelUpdateInfo(
130✔
844
                        scid, time.Time{}, time.Time{},
130✔
845
                )
130✔
846

130✔
847
                if len(msg.Timestamps) != 0 {
145✔
848
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
15✔
849
                        info.Node1UpdateTimestamp = t1
15✔
850

15✔
851
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
15✔
852
                        info.Node2UpdateTimestamp = t2
15✔
853

15✔
854
                        // Sort out all channels with outdated or skewed
15✔
855
                        // timestamps. Both timestamps need to be out of
15✔
856
                        // boundaries for us to skip the channel and not query
15✔
857
                        // it later on.
15✔
858
                        switch {
15✔
859
                        case isStale(info.Node1UpdateTimestamp) &&
860
                                isStale(info.Node2UpdateTimestamp):
2✔
861

2✔
862
                                continue
2✔
863

864
                        case isSkewed(info.Node1UpdateTimestamp) &&
865
                                isSkewed(info.Node2UpdateTimestamp):
2✔
866

2✔
867
                                continue
2✔
868

869
                        case isStale(info.Node1UpdateTimestamp) &&
870
                                isSkewed(info.Node2UpdateTimestamp):
2✔
871

2✔
872
                                continue
2✔
873

874
                        case isStale(info.Node2UpdateTimestamp) &&
875
                                isSkewed(info.Node1UpdateTimestamp):
2✔
876

2✔
877
                                continue
2✔
878
                        }
879
                }
880

881
                g.bufferedChanRangeReplies = append(
122✔
882
                        g.bufferedChanRangeReplies, info,
122✔
883
                )
122✔
884
        }
885

886
        switch g.cfg.encodingType {
130✔
887
        case lnwire.EncodingSortedPlain:
130✔
888
                g.numChanRangeRepliesRcvd++
130✔
889
        case lnwire.EncodingSortedZlib:
×
890
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
891
        default:
×
892
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
893
        }
894

895
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
130✔
896
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
130✔
897

130✔
898
        // If this isn't the last response and we can continue to receive more,
130✔
899
        // then we can exit as we've already buffered the latest portion of the
130✔
900
        // streaming reply.
130✔
901
        maxReplies := g.cfg.maxQueryChanRangeReplies
130✔
902
        switch {
130✔
903
        // If we're communicating with a legacy node, we'll need to look at the
904
        // complete field.
905
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
20✔
906
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
23✔
907
                        return nil
3✔
908
                }
3✔
909

910
        // Otherwise, we'll look at the reply's height range.
911
        default:
110✔
912
                replyLastHeight := msg.LastBlockHeight()
110✔
913
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
914

110✔
915
                // TODO(wilmer): This might require some padding if the remote
110✔
916
                // node is not aware of the last height we sent them, i.e., is
110✔
917
                // behind a few blocks from us.
110✔
918
                if replyLastHeight < queryLastHeight &&
110✔
919
                        g.numChanRangeRepliesRcvd < maxReplies {
215✔
920

105✔
921
                        return nil
105✔
922
                }
105✔
923
        }
924

925
        log.Infof("GossipSyncer(%x): filtering through %v chans",
22✔
926
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
22✔
927

22✔
928
        // Otherwise, this is the final response, so we'll now check to see
22✔
929
        // which channels they know of that we don't.
22✔
930
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
22✔
931
                g.cfg.chainHash, g.bufferedChanRangeReplies,
22✔
932
                g.cfg.isStillZombieChannel,
22✔
933
        )
22✔
934
        if err != nil {
22✔
935
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
936
        }
×
937

938
        // As we've received the entirety of the reply, we no longer need to
939
        // hold on to the set of buffered replies or the original query that
940
        // prompted the replies, so we'll let that be garbage collected now.
941
        g.curQueryRangeMsg = nil
22✔
942
        g.prevReplyChannelRange = nil
22✔
943
        g.bufferedChanRangeReplies = nil
22✔
944
        g.numChanRangeRepliesRcvd = 0
22✔
945

22✔
946
        // If there aren't any channels that we don't know of, then we can
22✔
947
        // switch straight to our terminal state.
22✔
948
        if len(newChans) == 0 {
41✔
949
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
19✔
950
                        g.cfg.peerPub[:])
19✔
951

19✔
952
                g.setSyncState(chansSynced)
19✔
953

19✔
954
                // Ensure that the sync manager becomes aware that the
19✔
955
                // historical sync completed so synced_to_graph is updated over
19✔
956
                // rpc.
19✔
957
                g.cfg.markGraphSynced()
19✔
958
                return nil
19✔
959
        }
19✔
960

961
        // Otherwise, we'll set the set of channels that we need to query for
962
        // the next state, and also transition our state.
963
        g.newChansToQuery = newChans
6✔
964
        g.setSyncState(queryNewChannels)
6✔
965

6✔
966
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
6✔
967
                g.cfg.peerPub[:], len(newChans))
6✔
968

6✔
969
        return nil
6✔
970
}
971

972
// genChanRangeQuery generates the initial message we'll send to the remote
973
// party when we're kicking off the channel graph synchronization upon
974
// connection. The historicalQuery boolean can be used to generate a query from
975
// the genesis block of the chain.
976
func (g *GossipSyncer) genChanRangeQuery(_ context.Context,
977
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
29✔
978

29✔
979
        // First, we'll query our channel graph time series for its highest
29✔
980
        // known channel ID.
29✔
981
        newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
29✔
982
        if err != nil {
29✔
983
                return nil, err
×
984
        }
×
985

986
        // Once we have the chan ID of the newest, we'll obtain the block height
987
        // of the channel, then subtract our default horizon to ensure we don't
988
        // miss any channels. By default, we go back 1 day from the newest
989
        // channel, unless we're attempting a historical sync, where we'll
990
        // actually start from the genesis block instead.
991
        var startHeight uint32
29✔
992
        switch {
29✔
993
        case historicalQuery:
24✔
994
                fallthrough
24✔
995
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
24✔
996
                startHeight = 0
24✔
997
        default:
5✔
998
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
5✔
999
        }
1000

1001
        // Determine the number of blocks to request based on our best height.
1002
        // We'll take into account any potential underflows and explicitly set
1003
        // numBlocks to its minimum value of 1 if so.
1004
        bestHeight := g.cfg.bestHeight()
29✔
1005
        numBlocks := bestHeight - startHeight
29✔
1006
        if int64(numBlocks) < 1 {
29✔
1007
                numBlocks = 1
×
1008
        }
×
1009

1010
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
29✔
1011
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
29✔
1012

29✔
1013
        // Finally, we'll craft the channel range query, using our starting
29✔
1014
        // height, then asking for all known channels to the foreseeable end of
29✔
1015
        // the main chain.
29✔
1016
        query := &lnwire.QueryChannelRange{
29✔
1017
                ChainHash:        g.cfg.chainHash,
29✔
1018
                FirstBlockHeight: startHeight,
29✔
1019
                NumBlocks:        numBlocks,
29✔
1020
        }
29✔
1021

29✔
1022
        if !g.cfg.noTimestampQueryOption {
50✔
1023
                query.QueryOptions = lnwire.NewTimestampQueryOption()
21✔
1024
        }
21✔
1025

1026
        g.curQueryRangeMsg = query
29✔
1027

29✔
1028
        return query, nil
29✔
1029
}
1030

1031
// replyPeerQueries is called in response to any query by the remote peer.
1032
// We'll examine our state and send back our best response.
1033
func (g *GossipSyncer) replyPeerQueries(ctx context.Context,
1034
        msg lnwire.Message) error {
8✔
1035

8✔
1036
        switch msg := msg.(type) {
8✔
1037

1038
        // In this state, we'll also handle any incoming channel range queries
1039
        // from the remote peer as they're trying to sync their state as well.
1040
        case *lnwire.QueryChannelRange:
6✔
1041
                return g.replyChanRangeQuery(ctx, msg)
6✔
1042

1043
        // If the remote peer skips straight to requesting new channels that
1044
        // they don't know of, then we'll ensure that we also handle this case.
1045
        case *lnwire.QueryShortChanIDs:
5✔
1046
                return g.replyShortChanIDs(ctx, msg)
5✔
1047

1048
        default:
×
1049
                return fmt.Errorf("unknown message: %T", msg)
×
1050
        }
1051
}
1052

1053
// replyChanRangeQuery will be dispatched in response to a channel range query
1054
// by the remote node. We'll query the channel time series for channels that
1055
// meet the channel range, then chunk our responses to the remote node. We also
1056
// ensure that our final fragment carries the "complete" bit to indicate the
1057
// end of our streaming response.
1058
func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context,
1059
        query *lnwire.QueryChannelRange) error {
12✔
1060

12✔
1061
        // Before responding, we'll check to ensure that the remote peer is
12✔
1062
        // querying for the same chain that we're on. If not, we'll send back a
12✔
1063
        // response with a complete value of zero to indicate we're on a
12✔
1064
        // different chain.
12✔
1065
        if g.cfg.chainHash != query.ChainHash {
13✔
1066
                log.Warnf("Remote peer requested QueryChannelRange for "+
1✔
1067
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1068
                        g.cfg.chainHash)
1✔
1069

1✔
1070
                return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
1✔
1071
                        ChainHash:        query.ChainHash,
1✔
1072
                        FirstBlockHeight: query.FirstBlockHeight,
1✔
1073
                        NumBlocks:        query.NumBlocks,
1✔
1074
                        Complete:         0,
1✔
1075
                        EncodingType:     g.cfg.encodingType,
1✔
1076
                        ShortChanIDs:     nil,
1✔
1077
                })
1✔
1078
        }
1✔
1079

1080
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
11✔
1081
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
11✔
1082
                query.NumBlocks)
11✔
1083

11✔
1084
        // Check if the query asked for timestamps. We will only serve
11✔
1085
        // timestamps if this has not been disabled with
11✔
1086
        // noTimestampQueryOption.
11✔
1087
        withTimestamps := query.WithTimestamps() &&
11✔
1088
                !g.cfg.noTimestampQueryOption
11✔
1089

11✔
1090
        // Next, we'll consult the time series to obtain the set of known
11✔
1091
        // channel ID's that match their query.
11✔
1092
        startBlock := query.FirstBlockHeight
11✔
1093
        endBlock := query.LastBlockHeight()
11✔
1094
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
11✔
1095
                query.ChainHash, startBlock, endBlock, withTimestamps,
11✔
1096
        )
11✔
1097
        if err != nil {
11✔
1098
                return err
×
1099
        }
×
1100

1101
        // TODO(roasbeef): means can't send max uint above?
1102
        //  * or make internal 64
1103

1104
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1105
        // this as there's a transport message size limit which we'll need to
1106
        // adhere to. We also need to make sure all of our replies cover the
1107
        // expected range of the query.
1108
        sendReplyForChunk := func(channelChunk []graphdb.ChannelUpdateInfo,
11✔
1109
                firstHeight, lastHeight uint32, finalChunk bool) error {
27✔
1110

16✔
1111
                // The number of blocks contained in the current chunk (the
16✔
1112
                // total span) is the difference between the last channel ID and
16✔
1113
                // the first in the range. We add one as even if all channels
16✔
1114
                // returned are in the same block, we need to count that.
16✔
1115
                numBlocks := lastHeight - firstHeight + 1
16✔
1116
                complete := uint8(0)
16✔
1117
                if finalChunk {
27✔
1118
                        complete = 1
11✔
1119
                }
11✔
1120

1121
                var timestamps lnwire.Timestamps
16✔
1122
                if withTimestamps {
19✔
1123
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
3✔
1124
                }
3✔
1125

1126
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
16✔
1127
                for i, info := range channelChunk {
33✔
1128
                        scids[i] = info.ShortChannelID
17✔
1129

17✔
1130
                        if !withTimestamps {
31✔
1131
                                continue
14✔
1132
                        }
1133

1134
                        timestamps[i].Timestamp1 = uint32(
3✔
1135
                                info.Node1UpdateTimestamp.Unix(),
3✔
1136
                        )
3✔
1137

3✔
1138
                        timestamps[i].Timestamp2 = uint32(
3✔
1139
                                info.Node2UpdateTimestamp.Unix(),
3✔
1140
                        )
3✔
1141
                }
1142

1143
                return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
16✔
1144
                        ChainHash:        query.ChainHash,
16✔
1145
                        NumBlocks:        numBlocks,
16✔
1146
                        FirstBlockHeight: firstHeight,
16✔
1147
                        Complete:         complete,
16✔
1148
                        EncodingType:     g.cfg.encodingType,
16✔
1149
                        ShortChanIDs:     scids,
16✔
1150
                        Timestamps:       timestamps,
16✔
1151
                })
16✔
1152
        }
1153

1154
        var (
11✔
1155
                firstHeight  = query.FirstBlockHeight
11✔
1156
                lastHeight   uint32
11✔
1157
                channelChunk []graphdb.ChannelUpdateInfo
11✔
1158
        )
11✔
1159

11✔
1160
        // chunkSize is the maximum number of SCIDs that we can safely put in a
11✔
1161
        // single message. If we also need to include timestamps though, then
11✔
1162
        // this number is halved since encoding two timestamps takes the same
11✔
1163
        // number of bytes as encoding an SCID.
11✔
1164
        chunkSize := g.cfg.chunkSize
11✔
1165
        if withTimestamps {
14✔
1166
                chunkSize /= 2
3✔
1167
        }
3✔
1168

1169
        for _, channelRange := range channelRanges {
28✔
1170
                channels := channelRange.Channels
17✔
1171
                numChannels := int32(len(channels))
17✔
1172
                numLeftToAdd := chunkSize - int32(len(channelChunk))
17✔
1173

17✔
1174
                // Include the current block in the ongoing chunk if it can fit
17✔
1175
                // and move on to the next block.
17✔
1176
                if numChannels <= numLeftToAdd {
29✔
1177
                        channelChunk = append(channelChunk, channels...)
12✔
1178
                        continue
12✔
1179
                }
1180

1181
                // Otherwise, we need to send our existing channel chunk as is
1182
                // as its own reply and start a new one for the current block.
1183
                // We'll mark the end of our current chunk as the height before
1184
                // the current block to ensure the whole query range is replied
1185
                // to.
1186
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
5✔
1187
                        g.cfg.peerPub[:], len(channelChunk))
5✔
1188

5✔
1189
                lastHeight = channelRange.Height - 1
5✔
1190
                err := sendReplyForChunk(
5✔
1191
                        channelChunk, firstHeight, lastHeight, false,
5✔
1192
                )
5✔
1193
                if err != nil {
5✔
1194
                        return err
×
1195
                }
×
1196

1197
                // With the reply constructed, we'll start tallying channels for
1198
                // our next one keeping in mind our chunk size. This may result
1199
                // in channels for this block being left out from the reply, but
1200
                // this isn't an issue since we'll randomly shuffle them and we
1201
                // assume a historical gossip sync is performed at a later time.
1202
                firstHeight = channelRange.Height
5✔
1203
                finalChunkSize := numChannels
5✔
1204
                exceedsChunkSize := numChannels > chunkSize
5✔
1205
                if exceedsChunkSize {
5✔
1206
                        rand.Shuffle(len(channels), func(i, j int) {
×
1207
                                channels[i], channels[j] = channels[j], channels[i]
×
1208
                        })
×
1209
                        finalChunkSize = chunkSize
×
1210
                }
1211
                channelChunk = channels[:finalChunkSize]
5✔
1212

5✔
1213
                // Sort the chunk once again if we had to shuffle it.
5✔
1214
                if exceedsChunkSize {
5✔
1215
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1216
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1217
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1218

×
1219
                                return id1 < id2
×
1220
                        })
×
1221
                }
1222
        }
1223

1224
        // Send the remaining chunk as the final reply.
1225
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
11✔
1226
                g.cfg.peerPub[:], len(channelChunk))
11✔
1227

11✔
1228
        return sendReplyForChunk(
11✔
1229
                channelChunk, firstHeight, query.LastBlockHeight(), true,
11✔
1230
        )
11✔
1231
}
1232

1233
// replyShortChanIDs will be dispatched in response to a query by the remote
1234
// node for information concerning a set of short channel ID's. Our response
1235
// will be sent in a streaming chunked manner to ensure that we remain below
1236
// the current transport level message size.
1237
func (g *GossipSyncer) replyShortChanIDs(ctx context.Context,
1238
        query *lnwire.QueryShortChanIDs) error {
7✔
1239

7✔
1240
        // Before responding, we'll check to ensure that the remote peer is
7✔
1241
        // querying for the same chain that we're on. If not, we'll send back a
7✔
1242
        // response with a complete value of zero to indicate we're on a
7✔
1243
        // different chain.
7✔
1244
        if g.cfg.chainHash != query.ChainHash {
8✔
1245
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
1✔
1246
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1247
                        g.cfg.chainHash)
1✔
1248

1✔
1249
                return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
1✔
1250
                        ChainHash: query.ChainHash,
1✔
1251
                        Complete:  0,
1✔
1252
                })
1✔
1253
        }
1✔
1254

1255
        if len(query.ShortChanIDs) == 0 {
6✔
1256
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1257
                        g.cfg.peerPub[:])
×
1258
                return nil
×
1259
        }
×
1260

1261
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
6✔
1262
                g.cfg.peerPub[:], len(query.ShortChanIDs))
6✔
1263

6✔
1264
        // Now that we know we're on the same chain, we'll query the channel
6✔
1265
        // time series for the set of messages that we know of which satisfies
6✔
1266
        // the requirement of being a chan ann, chan update, or a node ann
6✔
1267
        // related to the set of queried channels.
6✔
1268
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
6✔
1269
                query.ChainHash, query.ShortChanIDs,
6✔
1270
        )
6✔
1271
        if err != nil {
6✔
1272
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1273
                        query.ShortChanIDs[0].ToUint64(), err)
×
1274
        }
×
1275

1276
        // Reply with any messages related to those channel ID's, we'll write
1277
        // each one individually and synchronously to throttle the sends and
1278
        // perform buffering of responses in the syncer as opposed to the peer.
1279
        for _, msg := range replyMsgs {
12✔
1280
                err := g.cfg.sendToPeerSync(ctx, msg)
6✔
1281
                if err != nil {
6✔
1282
                        return err
×
1283
                }
×
1284
        }
1285

1286
        // Regardless of whether we had any messages to reply with, send over
1287
        // the sentinel message to signal that the stream has terminated.
1288
        return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
6✔
1289
                ChainHash: query.ChainHash,
6✔
1290
                Complete:  1,
6✔
1291
        })
6✔
1292
}
1293

1294
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1295
// state machine. Once applied, we'll ensure that we don't forward any messages
1296
// to the peer that aren't within the time range of the filter.
1297
func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
1298
        filter *lnwire.GossipTimestampRange) error {
6✔
1299

6✔
1300
        g.Lock()
6✔
1301

6✔
1302
        g.remoteUpdateHorizon = filter
6✔
1303

6✔
1304
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
6✔
1305
        endTime := startTime.Add(
6✔
1306
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
6✔
1307
        )
6✔
1308

6✔
1309
        g.Unlock()
6✔
1310

6✔
1311
        // If requested, don't reply with historical gossip data when the remote
6✔
1312
        // peer sets their gossip timestamp range.
6✔
1313
        if g.cfg.ignoreHistoricalFilters {
7✔
1314
                return nil
1✔
1315
        }
1✔
1316

1317
        select {
5✔
1318
        case <-g.syncerSema:
5✔
1319
        case <-g.cg.Done():
×
1320
                return ErrGossipSyncerExiting
×
1321
        case <-ctx.Done():
×
1322
                return ctx.Err()
×
1323
        }
1324

1325
        // We don't put this in a defer because if the goroutine is launched,
1326
        // it needs to be called when the goroutine is stopped.
1327
        returnSema := func() {
10✔
1328
                g.syncerSema <- struct{}{}
5✔
1329
        }
5✔
1330

1331
        // Now that the remote peer has applied their filter, we'll query the
1332
        // database for all the messages that are beyond this filter.
1333
        newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
5✔
1334
                g.cfg.chainHash, startTime, endTime,
5✔
1335
        )
5✔
1336
        if err != nil {
5✔
1337
                returnSema()
×
1338
                return err
×
1339
        }
×
1340

1341
        log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
5✔
1342
                "start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
5✔
1343
                startTime, endTime, len(newUpdatestoSend))
5✔
1344

5✔
1345
        // If we don't have any to send, then we can return early.
5✔
1346
        if len(newUpdatestoSend) == 0 {
9✔
1347
                returnSema()
4✔
1348
                return nil
4✔
1349
        }
4✔
1350

1351
        // We'll conclude by launching a goroutine to send out any updates.
1352
        g.cg.WgAdd(1)
4✔
1353
        go func() {
8✔
1354
                defer g.cg.WgDone()
4✔
1355
                defer returnSema()
4✔
1356

4✔
1357
                for _, msg := range newUpdatestoSend {
8✔
1358
                        err := g.cfg.sendToPeerSync(ctx, msg)
4✔
1359
                        switch {
4✔
1360
                        case err == ErrGossipSyncerExiting:
×
1361
                                return
×
1362

1363
                        case err == lnpeer.ErrPeerExiting:
×
1364
                                return
×
1365

1366
                        case err != nil:
×
1367
                                log.Errorf("Unable to send message for "+
×
1368
                                        "peer catch up: %v", err)
×
1369
                        }
1370
                }
1371
        }()
1372

1373
        return nil
4✔
1374
}
1375

1376
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1377
// iff the message is within the bounds of their set gossip filter. If the peer
1378
// doesn't have a gossip filter set, then no messages will be forwarded.
1379
func (g *GossipSyncer) FilterGossipMsgs(ctx context.Context,
1380
        msgs ...msgWithSenders) {
6✔
1381

6✔
1382
        // If the peer doesn't have an update horizon set, then we won't send
6✔
1383
        // it any new update messages.
6✔
1384
        if g.remoteUpdateHorizon == nil {
11✔
1385
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
5✔
1386
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
5✔
1387
                return
5✔
1388
        }
5✔
1389

1390
        // If we've been signaled to exit, or are exiting, then we'll stop
1391
        // short.
1392
        select {
4✔
1393
        case <-g.cg.Done():
×
1394
                return
×
1395
        case <-ctx.Done():
×
1396
                return
×
1397
        default:
4✔
1398
        }
1399

1400
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1401
        // gossiper on peer termination to signal peer disconnect?
1402

1403
        var err error
4✔
1404

4✔
1405
        // Before we filter out the messages, we'll construct an index over the
4✔
1406
        // set of channel announcements and channel updates. This will allow us
4✔
1407
        // to quickly check if we should forward a chan ann, based on the known
4✔
1408
        // channel updates for a channel.
4✔
1409
        chanUpdateIndex := make(
4✔
1410
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
4✔
1411
        )
4✔
1412
        for _, msg := range msgs {
17✔
1413
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
13✔
1414
                if !ok {
23✔
1415
                        continue
10✔
1416
                }
1417

1418
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
6✔
1419
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
6✔
1420
                )
6✔
1421
        }
1422

1423
        // We'll construct a helper function that we'll us below to determine
1424
        // if a given messages passes the gossip msg filter.
1425
        g.Lock()
4✔
1426
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
4✔
1427
        endTime := startTime.Add(
4✔
1428
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
4✔
1429
        )
4✔
1430
        g.Unlock()
4✔
1431

4✔
1432
        passesFilter := func(timeStamp uint32) bool {
17✔
1433
                t := time.Unix(int64(timeStamp), 0)
13✔
1434
                return t.Equal(startTime) ||
13✔
1435
                        (t.After(startTime) && t.Before(endTime))
13✔
1436
        }
13✔
1437

1438
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
4✔
1439
        for _, msg := range msgs {
17✔
1440
                // If the target peer is the peer that sent us this message,
13✔
1441
                // then we'll exit early as we don't need to filter this
13✔
1442
                // message.
13✔
1443
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
16✔
1444
                        continue
3✔
1445
                }
1446

1447
                switch msg := msg.msg.(type) {
13✔
1448

1449
                // For each channel announcement message, we'll only send this
1450
                // message if the channel updates for the channel are between
1451
                // our time range.
1452
                case *lnwire.ChannelAnnouncement1:
7✔
1453
                        // First, we'll check if the channel updates are in
7✔
1454
                        // this message batch.
7✔
1455
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
7✔
1456
                        if !ok {
11✔
1457
                                // If not, we'll attempt to query the database
4✔
1458
                                // to see if we know of the updates.
4✔
1459
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
4✔
1460
                                        g.cfg.chainHash, msg.ShortChannelID,
4✔
1461
                                )
4✔
1462
                                if err != nil {
4✔
1463
                                        log.Warnf("no channel updates found for "+
×
1464
                                                "short_chan_id=%v",
×
1465
                                                msg.ShortChannelID)
×
1466
                                        continue
×
1467
                                }
1468
                        }
1469

1470
                        for _, chanUpdate := range chanUpdates {
14✔
1471
                                if passesFilter(chanUpdate.Timestamp) {
11✔
1472
                                        msgsToSend = append(msgsToSend, msg)
4✔
1473
                                        break
4✔
1474
                                }
1475
                        }
1476

1477
                        if len(chanUpdates) == 0 {
10✔
1478
                                msgsToSend = append(msgsToSend, msg)
3✔
1479
                        }
3✔
1480

1481
                // For each channel update, we'll only send if it the timestamp
1482
                // is between our time range.
1483
                case *lnwire.ChannelUpdate1:
6✔
1484
                        if passesFilter(msg.Timestamp) {
10✔
1485
                                msgsToSend = append(msgsToSend, msg)
4✔
1486
                        }
4✔
1487

1488
                // Similarly, we only send node announcements if the update
1489
                // timestamp ifs between our set gossip filter time range.
1490
                case *lnwire.NodeAnnouncement:
6✔
1491
                        if passesFilter(msg.Timestamp) {
10✔
1492
                                msgsToSend = append(msgsToSend, msg)
4✔
1493
                        }
4✔
1494
                }
1495
        }
1496

1497
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
4✔
1498
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
4✔
1499

4✔
1500
        if len(msgsToSend) == 0 {
7✔
1501
                return
3✔
1502
        }
3✔
1503

1504
        if err = g.cfg.sendToPeer(ctx, msgsToSend...); err != nil {
4✔
1505
                log.Errorf("unable to send gossip msgs: %v", err)
×
1506
        }
×
1507

1508
}
1509

1510
// ProcessQueryMsg is used by outside callers to pass new channel time series
1511
// queries to the internal processing goroutine.
1512
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
117✔
1513
        var msgChan chan lnwire.Message
117✔
1514
        switch msg.(type) {
117✔
1515
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
3✔
1516
                msgChan = g.queryMsgs
3✔
1517

1518
        // Reply messages should only be expected in states where we're waiting
1519
        // for a reply.
1520
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
117✔
1521
                g.Lock()
117✔
1522
                syncState := g.syncState()
117✔
1523
                g.Unlock()
117✔
1524

117✔
1525
                if syncState != waitingQueryRangeReply &&
117✔
1526
                        syncState != waitingQueryChanReply {
118✔
1527

1✔
1528
                        return fmt.Errorf("unexpected msg %T received in "+
1✔
1529
                                "state %v", msg, syncState)
1✔
1530
                }
1✔
1531
                msgChan = g.gossipMsgs
116✔
1532

1533
        default:
×
1534
                msgChan = g.gossipMsgs
×
1535
        }
1536

1537
        select {
116✔
1538
        case msgChan <- msg:
116✔
1539
        case <-peerQuit:
×
1540
        case <-g.cg.Done():
×
1541
        }
1542

1543
        return nil
116✔
1544
}
1545

1546
// setSyncState sets the gossip syncer's state to the given state.
1547
func (g *GossipSyncer) setSyncState(state syncerState) {
105✔
1548
        atomic.StoreUint32(&g.state, uint32(state))
105✔
1549
}
105✔
1550

1551
// syncState returns the current syncerState of the target GossipSyncer.
1552
func (g *GossipSyncer) syncState() syncerState {
450✔
1553
        return syncerState(atomic.LoadUint32(&g.state))
450✔
1554
}
450✔
1555

1556
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1557
// a signal for when the GossipSyncer has reached its chansSynced state.
1558
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
19✔
1559
        g.Lock()
19✔
1560
        defer g.Unlock()
19✔
1561

19✔
1562
        syncedSignal := make(chan struct{})
19✔
1563

19✔
1564
        syncState := syncerState(atomic.LoadUint32(&g.state))
19✔
1565
        if syncState == chansSynced {
24✔
1566
                close(syncedSignal)
5✔
1567
                return syncedSignal
5✔
1568
        }
5✔
1569

1570
        g.syncedSignal = syncedSignal
17✔
1571
        return g.syncedSignal
17✔
1572
}
1573

1574
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1575
// sync type to a new one.
1576
//
1577
// NOTE: This can only be done once the gossip syncer has reached its final
1578
// chansSynced state.
1579
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
19✔
1580
        errChan := make(chan error, 1)
19✔
1581
        select {
19✔
1582
        case g.syncTransitionReqs <- &syncTransitionReq{
1583
                newSyncType: newSyncType,
1584
                errChan:     errChan,
1585
        }:
19✔
1586
        case <-time.After(syncTransitionTimeout):
×
1587
                return ErrSyncTransitionTimeout
×
1588
        case <-g.cg.Done():
×
1589
                return ErrGossipSyncerExiting
×
1590
        }
1591

1592
        select {
19✔
1593
        case err := <-errChan:
19✔
1594
                return err
19✔
1595
        case <-g.cg.Done():
×
1596
                return ErrGossipSyncerExiting
×
1597
        }
1598
}
1599

1600
// handleSyncTransition handles a new sync type transition request.
1601
//
1602
// NOTE: The gossip syncer might have another sync state as a result of this
1603
// transition.
1604
func (g *GossipSyncer) handleSyncTransition(ctx context.Context,
1605
        req *syncTransitionReq) error {
19✔
1606

19✔
1607
        // Return early from any NOP sync transitions.
19✔
1608
        syncType := g.SyncType()
19✔
1609
        if syncType == req.newSyncType {
19✔
1610
                return nil
×
1611
        }
×
1612

1613
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
19✔
1614
                g.cfg.peerPub, syncType, req.newSyncType)
19✔
1615

19✔
1616
        var (
19✔
1617
                firstTimestamp time.Time
19✔
1618
                timestampRange uint32
19✔
1619
        )
19✔
1620

19✔
1621
        switch req.newSyncType {
19✔
1622
        // If an active sync has been requested, then we should resume receiving
1623
        // new graph updates from the remote peer.
1624
        case ActiveSync, PinnedSync:
17✔
1625
                firstTimestamp = time.Now()
17✔
1626
                timestampRange = math.MaxUint32
17✔
1627

1628
        // If a PassiveSync transition has been requested, then we should no
1629
        // longer receive any new updates from the remote peer. We can do this
1630
        // by setting our update horizon to a range in the past ensuring no
1631
        // graph updates match the timestamp range.
1632
        case PassiveSync:
2✔
1633
                firstTimestamp = zeroTimestamp
2✔
1634
                timestampRange = 0
2✔
1635

1636
        default:
×
1637
                return fmt.Errorf("unhandled sync transition %v",
×
1638
                        req.newSyncType)
×
1639
        }
1640

1641
        err := g.sendGossipTimestampRange(ctx, firstTimestamp, timestampRange)
19✔
1642
        if err != nil {
19✔
1643
                return fmt.Errorf("unable to send local update horizon: %w",
×
1644
                        err)
×
1645
        }
×
1646

1647
        g.setSyncType(req.newSyncType)
19✔
1648

19✔
1649
        return nil
19✔
1650
}
1651

1652
// setSyncType sets the gossip syncer's sync type to the given type.
1653
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
76✔
1654
        atomic.StoreUint32(&g.syncType, uint32(syncType))
76✔
1655
}
76✔
1656

1657
// SyncType returns the current SyncerType of the target GossipSyncer.
1658
func (g *GossipSyncer) SyncType() SyncerType {
316✔
1659
        return SyncerType(atomic.LoadUint32(&g.syncType))
316✔
1660
}
316✔
1661

1662
// historicalSync sends a request to the gossip syncer to perofmr a historical
1663
// sync.
1664
//
1665
// NOTE: This can only be done once the gossip syncer has reached its final
1666
// chansSynced state.
1667
func (g *GossipSyncer) historicalSync() error {
21✔
1668
        done := make(chan struct{})
21✔
1669

21✔
1670
        select {
21✔
1671
        case g.historicalSyncReqs <- &historicalSyncReq{
1672
                doneChan: done,
1673
        }:
21✔
1674
        case <-time.After(syncTransitionTimeout):
×
1675
                return ErrSyncTransitionTimeout
×
1676
        case <-g.cg.Done():
×
1677
                return ErrGossiperShuttingDown
×
1678
        }
1679

1680
        select {
21✔
1681
        case <-done:
21✔
1682
                return nil
21✔
1683
        case <-g.cg.Done():
×
1684
                return ErrGossiperShuttingDown
×
1685
        }
1686
}
1687

1688
// handleHistoricalSync handles a request to the gossip syncer to perform a
1689
// historical sync.
1690
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
21✔
1691
        // We'll go back to our initial syncingChans state in order to request
21✔
1692
        // the remote peer to give us all of the channel IDs they know of
21✔
1693
        // starting from the genesis block.
21✔
1694
        g.genHistoricalChanRangeQuery = true
21✔
1695
        g.setSyncState(syncingChans)
21✔
1696
        close(req.doneChan)
21✔
1697
}
21✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc