• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18915193695

29 Oct 2025 04:31PM UTC coverage: 66.644% (+0.005%) from 66.639%
18915193695

Pull #10329

github

web-flow
Merge 1c46ace26 into f938e40af
Pull Request #10329: Gossiper - Bugfix context check

15 of 25 new or added lines in 1 file covered. (60.0%)

63 existing lines in 23 files now uncovered.

137259 of 205959 relevant lines covered (66.64%)

21286.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.12
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "iter"
8
        "math"
9
        "math/rand"
10
        "sort"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/chaincfg/chainhash"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/graph"
18
        graphdb "github.com/lightningnetwork/lnd/graph/db"
19
        "github.com/lightningnetwork/lnd/lnpeer"
20
        "github.com/lightningnetwork/lnd/lnwire"
21
        "golang.org/x/time/rate"
22
)
23

24
// SyncerType encapsulates the different types of syncing mechanisms for a
25
// gossip syncer.
26
type SyncerType uint8
27

28
const (
29
        // ActiveSync denotes that a gossip syncer:
30
        //
31
        // 1. Should not attempt to synchronize with the remote peer for
32
        //    missing channels.
33
        // 2. Should respond to queries from the remote peer.
34
        // 3. Should receive new updates from the remote peer.
35
        //
36
        // They are started in a chansSynced state in order to accomplish their
37
        // responsibilities above.
38
        ActiveSync SyncerType = iota
39

40
        // PassiveSync denotes that a gossip syncer:
41
        //
42
        // 1. Should not attempt to synchronize with the remote peer for
43
        //    missing channels.
44
        // 2. Should respond to queries from the remote peer.
45
        // 3. Should not receive new updates from the remote peer.
46
        //
47
        // They are started in a chansSynced state in order to accomplish their
48
        // responsibilities above.
49
        PassiveSync
50

51
        // PinnedSync denotes an ActiveSync that doesn't count towards the
52
        // default active syncer limits and is always active throughout the
53
        // duration of the peer's connection. Each pinned syncer will begin by
54
        // performing a historical sync to ensure we are well synchronized with
55
        // their routing table.
56
        PinnedSync
57
)
58

59
const (
60
        // defaultTimestampQueueSize is the size of the timestamp range queue
61
        // used.
62
        defaultTimestampQueueSize = 1
63
)
64

65
// String returns a human readable string describing the target SyncerType.
66
func (t SyncerType) String() string {
3✔
67
        switch t {
3✔
68
        case ActiveSync:
3✔
69
                return "ActiveSync"
3✔
70
        case PassiveSync:
3✔
71
                return "PassiveSync"
3✔
72
        case PinnedSync:
3✔
73
                return "PinnedSync"
3✔
74
        default:
×
75
                return fmt.Sprintf("unknown sync type %d", t)
×
76
        }
77
}
78

79
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
80
// allowing new gossip messages to be received from the peer.
81
func (t SyncerType) IsActiveSync() bool {
45✔
82
        switch t {
45✔
83
        case ActiveSync, PinnedSync:
15✔
84
                return true
15✔
85
        default:
33✔
86
                return false
33✔
87
        }
88
}
89

90
// syncerState is an enum that represents the current state of the GossipSyncer.
91
// As the syncer is a state machine, we'll gate our actions based off of the
92
// current state and the next incoming message.
93
type syncerState uint32
94

95
const (
96
        // syncingChans is the default state of the GossipSyncer. We start in
97
        // this state when a new peer first connects and we don't yet know if
98
        // we're fully synchronized.
99
        syncingChans syncerState = iota
100

101
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
102
        // We enter this state after we send out our first QueryChannelRange
103
        // reply. We'll stay in this state until the remote party sends us a
104
        // ReplyShortChanIDsEnd message that indicates they've responded to our
105
        // query entirely. After this state, we'll transition to
106
        // waitingQueryChanReply after we send out requests for all the new
107
        // chan ID's to us.
108
        waitingQueryRangeReply
109

110
        // queryNewChannels is the third main phase of the GossipSyncer.  In
111
        // this phase we'll send out all of our QueryShortChanIDs messages in
112
        // response to the new channels that we don't yet know about.
113
        queryNewChannels
114

115
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
116
        // We enter this phase once we've sent off a query chink to the remote
117
        // peer.  We'll stay in this phase until we receive a
118
        // ReplyShortChanIDsEnd message which indicates that the remote party
119
        // has responded to all of our requests.
120
        waitingQueryChanReply
121

122
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
123
        // this phase, we'll send out our update horizon, which filters out the
124
        // set of channel updates that we're interested in. In this state,
125
        // we'll be able to accept any outgoing messages from the
126
        // AuthenticatedGossiper, and decide if we should forward them to our
127
        // target peer based on its update horizon.
128
        chansSynced
129

130
        // syncerIdle is a state in which the gossip syncer can handle external
131
        // requests to transition or perform historical syncs. It is used as the
132
        // initial state for pinned syncers, as well as a fallthrough case for
133
        // chansSynced allowing fully synced peers to facilitate requests.
134
        syncerIdle
135
)
136

137
// String returns a human readable string describing the target syncerState.
138
func (s syncerState) String() string {
4✔
139
        switch s {
4✔
140
        case syncingChans:
3✔
141
                return "syncingChans"
3✔
142

143
        case waitingQueryRangeReply:
3✔
144
                return "waitingQueryRangeReply"
3✔
145

146
        case queryNewChannels:
3✔
147
                return "queryNewChannels"
3✔
148

149
        case waitingQueryChanReply:
3✔
150
                return "waitingQueryChanReply"
3✔
151

152
        case chansSynced:
4✔
153
                return "chansSynced"
4✔
154

155
        case syncerIdle:
3✔
156
                return "syncerIdle"
3✔
157

158
        default:
×
159
                return "UNKNOWN STATE"
×
160
        }
161
}
162

163
const (
164
        // maxQueryChanRangeReplies specifies the default limit of replies to
165
        // process for a single QueryChannelRange request.
166
        maxQueryChanRangeReplies = 500
167

168
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
169
        // the maximum number of replies allowed for zlib encoded replies.
170
        maxQueryChanRangeRepliesZlibFactor = 4
171

172
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
173
        // asking the remote peer for their any channels they know of beyond
174
        // our highest known channel ID.
175
        chanRangeQueryBuffer = 144
176

177
        // syncTransitionTimeout is the default timeout in which we'll wait up
178
        // to when attempting to perform a sync transition.
179
        syncTransitionTimeout = 5 * time.Second
180

181
        // requestBatchSize is the maximum number of channels we will query the
182
        // remote peer for in a QueryShortChanIDs message.
183
        requestBatchSize = 500
184

185
        // syncerBufferSize is the size of the syncer's buffers.
186
        syncerBufferSize = 50
187
)
188

189
var (
190
        // encodingTypeToChunkSize maps an encoding type, to the max number of
191
        // short chan ID's using the encoding type that we can fit into a
192
        // single message safely.
193
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
194
                lnwire.EncodingSortedPlain: 8000,
195
        }
196

197
        // ErrGossipSyncerExiting signals that the syncer has been killed.
198
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
199

200
        // ErrSyncTransitionTimeout is an error returned when we've timed out
201
        // attempting to perform a sync transition.
202
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
203
                "transition sync type")
204

205
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
206
        // peers that we do not want to receive any new graph updates.
207
        zeroTimestamp time.Time
208
)
209

210
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
211
type syncTransitionReq struct {
212
        newSyncType SyncerType
213
        errChan     chan error
214
}
215

216
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
217
// historical sync.
218
type historicalSyncReq struct {
219
        // doneChan is a channel that serves as a signal and is closed to ensure
220
        // the historical sync is attempted by the time we return to the caller.
221
        doneChan chan struct{}
222
}
223

224
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
225
// needs to carry out its duties.
226
type gossipSyncerCfg struct {
227
        // chainHash is the chain that this syncer is responsible for.
228
        chainHash chainhash.Hash
229

230
        // peerPub is the public key of the peer we're syncing with, serialized
231
        // in compressed format.
232
        peerPub [33]byte
233

234
        // channelSeries is the primary interface that we'll use to generate
235
        // our queries and respond to the queries of the remote peer.
236
        channelSeries ChannelGraphTimeSeries
237

238
        // encodingType is the current encoding type we're aware of. Requests
239
        // with different encoding types will be rejected.
240
        encodingType lnwire.QueryEncoding
241

242
        // chunkSize is the max number of short chan IDs using the syncer's
243
        // encoding type that we can fit into a single message safely.
244
        chunkSize int32
245

246
        // batchSize is the max number of channels the syncer will query from
247
        // the remote node in a single QueryShortChanIDs request.
248
        batchSize int32
249

250
        // sendMsg sends a variadic number of messages to the remote peer.
251
        // The boolean indicates whether this method should be blocked or not
252
        // while waiting for sends to be written to the wire.
253
        sendMsg func(context.Context, bool, ...lnwire.Message) error
254

255
        // noSyncChannels will prevent the GossipSyncer from spawning a
256
        // channelGraphSyncer, meaning we will not try to reconcile unknown
257
        // channels with the remote peer.
258
        noSyncChannels bool
259

260
        // noReplyQueries will prevent the GossipSyncer from spawning a
261
        // replyHandler, meaning we will not reply to queries from our remote
262
        // peer.
263
        noReplyQueries bool
264

265
        // noTimestampQueryOption will prevent the GossipSyncer from querying
266
        // timestamps of announcement messages from the peer, and it will
267
        // prevent it from responding to timestamp queries.
268
        noTimestampQueryOption bool
269

270
        // ignoreHistoricalFilters will prevent syncers from replying with
271
        // historical data when the remote peer sets a gossip_timestamp_range.
272
        // This prevents ranges with old start times from causing us to dump the
273
        // graph on connect.
274
        ignoreHistoricalFilters bool
275

276
        // bestHeight returns the latest height known of the chain.
277
        bestHeight func() uint32
278

279
        // markGraphSynced updates the SyncManager's perception of whether we
280
        // have completed at least one historical sync.
281
        markGraphSynced func()
282

283
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
284
        // for a single QueryChannelRange request.
285
        maxQueryChanRangeReplies uint32
286

287
        // isStillZombieChannel takes the timestamps of the latest channel
288
        // updates for a channel and returns true if the channel should be
289
        // considered a zombie based on these timestamps.
290
        isStillZombieChannel func(time.Time, time.Time) bool
291

292
        // timestampQueueSize is the size of the timestamp range queue. If not
293
        // set, defaults to the global timestampQueueSize constant.
294
        timestampQueueSize int
295

296
        // msgBytesPerSecond is the allotted bandwidth rate, expressed in
297
        // bytes/second that this gossip syncer can consume. Once we exceed this
298
        // rate, message sending will block until we're below the rate.
299
        msgBytesPerSecond uint64
300
}
301

302
// GossipSyncer is a struct that handles synchronizing the channel graph state
303
// with a remote peer. The GossipSyncer implements a state machine that will
304
// progressively ensure we're synchronized with the channel state of the remote
305
// node. Once both nodes have been synchronized, we'll use an update filter to
306
// filter out which messages should be sent to a remote peer based on their
307
// update horizon. If the update horizon isn't specified, then we won't send
308
// them any channel updates at all.
309
type GossipSyncer struct {
310
        started sync.Once
311
        stopped sync.Once
312

313
        // state is the current state of the GossipSyncer.
314
        //
315
        // NOTE: This variable MUST be used atomically.
316
        state uint32
317

318
        // syncType denotes the SyncerType the gossip syncer is currently
319
        // exercising.
320
        //
321
        // NOTE: This variable MUST be used atomically.
322
        syncType uint32
323

324
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
325
        // use this to properly filter out any messages.
326
        remoteUpdateHorizon *lnwire.GossipTimestampRange
327

328
        // localUpdateHorizon is our local update horizon, we'll use this to
329
        // determine if we've already sent out our update.
330
        localUpdateHorizon *lnwire.GossipTimestampRange
331

332
        // syncTransitions is a channel through which new sync type transition
333
        // requests will be sent through. These requests should only be handled
334
        // when the gossip syncer is in a chansSynced state to ensure its state
335
        // machine behaves as expected.
336
        syncTransitionReqs chan *syncTransitionReq
337

338
        // historicalSyncReqs is a channel that serves as a signal for the
339
        // gossip syncer to perform a historical sync. These can only be done
340
        // once the gossip syncer is in a chansSynced state to ensure its state
341
        // machine behaves as expected.
342
        historicalSyncReqs chan *historicalSyncReq
343

344
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
345
        // that it should request the remote peer for all of its known channel
346
        // IDs starting from the genesis block of the chain. This can only
347
        // happen if the gossip syncer receives a request to attempt a
348
        // historical sync. It can be unset if the syncer ever transitions from
349
        // PassiveSync to ActiveSync.
350
        genHistoricalChanRangeQuery bool
351

352
        // gossipMsgs is a channel that all responses to our queries from the
353
        // target peer will be sent over, these will be read by the
354
        // channelGraphSyncer.
355
        gossipMsgs chan lnwire.Message
356

357
        // queryMsgs is a channel that all queries from the remote peer will be
358
        // received over, these will be read by the replyHandler.
359
        queryMsgs chan lnwire.Message
360

361
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
362
        // we've sent to a peer to ensure we've consumed all expected replies.
363
        // This field is primarily used within the waitingQueryChanReply state.
364
        curQueryRangeMsg *lnwire.QueryChannelRange
365

366
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
367
        // message we've received from a peer to ensure they've fully replied to
368
        // our query by ensuring they covered our requested block range. This
369
        // field is primarily used within the waitingQueryChanReply state.
370
        prevReplyChannelRange *lnwire.ReplyChannelRange
371

372
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
373
        // buffer all the chunked response to our query.
374
        bufferedChanRangeReplies []graphdb.ChannelUpdateInfo
375

376
        // numChanRangeRepliesRcvd is used to track the number of replies
377
        // received as part of a QueryChannelRange. This field is primarily used
378
        // within the waitingQueryChanReply state.
379
        numChanRangeRepliesRcvd uint32
380

381
        // newChansToQuery is used to pass the set of channels we should query
382
        // for from the waitingQueryChanReply state to the queryNewChannels
383
        // state.
384
        newChansToQuery []lnwire.ShortChannelID
385

386
        cfg gossipSyncerCfg
387

388
        // syncedSignal is a channel that, if set, will be closed when the
389
        // GossipSyncer reaches its terminal chansSynced state.
390
        syncedSignal chan struct{}
391

392
        // syncerSema is used to more finely control the syncer's ability to
393
        // respond to gossip timestamp range messages.
394
        syncerSema chan struct{}
395

396
        // timestampRangeQueue is a buffered channel for queuing timestamp range
397
        // messages that need to be processed asynchronously. This prevents the
398
        // gossiper from blocking when ApplyGossipFilter is called.
399
        timestampRangeQueue chan *lnwire.GossipTimestampRange
400

401
        // isSendingBacklog is an atomic flag that indicates whether a goroutine
402
        // is currently sending the backlog of messages. This ensures only one
403
        // goroutine is active at a time.
404
        isSendingBacklog atomic.Bool
405

406
        sync.Mutex
407

408
        // cg is a helper that encapsulates a wait group and quit channel and
409
        // allows contexts that either block or cancel on those depending on
410
        // the use case.
411
        cg *fn.ContextGuard
412

413
        // rateLimiter dictates the frequency with which we will reply to gossip
414
        // queries to this peer.
415
        rateLimiter *rate.Limiter
416
}
417

418
// newGossipSyncer returns a new instance of the GossipSyncer populated using
419
// the passed config.
420
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
158✔
421
        // Use the configured queue size if set, otherwise use the default.
158✔
422
        queueSize := cfg.timestampQueueSize
158✔
423
        if queueSize == 0 {
186✔
424
                queueSize = defaultTimestampQueueSize
28✔
425
        }
28✔
426

427
        bytesPerSecond := cfg.msgBytesPerSecond
158✔
428
        if bytesPerSecond == 0 {
313✔
429
                bytesPerSecond = DefaultPeerMsgBytesPerSecond
155✔
430
        }
155✔
431
        bytesBurst := 2 * bytesPerSecond
158✔
432

158✔
433
        // We'll use this rate limiter to limit this single peer.
158✔
434
        rateLimiter := rate.NewLimiter(
158✔
435
                rate.Limit(bytesPerSecond), int(bytesBurst),
158✔
436
        )
158✔
437

158✔
438
        return &GossipSyncer{
158✔
439
                cfg:                cfg,
158✔
440
                syncTransitionReqs: make(chan *syncTransitionReq),
158✔
441
                historicalSyncReqs: make(chan *historicalSyncReq),
158✔
442
                gossipMsgs:         make(chan lnwire.Message, syncerBufferSize),
158✔
443
                queryMsgs:          make(chan lnwire.Message, syncerBufferSize),
158✔
444
                timestampRangeQueue: make(
158✔
445
                        chan *lnwire.GossipTimestampRange, queueSize,
158✔
446
                ),
158✔
447
                syncerSema:  sema,
158✔
448
                cg:          fn.NewContextGuard(),
158✔
449
                rateLimiter: rateLimiter,
158✔
450
        }
158✔
451
}
452

453
// Start starts the GossipSyncer and any goroutines that it needs to carry out
454
// its duties.
455
func (g *GossipSyncer) Start() {
103✔
456
        g.started.Do(func() {
206✔
457
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
103✔
458

103✔
459
                ctx, _ := g.cg.Create(context.Background())
103✔
460

103✔
461
                // TODO(conner): only spawn channelGraphSyncer if remote
103✔
462
                // supports gossip queries, and only spawn replyHandler if we
103✔
463
                // advertise support
103✔
464
                if !g.cfg.noSyncChannels {
205✔
465
                        g.cg.WgAdd(1)
102✔
466
                        go g.channelGraphSyncer(ctx)
102✔
467
                }
102✔
468
                if !g.cfg.noReplyQueries {
205✔
469
                        g.cg.WgAdd(1)
102✔
470
                        go g.replyHandler(ctx)
102✔
471
                }
102✔
472

473
                // Start the timestamp range queue processor to handle gossip
474
                // filter applications asynchronously.
475
                if !g.cfg.noTimestampQueryOption {
197✔
476
                        g.cg.WgAdd(1)
94✔
477
                        go g.processTimestampRangeQueue(ctx)
94✔
478
                }
94✔
479
        })
480
}
481

482
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
483
// exited.
484
func (g *GossipSyncer) Stop() {
100✔
485
        g.stopped.Do(func() {
200✔
486
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
100✔
487
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
100✔
488

100✔
489
                g.cg.Quit()
100✔
490
        })
100✔
491
}
492

493
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
494
// in this state, we will send a QueryChannelRange msg to our peer and advance
495
// the syncer's state to waitingQueryRangeReply.
496
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
87✔
497
        // Prepare the query msg.
87✔
498
        queryRangeMsg, err := g.genChanRangeQuery(
87✔
499
                ctx, g.genHistoricalChanRangeQuery,
87✔
500
        )
87✔
501
        if err != nil {
87✔
502
                log.Errorf("Unable to gen chan range query: %v", err)
×
503
                return
×
504
        }
×
505

506
        // Acquire a lock so the following state transition is atomic.
507
        //
508
        // NOTE: We must lock the following steps as it's possible we get an
509
        // immediate response (ReplyChannelRange) after sending the query msg.
510
        // The response is handled in ProcessQueryMsg, which requires the
511
        // current state to be waitingQueryRangeReply.
512
        g.Lock()
87✔
513
        defer g.Unlock()
87✔
514

87✔
515
        // Send the msg to the remote peer, which is non-blocking as
87✔
516
        // `sendToPeer` only queues the msg in Brontide.
87✔
517
        err = g.sendToPeer(ctx, queryRangeMsg)
87✔
518
        if err != nil {
87✔
519
                log.Errorf("Unable to send chan range query: %v", err)
×
520
                return
×
521
        }
×
522

523
        // With the message sent successfully, we'll transition into the next
524
        // state where we wait for their reply.
525
        g.setSyncState(waitingQueryRangeReply)
87✔
526
}
527

528
// channelGraphSyncer is the main goroutine responsible for ensuring that we
529
// properly channel graph state with the remote peer, and also that we only
530
// send them messages which actually pass their defined update horizon.
531
func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
102✔
532
        defer g.cg.WgDone()
102✔
533

102✔
534
        for {
440✔
535
                // Check cancellation before processing any state.
338✔
536
                select {
338✔
537
                case <-ctx.Done():
1✔
538
                        log.Debugf("GossipSyncer(%x): context canceled, "+
1✔
539
                                "exiting", g.cfg.peerPub[:])
1✔
540

1✔
541
                        return
1✔
542
                case <-g.cg.Done():
2✔
543
                        log.Debugf("GossipSyncer(%x): shutting down",
2✔
544
                                g.cfg.peerPub[:])
2✔
545

2✔
546
                        return
2✔
547
                default:
335✔
548
                }
549

550
                state := g.syncState()
335✔
551
                syncType := g.SyncType()
335✔
552

335✔
553
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
335✔
554
                        g.cfg.peerPub[:], state, syncType)
335✔
555

335✔
556
                switch state {
335✔
557
                // When we're in this state, we're trying to synchronize our
558
                // view of the network with the remote peer. We'll kick off
559
                // this sync by asking them for the set of channels they
560
                // understand, as we'll as responding to any other queries by
561
                // them.
562
                case syncingChans:
87✔
563
                        g.handleSyncingChans(ctx)
87✔
564

565
                // In this state, we've sent out our initial channel range
566
                // query and are waiting for the final response from the remote
567
                // peer before we perform a diff to see with channels they know
568
                // of that we don't.
569
                case waitingQueryRangeReply:
189✔
570
                        // We'll wait to either process a new message from the
189✔
571
                        // remote party, or exit due to the gossiper exiting,
189✔
572
                        // or us being signalled to do so.
189✔
573
                        select {
189✔
574
                        case msg := <-g.gossipMsgs:
120✔
575
                                // The remote peer is sending a response to our
120✔
576
                                // initial query, we'll collate this response,
120✔
577
                                // and see if it's the final one in the series.
120✔
578
                                // If so, we can then transition to querying
120✔
579
                                // for the new channels.
120✔
580
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
120✔
581
                                if ok {
240✔
582
                                        err := g.processChanRangeReply(
120✔
583
                                                ctx, queryReply,
120✔
584
                                        )
120✔
585
                                        if err != nil {
120✔
586
                                                log.Errorf("Unable to "+
×
587
                                                        "process chan range "+
×
588
                                                        "query: %v", err)
×
589
                                                return
×
590
                                        }
×
591
                                        continue
120✔
592
                                }
593

594
                                log.Warnf("Unexpected message: %T in state=%v",
×
595
                                        msg, state)
×
596

UNCOV
597
                        case <-g.cg.Done():
×
UNCOV
598
                                return
×
599

600
                        case <-ctx.Done():
69✔
601
                                return
69✔
602
                        }
603

604
                // We'll enter this state once we've discovered which channels
605
                // the remote party knows of that we don't yet know of
606
                // ourselves.
607
                case queryNewChannels:
6✔
608
                        // First, we'll attempt to continue our channel
6✔
609
                        // synchronization by continuing to send off another
6✔
610
                        // query chunk.
6✔
611
                        done := g.synchronizeChanIDs(ctx)
6✔
612

6✔
613
                        // If this wasn't our last query, then we'll need to
6✔
614
                        // transition to our waiting state.
6✔
615
                        if !done {
11✔
616
                                continue
5✔
617
                        }
618

619
                        // If we're fully synchronized, then we can transition
620
                        // to our terminal state.
621
                        g.setSyncState(chansSynced)
4✔
622

4✔
623
                        // Ensure that the sync manager becomes aware that the
4✔
624
                        // historical sync completed so synced_to_graph is
4✔
625
                        // updated over rpc.
4✔
626
                        g.cfg.markGraphSynced()
4✔
627

628
                // In this state, we've just sent off a new query for channels
629
                // that we don't yet know of. We'll remain in this state until
630
                // the remote party signals they've responded to our query in
631
                // totality.
632
                case waitingQueryChanReply:
5✔
633
                        // Once we've sent off our query, we'll wait for either
5✔
634
                        // an ending reply, or just another query from the
5✔
635
                        // remote peer.
5✔
636
                        select {
5✔
637
                        case msg := <-g.gossipMsgs:
5✔
638
                                // If this is the final reply to one of our
5✔
639
                                // queries, then we'll loop back into our query
5✔
640
                                // state to send of the remaining query chunks.
5✔
641
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
5✔
642
                                if ok {
10✔
643
                                        g.setSyncState(queryNewChannels)
5✔
644
                                        continue
5✔
645
                                }
646

647
                                log.Warnf("Unexpected message: %T in state=%v",
×
648
                                        msg, state)
×
649

650
                        case <-g.cg.Done():
×
651
                                return
×
652

653
                        case <-ctx.Done():
×
654
                                return
×
655
                        }
656

657
                // This is our final terminal state where we'll only reply to
658
                // any further queries by the remote peer.
659
                case chansSynced:
57✔
660
                        g.Lock()
57✔
661
                        if g.syncedSignal != nil {
68✔
662
                                close(g.syncedSignal)
11✔
663
                                g.syncedSignal = nil
11✔
664
                        }
11✔
665
                        g.Unlock()
57✔
666

57✔
667
                        // If we haven't yet sent out our update horizon, and
57✔
668
                        // we want to receive real-time channel updates, we'll
57✔
669
                        // do so now.
57✔
670
                        if g.localUpdateHorizon == nil &&
57✔
671
                                syncType.IsActiveSync() {
72✔
672

15✔
673
                                err := g.sendGossipTimestampRange(
15✔
674
                                        ctx, time.Now(), math.MaxUint32,
15✔
675
                                )
15✔
676
                                if err != nil {
15✔
677
                                        log.Errorf("Unable to send update "+
×
678
                                                "horizon to %x: %v",
×
679
                                                g.cfg.peerPub, err)
×
680
                                }
×
681
                        }
682
                        // With our horizon set, we'll simply reply to any new
683
                        // messages or process any state transitions and exit if
684
                        // needed.
685
                        fallthrough
57✔
686

687
                // Pinned peers will begin in this state, since they will
688
                // immediately receive a request to perform a historical sync.
689
                // Otherwise, we fall through after ending in chansSynced to
690
                // facilitate new requests.
691
                case syncerIdle:
60✔
692
                        select {
60✔
693
                        case req := <-g.syncTransitionReqs:
17✔
694
                                req.errChan <- g.handleSyncTransition(ctx, req)
17✔
695

696
                        case req := <-g.historicalSyncReqs:
19✔
697
                                g.handleHistoricalSync(req)
19✔
698

699
                        case <-g.cg.Done():
×
700
                                return
×
701

702
                        case <-ctx.Done():
27✔
703
                                return
27✔
704
                        }
705
                }
706
        }
707
}
708

709
// replyHandler is an event loop whose sole purpose is to reply to the remote
710
// peers queries. Our replyHandler will respond to messages generated by their
711
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
712
// the other's replyHandler, allowing the replyHandler to operate independently
713
// from the state machine maintained on the same node.
714
//
715
// NOTE: This method MUST be run as a goroutine.
716
func (g *GossipSyncer) replyHandler(ctx context.Context) {
102✔
717
        defer g.cg.WgDone()
102✔
718

102✔
719
        for {
209✔
720
                select {
107✔
721
                case msg := <-g.queryMsgs:
8✔
722
                        err := g.replyPeerQueries(ctx, msg)
8✔
723
                        switch {
8✔
724
                        case err == ErrGossipSyncerExiting:
×
725
                                return
×
726

727
                        case err == lnpeer.ErrPeerExiting:
×
728
                                return
×
729

730
                        case err != nil:
×
731
                                log.Errorf("Unable to reply to peer "+
×
732
                                        "query: %v", err)
×
733
                        }
734

735
                case <-g.cg.Done():
3✔
736
                        return
3✔
737

738
                case <-ctx.Done():
96✔
739
                        return
96✔
740
                }
741
        }
742
}
743

744
// processTimestampRangeQueue handles timestamp range messages from the queue
745
// asynchronously. This prevents blocking the gossiper when rate limiting is
746
// active and multiple peers are trying to apply gossip filters.
747
func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) {
94✔
748
        defer g.cg.WgDone()
94✔
749

94✔
750
        for {
193✔
751
                select {
99✔
752
                case msg := <-g.timestampRangeQueue:
71✔
753
                        // Process the timestamp range message. If we hit an
71✔
754
                        // error, log it but continue processing to avoid
71✔
755
                        // blocking the queue.
71✔
756
                        err := g.ApplyGossipFilter(ctx, msg)
71✔
757
                        switch {
71✔
758
                        case errors.Is(err, ErrGossipSyncerExiting):
×
759
                                return
×
760

761
                        case errors.Is(err, lnpeer.ErrPeerExiting):
×
762
                                return
×
763

764
                        case err != nil:
×
765
                                log.Errorf("Unable to apply gossip filter: %v",
×
766
                                        err)
×
767
                        }
768

UNCOV
769
                case <-g.cg.Done():
×
UNCOV
770
                        return
×
771

772
                case <-ctx.Done():
28✔
773
                        return
28✔
774
                }
775
        }
776
}
777

778
// QueueTimestampRange attempts to queue a timestamp range message for
779
// asynchronous processing. If the queue is full, it returns false to indicate
780
// the message was dropped.
781
func (g *GossipSyncer) QueueTimestampRange(
782
        msg *lnwire.GossipTimestampRange) bool {
2,224✔
783

2,224✔
784
        // If timestamp queries are disabled, don't queue the message.
2,224✔
785
        if g.cfg.noTimestampQueryOption {
2,224✔
786
                return false
×
787
        }
×
788

789
        select {
2,224✔
790
        case g.timestampRangeQueue <- msg:
832✔
791
                return true
832✔
792

793
        // Queue is full, drop the message to prevent blocking.
794
        default:
1,392✔
795
                log.Warnf("Timestamp range queue full for peer %x, "+
1,392✔
796
                        "dropping message", g.cfg.peerPub[:])
1,392✔
797
                return false
1,392✔
798
        }
799
}
800

801
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
802
// syncer and sends it to the remote peer.
803
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
804
        firstTimestamp time.Time, timestampRange uint32) error {
29✔
805

29✔
806
        endTimestamp := firstTimestamp.Add(
29✔
807
                time.Duration(timestampRange) * time.Second,
29✔
808
        )
29✔
809

29✔
810
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
29✔
811
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
29✔
812

29✔
813
        localUpdateHorizon := &lnwire.GossipTimestampRange{
29✔
814
                ChainHash:      g.cfg.chainHash,
29✔
815
                FirstTimestamp: uint32(firstTimestamp.Unix()),
29✔
816
                TimestampRange: timestampRange,
29✔
817
        }
29✔
818

29✔
819
        if err := g.sendToPeer(ctx, localUpdateHorizon); err != nil {
29✔
820
                return err
×
821
        }
×
822

823
        if firstTimestamp.Equal(zeroTimestamp) && timestampRange == 0 {
31✔
824
                g.localUpdateHorizon = nil
2✔
825
        } else {
29✔
826
                g.localUpdateHorizon = localUpdateHorizon
27✔
827
        }
27✔
828

829
        return nil
29✔
830
}
831

832
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
833
// the remote peer for its known set of channel IDs within a particular block
834
// range. This method will be called continually until the entire range has
835
// been queried for with a response received. We'll chunk our requests as
836
// required to ensure they fit into a single message. We may re-renter this
837
// state in the case that chunking is required.
838
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
9✔
839
        // If we're in this state yet there are no more new channels to query
9✔
840
        // for, then we'll transition to our final synced state and return true
9✔
841
        // to signal that we're fully synchronized.
9✔
842
        if len(g.newChansToQuery) == 0 {
13✔
843
                log.Infof("GossipSyncer(%x): no more chans to query",
4✔
844
                        g.cfg.peerPub[:])
4✔
845

4✔
846
                return true
4✔
847
        }
4✔
848

849
        // Otherwise, we'll issue our next chunked query to receive replies
850
        // for.
851
        var queryChunk []lnwire.ShortChannelID
8✔
852

8✔
853
        // If the number of channels to query for is less than the chunk size,
8✔
854
        // then we can issue a single query.
8✔
855
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
13✔
856
                queryChunk = g.newChansToQuery
5✔
857
                g.newChansToQuery = nil
5✔
858

5✔
859
        } else {
8✔
860
                // Otherwise, we'll need to only query for the next chunk.
3✔
861
                // We'll slice into our query chunk, then slide down our main
3✔
862
                // pointer down by the chunk size.
3✔
863
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
3✔
864
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
3✔
865
        }
3✔
866

867
        log.Infof("GossipSyncer(%x): querying for %v new channels",
8✔
868
                g.cfg.peerPub[:], len(queryChunk))
8✔
869

8✔
870
        // Change the state before sending the query msg.
8✔
871
        g.setSyncState(waitingQueryChanReply)
8✔
872

8✔
873
        // With our chunk obtained, we'll send over our next query, then return
8✔
874
        // false indicating that we're net yet fully synced.
8✔
875
        err := g.sendToPeer(ctx, &lnwire.QueryShortChanIDs{
8✔
876
                ChainHash:    g.cfg.chainHash,
8✔
877
                EncodingType: lnwire.EncodingSortedPlain,
8✔
878
                ShortChanIDs: queryChunk,
8✔
879
        })
8✔
880
        if err != nil {
8✔
881
                log.Errorf("Unable to sync chan IDs: %v", err)
×
882
        }
×
883

884
        return false
8✔
885
}
886

887
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
888
// considered legacy. There was a point where lnd used to include the same query
889
// over multiple replies, rather than including the portion of the query the
890
// reply is handling. We'll use this as a way of detecting whether we are
891
// communicating with a legacy node so we can properly sync with them.
892
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
893
        reply *lnwire.ReplyChannelRange) bool {
253✔
894

253✔
895
        return (reply.ChainHash == query.ChainHash &&
253✔
896
                reply.FirstBlockHeight == query.FirstBlockHeight &&
253✔
897
                reply.NumBlocks == query.NumBlocks)
253✔
898
}
253✔
899

900
// processChanRangeReply is called each time the GossipSyncer receives a new
901
// reply to the initial range query to discover new channels that it didn't
902
// previously know of.
903
func (g *GossipSyncer) processChanRangeReply(_ context.Context,
904
        msg *lnwire.ReplyChannelRange) error {
128✔
905

128✔
906
        // isStale returns whether the timestamp is too far into the past.
128✔
907
        isStale := func(timestamp time.Time) bool {
161✔
908
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
33✔
909
        }
33✔
910

911
        // isSkewed returns whether the timestamp is too far into the future.
912
        isSkewed := func(timestamp time.Time) bool {
151✔
913
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
23✔
914
        }
23✔
915

916
        // If we're not communicating with a legacy node, we'll apply some
917
        // further constraints on their reply to ensure it satisfies our query.
918
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
238✔
919
                // The first block should be within our original request.
110✔
920
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
110✔
921
                        return fmt.Errorf("reply includes channels for height "+
×
922
                                "%v prior to query %v", msg.FirstBlockHeight,
×
923
                                g.curQueryRangeMsg.FirstBlockHeight)
×
924
                }
×
925

926
                // The last block should also be. We don't need to check the
927
                // intermediate ones because they should already be in sorted
928
                // order.
929
                replyLastHeight := msg.LastBlockHeight()
110✔
930
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
931
                if replyLastHeight > queryLastHeight {
110✔
932
                        return fmt.Errorf("reply includes channels for height "+
×
933
                                "%v after query %v", replyLastHeight,
×
934
                                queryLastHeight)
×
935
                }
×
936

937
                // If we've previously received a reply for this query, look at
938
                // its last block to ensure the current reply properly follows
939
                // it.
940
                if g.prevReplyChannelRange != nil {
215✔
941
                        prevReply := g.prevReplyChannelRange
105✔
942
                        prevReplyLastHeight := prevReply.LastBlockHeight()
105✔
943

105✔
944
                        // The current reply can either start from the previous
105✔
945
                        // reply's last block, if there are still more channels
105✔
946
                        // for the same block, or the block after.
105✔
947
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
105✔
948
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
105✔
949

×
950
                                return fmt.Errorf("first block of reply %v "+
×
951
                                        "does not continue from last block of "+
×
952
                                        "previous %v", msg.FirstBlockHeight,
×
953
                                        prevReplyLastHeight)
×
954
                        }
×
955
                }
956
        }
957

958
        g.prevReplyChannelRange = msg
128✔
959

128✔
960
        for i, scid := range msg.ShortChanIDs {
258✔
961
                info := graphdb.NewChannelUpdateInfo(
130✔
962
                        scid, time.Time{}, time.Time{},
130✔
963
                )
130✔
964

130✔
965
                if len(msg.Timestamps) != 0 {
145✔
966
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
15✔
967
                        info.Node1UpdateTimestamp = t1
15✔
968

15✔
969
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
15✔
970
                        info.Node2UpdateTimestamp = t2
15✔
971

15✔
972
                        // Sort out all channels with outdated or skewed
15✔
973
                        // timestamps. Both timestamps need to be out of
15✔
974
                        // boundaries for us to skip the channel and not query
15✔
975
                        // it later on.
15✔
976
                        switch {
15✔
977
                        case isStale(info.Node1UpdateTimestamp) &&
978
                                isStale(info.Node2UpdateTimestamp):
2✔
979

2✔
980
                                continue
2✔
981

982
                        case isSkewed(info.Node1UpdateTimestamp) &&
983
                                isSkewed(info.Node2UpdateTimestamp):
2✔
984

2✔
985
                                continue
2✔
986

987
                        case isStale(info.Node1UpdateTimestamp) &&
988
                                isSkewed(info.Node2UpdateTimestamp):
2✔
989

2✔
990
                                continue
2✔
991

992
                        case isStale(info.Node2UpdateTimestamp) &&
993
                                isSkewed(info.Node1UpdateTimestamp):
2✔
994

2✔
995
                                continue
2✔
996
                        }
997
                }
998

999
                g.bufferedChanRangeReplies = append(
122✔
1000
                        g.bufferedChanRangeReplies, info,
122✔
1001
                )
122✔
1002
        }
1003

1004
        switch g.cfg.encodingType {
128✔
1005
        case lnwire.EncodingSortedPlain:
128✔
1006
                g.numChanRangeRepliesRcvd++
128✔
1007
        case lnwire.EncodingSortedZlib:
×
1008
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
1009
        default:
×
1010
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
1011
        }
1012

1013
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
128✔
1014
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
128✔
1015

128✔
1016
        // If this isn't the last response and we can continue to receive more,
128✔
1017
        // then we can exit as we've already buffered the latest portion of the
128✔
1018
        // streaming reply.
128✔
1019
        maxReplies := g.cfg.maxQueryChanRangeReplies
128✔
1020
        switch {
128✔
1021
        // If we're communicating with a legacy node, we'll need to look at the
1022
        // complete field.
1023
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
18✔
1024
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
21✔
1025
                        return nil
3✔
1026
                }
3✔
1027

1028
        // Otherwise, we'll look at the reply's height range.
1029
        default:
110✔
1030
                replyLastHeight := msg.LastBlockHeight()
110✔
1031
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
1032

110✔
1033
                // TODO(wilmer): This might require some padding if the remote
110✔
1034
                // node is not aware of the last height we sent them, i.e., is
110✔
1035
                // behind a few blocks from us.
110✔
1036
                if replyLastHeight < queryLastHeight &&
110✔
1037
                        g.numChanRangeRepliesRcvd < maxReplies {
215✔
1038

105✔
1039
                        return nil
105✔
1040
                }
105✔
1041
        }
1042

1043
        log.Infof("GossipSyncer(%x): filtering through %v chans",
20✔
1044
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
20✔
1045

20✔
1046
        // Otherwise, this is the final response, so we'll now check to see
20✔
1047
        // which channels they know of that we don't.
20✔
1048
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
20✔
1049
                g.cfg.chainHash, g.bufferedChanRangeReplies,
20✔
1050
                g.cfg.isStillZombieChannel,
20✔
1051
        )
20✔
1052
        if err != nil {
20✔
1053
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
1054
        }
×
1055

1056
        // As we've received the entirety of the reply, we no longer need to
1057
        // hold on to the set of buffered replies or the original query that
1058
        // prompted the replies, so we'll let that be garbage collected now.
1059
        g.curQueryRangeMsg = nil
20✔
1060
        g.prevReplyChannelRange = nil
20✔
1061
        g.bufferedChanRangeReplies = nil
20✔
1062
        g.numChanRangeRepliesRcvd = 0
20✔
1063

20✔
1064
        // If there aren't any channels that we don't know of, then we can
20✔
1065
        // switch straight to our terminal state.
20✔
1066
        if len(newChans) == 0 {
37✔
1067
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
17✔
1068
                        g.cfg.peerPub[:])
17✔
1069

17✔
1070
                g.setSyncState(chansSynced)
17✔
1071

17✔
1072
                // Ensure that the sync manager becomes aware that the
17✔
1073
                // historical sync completed so synced_to_graph is updated over
17✔
1074
                // rpc.
17✔
1075
                g.cfg.markGraphSynced()
17✔
1076
                return nil
17✔
1077
        }
17✔
1078

1079
        // Otherwise, we'll set the set of channels that we need to query for
1080
        // the next state, and also transition our state.
1081
        g.newChansToQuery = newChans
6✔
1082
        g.setSyncState(queryNewChannels)
6✔
1083

6✔
1084
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
6✔
1085
                g.cfg.peerPub[:], len(newChans))
6✔
1086

6✔
1087
        return nil
6✔
1088
}
1089

1090
// genChanRangeQuery generates the initial message we'll send to the remote
1091
// party when we're kicking off the channel graph synchronization upon
1092
// connection. The historicalQuery boolean can be used to generate a query from
1093
// the genesis block of the chain.
1094
func (g *GossipSyncer) genChanRangeQuery(ctx context.Context,
1095
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
91✔
1096

91✔
1097
        // First, we'll query our channel graph time series for its highest
91✔
1098
        // known channel ID.
91✔
1099
        newestChan, err := g.cfg.channelSeries.HighestChanID(
91✔
1100
                ctx, g.cfg.chainHash,
91✔
1101
        )
91✔
1102
        if err != nil {
91✔
1103
                return nil, err
×
1104
        }
×
1105

1106
        // Once we have the chan ID of the newest, we'll obtain the block height
1107
        // of the channel, then subtract our default horizon to ensure we don't
1108
        // miss any channels. By default, we go back 1 day from the newest
1109
        // channel, unless we're attempting a historical sync, where we'll
1110
        // actually start from the genesis block instead.
1111
        var startHeight uint32
91✔
1112
        switch {
91✔
1113
        case historicalQuery:
22✔
1114
                fallthrough
22✔
1115
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
23✔
1116
                startHeight = 0
23✔
1117
        default:
68✔
1118
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
68✔
1119
        }
1120

1121
        // Determine the number of blocks to request based on our best height.
1122
        // We'll take into account any potential underflows and explicitly set
1123
        // numBlocks to its minimum value of 1 if so.
1124
        bestHeight := g.cfg.bestHeight()
91✔
1125
        numBlocks := bestHeight - startHeight
91✔
1126
        if int64(numBlocks) < 1 {
91✔
1127
                numBlocks = 1
×
1128
        }
×
1129

1130
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
91✔
1131
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
91✔
1132

91✔
1133
        // Finally, we'll craft the channel range query, using our starting
91✔
1134
        // height, then asking for all known channels to the foreseeable end of
91✔
1135
        // the main chain.
91✔
1136
        query := &lnwire.QueryChannelRange{
91✔
1137
                ChainHash:        g.cfg.chainHash,
91✔
1138
                FirstBlockHeight: startHeight,
91✔
1139
                NumBlocks:        numBlocks,
91✔
1140
        }
91✔
1141

91✔
1142
        if !g.cfg.noTimestampQueryOption {
174✔
1143
                query.QueryOptions = lnwire.NewTimestampQueryOption()
83✔
1144
        }
83✔
1145

1146
        g.curQueryRangeMsg = query
91✔
1147

91✔
1148
        return query, nil
91✔
1149
}
1150

1151
// replyPeerQueries is called in response to any query by the remote peer.
1152
// We'll examine our state and send back our best response.
1153
func (g *GossipSyncer) replyPeerQueries(ctx context.Context,
1154
        msg lnwire.Message) error {
8✔
1155

8✔
1156
        switch msg := msg.(type) {
8✔
1157

1158
        // In this state, we'll also handle any incoming channel range queries
1159
        // from the remote peer as they're trying to sync their state as well.
1160
        case *lnwire.QueryChannelRange:
6✔
1161
                return g.replyChanRangeQuery(ctx, msg)
6✔
1162

1163
        // If the remote peer skips straight to requesting new channels that
1164
        // they don't know of, then we'll ensure that we also handle this case.
1165
        case *lnwire.QueryShortChanIDs:
5✔
1166
                return g.replyShortChanIDs(ctx, msg)
5✔
1167

1168
        default:
×
1169
                return fmt.Errorf("unknown message: %T", msg)
×
1170
        }
1171
}
1172

1173
// replyChanRangeQuery will be dispatched in response to a channel range query
1174
// by the remote node. We'll query the channel time series for channels that
1175
// meet the channel range, then chunk our responses to the remote node. We also
1176
// ensure that our final fragment carries the "complete" bit to indicate the
1177
// end of our streaming response.
1178
func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context,
1179
        query *lnwire.QueryChannelRange) error {
12✔
1180

12✔
1181
        // Before responding, we'll check to ensure that the remote peer is
12✔
1182
        // querying for the same chain that we're on. If not, we'll send back a
12✔
1183
        // response with a complete value of zero to indicate we're on a
12✔
1184
        // different chain.
12✔
1185
        if g.cfg.chainHash != query.ChainHash {
13✔
1186
                log.Warnf("Remote peer requested QueryChannelRange for "+
1✔
1187
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1188
                        g.cfg.chainHash)
1✔
1189

1✔
1190
                return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
1✔
1191
                        ChainHash:        query.ChainHash,
1✔
1192
                        FirstBlockHeight: query.FirstBlockHeight,
1✔
1193
                        NumBlocks:        query.NumBlocks,
1✔
1194
                        Complete:         0,
1✔
1195
                        EncodingType:     g.cfg.encodingType,
1✔
1196
                        ShortChanIDs:     nil,
1✔
1197
                })
1✔
1198
        }
1✔
1199

1200
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
11✔
1201
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
11✔
1202
                query.NumBlocks)
11✔
1203

11✔
1204
        // Check if the query asked for timestamps. We will only serve
11✔
1205
        // timestamps if this has not been disabled with
11✔
1206
        // noTimestampQueryOption.
11✔
1207
        withTimestamps := query.WithTimestamps() &&
11✔
1208
                !g.cfg.noTimestampQueryOption
11✔
1209

11✔
1210
        // Next, we'll consult the time series to obtain the set of known
11✔
1211
        // channel ID's that match their query.
11✔
1212
        startBlock := query.FirstBlockHeight
11✔
1213
        endBlock := query.LastBlockHeight()
11✔
1214
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
11✔
1215
                query.ChainHash, startBlock, endBlock, withTimestamps,
11✔
1216
        )
11✔
1217
        if err != nil {
11✔
1218
                return err
×
1219
        }
×
1220

1221
        // TODO(roasbeef): means can't send max uint above?
1222
        //  * or make internal 64
1223

1224
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1225
        // this as there's a transport message size limit which we'll need to
1226
        // adhere to. We also need to make sure all of our replies cover the
1227
        // expected range of the query.
1228
        sendReplyForChunk := func(channelChunk []graphdb.ChannelUpdateInfo,
11✔
1229
                firstHeight, lastHeight uint32, finalChunk bool) error {
27✔
1230

16✔
1231
                // The number of blocks contained in the current chunk (the
16✔
1232
                // total span) is the difference between the last channel ID and
16✔
1233
                // the first in the range. We add one as even if all channels
16✔
1234
                // returned are in the same block, we need to count that.
16✔
1235
                numBlocks := lastHeight - firstHeight + 1
16✔
1236
                complete := uint8(0)
16✔
1237
                if finalChunk {
27✔
1238
                        complete = 1
11✔
1239
                }
11✔
1240

1241
                var timestamps lnwire.Timestamps
16✔
1242
                if withTimestamps {
19✔
1243
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
3✔
1244
                }
3✔
1245

1246
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
16✔
1247
                for i, info := range channelChunk {
33✔
1248
                        scids[i] = info.ShortChannelID
17✔
1249

17✔
1250
                        if !withTimestamps {
31✔
1251
                                continue
14✔
1252
                        }
1253

1254
                        timestamps[i].Timestamp1 = uint32(
3✔
1255
                                info.Node1UpdateTimestamp.Unix(),
3✔
1256
                        )
3✔
1257

3✔
1258
                        timestamps[i].Timestamp2 = uint32(
3✔
1259
                                info.Node2UpdateTimestamp.Unix(),
3✔
1260
                        )
3✔
1261
                }
1262

1263
                return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
16✔
1264
                        ChainHash:        query.ChainHash,
16✔
1265
                        NumBlocks:        numBlocks,
16✔
1266
                        FirstBlockHeight: firstHeight,
16✔
1267
                        Complete:         complete,
16✔
1268
                        EncodingType:     g.cfg.encodingType,
16✔
1269
                        ShortChanIDs:     scids,
16✔
1270
                        Timestamps:       timestamps,
16✔
1271
                })
16✔
1272
        }
1273

1274
        var (
11✔
1275
                firstHeight  = query.FirstBlockHeight
11✔
1276
                lastHeight   uint32
11✔
1277
                channelChunk []graphdb.ChannelUpdateInfo
11✔
1278
        )
11✔
1279

11✔
1280
        // chunkSize is the maximum number of SCIDs that we can safely put in a
11✔
1281
        // single message. If we also need to include timestamps though, then
11✔
1282
        // this number is halved since encoding two timestamps takes the same
11✔
1283
        // number of bytes as encoding an SCID.
11✔
1284
        chunkSize := g.cfg.chunkSize
11✔
1285
        if withTimestamps {
14✔
1286
                chunkSize /= 2
3✔
1287
        }
3✔
1288

1289
        for _, channelRange := range channelRanges {
28✔
1290
                channels := channelRange.Channels
17✔
1291
                numChannels := int32(len(channels))
17✔
1292
                numLeftToAdd := chunkSize - int32(len(channelChunk))
17✔
1293

17✔
1294
                // Include the current block in the ongoing chunk if it can fit
17✔
1295
                // and move on to the next block.
17✔
1296
                if numChannels <= numLeftToAdd {
29✔
1297
                        channelChunk = append(channelChunk, channels...)
12✔
1298
                        continue
12✔
1299
                }
1300

1301
                // Otherwise, we need to send our existing channel chunk as is
1302
                // as its own reply and start a new one for the current block.
1303
                // We'll mark the end of our current chunk as the height before
1304
                // the current block to ensure the whole query range is replied
1305
                // to.
1306
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
5✔
1307
                        g.cfg.peerPub[:], len(channelChunk))
5✔
1308

5✔
1309
                lastHeight = channelRange.Height - 1
5✔
1310
                err := sendReplyForChunk(
5✔
1311
                        channelChunk, firstHeight, lastHeight, false,
5✔
1312
                )
5✔
1313
                if err != nil {
5✔
1314
                        return err
×
1315
                }
×
1316

1317
                // With the reply constructed, we'll start tallying channels for
1318
                // our next one keeping in mind our chunk size. This may result
1319
                // in channels for this block being left out from the reply, but
1320
                // this isn't an issue since we'll randomly shuffle them and we
1321
                // assume a historical gossip sync is performed at a later time.
1322
                firstHeight = channelRange.Height
5✔
1323
                finalChunkSize := numChannels
5✔
1324
                exceedsChunkSize := numChannels > chunkSize
5✔
1325
                if exceedsChunkSize {
5✔
1326
                        rand.Shuffle(len(channels), func(i, j int) {
×
1327
                                channels[i], channels[j] = channels[j], channels[i]
×
1328
                        })
×
1329
                        finalChunkSize = chunkSize
×
1330
                }
1331
                channelChunk = channels[:finalChunkSize]
5✔
1332

5✔
1333
                // Sort the chunk once again if we had to shuffle it.
5✔
1334
                if exceedsChunkSize {
5✔
1335
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1336
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1337
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1338

×
1339
                                return id1 < id2
×
1340
                        })
×
1341
                }
1342
        }
1343

1344
        // Send the remaining chunk as the final reply.
1345
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
11✔
1346
                g.cfg.peerPub[:], len(channelChunk))
11✔
1347

11✔
1348
        return sendReplyForChunk(
11✔
1349
                channelChunk, firstHeight, query.LastBlockHeight(), true,
11✔
1350
        )
11✔
1351
}
1352

1353
// replyShortChanIDs will be dispatched in response to a query by the remote
1354
// node for information concerning a set of short channel ID's. Our response
1355
// will be sent in a streaming chunked manner to ensure that we remain below
1356
// the current transport level message size.
1357
func (g *GossipSyncer) replyShortChanIDs(ctx context.Context,
1358
        query *lnwire.QueryShortChanIDs) error {
7✔
1359

7✔
1360
        // Before responding, we'll check to ensure that the remote peer is
7✔
1361
        // querying for the same chain that we're on. If not, we'll send back a
7✔
1362
        // response with a complete value of zero to indicate we're on a
7✔
1363
        // different chain.
7✔
1364
        if g.cfg.chainHash != query.ChainHash {
8✔
1365
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
1✔
1366
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1367
                        g.cfg.chainHash)
1✔
1368

1✔
1369
                return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
1✔
1370
                        ChainHash: query.ChainHash,
1✔
1371
                        Complete:  0,
1✔
1372
                })
1✔
1373
        }
1✔
1374

1375
        if len(query.ShortChanIDs) == 0 {
6✔
1376
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1377
                        g.cfg.peerPub[:])
×
1378
                return nil
×
1379
        }
×
1380

1381
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
6✔
1382
                g.cfg.peerPub[:], len(query.ShortChanIDs))
6✔
1383

6✔
1384
        // Now that we know we're on the same chain, we'll query the channel
6✔
1385
        // time series for the set of messages that we know of which satisfies
6✔
1386
        // the requirement of being a chan ann, chan update, or a node ann
6✔
1387
        // related to the set of queried channels.
6✔
1388
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
6✔
1389
                query.ChainHash, query.ShortChanIDs,
6✔
1390
        )
6✔
1391
        if err != nil {
6✔
1392
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1393
                        query.ShortChanIDs[0].ToUint64(), err)
×
1394
        }
×
1395

1396
        // Reply with any messages related to those channel ID's, we'll write
1397
        // each one individually and synchronously to throttle the sends and
1398
        // perform buffering of responses in the syncer as opposed to the peer.
1399
        for _, msg := range replyMsgs {
12✔
1400
                err := g.sendToPeerSync(ctx, msg)
6✔
1401
                if err != nil {
6✔
1402
                        return err
×
1403
                }
×
1404
        }
1405

1406
        // Regardless of whether we had any messages to reply with, send over
1407
        // the sentinel message to signal that the stream has terminated.
1408
        return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
6✔
1409
                ChainHash: query.ChainHash,
6✔
1410
                Complete:  1,
6✔
1411
        })
6✔
1412
}
1413

1414
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1415
// state machine. Once applied, we'll ensure that we don't forward any messages
1416
// to the peer that aren't within the time range of the filter.
1417
func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
1418
        filter *lnwire.GossipTimestampRange) error {
80✔
1419

80✔
1420
        g.Lock()
80✔
1421

80✔
1422
        g.remoteUpdateHorizon = filter
80✔
1423

80✔
1424
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
80✔
1425
        endTime := startTime.Add(
80✔
1426
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
80✔
1427
        )
80✔
1428

80✔
1429
        g.Unlock()
80✔
1430

80✔
1431
        // If requested, don't reply with historical gossip data when the remote
80✔
1432
        // peer sets their gossip timestamp range.
80✔
1433
        if g.cfg.ignoreHistoricalFilters {
81✔
1434
                return nil
1✔
1435
        }
1✔
1436

1437
        // Check if a goroutine is already sending the backlog. If so, return
1438
        // early without attempting to acquire the semaphore.
1439
        if g.isSendingBacklog.Load() {
84✔
1440
                log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+
5✔
1441
                        "backlog send already in progress", g.cfg.peerPub[:])
5✔
1442
                return nil
5✔
1443
        }
5✔
1444

1445
        select {
74✔
1446
        case <-g.syncerSema:
74✔
1447
        case <-g.cg.Done():
×
1448
                return ErrGossipSyncerExiting
×
1449
        case <-ctx.Done():
×
1450
                return ctx.Err()
×
1451
        }
1452

1453
        // We don't put this in a defer because if the goroutine is launched,
1454
        // it needs to be called when the goroutine is stopped.
1455
        returnSema := func() {
85✔
1456
                g.syncerSema <- struct{}{}
11✔
1457
        }
11✔
1458

1459
        // Now that the remote peer has applied their filter, we'll query the
1460
        // database for all the messages that are beyond this filter.
1461
        newUpdatestoSend := g.cfg.channelSeries.UpdatesInHorizon(
74✔
1462
                g.cfg.chainHash, startTime, endTime,
74✔
1463
        )
74✔
1464

74✔
1465
        // Create a pull-based iterator so we can check if there are any
74✔
1466
        // updates before launching the goroutine.
74✔
1467
        next, stop := iter.Pull2(newUpdatestoSend)
74✔
1468

74✔
1469
        // Check if we have any updates to send by attempting to get the first
74✔
1470
        // message.
74✔
1471
        firstMsg, firstErr, ok := next()
74✔
1472
        if firstErr != nil {
74✔
1473
                stop()
×
1474
                returnSema()
×
1475
                return firstErr
×
1476
        }
×
1477

1478
        log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
11✔
1479
                "start=%v, end=%v, has_updates=%v", g.cfg.peerPub[:],
11✔
1480
                startTime, endTime, ok)
11✔
1481

11✔
1482
        // If we don't have any to send, then we can return early.
11✔
1483
        if !ok {
20✔
1484
                stop()
9✔
1485
                returnSema()
9✔
1486
                return nil
9✔
1487
        }
9✔
1488

1489
        // Set the atomic flag to indicate we're starting to send the backlog.
1490
        // If the swap fails, it means another goroutine is already active, so
1491
        // we return early.
1492
        if !g.isSendingBacklog.CompareAndSwap(false, true) {
5✔
1493
                returnSema()
×
1494
                log.Debugf("GossipSyncer(%x): another goroutine already "+
×
1495
                        "sending backlog, skipping", g.cfg.peerPub[:])
×
1496

×
1497
                return nil
×
1498
        }
×
1499

1500
        // We'll conclude by launching a goroutine to send out any updates.
1501
        // The goroutine takes ownership of the iterator.
1502
        g.cg.WgAdd(1)
5✔
1503
        go func() {
10✔
1504
                defer g.cg.WgDone()
5✔
1505
                defer returnSema()
5✔
1506
                defer g.isSendingBacklog.Store(false)
5✔
1507
                defer stop()
5✔
1508

5✔
1509
                // Send the first message we already pulled.
5✔
1510
                err := g.sendToPeerSync(ctx, firstMsg)
5✔
1511
                switch {
5✔
1512
                case errors.Is(err, ErrGossipSyncerExiting):
×
1513
                        return
×
1514

1515
                case errors.Is(err, lnpeer.ErrPeerExiting):
×
1516
                        return
×
1517

1518
                case err != nil:
×
1519
                        log.Errorf("Unable to send message for "+
×
1520
                                "peer catch up: %v", err)
×
1521
                }
1522

1523
                // Continue with the rest of the messages using the same pull
1524
                // iterator.
1525
                for {
10✔
1526
                        select {
5✔
NEW
1527
                        case <-ctx.Done():
×
NEW
1528
                                log.Debugf("GossipSyncer(%x): context "+
×
NEW
1529
                                        "canceled, exiting", g.cfg.peerPub[:])
×
NEW
1530

×
NEW
1531
                                return
×
NEW
1532
                        case <-g.cg.Done():
×
NEW
1533
                                log.Debugf("GossipSyncer(%x): shutting down, "+
×
NEW
1534
                                        "exiting", g.cfg.peerPub[:])
×
NEW
1535

×
NEW
1536
                                return
×
1537
                        default:
5✔
1538
                        }
1539

1540
                        msg, err, ok := next()
5✔
1541
                        if !ok {
10✔
1542
                                return
5✔
1543
                        }
5✔
1544

1545
                        // If the iterator yielded an error, log it and
1546
                        // continue.
1547
                        if err != nil {
3✔
1548
                                log.Errorf("Error fetching update for peer "+
×
1549
                                        "catch up: %v", err)
×
1550
                                continue
×
1551
                        }
1552

1553
                        err = g.sendToPeerSync(ctx, msg)
3✔
1554
                        switch {
3✔
1555
                        case err == ErrGossipSyncerExiting:
×
1556
                                return
×
1557

1558
                        case err == lnpeer.ErrPeerExiting:
×
1559
                                return
×
1560

1561
                        case err != nil:
×
1562
                                log.Errorf("Unable to send message for "+
×
1563
                                        "peer catch up: %v", err)
×
1564
                        }
1565
                }
1566
        }()
1567

1568
        return nil
5✔
1569
}
1570

1571
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1572
// iff the message is within the bounds of their set gossip filter. If the peer
1573
// doesn't have a gossip filter set, then no messages will be forwarded.
1574
func (g *GossipSyncer) FilterGossipMsgs(ctx context.Context,
1575
        msgs ...msgWithSenders) {
5✔
1576

5✔
1577
        // If the peer doesn't have an update horizon set, then we won't send
5✔
1578
        // it any new update messages.
5✔
1579
        if g.remoteUpdateHorizon == nil {
9✔
1580
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
4✔
1581
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
4✔
1582
                return
4✔
1583
        }
4✔
1584

1585
        // If we've been signaled to exit, or are exiting, then we'll stop
1586
        // short.
1587
        select {
4✔
1588
        case <-g.cg.Done():
×
1589
                return
×
1590
        case <-ctx.Done():
×
1591
                return
×
1592
        default:
4✔
1593
        }
1594

1595
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1596
        // gossiper on peer termination to signal peer disconnect?
1597

1598
        var err error
4✔
1599

4✔
1600
        // Before we filter out the messages, we'll construct an index over the
4✔
1601
        // set of channel announcements and channel updates. This will allow us
4✔
1602
        // to quickly check if we should forward a chan ann, based on the known
4✔
1603
        // channel updates for a channel.
4✔
1604
        chanUpdateIndex := make(
4✔
1605
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
4✔
1606
        )
4✔
1607
        for _, msg := range msgs {
17✔
1608
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
13✔
1609
                if !ok {
23✔
1610
                        continue
10✔
1611
                }
1612

1613
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
6✔
1614
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
6✔
1615
                )
6✔
1616
        }
1617

1618
        // We'll construct a helper function that we'll us below to determine
1619
        // if a given messages passes the gossip msg filter.
1620
        g.Lock()
4✔
1621
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
4✔
1622
        endTime := startTime.Add(
4✔
1623
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
4✔
1624
        )
4✔
1625
        g.Unlock()
4✔
1626

4✔
1627
        passesFilter := func(timeStamp uint32) bool {
17✔
1628
                t := time.Unix(int64(timeStamp), 0)
13✔
1629
                return t.Equal(startTime) ||
13✔
1630
                        (t.After(startTime) && t.Before(endTime))
13✔
1631
        }
13✔
1632

1633
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
4✔
1634
        for _, msg := range msgs {
17✔
1635
                // If the target peer is the peer that sent us this message,
13✔
1636
                // then we'll exit early as we don't need to filter this
13✔
1637
                // message.
13✔
1638
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
16✔
1639
                        continue
3✔
1640
                }
1641

1642
                switch msg := msg.msg.(type) {
13✔
1643

1644
                // For each channel announcement message, we'll only send this
1645
                // message if the channel updates for the channel are between
1646
                // our time range.
1647
                case *lnwire.ChannelAnnouncement1:
7✔
1648
                        // First, we'll check if the channel updates are in
7✔
1649
                        // this message batch.
7✔
1650
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
7✔
1651
                        if !ok {
11✔
1652
                                // If not, we'll attempt to query the database
4✔
1653
                                // to see if we know of the updates.
4✔
1654
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
4✔
1655
                                        g.cfg.chainHash, msg.ShortChannelID,
4✔
1656
                                )
4✔
1657
                                if err != nil {
4✔
1658
                                        log.Warnf("no channel updates found for "+
×
1659
                                                "short_chan_id=%v",
×
1660
                                                msg.ShortChannelID)
×
1661
                                        continue
×
1662
                                }
1663
                        }
1664

1665
                        for _, chanUpdate := range chanUpdates {
14✔
1666
                                if passesFilter(chanUpdate.Timestamp) {
11✔
1667
                                        msgsToSend = append(msgsToSend, msg)
4✔
1668
                                        break
4✔
1669
                                }
1670
                        }
1671

1672
                        if len(chanUpdates) == 0 {
10✔
1673
                                msgsToSend = append(msgsToSend, msg)
3✔
1674
                        }
3✔
1675

1676
                // For each channel update, we'll only send if it the timestamp
1677
                // is between our time range.
1678
                case *lnwire.ChannelUpdate1:
6✔
1679
                        if passesFilter(msg.Timestamp) {
10✔
1680
                                msgsToSend = append(msgsToSend, msg)
4✔
1681
                        }
4✔
1682

1683
                // Similarly, we only send node announcements if the update
1684
                // timestamp ifs between our set gossip filter time range.
1685
                case *lnwire.NodeAnnouncement1:
6✔
1686
                        if passesFilter(msg.Timestamp) {
10✔
1687
                                msgsToSend = append(msgsToSend, msg)
4✔
1688
                        }
4✔
1689
                }
1690
        }
1691

1692
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
4✔
1693
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
4✔
1694

4✔
1695
        if len(msgsToSend) == 0 {
7✔
1696
                return
3✔
1697
        }
3✔
1698

1699
        if err = g.sendToPeer(ctx, msgsToSend...); err != nil {
4✔
1700
                log.Errorf("unable to send gossip msgs: %v", err)
×
1701
        }
×
1702

1703
}
1704

1705
// ProcessQueryMsg is used by outside callers to pass new channel time series
1706
// queries to the internal processing goroutine.
1707
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
115✔
1708
        var msgChan chan lnwire.Message
115✔
1709
        switch msg.(type) {
115✔
1710
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
3✔
1711
                msgChan = g.queryMsgs
3✔
1712

1713
        // Reply messages should only be expected in states where we're waiting
1714
        // for a reply.
1715
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
115✔
1716
                g.Lock()
115✔
1717
                syncState := g.syncState()
115✔
1718
                g.Unlock()
115✔
1719

115✔
1720
                if syncState != waitingQueryRangeReply &&
115✔
1721
                        syncState != waitingQueryChanReply {
116✔
1722

1✔
1723
                        return fmt.Errorf("unexpected msg %T received in "+
1✔
1724
                                "state %v", msg, syncState)
1✔
1725
                }
1✔
1726
                msgChan = g.gossipMsgs
114✔
1727

1728
        default:
×
1729
                msgChan = g.gossipMsgs
×
1730
        }
1731

1732
        select {
114✔
1733
        case msgChan <- msg:
114✔
1734
        case <-peerQuit:
×
1735
        case <-g.cg.Done():
×
1736
        }
1737

1738
        return nil
114✔
1739
}
1740

1741
// setSyncState sets the gossip syncer's state to the given state.
1742
func (g *GossipSyncer) setSyncState(state syncerState) {
161✔
1743
        atomic.StoreUint32(&g.state, uint32(state))
161✔
1744
}
161✔
1745

1746
// syncState returns the current syncerState of the target GossipSyncer.
1747
func (g *GossipSyncer) syncState() syncerState {
553✔
1748
        return syncerState(atomic.LoadUint32(&g.state))
553✔
1749
}
553✔
1750

1751
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1752
// a signal for when the GossipSyncer has reached its chansSynced state.
1753
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
17✔
1754
        g.Lock()
17✔
1755
        defer g.Unlock()
17✔
1756

17✔
1757
        syncedSignal := make(chan struct{})
17✔
1758

17✔
1759
        syncState := syncerState(atomic.LoadUint32(&g.state))
17✔
1760
        if syncState == chansSynced {
21✔
1761
                close(syncedSignal)
4✔
1762
                return syncedSignal
4✔
1763
        }
4✔
1764

1765
        g.syncedSignal = syncedSignal
15✔
1766
        return g.syncedSignal
15✔
1767
}
1768

1769
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1770
// sync type to a new one.
1771
//
1772
// NOTE: This can only be done once the gossip syncer has reached its final
1773
// chansSynced state.
1774
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
17✔
1775
        errChan := make(chan error, 1)
17✔
1776
        select {
17✔
1777
        case g.syncTransitionReqs <- &syncTransitionReq{
1778
                newSyncType: newSyncType,
1779
                errChan:     errChan,
1780
        }:
17✔
1781
        case <-time.After(syncTransitionTimeout):
×
1782
                return ErrSyncTransitionTimeout
×
1783
        case <-g.cg.Done():
×
1784
                return ErrGossipSyncerExiting
×
1785
        }
1786

1787
        select {
17✔
1788
        case err := <-errChan:
17✔
1789
                return err
17✔
1790
        case <-g.cg.Done():
×
1791
                return ErrGossipSyncerExiting
×
1792
        }
1793
}
1794

1795
// handleSyncTransition handles a new sync type transition request.
1796
//
1797
// NOTE: The gossip syncer might have another sync state as a result of this
1798
// transition.
1799
func (g *GossipSyncer) handleSyncTransition(ctx context.Context,
1800
        req *syncTransitionReq) error {
17✔
1801

17✔
1802
        // Return early from any NOP sync transitions.
17✔
1803
        syncType := g.SyncType()
17✔
1804
        if syncType == req.newSyncType {
17✔
1805
                return nil
×
1806
        }
×
1807

1808
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
17✔
1809
                g.cfg.peerPub, syncType, req.newSyncType)
17✔
1810

17✔
1811
        var (
17✔
1812
                firstTimestamp time.Time
17✔
1813
                timestampRange uint32
17✔
1814
        )
17✔
1815

17✔
1816
        switch req.newSyncType {
17✔
1817
        // If an active sync has been requested, then we should resume receiving
1818
        // new graph updates from the remote peer.
1819
        case ActiveSync, PinnedSync:
15✔
1820
                firstTimestamp = time.Now()
15✔
1821
                timestampRange = math.MaxUint32
15✔
1822

1823
        // If a PassiveSync transition has been requested, then we should no
1824
        // longer receive any new updates from the remote peer. We can do this
1825
        // by setting our update horizon to a range in the past ensuring no
1826
        // graph updates match the timestamp range.
1827
        case PassiveSync:
2✔
1828
                firstTimestamp = zeroTimestamp
2✔
1829
                timestampRange = 0
2✔
1830

1831
        default:
×
1832
                return fmt.Errorf("unhandled sync transition %v",
×
1833
                        req.newSyncType)
×
1834
        }
1835

1836
        err := g.sendGossipTimestampRange(ctx, firstTimestamp, timestampRange)
17✔
1837
        if err != nil {
17✔
1838
                return fmt.Errorf("unable to send local update horizon: %w",
×
1839
                        err)
×
1840
        }
×
1841

1842
        g.setSyncType(req.newSyncType)
17✔
1843

17✔
1844
        return nil
17✔
1845
}
1846

1847
// setSyncType sets the gossip syncer's sync type to the given type.
1848
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
70✔
1849
        atomic.StoreUint32(&g.syncType, uint32(syncType))
70✔
1850
}
70✔
1851

1852
// SyncType returns the current SyncerType of the target GossipSyncer.
1853
func (g *GossipSyncer) SyncType() SyncerType {
424✔
1854
        return SyncerType(atomic.LoadUint32(&g.syncType))
424✔
1855
}
424✔
1856

1857
// historicalSync sends a request to the gossip syncer to perofmr a historical
1858
// sync.
1859
//
1860
// NOTE: This can only be done once the gossip syncer has reached its final
1861
// chansSynced state.
1862
func (g *GossipSyncer) historicalSync() error {
19✔
1863
        done := make(chan struct{})
19✔
1864

19✔
1865
        select {
19✔
1866
        case g.historicalSyncReqs <- &historicalSyncReq{
1867
                doneChan: done,
1868
        }:
19✔
1869
        case <-time.After(syncTransitionTimeout):
×
1870
                return ErrSyncTransitionTimeout
×
1871
        case <-g.cg.Done():
×
1872
                return ErrGossiperShuttingDown
×
1873
        }
1874

1875
        select {
19✔
1876
        case <-done:
19✔
1877
                return nil
19✔
1878
        case <-g.cg.Done():
×
1879
                return ErrGossiperShuttingDown
×
1880
        }
1881
}
1882

1883
// handleHistoricalSync handles a request to the gossip syncer to perform a
1884
// historical sync.
1885
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
19✔
1886
        // We'll go back to our initial syncingChans state in order to request
19✔
1887
        // the remote peer to give us all of the channel IDs they know of
19✔
1888
        // starting from the genesis block.
19✔
1889
        g.genHistoricalChanRangeQuery = true
19✔
1890
        g.setSyncState(syncingChans)
19✔
1891
        close(req.doneChan)
19✔
1892
}
19✔
1893

1894
// sendToPeer sends a variadic number of messages to the remote peer. This
1895
// method should not block while waiting for sends to be written to the wire.
1896
func (g *GossipSyncer) sendToPeer(ctx context.Context,
1897
        msgs ...lnwire.Message) error {
119✔
1898

119✔
1899
        return g.sendMsgRateLimited(ctx, false, msgs...)
119✔
1900
}
119✔
1901

1902
// sendToPeerSync sends a variadic number of messages to the remote peer,
1903
// blocking until all messages have been sent successfully or a write error is
1904
// encountered.
1905
func (g *GossipSyncer) sendToPeerSync(ctx context.Context,
1906
        msgs ...lnwire.Message) error {
26✔
1907

26✔
1908
        return g.sendMsgRateLimited(ctx, true, msgs...)
26✔
1909
}
26✔
1910

1911
// sendMsgRateLimited sends a variadic number of messages to the remote peer,
1912
// applying our per-peer rate limit before each send. The sync boolean
1913
// determines if the send is blocking or not.
1914
func (g *GossipSyncer) sendMsgRateLimited(ctx context.Context, sync bool,
1915
        msgs ...lnwire.Message) error {
142✔
1916

142✔
1917
        for _, msg := range msgs {
286✔
1918
                err := maybeRateLimitMsg(
144✔
1919
                        ctx, g.rateLimiter, g.cfg.peerPub, msg, g.cg.Done(),
144✔
1920
                )
144✔
1921
                if err != nil {
144✔
1922
                        return err
×
1923
                }
×
1924

1925
                err = g.cfg.sendMsg(ctx, sync, msg)
144✔
1926
                if err != nil {
144✔
1927
                        return err
×
1928
                }
×
1929
        }
1930

1931
        return nil
142✔
1932
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc