• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18982684365

31 Oct 2025 07:08PM UTC coverage: 66.645% (+0.006%) from 66.639%
18982684365

Pull #10330

github

web-flow
Merge 928b32bfb into f938e40af
Pull Request #10330: discovery: fix potential infinite loop bug re context cancel error handling in gossip syncer

32 of 36 new or added lines in 1 file covered. (88.89%)

75 existing lines in 18 files now uncovered.

137262 of 205959 relevant lines covered (66.65%)

21275.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.61
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "iter"
8
        "math"
9
        "math/rand"
10
        "sort"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/chaincfg/chainhash"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/graph"
18
        graphdb "github.com/lightningnetwork/lnd/graph/db"
19
        "github.com/lightningnetwork/lnd/lnpeer"
20
        "github.com/lightningnetwork/lnd/lnwire"
21
        "golang.org/x/time/rate"
22
)
23

24
// SyncerType encapsulates the different types of syncing mechanisms for a
25
// gossip syncer.
26
type SyncerType uint8
27

28
const (
29
        // ActiveSync denotes that a gossip syncer:
30
        //
31
        // 1. Should not attempt to synchronize with the remote peer for
32
        //    missing channels.
33
        // 2. Should respond to queries from the remote peer.
34
        // 3. Should receive new updates from the remote peer.
35
        //
36
        // They are started in a chansSynced state in order to accomplish their
37
        // responsibilities above.
38
        ActiveSync SyncerType = iota
39

40
        // PassiveSync denotes that a gossip syncer:
41
        //
42
        // 1. Should not attempt to synchronize with the remote peer for
43
        //    missing channels.
44
        // 2. Should respond to queries from the remote peer.
45
        // 3. Should not receive new updates from the remote peer.
46
        //
47
        // They are started in a chansSynced state in order to accomplish their
48
        // responsibilities above.
49
        PassiveSync
50

51
        // PinnedSync denotes an ActiveSync that doesn't count towards the
52
        // default active syncer limits and is always active throughout the
53
        // duration of the peer's connection. Each pinned syncer will begin by
54
        // performing a historical sync to ensure we are well synchronized with
55
        // their routing table.
56
        PinnedSync
57
)
58

59
const (
60
        // defaultTimestampQueueSize is the size of the timestamp range queue
61
        // used.
62
        defaultTimestampQueueSize = 1
63
)
64

65
// String returns a human readable string describing the target SyncerType.
66
func (t SyncerType) String() string {
3✔
67
        switch t {
3✔
68
        case ActiveSync:
3✔
69
                return "ActiveSync"
3✔
70
        case PassiveSync:
3✔
71
                return "PassiveSync"
3✔
72
        case PinnedSync:
3✔
73
                return "PinnedSync"
3✔
74
        default:
×
75
                return fmt.Sprintf("unknown sync type %d", t)
×
76
        }
77
}
78

79
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
80
// allowing new gossip messages to be received from the peer.
81
func (t SyncerType) IsActiveSync() bool {
47✔
82
        switch t {
47✔
83
        case ActiveSync, PinnedSync:
17✔
84
                return true
17✔
85
        default:
33✔
86
                return false
33✔
87
        }
88
}
89

90
// syncerState is an enum that represents the current state of the GossipSyncer.
91
// As the syncer is a state machine, we'll gate our actions based off of the
92
// current state and the next incoming message.
93
type syncerState uint32
94

95
const (
96
        // syncingChans is the default state of the GossipSyncer. We start in
97
        // this state when a new peer first connects and we don't yet know if
98
        // we're fully synchronized.
99
        syncingChans syncerState = iota
100

101
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
102
        // We enter this state after we send out our first QueryChannelRange
103
        // reply. We'll stay in this state until the remote party sends us a
104
        // ReplyShortChanIDsEnd message that indicates they've responded to our
105
        // query entirely. After this state, we'll transition to
106
        // waitingQueryChanReply after we send out requests for all the new
107
        // chan ID's to us.
108
        waitingQueryRangeReply
109

110
        // queryNewChannels is the third main phase of the GossipSyncer.  In
111
        // this phase we'll send out all of our QueryShortChanIDs messages in
112
        // response to the new channels that we don't yet know about.
113
        queryNewChannels
114

115
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
116
        // We enter this phase once we've sent off a query chink to the remote
117
        // peer.  We'll stay in this phase until we receive a
118
        // ReplyShortChanIDsEnd message which indicates that the remote party
119
        // has responded to all of our requests.
120
        waitingQueryChanReply
121

122
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
123
        // this phase, we'll send out our update horizon, which filters out the
124
        // set of channel updates that we're interested in. In this state,
125
        // we'll be able to accept any outgoing messages from the
126
        // AuthenticatedGossiper, and decide if we should forward them to our
127
        // target peer based on its update horizon.
128
        chansSynced
129

130
        // syncerIdle is a state in which the gossip syncer can handle external
131
        // requests to transition or perform historical syncs. It is used as the
132
        // initial state for pinned syncers, as well as a fallthrough case for
133
        // chansSynced allowing fully synced peers to facilitate requests.
134
        syncerIdle
135
)
136

137
// String returns a human readable string describing the target syncerState.
138
func (s syncerState) String() string {
4✔
139
        switch s {
4✔
140
        case syncingChans:
3✔
141
                return "syncingChans"
3✔
142

143
        case waitingQueryRangeReply:
3✔
144
                return "waitingQueryRangeReply"
3✔
145

146
        case queryNewChannels:
3✔
147
                return "queryNewChannels"
3✔
148

149
        case waitingQueryChanReply:
3✔
150
                return "waitingQueryChanReply"
3✔
151

152
        case chansSynced:
4✔
153
                return "chansSynced"
4✔
154

155
        case syncerIdle:
3✔
156
                return "syncerIdle"
3✔
157

158
        default:
×
159
                return "UNKNOWN STATE"
×
160
        }
161
}
162

163
const (
164
        // maxQueryChanRangeReplies specifies the default limit of replies to
165
        // process for a single QueryChannelRange request.
166
        maxQueryChanRangeReplies = 500
167

168
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
169
        // the maximum number of replies allowed for zlib encoded replies.
170
        maxQueryChanRangeRepliesZlibFactor = 4
171

172
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
173
        // asking the remote peer for their any channels they know of beyond
174
        // our highest known channel ID.
175
        chanRangeQueryBuffer = 144
176

177
        // syncTransitionTimeout is the default timeout in which we'll wait up
178
        // to when attempting to perform a sync transition.
179
        syncTransitionTimeout = 5 * time.Second
180

181
        // requestBatchSize is the maximum number of channels we will query the
182
        // remote peer for in a QueryShortChanIDs message.
183
        requestBatchSize = 500
184

185
        // syncerBufferSize is the size of the syncer's buffers.
186
        syncerBufferSize = 50
187
)
188

189
var (
190
        // encodingTypeToChunkSize maps an encoding type, to the max number of
191
        // short chan ID's using the encoding type that we can fit into a
192
        // single message safely.
193
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
194
                lnwire.EncodingSortedPlain: 8000,
195
        }
196

197
        // ErrGossipSyncerExiting signals that the syncer has been killed.
198
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
199

200
        // ErrSyncTransitionTimeout is an error returned when we've timed out
201
        // attempting to perform a sync transition.
202
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
203
                "transition sync type")
204

205
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
206
        // peers that we do not want to receive any new graph updates.
207
        zeroTimestamp time.Time
208
)
209

210
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
211
type syncTransitionReq struct {
212
        newSyncType SyncerType
213
        errChan     chan error
214
}
215

216
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
217
// historical sync.
218
type historicalSyncReq struct {
219
        // doneChan is a channel that serves as a signal and is closed to ensure
220
        // the historical sync is attempted by the time we return to the caller.
221
        doneChan chan struct{}
222
}
223

224
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
225
// needs to carry out its duties.
226
type gossipSyncerCfg struct {
227
        // chainHash is the chain that this syncer is responsible for.
228
        chainHash chainhash.Hash
229

230
        // peerPub is the public key of the peer we're syncing with, serialized
231
        // in compressed format.
232
        peerPub [33]byte
233

234
        // channelSeries is the primary interface that we'll use to generate
235
        // our queries and respond to the queries of the remote peer.
236
        channelSeries ChannelGraphTimeSeries
237

238
        // encodingType is the current encoding type we're aware of. Requests
239
        // with different encoding types will be rejected.
240
        encodingType lnwire.QueryEncoding
241

242
        // chunkSize is the max number of short chan IDs using the syncer's
243
        // encoding type that we can fit into a single message safely.
244
        chunkSize int32
245

246
        // batchSize is the max number of channels the syncer will query from
247
        // the remote node in a single QueryShortChanIDs request.
248
        batchSize int32
249

250
        // sendMsg sends a variadic number of messages to the remote peer.
251
        // The boolean indicates whether this method should be blocked or not
252
        // while waiting for sends to be written to the wire.
253
        sendMsg func(context.Context, bool, ...lnwire.Message) error
254

255
        // noSyncChannels will prevent the GossipSyncer from spawning a
256
        // channelGraphSyncer, meaning we will not try to reconcile unknown
257
        // channels with the remote peer.
258
        noSyncChannels bool
259

260
        // noReplyQueries will prevent the GossipSyncer from spawning a
261
        // replyHandler, meaning we will not reply to queries from our remote
262
        // peer.
263
        noReplyQueries bool
264

265
        // noTimestampQueryOption will prevent the GossipSyncer from querying
266
        // timestamps of announcement messages from the peer, and it will
267
        // prevent it from responding to timestamp queries.
268
        noTimestampQueryOption bool
269

270
        // ignoreHistoricalFilters will prevent syncers from replying with
271
        // historical data when the remote peer sets a gossip_timestamp_range.
272
        // This prevents ranges with old start times from causing us to dump the
273
        // graph on connect.
274
        ignoreHistoricalFilters bool
275

276
        // bestHeight returns the latest height known of the chain.
277
        bestHeight func() uint32
278

279
        // markGraphSynced updates the SyncManager's perception of whether we
280
        // have completed at least one historical sync.
281
        markGraphSynced func()
282

283
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
284
        // for a single QueryChannelRange request.
285
        maxQueryChanRangeReplies uint32
286

287
        // isStillZombieChannel takes the timestamps of the latest channel
288
        // updates for a channel and returns true if the channel should be
289
        // considered a zombie based on these timestamps.
290
        isStillZombieChannel func(time.Time, time.Time) bool
291

292
        // timestampQueueSize is the size of the timestamp range queue. If not
293
        // set, defaults to the global timestampQueueSize constant.
294
        timestampQueueSize int
295

296
        // msgBytesPerSecond is the allotted bandwidth rate, expressed in
297
        // bytes/second that this gossip syncer can consume. Once we exceed this
298
        // rate, message sending will block until we're below the rate.
299
        msgBytesPerSecond uint64
300
}
301

302
// GossipSyncer is a struct that handles synchronizing the channel graph state
303
// with a remote peer. The GossipSyncer implements a state machine that will
304
// progressively ensure we're synchronized with the channel state of the remote
305
// node. Once both nodes have been synchronized, we'll use an update filter to
306
// filter out which messages should be sent to a remote peer based on their
307
// update horizon. If the update horizon isn't specified, then we won't send
308
// them any channel updates at all.
309
type GossipSyncer struct {
310
        started sync.Once
311
        stopped sync.Once
312

313
        // state is the current state of the GossipSyncer.
314
        //
315
        // NOTE: This variable MUST be used atomically.
316
        state uint32
317

318
        // syncType denotes the SyncerType the gossip syncer is currently
319
        // exercising.
320
        //
321
        // NOTE: This variable MUST be used atomically.
322
        syncType uint32
323

324
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
325
        // use this to properly filter out any messages.
326
        remoteUpdateHorizon *lnwire.GossipTimestampRange
327

328
        // localUpdateHorizon is our local update horizon, we'll use this to
329
        // determine if we've already sent out our update.
330
        localUpdateHorizon *lnwire.GossipTimestampRange
331

332
        // syncTransitions is a channel through which new sync type transition
333
        // requests will be sent through. These requests should only be handled
334
        // when the gossip syncer is in a chansSynced state to ensure its state
335
        // machine behaves as expected.
336
        syncTransitionReqs chan *syncTransitionReq
337

338
        // historicalSyncReqs is a channel that serves as a signal for the
339
        // gossip syncer to perform a historical sync. These can only be done
340
        // once the gossip syncer is in a chansSynced state to ensure its state
341
        // machine behaves as expected.
342
        historicalSyncReqs chan *historicalSyncReq
343

344
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
345
        // that it should request the remote peer for all of its known channel
346
        // IDs starting from the genesis block of the chain. This can only
347
        // happen if the gossip syncer receives a request to attempt a
348
        // historical sync. It can be unset if the syncer ever transitions from
349
        // PassiveSync to ActiveSync.
350
        genHistoricalChanRangeQuery bool
351

352
        // gossipMsgs is a channel that all responses to our queries from the
353
        // target peer will be sent over, these will be read by the
354
        // channelGraphSyncer.
355
        gossipMsgs chan lnwire.Message
356

357
        // queryMsgs is a channel that all queries from the remote peer will be
358
        // received over, these will be read by the replyHandler.
359
        queryMsgs chan lnwire.Message
360

361
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
362
        // we've sent to a peer to ensure we've consumed all expected replies.
363
        // This field is primarily used within the waitingQueryChanReply state.
364
        curQueryRangeMsg *lnwire.QueryChannelRange
365

366
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
367
        // message we've received from a peer to ensure they've fully replied to
368
        // our query by ensuring they covered our requested block range. This
369
        // field is primarily used within the waitingQueryChanReply state.
370
        prevReplyChannelRange *lnwire.ReplyChannelRange
371

372
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
373
        // buffer all the chunked response to our query.
374
        bufferedChanRangeReplies []graphdb.ChannelUpdateInfo
375

376
        // numChanRangeRepliesRcvd is used to track the number of replies
377
        // received as part of a QueryChannelRange. This field is primarily used
378
        // within the waitingQueryChanReply state.
379
        numChanRangeRepliesRcvd uint32
380

381
        // newChansToQuery is used to pass the set of channels we should query
382
        // for from the waitingQueryChanReply state to the queryNewChannels
383
        // state.
384
        newChansToQuery []lnwire.ShortChannelID
385

386
        cfg gossipSyncerCfg
387

388
        // syncedSignal is a channel that, if set, will be closed when the
389
        // GossipSyncer reaches its terminal chansSynced state.
390
        syncedSignal chan struct{}
391

392
        // syncerSema is used to more finely control the syncer's ability to
393
        // respond to gossip timestamp range messages.
394
        syncerSema chan struct{}
395

396
        // timestampRangeQueue is a buffered channel for queuing timestamp range
397
        // messages that need to be processed asynchronously. This prevents the
398
        // gossiper from blocking when ApplyGossipFilter is called.
399
        timestampRangeQueue chan *lnwire.GossipTimestampRange
400

401
        // isSendingBacklog is an atomic flag that indicates whether a goroutine
402
        // is currently sending the backlog of messages. This ensures only one
403
        // goroutine is active at a time.
404
        isSendingBacklog atomic.Bool
405

406
        sync.Mutex
407

408
        // cg is a helper that encapsulates a wait group and quit channel and
409
        // allows contexts that either block or cancel on those depending on
410
        // the use case.
411
        cg *fn.ContextGuard
412

413
        // rateLimiter dictates the frequency with which we will reply to gossip
414
        // queries to this peer.
415
        rateLimiter *rate.Limiter
416
}
417

418
// newGossipSyncer returns a new instance of the GossipSyncer populated using
419
// the passed config.
420
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
162✔
421
        // Use the configured queue size if set, otherwise use the default.
162✔
422
        queueSize := cfg.timestampQueueSize
162✔
423
        if queueSize == 0 {
190✔
424
                queueSize = defaultTimestampQueueSize
28✔
425
        }
28✔
426

427
        bytesPerSecond := cfg.msgBytesPerSecond
162✔
428
        if bytesPerSecond == 0 {
321✔
429
                bytesPerSecond = DefaultPeerMsgBytesPerSecond
159✔
430
        }
159✔
431
        bytesBurst := 2 * bytesPerSecond
162✔
432

162✔
433
        // We'll use this rate limiter to limit this single peer.
162✔
434
        rateLimiter := rate.NewLimiter(
162✔
435
                rate.Limit(bytesPerSecond), int(bytesBurst),
162✔
436
        )
162✔
437

162✔
438
        return &GossipSyncer{
162✔
439
                cfg:                cfg,
162✔
440
                syncTransitionReqs: make(chan *syncTransitionReq),
162✔
441
                historicalSyncReqs: make(chan *historicalSyncReq),
162✔
442
                gossipMsgs:         make(chan lnwire.Message, syncerBufferSize),
162✔
443
                queryMsgs:          make(chan lnwire.Message, syncerBufferSize),
162✔
444
                timestampRangeQueue: make(
162✔
445
                        chan *lnwire.GossipTimestampRange, queueSize,
162✔
446
                ),
162✔
447
                syncerSema:  sema,
162✔
448
                cg:          fn.NewContextGuard(),
162✔
449
                rateLimiter: rateLimiter,
162✔
450
        }
162✔
451
}
452

453
// Start starts the GossipSyncer and any goroutines that it needs to carry out
454
// its duties.
455
func (g *GossipSyncer) Start() {
92✔
456
        g.started.Do(func() {
184✔
457
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
92✔
458

92✔
459
                ctx, _ := g.cg.Create(context.Background())
92✔
460

92✔
461
                // TODO(conner): only spawn channelGraphSyncer if remote
92✔
462
                // supports gossip queries, and only spawn replyHandler if we
92✔
463
                // advertise support
92✔
464
                if !g.cfg.noSyncChannels {
183✔
465
                        g.cg.WgAdd(1)
91✔
466
                        go g.channelGraphSyncer(ctx)
91✔
467
                }
91✔
468
                if !g.cfg.noReplyQueries {
179✔
469
                        g.cg.WgAdd(1)
87✔
470
                        go g.replyHandler(ctx)
87✔
471
                }
87✔
472

473
                // Start the timestamp range queue processor to handle gossip
474
                // filter applications asynchronously.
475
                if !g.cfg.noTimestampQueryOption {
175✔
476
                        g.cg.WgAdd(1)
83✔
477
                        go g.processTimestampRangeQueue(ctx)
83✔
478
                }
83✔
479
        })
480
}
481

482
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
483
// exited.
484
func (g *GossipSyncer) Stop() {
89✔
485
        g.stopped.Do(func() {
178✔
486
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
89✔
487
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
89✔
488

89✔
489
                g.cg.Quit()
89✔
490
        })
89✔
491
}
492

493
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
494
// in this state, we will send a QueryChannelRange msg to our peer and advance
495
// the syncer's state to waitingQueryRangeReply. Returns an error if a fatal
496
// error occurs that should cause the goroutine to exit.
497
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) error {
75✔
498
        // Prepare the query msg.
75✔
499
        queryRangeMsg, err := g.genChanRangeQuery(
75✔
500
                ctx, g.genHistoricalChanRangeQuery,
75✔
501
        )
75✔
502
        if err != nil {
75✔
503
                log.Errorf("Unable to gen chan range query: %v", err)
×
NEW
504

×
NEW
505
                // Any error here is likely fatal (context cancelled, db error,
×
NEW
506
                // etc.), so return it to exit the goroutine cleanly.
×
NEW
507
                return err
×
UNCOV
508
        }
×
509

510
        // Acquire a lock so the following state transition is atomic.
511
        //
512
        // NOTE: We must lock the following steps as it's possible we get an
513
        // immediate response (ReplyChannelRange) after sending the query msg.
514
        // The response is handled in ProcessQueryMsg, which requires the
515
        // current state to be waitingQueryRangeReply.
516
        g.Lock()
75✔
517
        defer g.Unlock()
75✔
518

75✔
519
        // Send the msg to the remote peer, which is non-blocking as
75✔
520
        // `sendToPeer` only queues the msg in Brontide.
75✔
521
        err = g.sendToPeer(ctx, queryRangeMsg)
75✔
522
        if err != nil {
77✔
523
                log.Errorf("Unable to send chan range query: %v", err)
2✔
524

2✔
525
                // Any send error (peer exiting, connection closed, rate
2✔
526
                // limiter signaling exit, etc.) is fatal, so return it to
2✔
527
                // exit the goroutine cleanly.
2✔
528
                return err
2✔
529
        }
2✔
530

531
        // With the message sent successfully, we'll transition into the next
532
        // state where we wait for their reply.
533
        g.setSyncState(waitingQueryRangeReply)
73✔
534

73✔
535
        return nil
73✔
536
}
537

538
// channelGraphSyncer is the main goroutine responsible for ensuring that we
539
// properly channel graph state with the remote peer, and also that we only
540
// send them messages which actually pass their defined update horizon.
541
func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
91✔
542
        defer g.cg.WgDone()
91✔
543

91✔
544
        for {
404✔
545
                state := g.syncState()
313✔
546
                syncType := g.SyncType()
313✔
547

313✔
548
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
313✔
549
                        g.cfg.peerPub[:], state, syncType)
313✔
550

313✔
551
                switch state {
313✔
552
                // When we're in this state, we're trying to synchronize our
553
                // view of the network with the remote peer. We'll kick off
554
                // this sync by asking them for the set of channels they
555
                // understand, as we'll as responding to any other queries by
556
                // them.
557
                case syncingChans:
75✔
558
                        err := g.handleSyncingChans(ctx)
75✔
559
                        if err != nil {
77✔
560
                                log.Debugf("GossipSyncer(%x): exiting due to "+
2✔
561
                                        "error in syncingChans: %v",
2✔
562
                                        g.cfg.peerPub[:], err)
2✔
563

2✔
564
                                return
2✔
565
                        }
2✔
566

567
                // In this state, we've sent out our initial channel range
568
                // query and are waiting for the final response from the remote
569
                // peer before we perform a diff to see with channels they know
570
                // of that we don't.
571
                case waitingQueryRangeReply:
175✔
572
                        // We'll wait to either process a new message from the
175✔
573
                        // remote party, or exit due to the gossiper exiting,
175✔
574
                        // or us being signalled to do so.
175✔
575
                        select {
175✔
576
                        case msg := <-g.gossipMsgs:
120✔
577
                                // The remote peer is sending a response to our
120✔
578
                                // initial query, we'll collate this response,
120✔
579
                                // and see if it's the final one in the series.
120✔
580
                                // If so, we can then transition to querying
120✔
581
                                // for the new channels.
120✔
582
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
120✔
583
                                if ok {
240✔
584
                                        err := g.processChanRangeReply(
120✔
585
                                                ctx, queryReply,
120✔
586
                                        )
120✔
587
                                        if err != nil {
120✔
588
                                                log.Errorf("Unable to "+
×
589
                                                        "process chan range "+
×
590
                                                        "query: %v", err)
×
591
                                                return
×
592
                                        }
×
593
                                        continue
120✔
594
                                }
595

596
                                log.Warnf("Unexpected message: %T in state=%v",
×
597
                                        msg, state)
×
598

UNCOV
599
                        case <-g.cg.Done():
×
UNCOV
600
                                return
×
601

602
                        case <-ctx.Done():
55✔
603
                                return
55✔
604
                        }
605

606
                // We'll enter this state once we've discovered which channels
607
                // the remote party knows of that we don't yet know of
608
                // ourselves.
609
                case queryNewChannels:
8✔
610
                        // First, we'll attempt to continue our channel
8✔
611
                        // synchronization by continuing to send off another
8✔
612
                        // query chunk.
8✔
613
                        done, err := g.synchronizeChanIDs(ctx)
8✔
614
                        if err != nil {
10✔
615
                                log.Debugf("GossipSyncer(%x): exiting due to "+
2✔
616
                                        "error in queryNewChannels: %v",
2✔
617
                                        g.cfg.peerPub[:], err)
2✔
618

2✔
619
                                return
2✔
620
                        }
2✔
621

622
                        // If this wasn't our last query, then we'll need to
623
                        // transition to our waiting state.
624
                        if !done {
11✔
625
                                continue
5✔
626
                        }
627

628
                        // If we're fully synchronized, then we can transition
629
                        // to our terminal state.
630
                        g.setSyncState(chansSynced)
4✔
631

4✔
632
                        // Ensure that the sync manager becomes aware that the
4✔
633
                        // historical sync completed so synced_to_graph is
4✔
634
                        // updated over rpc.
4✔
635
                        g.cfg.markGraphSynced()
4✔
636

637
                // In this state, we've just sent off a new query for channels
638
                // that we don't yet know of. We'll remain in this state until
639
                // the remote party signals they've responded to our query in
640
                // totality.
641
                case waitingQueryChanReply:
5✔
642
                        // Once we've sent off our query, we'll wait for either
5✔
643
                        // an ending reply, or just another query from the
5✔
644
                        // remote peer.
5✔
645
                        select {
5✔
646
                        case msg := <-g.gossipMsgs:
5✔
647
                                // If this is the final reply to one of our
5✔
648
                                // queries, then we'll loop back into our query
5✔
649
                                // state to send of the remaining query chunks.
5✔
650
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
5✔
651
                                if ok {
10✔
652
                                        g.setSyncState(queryNewChannels)
5✔
653
                                        continue
5✔
654
                                }
655

656
                                log.Warnf("Unexpected message: %T in state=%v",
×
657
                                        msg, state)
×
658

659
                        case <-g.cg.Done():
×
660
                                return
×
661

662
                        case <-ctx.Done():
×
663
                                return
×
664
                        }
665

666
                // This is our final terminal state where we'll only reply to
667
                // any further queries by the remote peer.
668
                case chansSynced:
59✔
669
                        g.Lock()
59✔
670
                        if g.syncedSignal != nil {
70✔
671
                                close(g.syncedSignal)
11✔
672
                                g.syncedSignal = nil
11✔
673
                        }
11✔
674
                        g.Unlock()
59✔
675

59✔
676
                        // If we haven't yet sent out our update horizon, and
59✔
677
                        // we want to receive real-time channel updates, we'll
59✔
678
                        // do so now.
59✔
679
                        if g.localUpdateHorizon == nil &&
59✔
680
                                syncType.IsActiveSync() {
76✔
681

17✔
682
                                err := g.sendGossipTimestampRange(
17✔
683
                                        ctx, time.Now(), math.MaxUint32,
17✔
684
                                )
17✔
685
                                if err != nil {
17✔
686
                                        log.Errorf("Unable to send update "+
×
687
                                                "horizon to %x: %v",
×
688
                                                g.cfg.peerPub, err)
×
689
                                }
×
690
                        }
691
                        // With our horizon set, we'll simply reply to any new
692
                        // messages or process any state transitions and exit if
693
                        // needed.
694
                        fallthrough
59✔
695

696
                // Pinned peers will begin in this state, since they will
697
                // immediately receive a request to perform a historical sync.
698
                // Otherwise, we fall through after ending in chansSynced to
699
                // facilitate new requests.
700
                case syncerIdle:
62✔
701
                        select {
62✔
702
                        case req := <-g.syncTransitionReqs:
17✔
703
                                req.errChan <- g.handleSyncTransition(ctx, req)
17✔
704

705
                        case req := <-g.historicalSyncReqs:
19✔
706
                                g.handleHistoricalSync(req)
19✔
707

708
                        case <-g.cg.Done():
×
709
                                return
×
710

711
                        case <-ctx.Done():
29✔
712
                                return
29✔
713
                        }
714
                }
715
        }
716
}
717

718
// replyHandler is an event loop whose sole purpose is to reply to the remote
719
// peers queries. Our replyHandler will respond to messages generated by their
720
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
721
// the other's replyHandler, allowing the replyHandler to operate independently
722
// from the state machine maintained on the same node.
723
//
724
// NOTE: This method MUST be run as a goroutine.
725
func (g *GossipSyncer) replyHandler(ctx context.Context) {
87✔
726
        defer g.cg.WgDone()
87✔
727

87✔
728
        for {
179✔
729
                select {
92✔
730
                case msg := <-g.queryMsgs:
8✔
731
                        err := g.replyPeerQueries(ctx, msg)
8✔
732
                        switch {
8✔
733
                        case err == ErrGossipSyncerExiting:
×
734
                                return
×
735

736
                        case err == lnpeer.ErrPeerExiting:
×
737
                                return
×
738

739
                        case err != nil:
×
740
                                log.Errorf("Unable to reply to peer "+
×
741
                                        "query: %v", err)
×
742
                        }
743

744
                case <-g.cg.Done():
1✔
745
                        return
1✔
746

747
                case <-ctx.Done():
83✔
748
                        return
83✔
749
                }
750
        }
751
}
752

753
// processTimestampRangeQueue handles timestamp range messages from the queue
754
// asynchronously. This prevents blocking the gossiper when rate limiting is
755
// active and multiple peers are trying to apply gossip filters.
756
func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) {
83✔
757
        defer g.cg.WgDone()
83✔
758

83✔
759
        for {
171✔
760
                select {
88✔
761
                case msg := <-g.timestampRangeQueue:
55✔
762
                        // Process the timestamp range message. If we hit an
55✔
763
                        // error, log it but continue processing to avoid
55✔
764
                        // blocking the queue.
55✔
765
                        err := g.ApplyGossipFilter(ctx, msg)
55✔
766
                        switch {
55✔
767
                        case errors.Is(err, ErrGossipSyncerExiting):
×
768
                                return
×
769

770
                        case errors.Is(err, lnpeer.ErrPeerExiting):
×
771
                                return
×
772

773
                        case err != nil:
×
774
                                log.Errorf("Unable to apply gossip filter: %v",
×
775
                                        err)
×
776
                        }
777

778
                case <-g.cg.Done():
1✔
779
                        return
1✔
780

781
                case <-ctx.Done():
32✔
782
                        return
32✔
783
                }
784
        }
785
}
786

787
// QueueTimestampRange attempts to queue a timestamp range message for
788
// asynchronous processing. If the queue is full, it returns false to indicate
789
// the message was dropped.
790
func (g *GossipSyncer) QueueTimestampRange(
791
        msg *lnwire.GossipTimestampRange) bool {
2,039✔
792

2,039✔
793
        // If timestamp queries are disabled, don't queue the message.
2,039✔
794
        if g.cfg.noTimestampQueryOption {
2,039✔
795
                return false
×
796
        }
×
797

798
        select {
2,039✔
799
        case g.timestampRangeQueue <- msg:
783✔
800
                return true
783✔
801

802
        // Queue is full, drop the message to prevent blocking.
803
        default:
1,256✔
804
                log.Warnf("Timestamp range queue full for peer %x, "+
1,256✔
805
                        "dropping message", g.cfg.peerPub[:])
1,256✔
806
                return false
1,256✔
807
        }
808
}
809

810
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
811
// syncer and sends it to the remote peer.
812
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
813
        firstTimestamp time.Time, timestampRange uint32) error {
31✔
814

31✔
815
        endTimestamp := firstTimestamp.Add(
31✔
816
                time.Duration(timestampRange) * time.Second,
31✔
817
        )
31✔
818

31✔
819
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
31✔
820
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
31✔
821

31✔
822
        localUpdateHorizon := &lnwire.GossipTimestampRange{
31✔
823
                ChainHash:      g.cfg.chainHash,
31✔
824
                FirstTimestamp: uint32(firstTimestamp.Unix()),
31✔
825
                TimestampRange: timestampRange,
31✔
826
        }
31✔
827

31✔
828
        if err := g.sendToPeer(ctx, localUpdateHorizon); err != nil {
31✔
829
                return err
×
830
        }
×
831

832
        if firstTimestamp.Equal(zeroTimestamp) && timestampRange == 0 {
33✔
833
                g.localUpdateHorizon = nil
2✔
834
        } else {
31✔
835
                g.localUpdateHorizon = localUpdateHorizon
29✔
836
        }
29✔
837

838
        return nil
31✔
839
}
840

841
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
842
// the remote peer for its known set of channel IDs within a particular block
843
// range. This method will be called continually until the entire range has
844
// been queried for with a response received. We'll chunk our requests as
845
// required to ensure they fit into a single message. We may re-renter this
846
// state in the case that chunking is required. Returns true if synchronization
847
// is complete, and an error if a fatal error occurs that should cause the
848
// goroutine to exit.
849
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) (bool, error) {
11✔
850
        // If we're in this state yet there are no more new channels to query
11✔
851
        // for, then we'll transition to our final synced state and return true
11✔
852
        // to signal that we're fully synchronized.
11✔
853
        if len(g.newChansToQuery) == 0 {
15✔
854
                log.Infof("GossipSyncer(%x): no more chans to query",
4✔
855
                        g.cfg.peerPub[:])
4✔
856

4✔
857
                return true, nil
4✔
858
        }
4✔
859

860
        // Otherwise, we'll issue our next chunked query to receive replies
861
        // for.
862
        var queryChunk []lnwire.ShortChannelID
10✔
863

10✔
864
        // If the number of channels to query for is less than the chunk size,
10✔
865
        // then we can issue a single query.
10✔
866
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
15✔
867
                queryChunk = g.newChansToQuery
5✔
868
                g.newChansToQuery = nil
5✔
869

5✔
870
        } else {
10✔
871
                // Otherwise, we'll need to only query for the next chunk.
5✔
872
                // We'll slice into our query chunk, then slide down our main
5✔
873
                // pointer down by the chunk size.
5✔
874
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
5✔
875
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
5✔
876
        }
5✔
877

878
        log.Infof("GossipSyncer(%x): querying for %v new channels",
10✔
879
                g.cfg.peerPub[:], len(queryChunk))
10✔
880

10✔
881
        // Change the state before sending the query msg.
10✔
882
        g.setSyncState(waitingQueryChanReply)
10✔
883

10✔
884
        // With our chunk obtained, we'll send over our next query, then return
10✔
885
        // false indicating that we're net yet fully synced.
10✔
886
        err := g.sendToPeer(ctx, &lnwire.QueryShortChanIDs{
10✔
887
                ChainHash:    g.cfg.chainHash,
10✔
888
                EncodingType: lnwire.EncodingSortedPlain,
10✔
889
                ShortChanIDs: queryChunk,
10✔
890
        })
10✔
891
        if err != nil {
12✔
892
                log.Errorf("Unable to sync chan IDs: %v", err)
2✔
893

2✔
894
                // Any send error (peer exiting, connection closed, rate
2✔
895
                // limiter signaling exit, etc.) is fatal, so return it to
2✔
896
                // exit the goroutine cleanly.
2✔
897
                return false, err
2✔
898
        }
2✔
899

900
        return false, nil
8✔
901
}
902

903
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
904
// considered legacy. There was a point where lnd used to include the same query
905
// over multiple replies, rather than including the portion of the query the
906
// reply is handling. We'll use this as a way of detecting whether we are
907
// communicating with a legacy node so we can properly sync with them.
908
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
909
        reply *lnwire.ReplyChannelRange) bool {
253✔
910

253✔
911
        return (reply.ChainHash == query.ChainHash &&
253✔
912
                reply.FirstBlockHeight == query.FirstBlockHeight &&
253✔
913
                reply.NumBlocks == query.NumBlocks)
253✔
914
}
253✔
915

916
// processChanRangeReply is called each time the GossipSyncer receives a new
917
// reply to the initial range query to discover new channels that it didn't
918
// previously know of.
919
func (g *GossipSyncer) processChanRangeReply(_ context.Context,
920
        msg *lnwire.ReplyChannelRange) error {
128✔
921

128✔
922
        // isStale returns whether the timestamp is too far into the past.
128✔
923
        isStale := func(timestamp time.Time) bool {
161✔
924
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
33✔
925
        }
33✔
926

927
        // isSkewed returns whether the timestamp is too far into the future.
928
        isSkewed := func(timestamp time.Time) bool {
151✔
929
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
23✔
930
        }
23✔
931

932
        // If we're not communicating with a legacy node, we'll apply some
933
        // further constraints on their reply to ensure it satisfies our query.
934
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
238✔
935
                // The first block should be within our original request.
110✔
936
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
110✔
937
                        return fmt.Errorf("reply includes channels for height "+
×
938
                                "%v prior to query %v", msg.FirstBlockHeight,
×
939
                                g.curQueryRangeMsg.FirstBlockHeight)
×
940
                }
×
941

942
                // The last block should also be. We don't need to check the
943
                // intermediate ones because they should already be in sorted
944
                // order.
945
                replyLastHeight := msg.LastBlockHeight()
110✔
946
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
947
                if replyLastHeight > queryLastHeight {
110✔
948
                        return fmt.Errorf("reply includes channels for height "+
×
949
                                "%v after query %v", replyLastHeight,
×
950
                                queryLastHeight)
×
951
                }
×
952

953
                // If we've previously received a reply for this query, look at
954
                // its last block to ensure the current reply properly follows
955
                // it.
956
                if g.prevReplyChannelRange != nil {
215✔
957
                        prevReply := g.prevReplyChannelRange
105✔
958
                        prevReplyLastHeight := prevReply.LastBlockHeight()
105✔
959

105✔
960
                        // The current reply can either start from the previous
105✔
961
                        // reply's last block, if there are still more channels
105✔
962
                        // for the same block, or the block after.
105✔
963
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
105✔
964
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
105✔
965

×
966
                                return fmt.Errorf("first block of reply %v "+
×
967
                                        "does not continue from last block of "+
×
968
                                        "previous %v", msg.FirstBlockHeight,
×
969
                                        prevReplyLastHeight)
×
970
                        }
×
971
                }
972
        }
973

974
        g.prevReplyChannelRange = msg
128✔
975

128✔
976
        for i, scid := range msg.ShortChanIDs {
258✔
977
                info := graphdb.NewChannelUpdateInfo(
130✔
978
                        scid, time.Time{}, time.Time{},
130✔
979
                )
130✔
980

130✔
981
                if len(msg.Timestamps) != 0 {
145✔
982
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
15✔
983
                        info.Node1UpdateTimestamp = t1
15✔
984

15✔
985
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
15✔
986
                        info.Node2UpdateTimestamp = t2
15✔
987

15✔
988
                        // Sort out all channels with outdated or skewed
15✔
989
                        // timestamps. Both timestamps need to be out of
15✔
990
                        // boundaries for us to skip the channel and not query
15✔
991
                        // it later on.
15✔
992
                        switch {
15✔
993
                        case isStale(info.Node1UpdateTimestamp) &&
994
                                isStale(info.Node2UpdateTimestamp):
2✔
995

2✔
996
                                continue
2✔
997

998
                        case isSkewed(info.Node1UpdateTimestamp) &&
999
                                isSkewed(info.Node2UpdateTimestamp):
2✔
1000

2✔
1001
                                continue
2✔
1002

1003
                        case isStale(info.Node1UpdateTimestamp) &&
1004
                                isSkewed(info.Node2UpdateTimestamp):
2✔
1005

2✔
1006
                                continue
2✔
1007

1008
                        case isStale(info.Node2UpdateTimestamp) &&
1009
                                isSkewed(info.Node1UpdateTimestamp):
2✔
1010

2✔
1011
                                continue
2✔
1012
                        }
1013
                }
1014

1015
                g.bufferedChanRangeReplies = append(
122✔
1016
                        g.bufferedChanRangeReplies, info,
122✔
1017
                )
122✔
1018
        }
1019

1020
        switch g.cfg.encodingType {
128✔
1021
        case lnwire.EncodingSortedPlain:
128✔
1022
                g.numChanRangeRepliesRcvd++
128✔
1023
        case lnwire.EncodingSortedZlib:
×
1024
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
1025
        default:
×
1026
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
1027
        }
1028

1029
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
128✔
1030
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
128✔
1031

128✔
1032
        // If this isn't the last response and we can continue to receive more,
128✔
1033
        // then we can exit as we've already buffered the latest portion of the
128✔
1034
        // streaming reply.
128✔
1035
        maxReplies := g.cfg.maxQueryChanRangeReplies
128✔
1036
        switch {
128✔
1037
        // If we're communicating with a legacy node, we'll need to look at the
1038
        // complete field.
1039
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
18✔
1040
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
21✔
1041
                        return nil
3✔
1042
                }
3✔
1043

1044
        // Otherwise, we'll look at the reply's height range.
1045
        default:
110✔
1046
                replyLastHeight := msg.LastBlockHeight()
110✔
1047
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
1048

110✔
1049
                // TODO(wilmer): This might require some padding if the remote
110✔
1050
                // node is not aware of the last height we sent them, i.e., is
110✔
1051
                // behind a few blocks from us.
110✔
1052
                if replyLastHeight < queryLastHeight &&
110✔
1053
                        g.numChanRangeRepliesRcvd < maxReplies {
215✔
1054

105✔
1055
                        return nil
105✔
1056
                }
105✔
1057
        }
1058

1059
        log.Infof("GossipSyncer(%x): filtering through %v chans",
20✔
1060
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
20✔
1061

20✔
1062
        // Otherwise, this is the final response, so we'll now check to see
20✔
1063
        // which channels they know of that we don't.
20✔
1064
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
20✔
1065
                g.cfg.chainHash, g.bufferedChanRangeReplies,
20✔
1066
                g.cfg.isStillZombieChannel,
20✔
1067
        )
20✔
1068
        if err != nil {
20✔
1069
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
1070
        }
×
1071

1072
        // As we've received the entirety of the reply, we no longer need to
1073
        // hold on to the set of buffered replies or the original query that
1074
        // prompted the replies, so we'll let that be garbage collected now.
1075
        g.curQueryRangeMsg = nil
20✔
1076
        g.prevReplyChannelRange = nil
20✔
1077
        g.bufferedChanRangeReplies = nil
20✔
1078
        g.numChanRangeRepliesRcvd = 0
20✔
1079

20✔
1080
        // If there aren't any channels that we don't know of, then we can
20✔
1081
        // switch straight to our terminal state.
20✔
1082
        if len(newChans) == 0 {
37✔
1083
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
17✔
1084
                        g.cfg.peerPub[:])
17✔
1085

17✔
1086
                g.setSyncState(chansSynced)
17✔
1087

17✔
1088
                // Ensure that the sync manager becomes aware that the
17✔
1089
                // historical sync completed so synced_to_graph is updated over
17✔
1090
                // rpc.
17✔
1091
                g.cfg.markGraphSynced()
17✔
1092
                return nil
17✔
1093
        }
17✔
1094

1095
        // Otherwise, we'll set the set of channels that we need to query for
1096
        // the next state, and also transition our state.
1097
        g.newChansToQuery = newChans
6✔
1098
        g.setSyncState(queryNewChannels)
6✔
1099

6✔
1100
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
6✔
1101
                g.cfg.peerPub[:], len(newChans))
6✔
1102

6✔
1103
        return nil
6✔
1104
}
1105

1106
// genChanRangeQuery generates the initial message we'll send to the remote
1107
// party when we're kicking off the channel graph synchronization upon
1108
// connection. The historicalQuery boolean can be used to generate a query from
1109
// the genesis block of the chain.
1110
func (g *GossipSyncer) genChanRangeQuery(ctx context.Context,
1111
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
79✔
1112

79✔
1113
        // First, we'll query our channel graph time series for its highest
79✔
1114
        // known channel ID.
79✔
1115
        newestChan, err := g.cfg.channelSeries.HighestChanID(
79✔
1116
                ctx, g.cfg.chainHash,
79✔
1117
        )
79✔
1118
        if err != nil {
79✔
1119
                return nil, err
×
1120
        }
×
1121

1122
        // Once we have the chan ID of the newest, we'll obtain the block height
1123
        // of the channel, then subtract our default horizon to ensure we don't
1124
        // miss any channels. By default, we go back 1 day from the newest
1125
        // channel, unless we're attempting a historical sync, where we'll
1126
        // actually start from the genesis block instead.
1127
        var startHeight uint32
79✔
1128
        switch {
79✔
1129
        case historicalQuery:
22✔
1130
                fallthrough
22✔
1131
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
25✔
1132
                startHeight = 0
25✔
1133
        default:
54✔
1134
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
54✔
1135
        }
1136

1137
        // Determine the number of blocks to request based on our best height.
1138
        // We'll take into account any potential underflows and explicitly set
1139
        // numBlocks to its minimum value of 1 if so.
1140
        bestHeight := g.cfg.bestHeight()
79✔
1141
        numBlocks := bestHeight - startHeight
79✔
1142
        if int64(numBlocks) < 1 {
79✔
1143
                numBlocks = 1
×
1144
        }
×
1145

1146
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
79✔
1147
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
79✔
1148

79✔
1149
        // Finally, we'll craft the channel range query, using our starting
79✔
1150
        // height, then asking for all known channels to the foreseeable end of
79✔
1151
        // the main chain.
79✔
1152
        query := &lnwire.QueryChannelRange{
79✔
1153
                ChainHash:        g.cfg.chainHash,
79✔
1154
                FirstBlockHeight: startHeight,
79✔
1155
                NumBlocks:        numBlocks,
79✔
1156
        }
79✔
1157

79✔
1158
        if !g.cfg.noTimestampQueryOption {
150✔
1159
                query.QueryOptions = lnwire.NewTimestampQueryOption()
71✔
1160
        }
71✔
1161

1162
        g.curQueryRangeMsg = query
79✔
1163

79✔
1164
        return query, nil
79✔
1165
}
1166

1167
// replyPeerQueries is called in response to any query by the remote peer.
1168
// We'll examine our state and send back our best response.
1169
func (g *GossipSyncer) replyPeerQueries(ctx context.Context,
1170
        msg lnwire.Message) error {
8✔
1171

8✔
1172
        switch msg := msg.(type) {
8✔
1173

1174
        // In this state, we'll also handle any incoming channel range queries
1175
        // from the remote peer as they're trying to sync their state as well.
1176
        case *lnwire.QueryChannelRange:
6✔
1177
                return g.replyChanRangeQuery(ctx, msg)
6✔
1178

1179
        // If the remote peer skips straight to requesting new channels that
1180
        // they don't know of, then we'll ensure that we also handle this case.
1181
        case *lnwire.QueryShortChanIDs:
5✔
1182
                return g.replyShortChanIDs(ctx, msg)
5✔
1183

1184
        default:
×
1185
                return fmt.Errorf("unknown message: %T", msg)
×
1186
        }
1187
}
1188

1189
// replyChanRangeQuery will be dispatched in response to a channel range query
1190
// by the remote node. We'll query the channel time series for channels that
1191
// meet the channel range, then chunk our responses to the remote node. We also
1192
// ensure that our final fragment carries the "complete" bit to indicate the
1193
// end of our streaming response.
1194
func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context,
1195
        query *lnwire.QueryChannelRange) error {
12✔
1196

12✔
1197
        // Before responding, we'll check to ensure that the remote peer is
12✔
1198
        // querying for the same chain that we're on. If not, we'll send back a
12✔
1199
        // response with a complete value of zero to indicate we're on a
12✔
1200
        // different chain.
12✔
1201
        if g.cfg.chainHash != query.ChainHash {
13✔
1202
                log.Warnf("Remote peer requested QueryChannelRange for "+
1✔
1203
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1204
                        g.cfg.chainHash)
1✔
1205

1✔
1206
                return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
1✔
1207
                        ChainHash:        query.ChainHash,
1✔
1208
                        FirstBlockHeight: query.FirstBlockHeight,
1✔
1209
                        NumBlocks:        query.NumBlocks,
1✔
1210
                        Complete:         0,
1✔
1211
                        EncodingType:     g.cfg.encodingType,
1✔
1212
                        ShortChanIDs:     nil,
1✔
1213
                })
1✔
1214
        }
1✔
1215

1216
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
11✔
1217
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
11✔
1218
                query.NumBlocks)
11✔
1219

11✔
1220
        // Check if the query asked for timestamps. We will only serve
11✔
1221
        // timestamps if this has not been disabled with
11✔
1222
        // noTimestampQueryOption.
11✔
1223
        withTimestamps := query.WithTimestamps() &&
11✔
1224
                !g.cfg.noTimestampQueryOption
11✔
1225

11✔
1226
        // Next, we'll consult the time series to obtain the set of known
11✔
1227
        // channel ID's that match their query.
11✔
1228
        startBlock := query.FirstBlockHeight
11✔
1229
        endBlock := query.LastBlockHeight()
11✔
1230
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
11✔
1231
                query.ChainHash, startBlock, endBlock, withTimestamps,
11✔
1232
        )
11✔
1233
        if err != nil {
11✔
1234
                return err
×
1235
        }
×
1236

1237
        // TODO(roasbeef): means can't send max uint above?
1238
        //  * or make internal 64
1239

1240
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1241
        // this as there's a transport message size limit which we'll need to
1242
        // adhere to. We also need to make sure all of our replies cover the
1243
        // expected range of the query.
1244
        sendReplyForChunk := func(channelChunk []graphdb.ChannelUpdateInfo,
11✔
1245
                firstHeight, lastHeight uint32, finalChunk bool) error {
27✔
1246

16✔
1247
                // The number of blocks contained in the current chunk (the
16✔
1248
                // total span) is the difference between the last channel ID and
16✔
1249
                // the first in the range. We add one as even if all channels
16✔
1250
                // returned are in the same block, we need to count that.
16✔
1251
                numBlocks := lastHeight - firstHeight + 1
16✔
1252
                complete := uint8(0)
16✔
1253
                if finalChunk {
27✔
1254
                        complete = 1
11✔
1255
                }
11✔
1256

1257
                var timestamps lnwire.Timestamps
16✔
1258
                if withTimestamps {
19✔
1259
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
3✔
1260
                }
3✔
1261

1262
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
16✔
1263
                for i, info := range channelChunk {
33✔
1264
                        scids[i] = info.ShortChannelID
17✔
1265

17✔
1266
                        if !withTimestamps {
31✔
1267
                                continue
14✔
1268
                        }
1269

1270
                        timestamps[i].Timestamp1 = uint32(
3✔
1271
                                info.Node1UpdateTimestamp.Unix(),
3✔
1272
                        )
3✔
1273

3✔
1274
                        timestamps[i].Timestamp2 = uint32(
3✔
1275
                                info.Node2UpdateTimestamp.Unix(),
3✔
1276
                        )
3✔
1277
                }
1278

1279
                return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
16✔
1280
                        ChainHash:        query.ChainHash,
16✔
1281
                        NumBlocks:        numBlocks,
16✔
1282
                        FirstBlockHeight: firstHeight,
16✔
1283
                        Complete:         complete,
16✔
1284
                        EncodingType:     g.cfg.encodingType,
16✔
1285
                        ShortChanIDs:     scids,
16✔
1286
                        Timestamps:       timestamps,
16✔
1287
                })
16✔
1288
        }
1289

1290
        var (
11✔
1291
                firstHeight  = query.FirstBlockHeight
11✔
1292
                lastHeight   uint32
11✔
1293
                channelChunk []graphdb.ChannelUpdateInfo
11✔
1294
        )
11✔
1295

11✔
1296
        // chunkSize is the maximum number of SCIDs that we can safely put in a
11✔
1297
        // single message. If we also need to include timestamps though, then
11✔
1298
        // this number is halved since encoding two timestamps takes the same
11✔
1299
        // number of bytes as encoding an SCID.
11✔
1300
        chunkSize := g.cfg.chunkSize
11✔
1301
        if withTimestamps {
14✔
1302
                chunkSize /= 2
3✔
1303
        }
3✔
1304

1305
        for _, channelRange := range channelRanges {
28✔
1306
                channels := channelRange.Channels
17✔
1307
                numChannels := int32(len(channels))
17✔
1308
                numLeftToAdd := chunkSize - int32(len(channelChunk))
17✔
1309

17✔
1310
                // Include the current block in the ongoing chunk if it can fit
17✔
1311
                // and move on to the next block.
17✔
1312
                if numChannels <= numLeftToAdd {
29✔
1313
                        channelChunk = append(channelChunk, channels...)
12✔
1314
                        continue
12✔
1315
                }
1316

1317
                // Otherwise, we need to send our existing channel chunk as is
1318
                // as its own reply and start a new one for the current block.
1319
                // We'll mark the end of our current chunk as the height before
1320
                // the current block to ensure the whole query range is replied
1321
                // to.
1322
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
5✔
1323
                        g.cfg.peerPub[:], len(channelChunk))
5✔
1324

5✔
1325
                lastHeight = channelRange.Height - 1
5✔
1326
                err := sendReplyForChunk(
5✔
1327
                        channelChunk, firstHeight, lastHeight, false,
5✔
1328
                )
5✔
1329
                if err != nil {
5✔
1330
                        return err
×
1331
                }
×
1332

1333
                // With the reply constructed, we'll start tallying channels for
1334
                // our next one keeping in mind our chunk size. This may result
1335
                // in channels for this block being left out from the reply, but
1336
                // this isn't an issue since we'll randomly shuffle them and we
1337
                // assume a historical gossip sync is performed at a later time.
1338
                firstHeight = channelRange.Height
5✔
1339
                finalChunkSize := numChannels
5✔
1340
                exceedsChunkSize := numChannels > chunkSize
5✔
1341
                if exceedsChunkSize {
5✔
1342
                        rand.Shuffle(len(channels), func(i, j int) {
×
1343
                                channels[i], channels[j] = channels[j], channels[i]
×
1344
                        })
×
1345
                        finalChunkSize = chunkSize
×
1346
                }
1347
                channelChunk = channels[:finalChunkSize]
5✔
1348

5✔
1349
                // Sort the chunk once again if we had to shuffle it.
5✔
1350
                if exceedsChunkSize {
5✔
1351
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1352
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1353
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1354

×
1355
                                return id1 < id2
×
1356
                        })
×
1357
                }
1358
        }
1359

1360
        // Send the remaining chunk as the final reply.
1361
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
11✔
1362
                g.cfg.peerPub[:], len(channelChunk))
11✔
1363

11✔
1364
        return sendReplyForChunk(
11✔
1365
                channelChunk, firstHeight, query.LastBlockHeight(), true,
11✔
1366
        )
11✔
1367
}
1368

1369
// replyShortChanIDs will be dispatched in response to a query by the remote
1370
// node for information concerning a set of short channel ID's. Our response
1371
// will be sent in a streaming chunked manner to ensure that we remain below
1372
// the current transport level message size.
1373
func (g *GossipSyncer) replyShortChanIDs(ctx context.Context,
1374
        query *lnwire.QueryShortChanIDs) error {
7✔
1375

7✔
1376
        // Before responding, we'll check to ensure that the remote peer is
7✔
1377
        // querying for the same chain that we're on. If not, we'll send back a
7✔
1378
        // response with a complete value of zero to indicate we're on a
7✔
1379
        // different chain.
7✔
1380
        if g.cfg.chainHash != query.ChainHash {
8✔
1381
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
1✔
1382
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1383
                        g.cfg.chainHash)
1✔
1384

1✔
1385
                return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
1✔
1386
                        ChainHash: query.ChainHash,
1✔
1387
                        Complete:  0,
1✔
1388
                })
1✔
1389
        }
1✔
1390

1391
        if len(query.ShortChanIDs) == 0 {
6✔
1392
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1393
                        g.cfg.peerPub[:])
×
1394
                return nil
×
1395
        }
×
1396

1397
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
6✔
1398
                g.cfg.peerPub[:], len(query.ShortChanIDs))
6✔
1399

6✔
1400
        // Now that we know we're on the same chain, we'll query the channel
6✔
1401
        // time series for the set of messages that we know of which satisfies
6✔
1402
        // the requirement of being a chan ann, chan update, or a node ann
6✔
1403
        // related to the set of queried channels.
6✔
1404
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
6✔
1405
                query.ChainHash, query.ShortChanIDs,
6✔
1406
        )
6✔
1407
        if err != nil {
6✔
1408
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1409
                        query.ShortChanIDs[0].ToUint64(), err)
×
1410
        }
×
1411

1412
        // Reply with any messages related to those channel ID's, we'll write
1413
        // each one individually and synchronously to throttle the sends and
1414
        // perform buffering of responses in the syncer as opposed to the peer.
1415
        for _, msg := range replyMsgs {
12✔
1416
                err := g.sendToPeerSync(ctx, msg)
6✔
1417
                if err != nil {
6✔
1418
                        return err
×
1419
                }
×
1420
        }
1421

1422
        // Regardless of whether we had any messages to reply with, send over
1423
        // the sentinel message to signal that the stream has terminated.
1424
        return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
6✔
1425
                ChainHash: query.ChainHash,
6✔
1426
                Complete:  1,
6✔
1427
        })
6✔
1428
}
1429

1430
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1431
// state machine. Once applied, we'll ensure that we don't forward any messages
1432
// to the peer that aren't within the time range of the filter.
1433
func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
1434
        filter *lnwire.GossipTimestampRange) error {
64✔
1435

64✔
1436
        g.Lock()
64✔
1437

64✔
1438
        g.remoteUpdateHorizon = filter
64✔
1439

64✔
1440
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
64✔
1441
        endTime := startTime.Add(
64✔
1442
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
64✔
1443
        )
64✔
1444

64✔
1445
        g.Unlock()
64✔
1446

64✔
1447
        // If requested, don't reply with historical gossip data when the remote
64✔
1448
        // peer sets their gossip timestamp range.
64✔
1449
        if g.cfg.ignoreHistoricalFilters {
65✔
1450
                return nil
1✔
1451
        }
1✔
1452

1453
        // Check if a goroutine is already sending the backlog. If so, return
1454
        // early without attempting to acquire the semaphore.
1455
        if g.isSendingBacklog.Load() {
68✔
1456
                log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+
5✔
1457
                        "backlog send already in progress", g.cfg.peerPub[:])
5✔
1458
                return nil
5✔
1459
        }
5✔
1460

1461
        select {
58✔
1462
        case <-g.syncerSema:
58✔
1463
        case <-g.cg.Done():
×
1464
                return ErrGossipSyncerExiting
×
1465
        case <-ctx.Done():
×
1466
                return ctx.Err()
×
1467
        }
1468

1469
        // We don't put this in a defer because if the goroutine is launched,
1470
        // it needs to be called when the goroutine is stopped.
1471
        returnSema := func() {
69✔
1472
                g.syncerSema <- struct{}{}
11✔
1473
        }
11✔
1474

1475
        // Now that the remote peer has applied their filter, we'll query the
1476
        // database for all the messages that are beyond this filter.
1477
        newUpdatestoSend := g.cfg.channelSeries.UpdatesInHorizon(
58✔
1478
                g.cfg.chainHash, startTime, endTime,
58✔
1479
        )
58✔
1480

58✔
1481
        // Create a pull-based iterator so we can check if there are any
58✔
1482
        // updates before launching the goroutine.
58✔
1483
        next, stop := iter.Pull2(newUpdatestoSend)
58✔
1484

58✔
1485
        // Check if we have any updates to send by attempting to get the first
58✔
1486
        // message.
58✔
1487
        firstMsg, firstErr, ok := next()
58✔
1488
        if firstErr != nil {
58✔
1489
                stop()
×
1490
                returnSema()
×
1491
                return firstErr
×
1492
        }
×
1493

1494
        log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
11✔
1495
                "start=%v, end=%v, has_updates=%v", g.cfg.peerPub[:],
11✔
1496
                startTime, endTime, ok)
11✔
1497

11✔
1498
        // If we don't have any to send, then we can return early.
11✔
1499
        if !ok {
20✔
1500
                stop()
9✔
1501
                returnSema()
9✔
1502
                return nil
9✔
1503
        }
9✔
1504

1505
        // Set the atomic flag to indicate we're starting to send the backlog.
1506
        // If the swap fails, it means another goroutine is already active, so
1507
        // we return early.
1508
        if !g.isSendingBacklog.CompareAndSwap(false, true) {
5✔
1509
                returnSema()
×
1510
                log.Debugf("GossipSyncer(%x): another goroutine already "+
×
1511
                        "sending backlog, skipping", g.cfg.peerPub[:])
×
1512

×
1513
                return nil
×
1514
        }
×
1515

1516
        // We'll conclude by launching a goroutine to send out any updates.
1517
        // The goroutine takes ownership of the iterator.
1518
        g.cg.WgAdd(1)
5✔
1519
        go func() {
10✔
1520
                defer g.cg.WgDone()
5✔
1521
                defer returnSema()
5✔
1522
                defer g.isSendingBacklog.Store(false)
5✔
1523
                defer stop()
5✔
1524

5✔
1525
                // Send the first message we already pulled.
5✔
1526
                err := g.sendToPeerSync(ctx, firstMsg)
5✔
1527
                switch {
5✔
1528
                case errors.Is(err, ErrGossipSyncerExiting):
×
1529
                        return
×
1530

1531
                case errors.Is(err, lnpeer.ErrPeerExiting):
×
1532
                        return
×
1533

1534
                case err != nil:
×
1535
                        log.Errorf("Unable to send message for "+
×
1536
                                "peer catch up: %v", err)
×
1537
                }
1538

1539
                // Continue with the rest of the messages using the same pull
1540
                // iterator.
1541
                for {
10✔
1542
                        msg, err, ok := next()
5✔
1543
                        if !ok {
10✔
1544
                                return
5✔
1545
                        }
5✔
1546

1547
                        // If the iterator yielded an error, log it and
1548
                        // continue.
1549
                        if err != nil {
3✔
1550
                                log.Errorf("Error fetching update for peer "+
×
1551
                                        "catch up: %v", err)
×
1552
                                continue
×
1553
                        }
1554

1555
                        err = g.sendToPeerSync(ctx, msg)
3✔
1556
                        switch {
3✔
1557
                        case err == ErrGossipSyncerExiting:
×
1558
                                return
×
1559

1560
                        case err == lnpeer.ErrPeerExiting:
×
1561
                                return
×
1562

1563
                        case err != nil:
×
1564
                                log.Errorf("Unable to send message for "+
×
1565
                                        "peer catch up: %v", err)
×
1566
                        }
1567
                }
1568
        }()
1569

1570
        return nil
5✔
1571
}
1572

1573
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1574
// iff the message is within the bounds of their set gossip filter. If the peer
1575
// doesn't have a gossip filter set, then no messages will be forwarded.
1576
func (g *GossipSyncer) FilterGossipMsgs(ctx context.Context,
1577
        msgs ...msgWithSenders) {
5✔
1578

5✔
1579
        // If the peer doesn't have an update horizon set, then we won't send
5✔
1580
        // it any new update messages.
5✔
1581
        if g.remoteUpdateHorizon == nil {
9✔
1582
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
4✔
1583
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
4✔
1584
                return
4✔
1585
        }
4✔
1586

1587
        // If we've been signaled to exit, or are exiting, then we'll stop
1588
        // short.
1589
        select {
4✔
1590
        case <-g.cg.Done():
×
1591
                return
×
1592
        case <-ctx.Done():
×
1593
                return
×
1594
        default:
4✔
1595
        }
1596

1597
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1598
        // gossiper on peer termination to signal peer disconnect?
1599

1600
        var err error
4✔
1601

4✔
1602
        // Before we filter out the messages, we'll construct an index over the
4✔
1603
        // set of channel announcements and channel updates. This will allow us
4✔
1604
        // to quickly check if we should forward a chan ann, based on the known
4✔
1605
        // channel updates for a channel.
4✔
1606
        chanUpdateIndex := make(
4✔
1607
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
4✔
1608
        )
4✔
1609
        for _, msg := range msgs {
17✔
1610
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
13✔
1611
                if !ok {
23✔
1612
                        continue
10✔
1613
                }
1614

1615
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
6✔
1616
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
6✔
1617
                )
6✔
1618
        }
1619

1620
        // We'll construct a helper function that we'll us below to determine
1621
        // if a given messages passes the gossip msg filter.
1622
        g.Lock()
4✔
1623
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
4✔
1624
        endTime := startTime.Add(
4✔
1625
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
4✔
1626
        )
4✔
1627
        g.Unlock()
4✔
1628

4✔
1629
        passesFilter := func(timeStamp uint32) bool {
17✔
1630
                t := time.Unix(int64(timeStamp), 0)
13✔
1631
                return t.Equal(startTime) ||
13✔
1632
                        (t.After(startTime) && t.Before(endTime))
13✔
1633
        }
13✔
1634

1635
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
4✔
1636
        for _, msg := range msgs {
17✔
1637
                // If the target peer is the peer that sent us this message,
13✔
1638
                // then we'll exit early as we don't need to filter this
13✔
1639
                // message.
13✔
1640
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
16✔
1641
                        continue
3✔
1642
                }
1643

1644
                switch msg := msg.msg.(type) {
13✔
1645

1646
                // For each channel announcement message, we'll only send this
1647
                // message if the channel updates for the channel are between
1648
                // our time range.
1649
                case *lnwire.ChannelAnnouncement1:
7✔
1650
                        // First, we'll check if the channel updates are in
7✔
1651
                        // this message batch.
7✔
1652
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
7✔
1653
                        if !ok {
11✔
1654
                                // If not, we'll attempt to query the database
4✔
1655
                                // to see if we know of the updates.
4✔
1656
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
4✔
1657
                                        g.cfg.chainHash, msg.ShortChannelID,
4✔
1658
                                )
4✔
1659
                                if err != nil {
4✔
1660
                                        log.Warnf("no channel updates found for "+
×
1661
                                                "short_chan_id=%v",
×
1662
                                                msg.ShortChannelID)
×
1663
                                        continue
×
1664
                                }
1665
                        }
1666

1667
                        for _, chanUpdate := range chanUpdates {
14✔
1668
                                if passesFilter(chanUpdate.Timestamp) {
11✔
1669
                                        msgsToSend = append(msgsToSend, msg)
4✔
1670
                                        break
4✔
1671
                                }
1672
                        }
1673

1674
                        if len(chanUpdates) == 0 {
10✔
1675
                                msgsToSend = append(msgsToSend, msg)
3✔
1676
                        }
3✔
1677

1678
                // For each channel update, we'll only send if it the timestamp
1679
                // is between our time range.
1680
                case *lnwire.ChannelUpdate1:
6✔
1681
                        if passesFilter(msg.Timestamp) {
10✔
1682
                                msgsToSend = append(msgsToSend, msg)
4✔
1683
                        }
4✔
1684

1685
                // Similarly, we only send node announcements if the update
1686
                // timestamp ifs between our set gossip filter time range.
1687
                case *lnwire.NodeAnnouncement1:
6✔
1688
                        if passesFilter(msg.Timestamp) {
10✔
1689
                                msgsToSend = append(msgsToSend, msg)
4✔
1690
                        }
4✔
1691
                }
1692
        }
1693

1694
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
4✔
1695
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
4✔
1696

4✔
1697
        if len(msgsToSend) == 0 {
7✔
1698
                return
3✔
1699
        }
3✔
1700

1701
        if err = g.sendToPeer(ctx, msgsToSend...); err != nil {
4✔
1702
                log.Errorf("unable to send gossip msgs: %v", err)
×
1703
        }
×
1704

1705
}
1706

1707
// ProcessQueryMsg is used by outside callers to pass new channel time series
1708
// queries to the internal processing goroutine.
1709
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
115✔
1710
        var msgChan chan lnwire.Message
115✔
1711
        switch msg.(type) {
115✔
1712
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
3✔
1713
                msgChan = g.queryMsgs
3✔
1714

1715
        // Reply messages should only be expected in states where we're waiting
1716
        // for a reply.
1717
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
115✔
1718
                g.Lock()
115✔
1719
                syncState := g.syncState()
115✔
1720
                g.Unlock()
115✔
1721

115✔
1722
                if syncState != waitingQueryRangeReply &&
115✔
1723
                        syncState != waitingQueryChanReply {
116✔
1724

1✔
1725
                        return fmt.Errorf("unexpected msg %T received in "+
1✔
1726
                                "state %v", msg, syncState)
1✔
1727
                }
1✔
1728
                msgChan = g.gossipMsgs
114✔
1729

1730
        default:
×
1731
                msgChan = g.gossipMsgs
×
1732
        }
1733

1734
        select {
114✔
1735
        case msgChan <- msg:
114✔
1736
        case <-peerQuit:
×
1737
        case <-g.cg.Done():
×
1738
        }
1739

1740
        return nil
114✔
1741
}
1742

1743
// setSyncState sets the gossip syncer's state to the given state.
1744
func (g *GossipSyncer) setSyncState(state syncerState) {
153✔
1745
        atomic.StoreUint32(&g.state, uint32(state))
153✔
1746
}
153✔
1747

1748
// syncState returns the current syncerState of the target GossipSyncer.
1749
func (g *GossipSyncer) syncState() syncerState {
532✔
1750
        return syncerState(atomic.LoadUint32(&g.state))
532✔
1751
}
532✔
1752

1753
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1754
// a signal for when the GossipSyncer has reached its chansSynced state.
1755
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
17✔
1756
        g.Lock()
17✔
1757
        defer g.Unlock()
17✔
1758

17✔
1759
        syncedSignal := make(chan struct{})
17✔
1760

17✔
1761
        syncState := syncerState(atomic.LoadUint32(&g.state))
17✔
1762
        if syncState == chansSynced {
22✔
1763
                close(syncedSignal)
5✔
1764
                return syncedSignal
5✔
1765
        }
5✔
1766

1767
        g.syncedSignal = syncedSignal
15✔
1768
        return g.syncedSignal
15✔
1769
}
1770

1771
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1772
// sync type to a new one.
1773
//
1774
// NOTE: This can only be done once the gossip syncer has reached its final
1775
// chansSynced state.
1776
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
17✔
1777
        errChan := make(chan error, 1)
17✔
1778
        select {
17✔
1779
        case g.syncTransitionReqs <- &syncTransitionReq{
1780
                newSyncType: newSyncType,
1781
                errChan:     errChan,
1782
        }:
17✔
1783
        case <-time.After(syncTransitionTimeout):
×
1784
                return ErrSyncTransitionTimeout
×
1785
        case <-g.cg.Done():
×
1786
                return ErrGossipSyncerExiting
×
1787
        }
1788

1789
        select {
17✔
1790
        case err := <-errChan:
17✔
1791
                return err
17✔
1792
        case <-g.cg.Done():
×
1793
                return ErrGossipSyncerExiting
×
1794
        }
1795
}
1796

1797
// handleSyncTransition handles a new sync type transition request.
1798
//
1799
// NOTE: The gossip syncer might have another sync state as a result of this
1800
// transition.
1801
func (g *GossipSyncer) handleSyncTransition(ctx context.Context,
1802
        req *syncTransitionReq) error {
17✔
1803

17✔
1804
        // Return early from any NOP sync transitions.
17✔
1805
        syncType := g.SyncType()
17✔
1806
        if syncType == req.newSyncType {
17✔
1807
                return nil
×
1808
        }
×
1809

1810
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
17✔
1811
                g.cfg.peerPub, syncType, req.newSyncType)
17✔
1812

17✔
1813
        var (
17✔
1814
                firstTimestamp time.Time
17✔
1815
                timestampRange uint32
17✔
1816
        )
17✔
1817

17✔
1818
        switch req.newSyncType {
17✔
1819
        // If an active sync has been requested, then we should resume receiving
1820
        // new graph updates from the remote peer.
1821
        case ActiveSync, PinnedSync:
15✔
1822
                firstTimestamp = time.Now()
15✔
1823
                timestampRange = math.MaxUint32
15✔
1824

1825
        // If a PassiveSync transition has been requested, then we should no
1826
        // longer receive any new updates from the remote peer. We can do this
1827
        // by setting our update horizon to a range in the past ensuring no
1828
        // graph updates match the timestamp range.
1829
        case PassiveSync:
2✔
1830
                firstTimestamp = zeroTimestamp
2✔
1831
                timestampRange = 0
2✔
1832

1833
        default:
×
1834
                return fmt.Errorf("unhandled sync transition %v",
×
1835
                        req.newSyncType)
×
1836
        }
1837

1838
        err := g.sendGossipTimestampRange(ctx, firstTimestamp, timestampRange)
17✔
1839
        if err != nil {
17✔
1840
                return fmt.Errorf("unable to send local update horizon: %w",
×
1841
                        err)
×
1842
        }
×
1843

1844
        g.setSyncType(req.newSyncType)
17✔
1845

17✔
1846
        return nil
17✔
1847
}
1848

1849
// setSyncType sets the gossip syncer's sync type to the given type.
1850
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
70✔
1851
        atomic.StoreUint32(&g.syncType, uint32(syncType))
70✔
1852
}
70✔
1853

1854
// SyncType returns the current SyncerType of the target GossipSyncer.
1855
func (g *GossipSyncer) SyncType() SyncerType {
402✔
1856
        return SyncerType(atomic.LoadUint32(&g.syncType))
402✔
1857
}
402✔
1858

1859
// historicalSync sends a request to the gossip syncer to perofmr a historical
1860
// sync.
1861
//
1862
// NOTE: This can only be done once the gossip syncer has reached its final
1863
// chansSynced state.
1864
func (g *GossipSyncer) historicalSync() error {
19✔
1865
        done := make(chan struct{})
19✔
1866

19✔
1867
        select {
19✔
1868
        case g.historicalSyncReqs <- &historicalSyncReq{
1869
                doneChan: done,
1870
        }:
19✔
1871
        case <-time.After(syncTransitionTimeout):
×
1872
                return ErrSyncTransitionTimeout
×
1873
        case <-g.cg.Done():
×
1874
                return ErrGossiperShuttingDown
×
1875
        }
1876

1877
        select {
19✔
1878
        case <-done:
19✔
1879
                return nil
19✔
1880
        case <-g.cg.Done():
×
1881
                return ErrGossiperShuttingDown
×
1882
        }
1883
}
1884

1885
// handleHistoricalSync handles a request to the gossip syncer to perform a
1886
// historical sync.
1887
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
19✔
1888
        // We'll go back to our initial syncingChans state in order to request
19✔
1889
        // the remote peer to give us all of the channel IDs they know of
19✔
1890
        // starting from the genesis block.
19✔
1891
        g.genHistoricalChanRangeQuery = true
19✔
1892
        g.setSyncState(syncingChans)
19✔
1893
        close(req.doneChan)
19✔
1894
}
19✔
1895

1896
// sendToPeer sends a variadic number of messages to the remote peer. This
1897
// method should not block while waiting for sends to be written to the wire.
1898
func (g *GossipSyncer) sendToPeer(ctx context.Context,
1899
        msgs ...lnwire.Message) error {
111✔
1900

111✔
1901
        return g.sendMsgRateLimited(ctx, false, msgs...)
111✔
1902
}
111✔
1903

1904
// sendToPeerSync sends a variadic number of messages to the remote peer,
1905
// blocking until all messages have been sent successfully or a write error is
1906
// encountered.
1907
func (g *GossipSyncer) sendToPeerSync(ctx context.Context,
1908
        msgs ...lnwire.Message) error {
26✔
1909

26✔
1910
        return g.sendMsgRateLimited(ctx, true, msgs...)
26✔
1911
}
26✔
1912

1913
// sendMsgRateLimited sends a variadic number of messages to the remote peer,
1914
// applying our per-peer rate limit before each send. The sync boolean
1915
// determines if the send is blocking or not.
1916
func (g *GossipSyncer) sendMsgRateLimited(ctx context.Context, sync bool,
1917
        msgs ...lnwire.Message) error {
134✔
1918

134✔
1919
        for _, msg := range msgs {
270✔
1920
                err := maybeRateLimitMsg(
136✔
1921
                        ctx, g.rateLimiter, g.cfg.peerPub, msg, g.cg.Done(),
136✔
1922
                )
136✔
1923
                if err != nil {
136✔
1924
                        return err
×
1925
                }
×
1926

1927
                err = g.cfg.sendMsg(ctx, sync, msg)
136✔
1928
                if err != nil {
140✔
1929
                        return err
4✔
1930
                }
4✔
1931
        }
1932

1933
        return nil
130✔
1934
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc