• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16990665124

15 Aug 2025 01:10PM UTC coverage: 66.74% (-0.03%) from 66.765%
16990665124

Pull #9455

github

web-flow
Merge 035fac41d into fb1adfc21
Pull Request #9455: [1/2] discovery+lnwire: add support for DNS host name in NodeAnnouncement msg

116 of 188 new or added lines in 8 files covered. (61.7%)

110 existing lines in 23 files now uncovered.

136011 of 203791 relevant lines covered (66.74%)

21482.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.35
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "math"
8
        "math/rand"
9
        "sort"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/graph"
17
        graphdb "github.com/lightningnetwork/lnd/graph/db"
18
        "github.com/lightningnetwork/lnd/lnpeer"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
)
21

22
// SyncerType encapsulates the different types of syncing mechanisms for a
23
// gossip syncer.
24
type SyncerType uint8
25

26
const (
27
        // ActiveSync denotes that a gossip syncer:
28
        //
29
        // 1. Should not attempt to synchronize with the remote peer for
30
        //    missing channels.
31
        // 2. Should respond to queries from the remote peer.
32
        // 3. Should receive new updates from the remote peer.
33
        //
34
        // They are started in a chansSynced state in order to accomplish their
35
        // responsibilities above.
36
        ActiveSync SyncerType = iota
37

38
        // PassiveSync denotes that a gossip syncer:
39
        //
40
        // 1. Should not attempt to synchronize with the remote peer for
41
        //    missing channels.
42
        // 2. Should respond to queries from the remote peer.
43
        // 3. Should not receive new updates from the remote peer.
44
        //
45
        // They are started in a chansSynced state in order to accomplish their
46
        // responsibilities above.
47
        PassiveSync
48

49
        // PinnedSync denotes an ActiveSync that doesn't count towards the
50
        // default active syncer limits and is always active throughout the
51
        // duration of the peer's connection. Each pinned syncer will begin by
52
        // performing a historical sync to ensure we are well synchronized with
53
        // their routing table.
54
        PinnedSync
55
)
56

57
const (
58
        // defaultTimestampQueueSize is the size of the timestamp range queue
59
        // used.
60
        defaultTimestampQueueSize = 1
61
)
62

63
// String returns a human readable string describing the target SyncerType.
64
func (t SyncerType) String() string {
3✔
65
        switch t {
3✔
66
        case ActiveSync:
3✔
67
                return "ActiveSync"
3✔
68
        case PassiveSync:
3✔
69
                return "PassiveSync"
3✔
70
        case PinnedSync:
3✔
71
                return "PinnedSync"
3✔
72
        default:
×
73
                return fmt.Sprintf("unknown sync type %d", t)
×
74
        }
75
}
76

77
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
78
// allowing new gossip messages to be received from the peer.
79
func (t SyncerType) IsActiveSync() bool {
47✔
80
        switch t {
47✔
81
        case ActiveSync, PinnedSync:
17✔
82
                return true
17✔
83
        default:
33✔
84
                return false
33✔
85
        }
86
}
87

88
// syncerState is an enum that represents the current state of the GossipSyncer.
89
// As the syncer is a state machine, we'll gate our actions based off of the
90
// current state and the next incoming message.
91
type syncerState uint32
92

93
const (
94
        // syncingChans is the default state of the GossipSyncer. We start in
95
        // this state when a new peer first connects and we don't yet know if
96
        // we're fully synchronized.
97
        syncingChans syncerState = iota
98

99
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
100
        // We enter this state after we send out our first QueryChannelRange
101
        // reply. We'll stay in this state until the remote party sends us a
102
        // ReplyShortChanIDsEnd message that indicates they've responded to our
103
        // query entirely. After this state, we'll transition to
104
        // waitingQueryChanReply after we send out requests for all the new
105
        // chan ID's to us.
106
        waitingQueryRangeReply
107

108
        // queryNewChannels is the third main phase of the GossipSyncer.  In
109
        // this phase we'll send out all of our QueryShortChanIDs messages in
110
        // response to the new channels that we don't yet know about.
111
        queryNewChannels
112

113
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
114
        // We enter this phase once we've sent off a query chink to the remote
115
        // peer.  We'll stay in this phase until we receive a
116
        // ReplyShortChanIDsEnd message which indicates that the remote party
117
        // has responded to all of our requests.
118
        waitingQueryChanReply
119

120
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
121
        // this phase, we'll send out our update horizon, which filters out the
122
        // set of channel updates that we're interested in. In this state,
123
        // we'll be able to accept any outgoing messages from the
124
        // AuthenticatedGossiper, and decide if we should forward them to our
125
        // target peer based on its update horizon.
126
        chansSynced
127

128
        // syncerIdle is a state in which the gossip syncer can handle external
129
        // requests to transition or perform historical syncs. It is used as the
130
        // initial state for pinned syncers, as well as a fallthrough case for
131
        // chansSynced allowing fully synced peers to facilitate requests.
132
        syncerIdle
133
)
134

135
// String returns a human readable string describing the target syncerState.
136
func (s syncerState) String() string {
4✔
137
        switch s {
4✔
138
        case syncingChans:
3✔
139
                return "syncingChans"
3✔
140

141
        case waitingQueryRangeReply:
3✔
142
                return "waitingQueryRangeReply"
3✔
143

144
        case queryNewChannels:
3✔
145
                return "queryNewChannels"
3✔
146

147
        case waitingQueryChanReply:
3✔
148
                return "waitingQueryChanReply"
3✔
149

150
        case chansSynced:
4✔
151
                return "chansSynced"
4✔
152

153
        case syncerIdle:
3✔
154
                return "syncerIdle"
3✔
155

156
        default:
×
157
                return "UNKNOWN STATE"
×
158
        }
159
}
160

161
const (
162
        // maxQueryChanRangeReplies specifies the default limit of replies to
163
        // process for a single QueryChannelRange request.
164
        maxQueryChanRangeReplies = 500
165

166
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
167
        // the maximum number of replies allowed for zlib encoded replies.
168
        maxQueryChanRangeRepliesZlibFactor = 4
169

170
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
171
        // asking the remote peer for their any channels they know of beyond
172
        // our highest known channel ID.
173
        chanRangeQueryBuffer = 144
174

175
        // syncTransitionTimeout is the default timeout in which we'll wait up
176
        // to when attempting to perform a sync transition.
177
        syncTransitionTimeout = 5 * time.Second
178

179
        // requestBatchSize is the maximum number of channels we will query the
180
        // remote peer for in a QueryShortChanIDs message.
181
        requestBatchSize = 500
182

183
        // syncerBufferSize is the size of the syncer's buffers.
184
        syncerBufferSize = 50
185
)
186

187
var (
188
        // encodingTypeToChunkSize maps an encoding type, to the max number of
189
        // short chan ID's using the encoding type that we can fit into a
190
        // single message safely.
191
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
192
                lnwire.EncodingSortedPlain: 8000,
193
        }
194

195
        // ErrGossipSyncerExiting signals that the syncer has been killed.
196
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
197

198
        // ErrSyncTransitionTimeout is an error returned when we've timed out
199
        // attempting to perform a sync transition.
200
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
201
                "transition sync type")
202

203
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
204
        // peers that we do not want to receive any new graph updates.
205
        zeroTimestamp time.Time
206
)
207

208
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
209
type syncTransitionReq struct {
210
        newSyncType SyncerType
211
        errChan     chan error
212
}
213

214
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
215
// historical sync.
216
type historicalSyncReq struct {
217
        // doneChan is a channel that serves as a signal and is closed to ensure
218
        // the historical sync is attempted by the time we return to the caller.
219
        doneChan chan struct{}
220
}
221

222
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
223
// needs to carry out its duties.
224
type gossipSyncerCfg struct {
225
        // chainHash is the chain that this syncer is responsible for.
226
        chainHash chainhash.Hash
227

228
        // peerPub is the public key of the peer we're syncing with, serialized
229
        // in compressed format.
230
        peerPub [33]byte
231

232
        // channelSeries is the primary interface that we'll use to generate
233
        // our queries and respond to the queries of the remote peer.
234
        channelSeries ChannelGraphTimeSeries
235

236
        // encodingType is the current encoding type we're aware of. Requests
237
        // with different encoding types will be rejected.
238
        encodingType lnwire.QueryEncoding
239

240
        // chunkSize is the max number of short chan IDs using the syncer's
241
        // encoding type that we can fit into a single message safely.
242
        chunkSize int32
243

244
        // batchSize is the max number of channels the syncer will query from
245
        // the remote node in a single QueryShortChanIDs request.
246
        batchSize int32
247

248
        // sendToPeer sends a variadic number of messages to the remote peer.
249
        // This method should not block while waiting for sends to be written
250
        // to the wire.
251
        sendToPeer func(context.Context, ...lnwire.Message) error
252

253
        // sendToPeerSync sends a variadic number of messages to the remote
254
        // peer, blocking until all messages have been sent successfully or a
255
        // write error is encountered.
256
        sendToPeerSync func(context.Context, ...lnwire.Message) error
257

258
        // noSyncChannels will prevent the GossipSyncer from spawning a
259
        // channelGraphSyncer, meaning we will not try to reconcile unknown
260
        // channels with the remote peer.
261
        noSyncChannels bool
262

263
        // noReplyQueries will prevent the GossipSyncer from spawning a
264
        // replyHandler, meaning we will not reply to queries from our remote
265
        // peer.
266
        noReplyQueries bool
267

268
        // noTimestampQueryOption will prevent the GossipSyncer from querying
269
        // timestamps of announcement messages from the peer, and it will
270
        // prevent it from responding to timestamp queries.
271
        noTimestampQueryOption bool
272

273
        // ignoreHistoricalFilters will prevent syncers from replying with
274
        // historical data when the remote peer sets a gossip_timestamp_range.
275
        // This prevents ranges with old start times from causing us to dump the
276
        // graph on connect.
277
        ignoreHistoricalFilters bool
278

279
        // bestHeight returns the latest height known of the chain.
280
        bestHeight func() uint32
281

282
        // markGraphSynced updates the SyncManager's perception of whether we
283
        // have completed at least one historical sync.
284
        markGraphSynced func()
285

286
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
287
        // for a single QueryChannelRange request.
288
        maxQueryChanRangeReplies uint32
289

290
        // isStillZombieChannel takes the timestamps of the latest channel
291
        // updates for a channel and returns true if the channel should be
292
        // considered a zombie based on these timestamps.
293
        isStillZombieChannel func(time.Time, time.Time) bool
294

295
        // timestampQueueSize is the size of the timestamp range queue. If not
296
        // set, defaults to the global timestampQueueSize constant.
297
        timestampQueueSize int
298
}
299

300
// GossipSyncer is a struct that handles synchronizing the channel graph state
301
// with a remote peer. The GossipSyncer implements a state machine that will
302
// progressively ensure we're synchronized with the channel state of the remote
303
// node. Once both nodes have been synchronized, we'll use an update filter to
304
// filter out which messages should be sent to a remote peer based on their
305
// update horizon. If the update horizon isn't specified, then we won't send
306
// them any channel updates at all.
307
type GossipSyncer struct {
308
        started sync.Once
309
        stopped sync.Once
310

311
        // state is the current state of the GossipSyncer.
312
        //
313
        // NOTE: This variable MUST be used atomically.
314
        state uint32
315

316
        // syncType denotes the SyncerType the gossip syncer is currently
317
        // exercising.
318
        //
319
        // NOTE: This variable MUST be used atomically.
320
        syncType uint32
321

322
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
323
        // use this to properly filter out any messages.
324
        remoteUpdateHorizon *lnwire.GossipTimestampRange
325

326
        // localUpdateHorizon is our local update horizon, we'll use this to
327
        // determine if we've already sent out our update.
328
        localUpdateHorizon *lnwire.GossipTimestampRange
329

330
        // syncTransitions is a channel through which new sync type transition
331
        // requests will be sent through. These requests should only be handled
332
        // when the gossip syncer is in a chansSynced state to ensure its state
333
        // machine behaves as expected.
334
        syncTransitionReqs chan *syncTransitionReq
335

336
        // historicalSyncReqs is a channel that serves as a signal for the
337
        // gossip syncer to perform a historical sync. These can only be done
338
        // once the gossip syncer is in a chansSynced state to ensure its state
339
        // machine behaves as expected.
340
        historicalSyncReqs chan *historicalSyncReq
341

342
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
343
        // that it should request the remote peer for all of its known channel
344
        // IDs starting from the genesis block of the chain. This can only
345
        // happen if the gossip syncer receives a request to attempt a
346
        // historical sync. It can be unset if the syncer ever transitions from
347
        // PassiveSync to ActiveSync.
348
        genHistoricalChanRangeQuery bool
349

350
        // gossipMsgs is a channel that all responses to our queries from the
351
        // target peer will be sent over, these will be read by the
352
        // channelGraphSyncer.
353
        gossipMsgs chan lnwire.Message
354

355
        // queryMsgs is a channel that all queries from the remote peer will be
356
        // received over, these will be read by the replyHandler.
357
        queryMsgs chan lnwire.Message
358

359
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
360
        // we've sent to a peer to ensure we've consumed all expected replies.
361
        // This field is primarily used within the waitingQueryChanReply state.
362
        curQueryRangeMsg *lnwire.QueryChannelRange
363

364
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
365
        // message we've received from a peer to ensure they've fully replied to
366
        // our query by ensuring they covered our requested block range. This
367
        // field is primarily used within the waitingQueryChanReply state.
368
        prevReplyChannelRange *lnwire.ReplyChannelRange
369

370
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
371
        // buffer all the chunked response to our query.
372
        bufferedChanRangeReplies []graphdb.ChannelUpdateInfo
373

374
        // numChanRangeRepliesRcvd is used to track the number of replies
375
        // received as part of a QueryChannelRange. This field is primarily used
376
        // within the waitingQueryChanReply state.
377
        numChanRangeRepliesRcvd uint32
378

379
        // newChansToQuery is used to pass the set of channels we should query
380
        // for from the waitingQueryChanReply state to the queryNewChannels
381
        // state.
382
        newChansToQuery []lnwire.ShortChannelID
383

384
        cfg gossipSyncerCfg
385

386
        // syncedSignal is a channel that, if set, will be closed when the
387
        // GossipSyncer reaches its terminal chansSynced state.
388
        syncedSignal chan struct{}
389

390
        // syncerSema is used to more finely control the syncer's ability to
391
        // respond to gossip timestamp range messages.
392
        syncerSema chan struct{}
393

394
        // timestampRangeQueue is a buffered channel for queuing timestamp range
395
        // messages that need to be processed asynchronously. This prevents the
396
        // gossiper from blocking when ApplyGossipFilter is called.
397
        timestampRangeQueue chan *lnwire.GossipTimestampRange
398

399
        // isSendingBacklog is an atomic flag that indicates whether a goroutine
400
        // is currently sending the backlog of messages. This ensures only one
401
        // goroutine is active at a time.
402
        isSendingBacklog atomic.Bool
403

404
        sync.Mutex
405

406
        // cg is a helper that encapsulates a wait group and quit channel and
407
        // allows contexts that either block or cancel on those depending on
408
        // the use case.
409
        cg *fn.ContextGuard
410
}
411

412
// newGossipSyncer returns a new instance of the GossipSyncer populated using
413
// the passed config.
414
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
158✔
415
        // Use the configured queue size if set, otherwise use the default.
158✔
416
        queueSize := cfg.timestampQueueSize
158✔
417
        if queueSize == 0 {
186✔
418
                queueSize = defaultTimestampQueueSize
28✔
419
        }
28✔
420

421
        return &GossipSyncer{
158✔
422
                cfg:                cfg,
158✔
423
                syncTransitionReqs: make(chan *syncTransitionReq),
158✔
424
                historicalSyncReqs: make(chan *historicalSyncReq),
158✔
425
                gossipMsgs:         make(chan lnwire.Message, syncerBufferSize),
158✔
426
                queryMsgs:          make(chan lnwire.Message, syncerBufferSize),
158✔
427
                timestampRangeQueue: make(
158✔
428
                        chan *lnwire.GossipTimestampRange, queueSize,
158✔
429
                ),
158✔
430
                syncerSema: sema,
158✔
431
                cg:         fn.NewContextGuard(),
158✔
432
        }
158✔
433
}
434

435
// Start starts the GossipSyncer and any goroutines that it needs to carry out
436
// its duties.
437
func (g *GossipSyncer) Start() {
92✔
438
        g.started.Do(func() {
184✔
439
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
92✔
440

92✔
441
                ctx, _ := g.cg.Create(context.Background())
92✔
442

92✔
443
                // TODO(conner): only spawn channelGraphSyncer if remote
92✔
444
                // supports gossip queries, and only spawn replyHandler if we
92✔
445
                // advertise support
92✔
446
                if !g.cfg.noSyncChannels {
183✔
447
                        g.cg.WgAdd(1)
91✔
448
                        go g.channelGraphSyncer(ctx)
91✔
449
                }
91✔
450
                if !g.cfg.noReplyQueries {
183✔
451
                        g.cg.WgAdd(1)
91✔
452
                        go g.replyHandler(ctx)
91✔
453
                }
91✔
454

455
                // Start the timestamp range queue processor to handle gossip
456
                // filter applications asynchronously.
457
                if !g.cfg.noTimestampQueryOption {
175✔
458
                        g.cg.WgAdd(1)
83✔
459
                        go g.processTimestampRangeQueue(ctx)
83✔
460
                }
83✔
461
        })
462
}
463

464
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
465
// exited.
466
func (g *GossipSyncer) Stop() {
89✔
467
        g.stopped.Do(func() {
178✔
468
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
89✔
469
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
89✔
470

89✔
471
                g.cg.Quit()
89✔
472
        })
89✔
473
}
474

475
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
476
// in this state, we will send a QueryChannelRange msg to our peer and advance
477
// the syncer's state to waitingQueryRangeReply.
478
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
77✔
479
        // Prepare the query msg.
77✔
480
        queryRangeMsg, err := g.genChanRangeQuery(
77✔
481
                ctx, g.genHistoricalChanRangeQuery,
77✔
482
        )
77✔
483
        if err != nil {
77✔
484
                log.Errorf("Unable to gen chan range query: %v", err)
×
485
                return
×
486
        }
×
487

488
        // Acquire a lock so the following state transition is atomic.
489
        //
490
        // NOTE: We must lock the following steps as it's possible we get an
491
        // immediate response (ReplyChannelRange) after sending the query msg.
492
        // The response is handled in ProcessQueryMsg, which requires the
493
        // current state to be waitingQueryRangeReply.
494
        g.Lock()
77✔
495
        defer g.Unlock()
77✔
496

77✔
497
        // Send the msg to the remote peer, which is non-blocking as
77✔
498
        // `sendToPeer` only queues the msg in Brontide.
77✔
499
        err = g.cfg.sendToPeer(ctx, queryRangeMsg)
77✔
500
        if err != nil {
77✔
501
                log.Errorf("Unable to send chan range query: %v", err)
×
502
                return
×
503
        }
×
504

505
        // With the message sent successfully, we'll transition into the next
506
        // state where we wait for their reply.
507
        g.setSyncState(waitingQueryRangeReply)
77✔
508
}
509

510
// channelGraphSyncer is the main goroutine responsible for ensuring that we
511
// properly channel graph state with the remote peer, and also that we only
512
// send them messages which actually pass their defined update horizon.
513
func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
91✔
514
        defer g.cg.WgDone()
91✔
515

91✔
516
        for {
408✔
517
                state := g.syncState()
317✔
518
                syncType := g.SyncType()
317✔
519

317✔
520
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
317✔
521
                        g.cfg.peerPub[:], state, syncType)
317✔
522

317✔
523
                switch state {
317✔
524
                // When we're in this state, we're trying to synchronize our
525
                // view of the network with the remote peer. We'll kick off
526
                // this sync by asking them for the set of channels they
527
                // understand, as we'll as responding to any other queries by
528
                // them.
529
                case syncingChans:
77✔
530
                        g.handleSyncingChans(ctx)
77✔
531

532
                // In this state, we've sent out our initial channel range
533
                // query and are waiting for the final response from the remote
534
                // peer before we perform a diff to see with channels they know
535
                // of that we don't.
536
                case waitingQueryRangeReply:
179✔
537
                        // We'll wait to either process a new message from the
179✔
538
                        // remote party, or exit due to the gossiper exiting,
179✔
539
                        // or us being signalled to do so.
179✔
540
                        select {
179✔
541
                        case msg := <-g.gossipMsgs:
120✔
542
                                // The remote peer is sending a response to our
120✔
543
                                // initial query, we'll collate this response,
120✔
544
                                // and see if it's the final one in the series.
120✔
545
                                // If so, we can then transition to querying
120✔
546
                                // for the new channels.
120✔
547
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
120✔
548
                                if ok {
240✔
549
                                        err := g.processChanRangeReply(
120✔
550
                                                ctx, queryReply,
120✔
551
                                        )
120✔
552
                                        if err != nil {
120✔
553
                                                log.Errorf("Unable to "+
×
554
                                                        "process chan range "+
×
555
                                                        "query: %v", err)
×
556
                                                return
×
557
                                        }
×
558
                                        continue
120✔
559
                                }
560

561
                                log.Warnf("Unexpected message: %T in state=%v",
×
562
                                        msg, state)
×
563

UNCOV
564
                        case <-g.cg.Done():
×
UNCOV
565
                                return
×
566

567
                        case <-ctx.Done():
59✔
568
                                return
59✔
569
                        }
570

571
                // We'll enter this state once we've discovered which channels
572
                // the remote party knows of that we don't yet know of
573
                // ourselves.
574
                case queryNewChannels:
6✔
575
                        // First, we'll attempt to continue our channel
6✔
576
                        // synchronization by continuing to send off another
6✔
577
                        // query chunk.
6✔
578
                        done := g.synchronizeChanIDs(ctx)
6✔
579

6✔
580
                        // If this wasn't our last query, then we'll need to
6✔
581
                        // transition to our waiting state.
6✔
582
                        if !done {
11✔
583
                                continue
5✔
584
                        }
585

586
                        // If we're fully synchronized, then we can transition
587
                        // to our terminal state.
588
                        g.setSyncState(chansSynced)
4✔
589

4✔
590
                        // Ensure that the sync manager becomes aware that the
4✔
591
                        // historical sync completed so synced_to_graph is
4✔
592
                        // updated over rpc.
4✔
593
                        g.cfg.markGraphSynced()
4✔
594

595
                // In this state, we've just sent off a new query for channels
596
                // that we don't yet know of. We'll remain in this state until
597
                // the remote party signals they've responded to our query in
598
                // totality.
599
                case waitingQueryChanReply:
5✔
600
                        // Once we've sent off our query, we'll wait for either
5✔
601
                        // an ending reply, or just another query from the
5✔
602
                        // remote peer.
5✔
603
                        select {
5✔
604
                        case msg := <-g.gossipMsgs:
5✔
605
                                // If this is the final reply to one of our
5✔
606
                                // queries, then we'll loop back into our query
5✔
607
                                // state to send of the remaining query chunks.
5✔
608
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
5✔
609
                                if ok {
10✔
610
                                        g.setSyncState(queryNewChannels)
5✔
611
                                        continue
5✔
612
                                }
613

614
                                log.Warnf("Unexpected message: %T in state=%v",
×
615
                                        msg, state)
×
616

617
                        case <-g.cg.Done():
×
618
                                return
×
619

620
                        case <-ctx.Done():
×
621
                                return
×
622
                        }
623

624
                // This is our final terminal state where we'll only reply to
625
                // any further queries by the remote peer.
626
                case chansSynced:
59✔
627
                        g.Lock()
59✔
628
                        if g.syncedSignal != nil {
70✔
629
                                close(g.syncedSignal)
11✔
630
                                g.syncedSignal = nil
11✔
631
                        }
11✔
632
                        g.Unlock()
59✔
633

59✔
634
                        // If we haven't yet sent out our update horizon, and
59✔
635
                        // we want to receive real-time channel updates, we'll
59✔
636
                        // do so now.
59✔
637
                        if g.localUpdateHorizon == nil &&
59✔
638
                                syncType.IsActiveSync() {
76✔
639

17✔
640
                                err := g.sendGossipTimestampRange(
17✔
641
                                        ctx, time.Now(), math.MaxUint32,
17✔
642
                                )
17✔
643
                                if err != nil {
17✔
644
                                        log.Errorf("Unable to send update "+
×
645
                                                "horizon to %x: %v",
×
646
                                                g.cfg.peerPub, err)
×
647
                                }
×
648
                        }
649
                        // With our horizon set, we'll simply reply to any new
650
                        // messages or process any state transitions and exit if
651
                        // needed.
652
                        fallthrough
59✔
653

654
                // Pinned peers will begin in this state, since they will
655
                // immediately receive a request to perform a historical sync.
656
                // Otherwise, we fall through after ending in chansSynced to
657
                // facilitate new requests.
658
                case syncerIdle:
62✔
659
                        select {
62✔
660
                        case req := <-g.syncTransitionReqs:
17✔
661
                                req.errChan <- g.handleSyncTransition(ctx, req)
17✔
662

663
                        case req := <-g.historicalSyncReqs:
19✔
664
                                g.handleHistoricalSync(req)
19✔
665

666
                        case <-g.cg.Done():
1✔
667
                                return
1✔
668

669
                        case <-ctx.Done():
28✔
670
                                return
28✔
671
                        }
672
                }
673
        }
674
}
675

676
// replyHandler is an event loop whose sole purpose is to reply to the remote
677
// peers queries. Our replyHandler will respond to messages generated by their
678
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
679
// the other's replyHandler, allowing the replyHandler to operate independently
680
// from the state machine maintained on the same node.
681
//
682
// NOTE: This method MUST be run as a goroutine.
683
func (g *GossipSyncer) replyHandler(ctx context.Context) {
91✔
684
        defer g.cg.WgDone()
91✔
685

91✔
686
        for {
187✔
687
                select {
96✔
688
                case msg := <-g.queryMsgs:
8✔
689
                        err := g.replyPeerQueries(ctx, msg)
8✔
690
                        switch {
8✔
691
                        case err == ErrGossipSyncerExiting:
×
692
                                return
×
693

694
                        case err == lnpeer.ErrPeerExiting:
×
695
                                return
×
696

697
                        case err != nil:
×
698
                                log.Errorf("Unable to reply to peer "+
×
699
                                        "query: %v", err)
×
700
                        }
701

702
                case <-g.cg.Done():
3✔
703
                        return
3✔
704

705
                case <-ctx.Done():
85✔
706
                        return
85✔
707
                }
708
        }
709
}
710

711
// processTimestampRangeQueue handles timestamp range messages from the queue
712
// asynchronously. This prevents blocking the gossiper when rate limiting is
713
// active and multiple peers are trying to apply gossip filters.
714
func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) {
83✔
715
        defer g.cg.WgDone()
83✔
716

83✔
717
        for {
171✔
718
                select {
88✔
719
                case msg := <-g.timestampRangeQueue:
59✔
720
                        // Process the timestamp range message. If we hit an
59✔
721
                        // error, log it but continue processing to avoid
59✔
722
                        // blocking the queue.
59✔
723
                        err := g.ApplyGossipFilter(ctx, msg)
59✔
724
                        switch {
59✔
725
                        case errors.Is(err, ErrGossipSyncerExiting):
×
726
                                return
×
727

728
                        case errors.Is(err, lnpeer.ErrPeerExiting):
×
729
                                return
×
730

731
                        case err != nil:
×
732
                                log.Errorf("Unable to apply gossip filter: %v",
×
733
                                        err)
×
734
                        }
735

736
                case <-g.cg.Done():
×
737
                        return
×
738

739
                case <-ctx.Done():
29✔
740
                        return
29✔
741
                }
742
        }
743
}
744

745
// QueueTimestampRange attempts to queue a timestamp range message for
746
// asynchronous processing. If the queue is full, it returns false to indicate
747
// the message was dropped.
748
func (g *GossipSyncer) QueueTimestampRange(
749
        msg *lnwire.GossipTimestampRange) bool {
2,201✔
750

2,201✔
751
        // If timestamp queries are disabled, don't queue the message.
2,201✔
752
        if g.cfg.noTimestampQueryOption {
2,201✔
753
                return false
×
754
        }
×
755

756
        select {
2,201✔
757
        case g.timestampRangeQueue <- msg:
770✔
758
                return true
770✔
759

760
        // Queue is full, drop the message to prevent blocking.
761
        default:
1,431✔
762
                log.Warnf("Timestamp range queue full for peer %x, "+
1,431✔
763
                        "dropping message", g.cfg.peerPub[:])
1,431✔
764
                return false
1,431✔
765
        }
766
}
767

768
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
769
// syncer and sends it to the remote peer.
770
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
771
        firstTimestamp time.Time, timestampRange uint32) error {
31✔
772

31✔
773
        endTimestamp := firstTimestamp.Add(
31✔
774
                time.Duration(timestampRange) * time.Second,
31✔
775
        )
31✔
776

31✔
777
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
31✔
778
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
31✔
779

31✔
780
        localUpdateHorizon := &lnwire.GossipTimestampRange{
31✔
781
                ChainHash:      g.cfg.chainHash,
31✔
782
                FirstTimestamp: uint32(firstTimestamp.Unix()),
31✔
783
                TimestampRange: timestampRange,
31✔
784
        }
31✔
785

31✔
786
        if err := g.cfg.sendToPeer(ctx, localUpdateHorizon); err != nil {
31✔
787
                return err
×
788
        }
×
789

790
        if firstTimestamp == zeroTimestamp && timestampRange == 0 {
33✔
791
                g.localUpdateHorizon = nil
2✔
792
        } else {
31✔
793
                g.localUpdateHorizon = localUpdateHorizon
29✔
794
        }
29✔
795

796
        return nil
31✔
797
}
798

799
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
800
// the remote peer for its known set of channel IDs within a particular block
801
// range. This method will be called continually until the entire range has
802
// been queried for with a response received. We'll chunk our requests as
803
// required to ensure they fit into a single message. We may re-renter this
804
// state in the case that chunking is required.
805
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
9✔
806
        // If we're in this state yet there are no more new channels to query
9✔
807
        // for, then we'll transition to our final synced state and return true
9✔
808
        // to signal that we're fully synchronized.
9✔
809
        if len(g.newChansToQuery) == 0 {
13✔
810
                log.Infof("GossipSyncer(%x): no more chans to query",
4✔
811
                        g.cfg.peerPub[:])
4✔
812

4✔
813
                return true
4✔
814
        }
4✔
815

816
        // Otherwise, we'll issue our next chunked query to receive replies
817
        // for.
818
        var queryChunk []lnwire.ShortChannelID
8✔
819

8✔
820
        // If the number of channels to query for is less than the chunk size,
8✔
821
        // then we can issue a single query.
8✔
822
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
13✔
823
                queryChunk = g.newChansToQuery
5✔
824
                g.newChansToQuery = nil
5✔
825

5✔
826
        } else {
8✔
827
                // Otherwise, we'll need to only query for the next chunk.
3✔
828
                // We'll slice into our query chunk, then slide down our main
3✔
829
                // pointer down by the chunk size.
3✔
830
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
3✔
831
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
3✔
832
        }
3✔
833

834
        log.Infof("GossipSyncer(%x): querying for %v new channels",
8✔
835
                g.cfg.peerPub[:], len(queryChunk))
8✔
836

8✔
837
        // Change the state before sending the query msg.
8✔
838
        g.setSyncState(waitingQueryChanReply)
8✔
839

8✔
840
        // With our chunk obtained, we'll send over our next query, then return
8✔
841
        // false indicating that we're net yet fully synced.
8✔
842
        err := g.cfg.sendToPeer(ctx, &lnwire.QueryShortChanIDs{
8✔
843
                ChainHash:    g.cfg.chainHash,
8✔
844
                EncodingType: lnwire.EncodingSortedPlain,
8✔
845
                ShortChanIDs: queryChunk,
8✔
846
        })
8✔
847
        if err != nil {
8✔
848
                log.Errorf("Unable to sync chan IDs: %v", err)
×
849
        }
×
850

851
        return false
8✔
852
}
853

854
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
855
// considered legacy. There was a point where lnd used to include the same query
856
// over multiple replies, rather than including the portion of the query the
857
// reply is handling. We'll use this as a way of detecting whether we are
858
// communicating with a legacy node so we can properly sync with them.
859
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
860
        reply *lnwire.ReplyChannelRange) bool {
253✔
861

253✔
862
        return (reply.ChainHash == query.ChainHash &&
253✔
863
                reply.FirstBlockHeight == query.FirstBlockHeight &&
253✔
864
                reply.NumBlocks == query.NumBlocks)
253✔
865
}
253✔
866

867
// processChanRangeReply is called each time the GossipSyncer receives a new
868
// reply to the initial range query to discover new channels that it didn't
869
// previously know of.
870
func (g *GossipSyncer) processChanRangeReply(_ context.Context,
871
        msg *lnwire.ReplyChannelRange) error {
128✔
872

128✔
873
        // isStale returns whether the timestamp is too far into the past.
128✔
874
        isStale := func(timestamp time.Time) bool {
161✔
875
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
33✔
876
        }
33✔
877

878
        // isSkewed returns whether the timestamp is too far into the future.
879
        isSkewed := func(timestamp time.Time) bool {
151✔
880
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
23✔
881
        }
23✔
882

883
        // If we're not communicating with a legacy node, we'll apply some
884
        // further constraints on their reply to ensure it satisfies our query.
885
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
238✔
886
                // The first block should be within our original request.
110✔
887
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
110✔
888
                        return fmt.Errorf("reply includes channels for height "+
×
889
                                "%v prior to query %v", msg.FirstBlockHeight,
×
890
                                g.curQueryRangeMsg.FirstBlockHeight)
×
891
                }
×
892

893
                // The last block should also be. We don't need to check the
894
                // intermediate ones because they should already be in sorted
895
                // order.
896
                replyLastHeight := msg.LastBlockHeight()
110✔
897
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
898
                if replyLastHeight > queryLastHeight {
110✔
899
                        return fmt.Errorf("reply includes channels for height "+
×
900
                                "%v after query %v", replyLastHeight,
×
901
                                queryLastHeight)
×
902
                }
×
903

904
                // If we've previously received a reply for this query, look at
905
                // its last block to ensure the current reply properly follows
906
                // it.
907
                if g.prevReplyChannelRange != nil {
215✔
908
                        prevReply := g.prevReplyChannelRange
105✔
909
                        prevReplyLastHeight := prevReply.LastBlockHeight()
105✔
910

105✔
911
                        // The current reply can either start from the previous
105✔
912
                        // reply's last block, if there are still more channels
105✔
913
                        // for the same block, or the block after.
105✔
914
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
105✔
915
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
105✔
916

×
917
                                return fmt.Errorf("first block of reply %v "+
×
918
                                        "does not continue from last block of "+
×
919
                                        "previous %v", msg.FirstBlockHeight,
×
920
                                        prevReplyLastHeight)
×
921
                        }
×
922
                }
923
        }
924

925
        g.prevReplyChannelRange = msg
128✔
926

128✔
927
        for i, scid := range msg.ShortChanIDs {
258✔
928
                info := graphdb.NewChannelUpdateInfo(
130✔
929
                        scid, time.Time{}, time.Time{},
130✔
930
                )
130✔
931

130✔
932
                if len(msg.Timestamps) != 0 {
145✔
933
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
15✔
934
                        info.Node1UpdateTimestamp = t1
15✔
935

15✔
936
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
15✔
937
                        info.Node2UpdateTimestamp = t2
15✔
938

15✔
939
                        // Sort out all channels with outdated or skewed
15✔
940
                        // timestamps. Both timestamps need to be out of
15✔
941
                        // boundaries for us to skip the channel and not query
15✔
942
                        // it later on.
15✔
943
                        switch {
15✔
944
                        case isStale(info.Node1UpdateTimestamp) &&
945
                                isStale(info.Node2UpdateTimestamp):
2✔
946

2✔
947
                                continue
2✔
948

949
                        case isSkewed(info.Node1UpdateTimestamp) &&
950
                                isSkewed(info.Node2UpdateTimestamp):
2✔
951

2✔
952
                                continue
2✔
953

954
                        case isStale(info.Node1UpdateTimestamp) &&
955
                                isSkewed(info.Node2UpdateTimestamp):
2✔
956

2✔
957
                                continue
2✔
958

959
                        case isStale(info.Node2UpdateTimestamp) &&
960
                                isSkewed(info.Node1UpdateTimestamp):
2✔
961

2✔
962
                                continue
2✔
963
                        }
964
                }
965

966
                g.bufferedChanRangeReplies = append(
122✔
967
                        g.bufferedChanRangeReplies, info,
122✔
968
                )
122✔
969
        }
970

971
        switch g.cfg.encodingType {
128✔
972
        case lnwire.EncodingSortedPlain:
128✔
973
                g.numChanRangeRepliesRcvd++
128✔
974
        case lnwire.EncodingSortedZlib:
×
975
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
976
        default:
×
977
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
978
        }
979

980
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
128✔
981
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
128✔
982

128✔
983
        // If this isn't the last response and we can continue to receive more,
128✔
984
        // then we can exit as we've already buffered the latest portion of the
128✔
985
        // streaming reply.
128✔
986
        maxReplies := g.cfg.maxQueryChanRangeReplies
128✔
987
        switch {
128✔
988
        // If we're communicating with a legacy node, we'll need to look at the
989
        // complete field.
990
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
18✔
991
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
21✔
992
                        return nil
3✔
993
                }
3✔
994

995
        // Otherwise, we'll look at the reply's height range.
996
        default:
110✔
997
                replyLastHeight := msg.LastBlockHeight()
110✔
998
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
999

110✔
1000
                // TODO(wilmer): This might require some padding if the remote
110✔
1001
                // node is not aware of the last height we sent them, i.e., is
110✔
1002
                // behind a few blocks from us.
110✔
1003
                if replyLastHeight < queryLastHeight &&
110✔
1004
                        g.numChanRangeRepliesRcvd < maxReplies {
215✔
1005

105✔
1006
                        return nil
105✔
1007
                }
105✔
1008
        }
1009

1010
        log.Infof("GossipSyncer(%x): filtering through %v chans",
20✔
1011
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
20✔
1012

20✔
1013
        // Otherwise, this is the final response, so we'll now check to see
20✔
1014
        // which channels they know of that we don't.
20✔
1015
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
20✔
1016
                g.cfg.chainHash, g.bufferedChanRangeReplies,
20✔
1017
                g.cfg.isStillZombieChannel,
20✔
1018
        )
20✔
1019
        if err != nil {
20✔
1020
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
1021
        }
×
1022

1023
        // As we've received the entirety of the reply, we no longer need to
1024
        // hold on to the set of buffered replies or the original query that
1025
        // prompted the replies, so we'll let that be garbage collected now.
1026
        g.curQueryRangeMsg = nil
20✔
1027
        g.prevReplyChannelRange = nil
20✔
1028
        g.bufferedChanRangeReplies = nil
20✔
1029
        g.numChanRangeRepliesRcvd = 0
20✔
1030

20✔
1031
        // If there aren't any channels that we don't know of, then we can
20✔
1032
        // switch straight to our terminal state.
20✔
1033
        if len(newChans) == 0 {
37✔
1034
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
17✔
1035
                        g.cfg.peerPub[:])
17✔
1036

17✔
1037
                g.setSyncState(chansSynced)
17✔
1038

17✔
1039
                // Ensure that the sync manager becomes aware that the
17✔
1040
                // historical sync completed so synced_to_graph is updated over
17✔
1041
                // rpc.
17✔
1042
                g.cfg.markGraphSynced()
17✔
1043
                return nil
17✔
1044
        }
17✔
1045

1046
        // Otherwise, we'll set the set of channels that we need to query for
1047
        // the next state, and also transition our state.
1048
        g.newChansToQuery = newChans
6✔
1049
        g.setSyncState(queryNewChannels)
6✔
1050

6✔
1051
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
6✔
1052
                g.cfg.peerPub[:], len(newChans))
6✔
1053

6✔
1054
        return nil
6✔
1055
}
1056

1057
// genChanRangeQuery generates the initial message we'll send to the remote
1058
// party when we're kicking off the channel graph synchronization upon
1059
// connection. The historicalQuery boolean can be used to generate a query from
1060
// the genesis block of the chain.
1061
func (g *GossipSyncer) genChanRangeQuery(ctx context.Context,
1062
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
81✔
1063

81✔
1064
        // First, we'll query our channel graph time series for its highest
81✔
1065
        // known channel ID.
81✔
1066
        newestChan, err := g.cfg.channelSeries.HighestChanID(
81✔
1067
                ctx, g.cfg.chainHash,
81✔
1068
        )
81✔
1069
        if err != nil {
81✔
1070
                return nil, err
×
1071
        }
×
1072

1073
        // Once we have the chan ID of the newest, we'll obtain the block height
1074
        // of the channel, then subtract our default horizon to ensure we don't
1075
        // miss any channels. By default, we go back 1 day from the newest
1076
        // channel, unless we're attempting a historical sync, where we'll
1077
        // actually start from the genesis block instead.
1078
        var startHeight uint32
81✔
1079
        switch {
81✔
1080
        case historicalQuery:
22✔
1081
                fallthrough
22✔
1082
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
23✔
1083
                startHeight = 0
23✔
1084
        default:
58✔
1085
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
58✔
1086
        }
1087

1088
        // Determine the number of blocks to request based on our best height.
1089
        // We'll take into account any potential underflows and explicitly set
1090
        // numBlocks to its minimum value of 1 if so.
1091
        bestHeight := g.cfg.bestHeight()
81✔
1092
        numBlocks := bestHeight - startHeight
81✔
1093
        if int64(numBlocks) < 1 {
81✔
1094
                numBlocks = 1
×
1095
        }
×
1096

1097
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
81✔
1098
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
81✔
1099

81✔
1100
        // Finally, we'll craft the channel range query, using our starting
81✔
1101
        // height, then asking for all known channels to the foreseeable end of
81✔
1102
        // the main chain.
81✔
1103
        query := &lnwire.QueryChannelRange{
81✔
1104
                ChainHash:        g.cfg.chainHash,
81✔
1105
                FirstBlockHeight: startHeight,
81✔
1106
                NumBlocks:        numBlocks,
81✔
1107
        }
81✔
1108

81✔
1109
        if !g.cfg.noTimestampQueryOption {
154✔
1110
                query.QueryOptions = lnwire.NewTimestampQueryOption()
73✔
1111
        }
73✔
1112

1113
        g.curQueryRangeMsg = query
81✔
1114

81✔
1115
        return query, nil
81✔
1116
}
1117

1118
// replyPeerQueries is called in response to any query by the remote peer.
1119
// We'll examine our state and send back our best response.
1120
func (g *GossipSyncer) replyPeerQueries(ctx context.Context,
1121
        msg lnwire.Message) error {
8✔
1122

8✔
1123
        switch msg := msg.(type) {
8✔
1124

1125
        // In this state, we'll also handle any incoming channel range queries
1126
        // from the remote peer as they're trying to sync their state as well.
1127
        case *lnwire.QueryChannelRange:
6✔
1128
                return g.replyChanRangeQuery(ctx, msg)
6✔
1129

1130
        // If the remote peer skips straight to requesting new channels that
1131
        // they don't know of, then we'll ensure that we also handle this case.
1132
        case *lnwire.QueryShortChanIDs:
5✔
1133
                return g.replyShortChanIDs(ctx, msg)
5✔
1134

1135
        default:
×
1136
                return fmt.Errorf("unknown message: %T", msg)
×
1137
        }
1138
}
1139

1140
// replyChanRangeQuery will be dispatched in response to a channel range query
1141
// by the remote node. We'll query the channel time series for channels that
1142
// meet the channel range, then chunk our responses to the remote node. We also
1143
// ensure that our final fragment carries the "complete" bit to indicate the
1144
// end of our streaming response.
1145
func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context,
1146
        query *lnwire.QueryChannelRange) error {
12✔
1147

12✔
1148
        // Before responding, we'll check to ensure that the remote peer is
12✔
1149
        // querying for the same chain that we're on. If not, we'll send back a
12✔
1150
        // response with a complete value of zero to indicate we're on a
12✔
1151
        // different chain.
12✔
1152
        if g.cfg.chainHash != query.ChainHash {
13✔
1153
                log.Warnf("Remote peer requested QueryChannelRange for "+
1✔
1154
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1155
                        g.cfg.chainHash)
1✔
1156

1✔
1157
                return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
1✔
1158
                        ChainHash:        query.ChainHash,
1✔
1159
                        FirstBlockHeight: query.FirstBlockHeight,
1✔
1160
                        NumBlocks:        query.NumBlocks,
1✔
1161
                        Complete:         0,
1✔
1162
                        EncodingType:     g.cfg.encodingType,
1✔
1163
                        ShortChanIDs:     nil,
1✔
1164
                })
1✔
1165
        }
1✔
1166

1167
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
11✔
1168
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
11✔
1169
                query.NumBlocks)
11✔
1170

11✔
1171
        // Check if the query asked for timestamps. We will only serve
11✔
1172
        // timestamps if this has not been disabled with
11✔
1173
        // noTimestampQueryOption.
11✔
1174
        withTimestamps := query.WithTimestamps() &&
11✔
1175
                !g.cfg.noTimestampQueryOption
11✔
1176

11✔
1177
        // Next, we'll consult the time series to obtain the set of known
11✔
1178
        // channel ID's that match their query.
11✔
1179
        startBlock := query.FirstBlockHeight
11✔
1180
        endBlock := query.LastBlockHeight()
11✔
1181
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
11✔
1182
                query.ChainHash, startBlock, endBlock, withTimestamps,
11✔
1183
        )
11✔
1184
        if err != nil {
11✔
1185
                return err
×
1186
        }
×
1187

1188
        // TODO(roasbeef): means can't send max uint above?
1189
        //  * or make internal 64
1190

1191
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1192
        // this as there's a transport message size limit which we'll need to
1193
        // adhere to. We also need to make sure all of our replies cover the
1194
        // expected range of the query.
1195
        sendReplyForChunk := func(channelChunk []graphdb.ChannelUpdateInfo,
11✔
1196
                firstHeight, lastHeight uint32, finalChunk bool) error {
27✔
1197

16✔
1198
                // The number of blocks contained in the current chunk (the
16✔
1199
                // total span) is the difference between the last channel ID and
16✔
1200
                // the first in the range. We add one as even if all channels
16✔
1201
                // returned are in the same block, we need to count that.
16✔
1202
                numBlocks := lastHeight - firstHeight + 1
16✔
1203
                complete := uint8(0)
16✔
1204
                if finalChunk {
27✔
1205
                        complete = 1
11✔
1206
                }
11✔
1207

1208
                var timestamps lnwire.Timestamps
16✔
1209
                if withTimestamps {
19✔
1210
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
3✔
1211
                }
3✔
1212

1213
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
16✔
1214
                for i, info := range channelChunk {
33✔
1215
                        scids[i] = info.ShortChannelID
17✔
1216

17✔
1217
                        if !withTimestamps {
31✔
1218
                                continue
14✔
1219
                        }
1220

1221
                        timestamps[i].Timestamp1 = uint32(
3✔
1222
                                info.Node1UpdateTimestamp.Unix(),
3✔
1223
                        )
3✔
1224

3✔
1225
                        timestamps[i].Timestamp2 = uint32(
3✔
1226
                                info.Node2UpdateTimestamp.Unix(),
3✔
1227
                        )
3✔
1228
                }
1229

1230
                return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
16✔
1231
                        ChainHash:        query.ChainHash,
16✔
1232
                        NumBlocks:        numBlocks,
16✔
1233
                        FirstBlockHeight: firstHeight,
16✔
1234
                        Complete:         complete,
16✔
1235
                        EncodingType:     g.cfg.encodingType,
16✔
1236
                        ShortChanIDs:     scids,
16✔
1237
                        Timestamps:       timestamps,
16✔
1238
                })
16✔
1239
        }
1240

1241
        var (
11✔
1242
                firstHeight  = query.FirstBlockHeight
11✔
1243
                lastHeight   uint32
11✔
1244
                channelChunk []graphdb.ChannelUpdateInfo
11✔
1245
        )
11✔
1246

11✔
1247
        // chunkSize is the maximum number of SCIDs that we can safely put in a
11✔
1248
        // single message. If we also need to include timestamps though, then
11✔
1249
        // this number is halved since encoding two timestamps takes the same
11✔
1250
        // number of bytes as encoding an SCID.
11✔
1251
        chunkSize := g.cfg.chunkSize
11✔
1252
        if withTimestamps {
14✔
1253
                chunkSize /= 2
3✔
1254
        }
3✔
1255

1256
        for _, channelRange := range channelRanges {
28✔
1257
                channels := channelRange.Channels
17✔
1258
                numChannels := int32(len(channels))
17✔
1259
                numLeftToAdd := chunkSize - int32(len(channelChunk))
17✔
1260

17✔
1261
                // Include the current block in the ongoing chunk if it can fit
17✔
1262
                // and move on to the next block.
17✔
1263
                if numChannels <= numLeftToAdd {
29✔
1264
                        channelChunk = append(channelChunk, channels...)
12✔
1265
                        continue
12✔
1266
                }
1267

1268
                // Otherwise, we need to send our existing channel chunk as is
1269
                // as its own reply and start a new one for the current block.
1270
                // We'll mark the end of our current chunk as the height before
1271
                // the current block to ensure the whole query range is replied
1272
                // to.
1273
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
5✔
1274
                        g.cfg.peerPub[:], len(channelChunk))
5✔
1275

5✔
1276
                lastHeight = channelRange.Height - 1
5✔
1277
                err := sendReplyForChunk(
5✔
1278
                        channelChunk, firstHeight, lastHeight, false,
5✔
1279
                )
5✔
1280
                if err != nil {
5✔
1281
                        return err
×
1282
                }
×
1283

1284
                // With the reply constructed, we'll start tallying channels for
1285
                // our next one keeping in mind our chunk size. This may result
1286
                // in channels for this block being left out from the reply, but
1287
                // this isn't an issue since we'll randomly shuffle them and we
1288
                // assume a historical gossip sync is performed at a later time.
1289
                firstHeight = channelRange.Height
5✔
1290
                finalChunkSize := numChannels
5✔
1291
                exceedsChunkSize := numChannels > chunkSize
5✔
1292
                if exceedsChunkSize {
5✔
1293
                        rand.Shuffle(len(channels), func(i, j int) {
×
1294
                                channels[i], channels[j] = channels[j], channels[i]
×
1295
                        })
×
1296
                        finalChunkSize = chunkSize
×
1297
                }
1298
                channelChunk = channels[:finalChunkSize]
5✔
1299

5✔
1300
                // Sort the chunk once again if we had to shuffle it.
5✔
1301
                if exceedsChunkSize {
5✔
1302
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1303
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1304
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1305

×
1306
                                return id1 < id2
×
1307
                        })
×
1308
                }
1309
        }
1310

1311
        // Send the remaining chunk as the final reply.
1312
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
11✔
1313
                g.cfg.peerPub[:], len(channelChunk))
11✔
1314

11✔
1315
        return sendReplyForChunk(
11✔
1316
                channelChunk, firstHeight, query.LastBlockHeight(), true,
11✔
1317
        )
11✔
1318
}
1319

1320
// replyShortChanIDs will be dispatched in response to a query by the remote
1321
// node for information concerning a set of short channel ID's. Our response
1322
// will be sent in a streaming chunked manner to ensure that we remain below
1323
// the current transport level message size.
1324
func (g *GossipSyncer) replyShortChanIDs(ctx context.Context,
1325
        query *lnwire.QueryShortChanIDs) error {
7✔
1326

7✔
1327
        // Before responding, we'll check to ensure that the remote peer is
7✔
1328
        // querying for the same chain that we're on. If not, we'll send back a
7✔
1329
        // response with a complete value of zero to indicate we're on a
7✔
1330
        // different chain.
7✔
1331
        if g.cfg.chainHash != query.ChainHash {
8✔
1332
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
1✔
1333
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1334
                        g.cfg.chainHash)
1✔
1335

1✔
1336
                return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
1✔
1337
                        ChainHash: query.ChainHash,
1✔
1338
                        Complete:  0,
1✔
1339
                })
1✔
1340
        }
1✔
1341

1342
        if len(query.ShortChanIDs) == 0 {
6✔
1343
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1344
                        g.cfg.peerPub[:])
×
1345
                return nil
×
1346
        }
×
1347

1348
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
6✔
1349
                g.cfg.peerPub[:], len(query.ShortChanIDs))
6✔
1350

6✔
1351
        // Now that we know we're on the same chain, we'll query the channel
6✔
1352
        // time series for the set of messages that we know of which satisfies
6✔
1353
        // the requirement of being a chan ann, chan update, or a node ann
6✔
1354
        // related to the set of queried channels.
6✔
1355
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
6✔
1356
                query.ChainHash, query.ShortChanIDs,
6✔
1357
        )
6✔
1358
        if err != nil {
6✔
1359
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1360
                        query.ShortChanIDs[0].ToUint64(), err)
×
1361
        }
×
1362

1363
        // Reply with any messages related to those channel ID's, we'll write
1364
        // each one individually and synchronously to throttle the sends and
1365
        // perform buffering of responses in the syncer as opposed to the peer.
1366
        for _, msg := range replyMsgs {
12✔
1367
                err := g.cfg.sendToPeerSync(ctx, msg)
6✔
1368
                if err != nil {
6✔
1369
                        return err
×
1370
                }
×
1371
        }
1372

1373
        // Regardless of whether we had any messages to reply with, send over
1374
        // the sentinel message to signal that the stream has terminated.
1375
        return g.cfg.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
6✔
1376
                ChainHash: query.ChainHash,
6✔
1377
                Complete:  1,
6✔
1378
        })
6✔
1379
}
1380

1381
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1382
// state machine. Once applied, we'll ensure that we don't forward any messages
1383
// to the peer that aren't within the time range of the filter.
1384
func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
1385
        filter *lnwire.GossipTimestampRange) error {
68✔
1386

68✔
1387
        g.Lock()
68✔
1388

68✔
1389
        g.remoteUpdateHorizon = filter
68✔
1390

68✔
1391
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
68✔
1392
        endTime := startTime.Add(
68✔
1393
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
68✔
1394
        )
68✔
1395

68✔
1396
        g.Unlock()
68✔
1397

68✔
1398
        // If requested, don't reply with historical gossip data when the remote
68✔
1399
        // peer sets their gossip timestamp range.
68✔
1400
        if g.cfg.ignoreHistoricalFilters {
69✔
1401
                return nil
1✔
1402
        }
1✔
1403

1404
        // Check if a goroutine is already sending the backlog. If so, return
1405
        // early without attempting to acquire the semaphore.
1406
        if g.isSendingBacklog.Load() {
72✔
1407
                log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+
5✔
1408
                        "backlog send already in progress", g.cfg.peerPub[:])
5✔
1409
                return nil
5✔
1410
        }
5✔
1411

1412
        select {
62✔
1413
        case <-g.syncerSema:
62✔
1414
        case <-g.cg.Done():
×
1415
                return ErrGossipSyncerExiting
×
1416
        case <-ctx.Done():
×
1417
                return ctx.Err()
×
1418
        }
1419

1420
        // We don't put this in a defer because if the goroutine is launched,
1421
        // it needs to be called when the goroutine is stopped.
1422
        returnSema := func() {
73✔
1423
                g.syncerSema <- struct{}{}
11✔
1424
        }
11✔
1425

1426
        // Now that the remote peer has applied their filter, we'll query the
1427
        // database for all the messages that are beyond this filter.
1428
        newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
62✔
1429
                g.cfg.chainHash, startTime, endTime,
62✔
1430
        )
62✔
1431
        if err != nil {
62✔
1432
                returnSema()
×
1433
                return err
×
1434
        }
×
1435

1436
        log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
11✔
1437
                "start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
11✔
1438
                startTime, endTime, len(newUpdatestoSend))
11✔
1439

11✔
1440
        // If we don't have any to send, then we can return early.
11✔
1441
        if len(newUpdatestoSend) == 0 {
20✔
1442
                returnSema()
9✔
1443
                return nil
9✔
1444
        }
9✔
1445

1446
        // Set the atomic flag to indicate we're starting to send the backlog.
1447
        // If the swap fails, it means another goroutine is already active, so
1448
        // we return early.
1449
        if !g.isSendingBacklog.CompareAndSwap(false, true) {
5✔
1450
                returnSema()
×
1451
                log.Debugf("GossipSyncer(%x): another goroutine already "+
×
1452
                        "sending backlog, skipping", g.cfg.peerPub[:])
×
1453

×
1454
                return nil
×
1455
        }
×
1456

1457
        // We'll conclude by launching a goroutine to send out any updates.
1458
        g.cg.WgAdd(1)
5✔
1459
        go func() {
10✔
1460
                defer g.cg.WgDone()
5✔
1461
                defer returnSema()
5✔
1462
                defer g.isSendingBacklog.Store(false)
5✔
1463

5✔
1464
                for _, msg := range newUpdatestoSend {
10✔
1465
                        err := g.cfg.sendToPeerSync(ctx, msg)
5✔
1466
                        switch {
5✔
1467
                        case err == ErrGossipSyncerExiting:
×
1468
                                return
×
1469

1470
                        case err == lnpeer.ErrPeerExiting:
×
1471
                                return
×
1472

1473
                        case err != nil:
×
1474
                                log.Errorf("Unable to send message for "+
×
1475
                                        "peer catch up: %v", err)
×
1476
                        }
1477
                }
1478
        }()
1479

1480
        return nil
5✔
1481
}
1482

1483
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1484
// iff the message is within the bounds of their set gossip filter. If the peer
1485
// doesn't have a gossip filter set, then no messages will be forwarded.
1486
func (g *GossipSyncer) FilterGossipMsgs(ctx context.Context,
1487
        msgs ...msgWithSenders) {
5✔
1488

5✔
1489
        // If the peer doesn't have an update horizon set, then we won't send
5✔
1490
        // it any new update messages.
5✔
1491
        if g.remoteUpdateHorizon == nil {
9✔
1492
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
4✔
1493
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
4✔
1494
                return
4✔
1495
        }
4✔
1496

1497
        // If we've been signaled to exit, or are exiting, then we'll stop
1498
        // short.
1499
        select {
4✔
1500
        case <-g.cg.Done():
×
1501
                return
×
1502
        case <-ctx.Done():
×
1503
                return
×
1504
        default:
4✔
1505
        }
1506

1507
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1508
        // gossiper on peer termination to signal peer disconnect?
1509

1510
        var err error
4✔
1511

4✔
1512
        // Before we filter out the messages, we'll construct an index over the
4✔
1513
        // set of channel announcements and channel updates. This will allow us
4✔
1514
        // to quickly check if we should forward a chan ann, based on the known
4✔
1515
        // channel updates for a channel.
4✔
1516
        chanUpdateIndex := make(
4✔
1517
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
4✔
1518
        )
4✔
1519
        for _, msg := range msgs {
17✔
1520
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
13✔
1521
                if !ok {
23✔
1522
                        continue
10✔
1523
                }
1524

1525
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
6✔
1526
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
6✔
1527
                )
6✔
1528
        }
1529

1530
        // We'll construct a helper function that we'll us below to determine
1531
        // if a given messages passes the gossip msg filter.
1532
        g.Lock()
4✔
1533
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
4✔
1534
        endTime := startTime.Add(
4✔
1535
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
4✔
1536
        )
4✔
1537
        g.Unlock()
4✔
1538

4✔
1539
        passesFilter := func(timeStamp uint32) bool {
17✔
1540
                t := time.Unix(int64(timeStamp), 0)
13✔
1541
                return t.Equal(startTime) ||
13✔
1542
                        (t.After(startTime) && t.Before(endTime))
13✔
1543
        }
13✔
1544

1545
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
4✔
1546
        for _, msg := range msgs {
17✔
1547
                // If the target peer is the peer that sent us this message,
13✔
1548
                // then we'll exit early as we don't need to filter this
13✔
1549
                // message.
13✔
1550
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
16✔
1551
                        continue
3✔
1552
                }
1553

1554
                switch msg := msg.msg.(type) {
13✔
1555

1556
                // For each channel announcement message, we'll only send this
1557
                // message if the channel updates for the channel are between
1558
                // our time range.
1559
                case *lnwire.ChannelAnnouncement1:
7✔
1560
                        // First, we'll check if the channel updates are in
7✔
1561
                        // this message batch.
7✔
1562
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
7✔
1563
                        if !ok {
11✔
1564
                                // If not, we'll attempt to query the database
4✔
1565
                                // to see if we know of the updates.
4✔
1566
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
4✔
1567
                                        g.cfg.chainHash, msg.ShortChannelID,
4✔
1568
                                )
4✔
1569
                                if err != nil {
4✔
1570
                                        log.Warnf("no channel updates found for "+
×
1571
                                                "short_chan_id=%v",
×
1572
                                                msg.ShortChannelID)
×
1573
                                        continue
×
1574
                                }
1575
                        }
1576

1577
                        for _, chanUpdate := range chanUpdates {
14✔
1578
                                if passesFilter(chanUpdate.Timestamp) {
11✔
1579
                                        msgsToSend = append(msgsToSend, msg)
4✔
1580
                                        break
4✔
1581
                                }
1582
                        }
1583

1584
                        if len(chanUpdates) == 0 {
10✔
1585
                                msgsToSend = append(msgsToSend, msg)
3✔
1586
                        }
3✔
1587

1588
                // For each channel update, we'll only send if it the timestamp
1589
                // is between our time range.
1590
                case *lnwire.ChannelUpdate1:
6✔
1591
                        if passesFilter(msg.Timestamp) {
10✔
1592
                                msgsToSend = append(msgsToSend, msg)
4✔
1593
                        }
4✔
1594

1595
                // Similarly, we only send node announcements if the update
1596
                // timestamp ifs between our set gossip filter time range.
1597
                case *lnwire.NodeAnnouncement:
6✔
1598
                        if passesFilter(msg.Timestamp) {
10✔
1599
                                msgsToSend = append(msgsToSend, msg)
4✔
1600
                        }
4✔
1601
                }
1602
        }
1603

1604
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
4✔
1605
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
4✔
1606

4✔
1607
        if len(msgsToSend) == 0 {
7✔
1608
                return
3✔
1609
        }
3✔
1610

1611
        if err = g.cfg.sendToPeer(ctx, msgsToSend...); err != nil {
4✔
1612
                log.Errorf("unable to send gossip msgs: %v", err)
×
1613
        }
×
1614

1615
}
1616

1617
// ProcessQueryMsg is used by outside callers to pass new channel time series
1618
// queries to the internal processing goroutine.
1619
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
115✔
1620
        var msgChan chan lnwire.Message
115✔
1621
        switch msg.(type) {
115✔
1622
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
3✔
1623
                msgChan = g.queryMsgs
3✔
1624

1625
        // Reply messages should only be expected in states where we're waiting
1626
        // for a reply.
1627
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
115✔
1628
                g.Lock()
115✔
1629
                syncState := g.syncState()
115✔
1630
                g.Unlock()
115✔
1631

115✔
1632
                if syncState != waitingQueryRangeReply &&
115✔
1633
                        syncState != waitingQueryChanReply {
116✔
1634

1✔
1635
                        return fmt.Errorf("unexpected msg %T received in "+
1✔
1636
                                "state %v", msg, syncState)
1✔
1637
                }
1✔
1638
                msgChan = g.gossipMsgs
114✔
1639

1640
        default:
×
1641
                msgChan = g.gossipMsgs
×
1642
        }
1643

1644
        select {
114✔
1645
        case msgChan <- msg:
114✔
1646
        case <-peerQuit:
×
1647
        case <-g.cg.Done():
×
1648
        }
1649

1650
        return nil
114✔
1651
}
1652

1653
// setSyncState sets the gossip syncer's state to the given state.
1654
func (g *GossipSyncer) setSyncState(state syncerState) {
151✔
1655
        atomic.StoreUint32(&g.state, uint32(state))
151✔
1656
}
151✔
1657

1658
// syncState returns the current syncerState of the target GossipSyncer.
1659
func (g *GossipSyncer) syncState() syncerState {
535✔
1660
        return syncerState(atomic.LoadUint32(&g.state))
535✔
1661
}
535✔
1662

1663
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1664
// a signal for when the GossipSyncer has reached its chansSynced state.
1665
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
17✔
1666
        g.Lock()
17✔
1667
        defer g.Unlock()
17✔
1668

17✔
1669
        syncedSignal := make(chan struct{})
17✔
1670

17✔
1671
        syncState := syncerState(atomic.LoadUint32(&g.state))
17✔
1672
        if syncState == chansSynced {
22✔
1673
                close(syncedSignal)
5✔
1674
                return syncedSignal
5✔
1675
        }
5✔
1676

1677
        g.syncedSignal = syncedSignal
15✔
1678
        return g.syncedSignal
15✔
1679
}
1680

1681
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1682
// sync type to a new one.
1683
//
1684
// NOTE: This can only be done once the gossip syncer has reached its final
1685
// chansSynced state.
1686
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
17✔
1687
        errChan := make(chan error, 1)
17✔
1688
        select {
17✔
1689
        case g.syncTransitionReqs <- &syncTransitionReq{
1690
                newSyncType: newSyncType,
1691
                errChan:     errChan,
1692
        }:
17✔
1693
        case <-time.After(syncTransitionTimeout):
×
1694
                return ErrSyncTransitionTimeout
×
1695
        case <-g.cg.Done():
×
1696
                return ErrGossipSyncerExiting
×
1697
        }
1698

1699
        select {
17✔
1700
        case err := <-errChan:
17✔
1701
                return err
17✔
1702
        case <-g.cg.Done():
×
1703
                return ErrGossipSyncerExiting
×
1704
        }
1705
}
1706

1707
// handleSyncTransition handles a new sync type transition request.
1708
//
1709
// NOTE: The gossip syncer might have another sync state as a result of this
1710
// transition.
1711
func (g *GossipSyncer) handleSyncTransition(ctx context.Context,
1712
        req *syncTransitionReq) error {
17✔
1713

17✔
1714
        // Return early from any NOP sync transitions.
17✔
1715
        syncType := g.SyncType()
17✔
1716
        if syncType == req.newSyncType {
17✔
1717
                return nil
×
1718
        }
×
1719

1720
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
17✔
1721
                g.cfg.peerPub, syncType, req.newSyncType)
17✔
1722

17✔
1723
        var (
17✔
1724
                firstTimestamp time.Time
17✔
1725
                timestampRange uint32
17✔
1726
        )
17✔
1727

17✔
1728
        switch req.newSyncType {
17✔
1729
        // If an active sync has been requested, then we should resume receiving
1730
        // new graph updates from the remote peer.
1731
        case ActiveSync, PinnedSync:
15✔
1732
                firstTimestamp = time.Now()
15✔
1733
                timestampRange = math.MaxUint32
15✔
1734

1735
        // If a PassiveSync transition has been requested, then we should no
1736
        // longer receive any new updates from the remote peer. We can do this
1737
        // by setting our update horizon to a range in the past ensuring no
1738
        // graph updates match the timestamp range.
1739
        case PassiveSync:
2✔
1740
                firstTimestamp = zeroTimestamp
2✔
1741
                timestampRange = 0
2✔
1742

1743
        default:
×
1744
                return fmt.Errorf("unhandled sync transition %v",
×
1745
                        req.newSyncType)
×
1746
        }
1747

1748
        err := g.sendGossipTimestampRange(ctx, firstTimestamp, timestampRange)
17✔
1749
        if err != nil {
17✔
1750
                return fmt.Errorf("unable to send local update horizon: %w",
×
1751
                        err)
×
1752
        }
×
1753

1754
        g.setSyncType(req.newSyncType)
17✔
1755

17✔
1756
        return nil
17✔
1757
}
1758

1759
// setSyncType sets the gossip syncer's sync type to the given type.
1760
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
70✔
1761
        atomic.StoreUint32(&g.syncType, uint32(syncType))
70✔
1762
}
70✔
1763

1764
// SyncType returns the current SyncerType of the target GossipSyncer.
1765
func (g *GossipSyncer) SyncType() SyncerType {
406✔
1766
        return SyncerType(atomic.LoadUint32(&g.syncType))
406✔
1767
}
406✔
1768

1769
// historicalSync sends a request to the gossip syncer to perofmr a historical
1770
// sync.
1771
//
1772
// NOTE: This can only be done once the gossip syncer has reached its final
1773
// chansSynced state.
1774
func (g *GossipSyncer) historicalSync() error {
19✔
1775
        done := make(chan struct{})
19✔
1776

19✔
1777
        select {
19✔
1778
        case g.historicalSyncReqs <- &historicalSyncReq{
1779
                doneChan: done,
1780
        }:
19✔
1781
        case <-time.After(syncTransitionTimeout):
×
1782
                return ErrSyncTransitionTimeout
×
1783
        case <-g.cg.Done():
×
1784
                return ErrGossiperShuttingDown
×
1785
        }
1786

1787
        select {
19✔
1788
        case <-done:
19✔
1789
                return nil
19✔
1790
        case <-g.cg.Done():
×
1791
                return ErrGossiperShuttingDown
×
1792
        }
1793
}
1794

1795
// handleHistoricalSync handles a request to the gossip syncer to perform a
1796
// historical sync.
1797
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
19✔
1798
        // We'll go back to our initial syncingChans state in order to request
19✔
1799
        // the remote peer to give us all of the channel IDs they know of
19✔
1800
        // starting from the genesis block.
19✔
1801
        g.genHistoricalChanRangeQuery = true
19✔
1802
        g.setSyncState(syncingChans)
19✔
1803
        close(req.doneChan)
19✔
1804
}
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc