• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18352382351

08 Oct 2025 05:06PM UTC coverage: 66.659% (+0.02%) from 66.641%
18352382351

Pull #10277

github

web-flow
Merge 380f79f9e into 87aed739e
Pull Request #10277: fix channel disable/enable race condition

10 of 10 new or added lines in 1 file covered. (100.0%)

61 existing lines in 12 files now uncovered.

137262 of 205916 relevant lines covered (66.66%)

21246.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.05
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "iter"
8
        "math"
9
        "math/rand"
10
        "sort"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/chaincfg/chainhash"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/graph"
18
        graphdb "github.com/lightningnetwork/lnd/graph/db"
19
        "github.com/lightningnetwork/lnd/lnpeer"
20
        "github.com/lightningnetwork/lnd/lnwire"
21
        "golang.org/x/time/rate"
22
)
23

24
// SyncerType encapsulates the different types of syncing mechanisms for a
25
// gossip syncer.
26
type SyncerType uint8
27

28
const (
29
        // ActiveSync denotes that a gossip syncer:
30
        //
31
        // 1. Should not attempt to synchronize with the remote peer for
32
        //    missing channels.
33
        // 2. Should respond to queries from the remote peer.
34
        // 3. Should receive new updates from the remote peer.
35
        //
36
        // They are started in a chansSynced state in order to accomplish their
37
        // responsibilities above.
38
        ActiveSync SyncerType = iota
39

40
        // PassiveSync denotes that a gossip syncer:
41
        //
42
        // 1. Should not attempt to synchronize with the remote peer for
43
        //    missing channels.
44
        // 2. Should respond to queries from the remote peer.
45
        // 3. Should not receive new updates from the remote peer.
46
        //
47
        // They are started in a chansSynced state in order to accomplish their
48
        // responsibilities above.
49
        PassiveSync
50

51
        // PinnedSync denotes an ActiveSync that doesn't count towards the
52
        // default active syncer limits and is always active throughout the
53
        // duration of the peer's connection. Each pinned syncer will begin by
54
        // performing a historical sync to ensure we are well synchronized with
55
        // their routing table.
56
        PinnedSync
57
)
58

59
const (
60
        // defaultTimestampQueueSize is the size of the timestamp range queue
61
        // used.
62
        defaultTimestampQueueSize = 1
63
)
64

65
// String returns a human readable string describing the target SyncerType.
66
func (t SyncerType) String() string {
3✔
67
        switch t {
3✔
68
        case ActiveSync:
3✔
69
                return "ActiveSync"
3✔
70
        case PassiveSync:
3✔
71
                return "PassiveSync"
3✔
72
        case PinnedSync:
3✔
73
                return "PinnedSync"
3✔
74
        default:
×
75
                return fmt.Sprintf("unknown sync type %d", t)
×
76
        }
77
}
78

79
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
80
// allowing new gossip messages to be received from the peer.
81
func (t SyncerType) IsActiveSync() bool {
47✔
82
        switch t {
47✔
83
        case ActiveSync, PinnedSync:
17✔
84
                return true
17✔
85
        default:
33✔
86
                return false
33✔
87
        }
88
}
89

90
// syncerState is an enum that represents the current state of the GossipSyncer.
91
// As the syncer is a state machine, we'll gate our actions based off of the
92
// current state and the next incoming message.
93
type syncerState uint32
94

95
const (
96
        // syncingChans is the default state of the GossipSyncer. We start in
97
        // this state when a new peer first connects and we don't yet know if
98
        // we're fully synchronized.
99
        syncingChans syncerState = iota
100

101
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
102
        // We enter this state after we send out our first QueryChannelRange
103
        // reply. We'll stay in this state until the remote party sends us a
104
        // ReplyShortChanIDsEnd message that indicates they've responded to our
105
        // query entirely. After this state, we'll transition to
106
        // waitingQueryChanReply after we send out requests for all the new
107
        // chan ID's to us.
108
        waitingQueryRangeReply
109

110
        // queryNewChannels is the third main phase of the GossipSyncer.  In
111
        // this phase we'll send out all of our QueryShortChanIDs messages in
112
        // response to the new channels that we don't yet know about.
113
        queryNewChannels
114

115
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
116
        // We enter this phase once we've sent off a query chink to the remote
117
        // peer.  We'll stay in this phase until we receive a
118
        // ReplyShortChanIDsEnd message which indicates that the remote party
119
        // has responded to all of our requests.
120
        waitingQueryChanReply
121

122
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
123
        // this phase, we'll send out our update horizon, which filters out the
124
        // set of channel updates that we're interested in. In this state,
125
        // we'll be able to accept any outgoing messages from the
126
        // AuthenticatedGossiper, and decide if we should forward them to our
127
        // target peer based on its update horizon.
128
        chansSynced
129

130
        // syncerIdle is a state in which the gossip syncer can handle external
131
        // requests to transition or perform historical syncs. It is used as the
132
        // initial state for pinned syncers, as well as a fallthrough case for
133
        // chansSynced allowing fully synced peers to facilitate requests.
134
        syncerIdle
135
)
136

137
// String returns a human readable string describing the target syncerState.
138
func (s syncerState) String() string {
4✔
139
        switch s {
4✔
140
        case syncingChans:
3✔
141
                return "syncingChans"
3✔
142

143
        case waitingQueryRangeReply:
3✔
144
                return "waitingQueryRangeReply"
3✔
145

146
        case queryNewChannels:
3✔
147
                return "queryNewChannels"
3✔
148

149
        case waitingQueryChanReply:
3✔
150
                return "waitingQueryChanReply"
3✔
151

152
        case chansSynced:
4✔
153
                return "chansSynced"
4✔
154

155
        case syncerIdle:
3✔
156
                return "syncerIdle"
3✔
157

158
        default:
×
159
                return "UNKNOWN STATE"
×
160
        }
161
}
162

163
const (
164
        // maxQueryChanRangeReplies specifies the default limit of replies to
165
        // process for a single QueryChannelRange request.
166
        maxQueryChanRangeReplies = 500
167

168
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
169
        // the maximum number of replies allowed for zlib encoded replies.
170
        maxQueryChanRangeRepliesZlibFactor = 4
171

172
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
173
        // asking the remote peer for their any channels they know of beyond
174
        // our highest known channel ID.
175
        chanRangeQueryBuffer = 144
176

177
        // syncTransitionTimeout is the default timeout in which we'll wait up
178
        // to when attempting to perform a sync transition.
179
        syncTransitionTimeout = 5 * time.Second
180

181
        // requestBatchSize is the maximum number of channels we will query the
182
        // remote peer for in a QueryShortChanIDs message.
183
        requestBatchSize = 500
184

185
        // syncerBufferSize is the size of the syncer's buffers.
186
        syncerBufferSize = 50
187
)
188

189
var (
190
        // encodingTypeToChunkSize maps an encoding type, to the max number of
191
        // short chan ID's using the encoding type that we can fit into a
192
        // single message safely.
193
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
194
                lnwire.EncodingSortedPlain: 8000,
195
        }
196

197
        // ErrGossipSyncerExiting signals that the syncer has been killed.
198
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
199

200
        // ErrSyncTransitionTimeout is an error returned when we've timed out
201
        // attempting to perform a sync transition.
202
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
203
                "transition sync type")
204

205
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
206
        // peers that we do not want to receive any new graph updates.
207
        zeroTimestamp time.Time
208
)
209

210
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
211
type syncTransitionReq struct {
212
        newSyncType SyncerType
213
        errChan     chan error
214
}
215

216
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
217
// historical sync.
218
type historicalSyncReq struct {
219
        // doneChan is a channel that serves as a signal and is closed to ensure
220
        // the historical sync is attempted by the time we return to the caller.
221
        doneChan chan struct{}
222
}
223

224
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
225
// needs to carry out its duties.
226
type gossipSyncerCfg struct {
227
        // chainHash is the chain that this syncer is responsible for.
228
        chainHash chainhash.Hash
229

230
        // peerPub is the public key of the peer we're syncing with, serialized
231
        // in compressed format.
232
        peerPub [33]byte
233

234
        // channelSeries is the primary interface that we'll use to generate
235
        // our queries and respond to the queries of the remote peer.
236
        channelSeries ChannelGraphTimeSeries
237

238
        // encodingType is the current encoding type we're aware of. Requests
239
        // with different encoding types will be rejected.
240
        encodingType lnwire.QueryEncoding
241

242
        // chunkSize is the max number of short chan IDs using the syncer's
243
        // encoding type that we can fit into a single message safely.
244
        chunkSize int32
245

246
        // batchSize is the max number of channels the syncer will query from
247
        // the remote node in a single QueryShortChanIDs request.
248
        batchSize int32
249

250
        // sendMsg sends a variadic number of messages to the remote peer.
251
        // The boolean indicates whether this method should be blocked or not
252
        // while waiting for sends to be written to the wire.
253
        sendMsg func(context.Context, bool, ...lnwire.Message) error
254

255
        // noSyncChannels will prevent the GossipSyncer from spawning a
256
        // channelGraphSyncer, meaning we will not try to reconcile unknown
257
        // channels with the remote peer.
258
        noSyncChannels bool
259

260
        // noReplyQueries will prevent the GossipSyncer from spawning a
261
        // replyHandler, meaning we will not reply to queries from our remote
262
        // peer.
263
        noReplyQueries bool
264

265
        // noTimestampQueryOption will prevent the GossipSyncer from querying
266
        // timestamps of announcement messages from the peer, and it will
267
        // prevent it from responding to timestamp queries.
268
        noTimestampQueryOption bool
269

270
        // ignoreHistoricalFilters will prevent syncers from replying with
271
        // historical data when the remote peer sets a gossip_timestamp_range.
272
        // This prevents ranges with old start times from causing us to dump the
273
        // graph on connect.
274
        ignoreHistoricalFilters bool
275

276
        // bestHeight returns the latest height known of the chain.
277
        bestHeight func() uint32
278

279
        // markGraphSynced updates the SyncManager's perception of whether we
280
        // have completed at least one historical sync.
281
        markGraphSynced func()
282

283
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
284
        // for a single QueryChannelRange request.
285
        maxQueryChanRangeReplies uint32
286

287
        // isStillZombieChannel takes the timestamps of the latest channel
288
        // updates for a channel and returns true if the channel should be
289
        // considered a zombie based on these timestamps.
290
        isStillZombieChannel func(time.Time, time.Time) bool
291

292
        // timestampQueueSize is the size of the timestamp range queue. If not
293
        // set, defaults to the global timestampQueueSize constant.
294
        timestampQueueSize int
295

296
        // msgBytesPerSecond is the allotted bandwidth rate, expressed in
297
        // bytes/second that this gossip syncer can consume. Once we exceed this
298
        // rate, message sending will block until we're below the rate.
299
        msgBytesPerSecond uint64
300
}
301

302
// GossipSyncer is a struct that handles synchronizing the channel graph state
303
// with a remote peer. The GossipSyncer implements a state machine that will
304
// progressively ensure we're synchronized with the channel state of the remote
305
// node. Once both nodes have been synchronized, we'll use an update filter to
306
// filter out which messages should be sent to a remote peer based on their
307
// update horizon. If the update horizon isn't specified, then we won't send
308
// them any channel updates at all.
309
type GossipSyncer struct {
310
        started sync.Once
311
        stopped sync.Once
312

313
        // state is the current state of the GossipSyncer.
314
        //
315
        // NOTE: This variable MUST be used atomically.
316
        state uint32
317

318
        // syncType denotes the SyncerType the gossip syncer is currently
319
        // exercising.
320
        //
321
        // NOTE: This variable MUST be used atomically.
322
        syncType uint32
323

324
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
325
        // use this to properly filter out any messages.
326
        remoteUpdateHorizon *lnwire.GossipTimestampRange
327

328
        // localUpdateHorizon is our local update horizon, we'll use this to
329
        // determine if we've already sent out our update.
330
        localUpdateHorizon *lnwire.GossipTimestampRange
331

332
        // syncTransitions is a channel through which new sync type transition
333
        // requests will be sent through. These requests should only be handled
334
        // when the gossip syncer is in a chansSynced state to ensure its state
335
        // machine behaves as expected.
336
        syncTransitionReqs chan *syncTransitionReq
337

338
        // historicalSyncReqs is a channel that serves as a signal for the
339
        // gossip syncer to perform a historical sync. These can only be done
340
        // once the gossip syncer is in a chansSynced state to ensure its state
341
        // machine behaves as expected.
342
        historicalSyncReqs chan *historicalSyncReq
343

344
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
345
        // that it should request the remote peer for all of its known channel
346
        // IDs starting from the genesis block of the chain. This can only
347
        // happen if the gossip syncer receives a request to attempt a
348
        // historical sync. It can be unset if the syncer ever transitions from
349
        // PassiveSync to ActiveSync.
350
        genHistoricalChanRangeQuery bool
351

352
        // gossipMsgs is a channel that all responses to our queries from the
353
        // target peer will be sent over, these will be read by the
354
        // channelGraphSyncer.
355
        gossipMsgs chan lnwire.Message
356

357
        // queryMsgs is a channel that all queries from the remote peer will be
358
        // received over, these will be read by the replyHandler.
359
        queryMsgs chan lnwire.Message
360

361
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
362
        // we've sent to a peer to ensure we've consumed all expected replies.
363
        // This field is primarily used within the waitingQueryChanReply state.
364
        curQueryRangeMsg *lnwire.QueryChannelRange
365

366
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
367
        // message we've received from a peer to ensure they've fully replied to
368
        // our query by ensuring they covered our requested block range. This
369
        // field is primarily used within the waitingQueryChanReply state.
370
        prevReplyChannelRange *lnwire.ReplyChannelRange
371

372
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
373
        // buffer all the chunked response to our query.
374
        bufferedChanRangeReplies []graphdb.ChannelUpdateInfo
375

376
        // numChanRangeRepliesRcvd is used to track the number of replies
377
        // received as part of a QueryChannelRange. This field is primarily used
378
        // within the waitingQueryChanReply state.
379
        numChanRangeRepliesRcvd uint32
380

381
        // newChansToQuery is used to pass the set of channels we should query
382
        // for from the waitingQueryChanReply state to the queryNewChannels
383
        // state.
384
        newChansToQuery []lnwire.ShortChannelID
385

386
        cfg gossipSyncerCfg
387

388
        // syncedSignal is a channel that, if set, will be closed when the
389
        // GossipSyncer reaches its terminal chansSynced state.
390
        syncedSignal chan struct{}
391

392
        // syncerSema is used to more finely control the syncer's ability to
393
        // respond to gossip timestamp range messages.
394
        syncerSema chan struct{}
395

396
        // timestampRangeQueue is a buffered channel for queuing timestamp range
397
        // messages that need to be processed asynchronously. This prevents the
398
        // gossiper from blocking when ApplyGossipFilter is called.
399
        timestampRangeQueue chan *lnwire.GossipTimestampRange
400

401
        // isSendingBacklog is an atomic flag that indicates whether a goroutine
402
        // is currently sending the backlog of messages. This ensures only one
403
        // goroutine is active at a time.
404
        isSendingBacklog atomic.Bool
405

406
        sync.Mutex
407

408
        // cg is a helper that encapsulates a wait group and quit channel and
409
        // allows contexts that either block or cancel on those depending on
410
        // the use case.
411
        cg *fn.ContextGuard
412

413
        // rateLimiter dictates the frequency with which we will reply to gossip
414
        // queries to this peer.
415
        rateLimiter *rate.Limiter
416
}
417

418
// newGossipSyncer returns a new instance of the GossipSyncer populated using
419
// the passed config.
420
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
158✔
421
        // Use the configured queue size if set, otherwise use the default.
158✔
422
        queueSize := cfg.timestampQueueSize
158✔
423
        if queueSize == 0 {
186✔
424
                queueSize = defaultTimestampQueueSize
28✔
425
        }
28✔
426

427
        bytesPerSecond := cfg.msgBytesPerSecond
158✔
428
        if bytesPerSecond == 0 {
313✔
429
                bytesPerSecond = DefaultPeerMsgBytesPerSecond
155✔
430
        }
155✔
431
        bytesBurst := 2 * bytesPerSecond
158✔
432

158✔
433
        // We'll use this rate limiter to limit this single peer.
158✔
434
        rateLimiter := rate.NewLimiter(
158✔
435
                rate.Limit(bytesPerSecond), int(bytesBurst),
158✔
436
        )
158✔
437

158✔
438
        return &GossipSyncer{
158✔
439
                cfg:                cfg,
158✔
440
                syncTransitionReqs: make(chan *syncTransitionReq),
158✔
441
                historicalSyncReqs: make(chan *historicalSyncReq),
158✔
442
                gossipMsgs:         make(chan lnwire.Message, syncerBufferSize),
158✔
443
                queryMsgs:          make(chan lnwire.Message, syncerBufferSize),
158✔
444
                timestampRangeQueue: make(
158✔
445
                        chan *lnwire.GossipTimestampRange, queueSize,
158✔
446
                ),
158✔
447
                syncerSema:  sema,
158✔
448
                cg:          fn.NewContextGuard(),
158✔
449
                rateLimiter: rateLimiter,
158✔
450
        }
158✔
451
}
452

453
// Start starts the GossipSyncer and any goroutines that it needs to carry out
454
// its duties.
455
func (g *GossipSyncer) Start() {
102✔
456
        g.started.Do(func() {
204✔
457
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
102✔
458

102✔
459
                ctx, _ := g.cg.Create(context.Background())
102✔
460

102✔
461
                // TODO(conner): only spawn channelGraphSyncer if remote
102✔
462
                // supports gossip queries, and only spawn replyHandler if we
102✔
463
                // advertise support
102✔
464
                if !g.cfg.noSyncChannels {
203✔
465
                        g.cg.WgAdd(1)
101✔
466
                        go g.channelGraphSyncer(ctx)
101✔
467
                }
101✔
468
                if !g.cfg.noReplyQueries {
203✔
469
                        g.cg.WgAdd(1)
101✔
470
                        go g.replyHandler(ctx)
101✔
471
                }
101✔
472

473
                // Start the timestamp range queue processor to handle gossip
474
                // filter applications asynchronously.
475
                if !g.cfg.noTimestampQueryOption {
195✔
476
                        g.cg.WgAdd(1)
93✔
477
                        go g.processTimestampRangeQueue(ctx)
93✔
478
                }
93✔
479
        })
480
}
481

482
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
483
// exited.
484
func (g *GossipSyncer) Stop() {
99✔
485
        g.stopped.Do(func() {
198✔
486
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
99✔
487
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
99✔
488

99✔
489
                g.cg.Quit()
99✔
490
        })
99✔
491
}
492

493
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
494
// in this state, we will send a QueryChannelRange msg to our peer and advance
495
// the syncer's state to waitingQueryRangeReply.
496
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
87✔
497
        // Prepare the query msg.
87✔
498
        queryRangeMsg, err := g.genChanRangeQuery(
87✔
499
                ctx, g.genHistoricalChanRangeQuery,
87✔
500
        )
87✔
501
        if err != nil {
87✔
502
                log.Errorf("Unable to gen chan range query: %v", err)
×
503
                return
×
504
        }
×
505

506
        // Acquire a lock so the following state transition is atomic.
507
        //
508
        // NOTE: We must lock the following steps as it's possible we get an
509
        // immediate response (ReplyChannelRange) after sending the query msg.
510
        // The response is handled in ProcessQueryMsg, which requires the
511
        // current state to be waitingQueryRangeReply.
512
        g.Lock()
87✔
513
        defer g.Unlock()
87✔
514

87✔
515
        // Send the msg to the remote peer, which is non-blocking as
87✔
516
        // `sendToPeer` only queues the msg in Brontide.
87✔
517
        err = g.sendToPeer(ctx, queryRangeMsg)
87✔
518
        if err != nil {
87✔
519
                log.Errorf("Unable to send chan range query: %v", err)
×
520
                return
×
521
        }
×
522

523
        // With the message sent successfully, we'll transition into the next
524
        // state where we wait for their reply.
525
        g.setSyncState(waitingQueryRangeReply)
87✔
526
}
527

528
// channelGraphSyncer is the main goroutine responsible for ensuring that we
529
// properly channel graph state with the remote peer, and also that we only
530
// send them messages which actually pass their defined update horizon.
531
func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
101✔
532
        defer g.cg.WgDone()
101✔
533

101✔
534
        for {
438✔
535
                state := g.syncState()
337✔
536
                syncType := g.SyncType()
337✔
537

337✔
538
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
337✔
539
                        g.cfg.peerPub[:], state, syncType)
337✔
540

337✔
541
                switch state {
337✔
542
                // When we're in this state, we're trying to synchronize our
543
                // view of the network with the remote peer. We'll kick off
544
                // this sync by asking them for the set of channels they
545
                // understand, as we'll as responding to any other queries by
546
                // them.
547
                case syncingChans:
87✔
548
                        g.handleSyncingChans(ctx)
87✔
549

550
                // In this state, we've sent out our initial channel range
551
                // query and are waiting for the final response from the remote
552
                // peer before we perform a diff to see with channels they know
553
                // of that we don't.
554
                case waitingQueryRangeReply:
189✔
555
                        // We'll wait to either process a new message from the
189✔
556
                        // remote party, or exit due to the gossiper exiting,
189✔
557
                        // or us being signalled to do so.
189✔
558
                        select {
189✔
559
                        case msg := <-g.gossipMsgs:
120✔
560
                                // The remote peer is sending a response to our
120✔
561
                                // initial query, we'll collate this response,
120✔
562
                                // and see if it's the final one in the series.
120✔
563
                                // If so, we can then transition to querying
120✔
564
                                // for the new channels.
120✔
565
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
120✔
566
                                if ok {
240✔
567
                                        err := g.processChanRangeReply(
120✔
568
                                                ctx, queryReply,
120✔
569
                                        )
120✔
570
                                        if err != nil {
120✔
571
                                                log.Errorf("Unable to "+
×
572
                                                        "process chan range "+
×
573
                                                        "query: %v", err)
×
574
                                                return
×
575
                                        }
×
576
                                        continue
120✔
577
                                }
578

579
                                log.Warnf("Unexpected message: %T in state=%v",
×
580
                                        msg, state)
×
581

582
                        case <-g.cg.Done():
1✔
583
                                return
1✔
584

585
                        case <-ctx.Done():
68✔
586
                                return
68✔
587
                        }
588

589
                // We'll enter this state once we've discovered which channels
590
                // the remote party knows of that we don't yet know of
591
                // ourselves.
592
                case queryNewChannels:
6✔
593
                        // First, we'll attempt to continue our channel
6✔
594
                        // synchronization by continuing to send off another
6✔
595
                        // query chunk.
6✔
596
                        done := g.synchronizeChanIDs(ctx)
6✔
597

6✔
598
                        // If this wasn't our last query, then we'll need to
6✔
599
                        // transition to our waiting state.
6✔
600
                        if !done {
11✔
601
                                continue
5✔
602
                        }
603

604
                        // If we're fully synchronized, then we can transition
605
                        // to our terminal state.
606
                        g.setSyncState(chansSynced)
4✔
607

4✔
608
                        // Ensure that the sync manager becomes aware that the
4✔
609
                        // historical sync completed so synced_to_graph is
4✔
610
                        // updated over rpc.
4✔
611
                        g.cfg.markGraphSynced()
4✔
612

613
                // In this state, we've just sent off a new query for channels
614
                // that we don't yet know of. We'll remain in this state until
615
                // the remote party signals they've responded to our query in
616
                // totality.
617
                case waitingQueryChanReply:
5✔
618
                        // Once we've sent off our query, we'll wait for either
5✔
619
                        // an ending reply, or just another query from the
5✔
620
                        // remote peer.
5✔
621
                        select {
5✔
622
                        case msg := <-g.gossipMsgs:
5✔
623
                                // If this is the final reply to one of our
5✔
624
                                // queries, then we'll loop back into our query
5✔
625
                                // state to send of the remaining query chunks.
5✔
626
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
5✔
627
                                if ok {
10✔
628
                                        g.setSyncState(queryNewChannels)
5✔
629
                                        continue
5✔
630
                                }
631

632
                                log.Warnf("Unexpected message: %T in state=%v",
×
633
                                        msg, state)
×
634

635
                        case <-g.cg.Done():
×
636
                                return
×
637

638
                        case <-ctx.Done():
×
639
                                return
×
640
                        }
641

642
                // This is our final terminal state where we'll only reply to
643
                // any further queries by the remote peer.
644
                case chansSynced:
59✔
645
                        g.Lock()
59✔
646
                        if g.syncedSignal != nil {
70✔
647
                                close(g.syncedSignal)
11✔
648
                                g.syncedSignal = nil
11✔
649
                        }
11✔
650
                        g.Unlock()
59✔
651

59✔
652
                        // If we haven't yet sent out our update horizon, and
59✔
653
                        // we want to receive real-time channel updates, we'll
59✔
654
                        // do so now.
59✔
655
                        if g.localUpdateHorizon == nil &&
59✔
656
                                syncType.IsActiveSync() {
76✔
657

17✔
658
                                err := g.sendGossipTimestampRange(
17✔
659
                                        ctx, time.Now(), math.MaxUint32,
17✔
660
                                )
17✔
661
                                if err != nil {
17✔
662
                                        log.Errorf("Unable to send update "+
×
663
                                                "horizon to %x: %v",
×
664
                                                g.cfg.peerPub, err)
×
665
                                }
×
666
                        }
667
                        // With our horizon set, we'll simply reply to any new
668
                        // messages or process any state transitions and exit if
669
                        // needed.
670
                        fallthrough
59✔
671

672
                // Pinned peers will begin in this state, since they will
673
                // immediately receive a request to perform a historical sync.
674
                // Otherwise, we fall through after ending in chansSynced to
675
                // facilitate new requests.
676
                case syncerIdle:
62✔
677
                        select {
62✔
678
                        case req := <-g.syncTransitionReqs:
17✔
679
                                req.errChan <- g.handleSyncTransition(ctx, req)
17✔
680

681
                        case req := <-g.historicalSyncReqs:
19✔
682
                                g.handleHistoricalSync(req)
19✔
683

UNCOV
684
                        case <-g.cg.Done():
×
UNCOV
685
                                return
×
686

687
                        case <-ctx.Done():
29✔
688
                                return
29✔
689
                        }
690
                }
691
        }
692
}
693

694
// replyHandler is an event loop whose sole purpose is to reply to the remote
695
// peers queries. Our replyHandler will respond to messages generated by their
696
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
697
// the other's replyHandler, allowing the replyHandler to operate independently
698
// from the state machine maintained on the same node.
699
//
700
// NOTE: This method MUST be run as a goroutine.
701
func (g *GossipSyncer) replyHandler(ctx context.Context) {
101✔
702
        defer g.cg.WgDone()
101✔
703

101✔
704
        for {
207✔
705
                select {
106✔
706
                case msg := <-g.queryMsgs:
8✔
707
                        err := g.replyPeerQueries(ctx, msg)
8✔
708
                        switch {
8✔
709
                        case err == ErrGossipSyncerExiting:
×
710
                                return
×
711

712
                        case err == lnpeer.ErrPeerExiting:
×
713
                                return
×
714

715
                        case err != nil:
×
716
                                log.Errorf("Unable to reply to peer "+
×
717
                                        "query: %v", err)
×
718
                        }
719

720
                case <-g.cg.Done():
2✔
721
                        return
2✔
722

723
                case <-ctx.Done():
96✔
724
                        return
96✔
725
                }
726
        }
727
}
728

729
// processTimestampRangeQueue handles timestamp range messages from the queue
730
// asynchronously. This prevents blocking the gossiper when rate limiting is
731
// active and multiple peers are trying to apply gossip filters.
732
func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) {
93✔
733
        defer g.cg.WgDone()
93✔
734

93✔
735
        for {
191✔
736
                select {
98✔
737
                case msg := <-g.timestampRangeQueue:
69✔
738
                        // Process the timestamp range message. If we hit an
69✔
739
                        // error, log it but continue processing to avoid
69✔
740
                        // blocking the queue.
69✔
741
                        err := g.ApplyGossipFilter(ctx, msg)
69✔
742
                        switch {
69✔
743
                        case errors.Is(err, ErrGossipSyncerExiting):
×
744
                                return
×
745

746
                        case errors.Is(err, lnpeer.ErrPeerExiting):
×
747
                                return
×
748

749
                        case err != nil:
×
750
                                log.Errorf("Unable to apply gossip filter: %v",
×
751
                                        err)
×
752
                        }
753

754
                case <-g.cg.Done():
1✔
755
                        return
1✔
756

757
                case <-ctx.Done():
28✔
758
                        return
28✔
759
                }
760
        }
761
}
762

763
// QueueTimestampRange attempts to queue a timestamp range message for
764
// asynchronous processing. If the queue is full, it returns false to indicate
765
// the message was dropped.
766
func (g *GossipSyncer) QueueTimestampRange(
767
        msg *lnwire.GossipTimestampRange) bool {
1,965✔
768

1,965✔
769
        // If timestamp queries are disabled, don't queue the message.
1,965✔
770
        if g.cfg.noTimestampQueryOption {
1,965✔
771
                return false
×
772
        }
×
773

774
        select {
1,965✔
775
        case g.timestampRangeQueue <- msg:
776✔
776
                return true
776✔
777

778
        // Queue is full, drop the message to prevent blocking.
779
        default:
1,189✔
780
                log.Warnf("Timestamp range queue full for peer %x, "+
1,189✔
781
                        "dropping message", g.cfg.peerPub[:])
1,189✔
782
                return false
1,189✔
783
        }
784
}
785

786
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
787
// syncer and sends it to the remote peer.
788
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
789
        firstTimestamp time.Time, timestampRange uint32) error {
31✔
790

31✔
791
        endTimestamp := firstTimestamp.Add(
31✔
792
                time.Duration(timestampRange) * time.Second,
31✔
793
        )
31✔
794

31✔
795
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
31✔
796
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
31✔
797

31✔
798
        localUpdateHorizon := &lnwire.GossipTimestampRange{
31✔
799
                ChainHash:      g.cfg.chainHash,
31✔
800
                FirstTimestamp: uint32(firstTimestamp.Unix()),
31✔
801
                TimestampRange: timestampRange,
31✔
802
        }
31✔
803

31✔
804
        if err := g.sendToPeer(ctx, localUpdateHorizon); err != nil {
31✔
805
                return err
×
806
        }
×
807

808
        if firstTimestamp.Equal(zeroTimestamp) && timestampRange == 0 {
33✔
809
                g.localUpdateHorizon = nil
2✔
810
        } else {
31✔
811
                g.localUpdateHorizon = localUpdateHorizon
29✔
812
        }
29✔
813

814
        return nil
31✔
815
}
816

817
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
818
// the remote peer for its known set of channel IDs within a particular block
819
// range. This method will be called continually until the entire range has
820
// been queried for with a response received. We'll chunk our requests as
821
// required to ensure they fit into a single message. We may re-renter this
822
// state in the case that chunking is required.
823
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
9✔
824
        // If we're in this state yet there are no more new channels to query
9✔
825
        // for, then we'll transition to our final synced state and return true
9✔
826
        // to signal that we're fully synchronized.
9✔
827
        if len(g.newChansToQuery) == 0 {
13✔
828
                log.Infof("GossipSyncer(%x): no more chans to query",
4✔
829
                        g.cfg.peerPub[:])
4✔
830

4✔
831
                return true
4✔
832
        }
4✔
833

834
        // Otherwise, we'll issue our next chunked query to receive replies
835
        // for.
836
        var queryChunk []lnwire.ShortChannelID
8✔
837

8✔
838
        // If the number of channels to query for is less than the chunk size,
8✔
839
        // then we can issue a single query.
8✔
840
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
13✔
841
                queryChunk = g.newChansToQuery
5✔
842
                g.newChansToQuery = nil
5✔
843

5✔
844
        } else {
8✔
845
                // Otherwise, we'll need to only query for the next chunk.
3✔
846
                // We'll slice into our query chunk, then slide down our main
3✔
847
                // pointer down by the chunk size.
3✔
848
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
3✔
849
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
3✔
850
        }
3✔
851

852
        log.Infof("GossipSyncer(%x): querying for %v new channels",
8✔
853
                g.cfg.peerPub[:], len(queryChunk))
8✔
854

8✔
855
        // Change the state before sending the query msg.
8✔
856
        g.setSyncState(waitingQueryChanReply)
8✔
857

8✔
858
        // With our chunk obtained, we'll send over our next query, then return
8✔
859
        // false indicating that we're net yet fully synced.
8✔
860
        err := g.sendToPeer(ctx, &lnwire.QueryShortChanIDs{
8✔
861
                ChainHash:    g.cfg.chainHash,
8✔
862
                EncodingType: lnwire.EncodingSortedPlain,
8✔
863
                ShortChanIDs: queryChunk,
8✔
864
        })
8✔
865
        if err != nil {
8✔
866
                log.Errorf("Unable to sync chan IDs: %v", err)
×
867
        }
×
868

869
        return false
8✔
870
}
871

872
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
873
// considered legacy. There was a point where lnd used to include the same query
874
// over multiple replies, rather than including the portion of the query the
875
// reply is handling. We'll use this as a way of detecting whether we are
876
// communicating with a legacy node so we can properly sync with them.
877
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
878
        reply *lnwire.ReplyChannelRange) bool {
253✔
879

253✔
880
        return (reply.ChainHash == query.ChainHash &&
253✔
881
                reply.FirstBlockHeight == query.FirstBlockHeight &&
253✔
882
                reply.NumBlocks == query.NumBlocks)
253✔
883
}
253✔
884

885
// processChanRangeReply is called each time the GossipSyncer receives a new
886
// reply to the initial range query to discover new channels that it didn't
887
// previously know of.
888
func (g *GossipSyncer) processChanRangeReply(_ context.Context,
889
        msg *lnwire.ReplyChannelRange) error {
128✔
890

128✔
891
        // isStale returns whether the timestamp is too far into the past.
128✔
892
        isStale := func(timestamp time.Time) bool {
161✔
893
                return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
33✔
894
        }
33✔
895

896
        // isSkewed returns whether the timestamp is too far into the future.
897
        isSkewed := func(timestamp time.Time) bool {
151✔
898
                return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
23✔
899
        }
23✔
900

901
        // If we're not communicating with a legacy node, we'll apply some
902
        // further constraints on their reply to ensure it satisfies our query.
903
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
238✔
904
                // The first block should be within our original request.
110✔
905
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
110✔
906
                        return fmt.Errorf("reply includes channels for height "+
×
907
                                "%v prior to query %v", msg.FirstBlockHeight,
×
908
                                g.curQueryRangeMsg.FirstBlockHeight)
×
909
                }
×
910

911
                // The last block should also be. We don't need to check the
912
                // intermediate ones because they should already be in sorted
913
                // order.
914
                replyLastHeight := msg.LastBlockHeight()
110✔
915
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
916
                if replyLastHeight > queryLastHeight {
110✔
917
                        return fmt.Errorf("reply includes channels for height "+
×
918
                                "%v after query %v", replyLastHeight,
×
919
                                queryLastHeight)
×
920
                }
×
921

922
                // If we've previously received a reply for this query, look at
923
                // its last block to ensure the current reply properly follows
924
                // it.
925
                if g.prevReplyChannelRange != nil {
215✔
926
                        prevReply := g.prevReplyChannelRange
105✔
927
                        prevReplyLastHeight := prevReply.LastBlockHeight()
105✔
928

105✔
929
                        // The current reply can either start from the previous
105✔
930
                        // reply's last block, if there are still more channels
105✔
931
                        // for the same block, or the block after.
105✔
932
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
105✔
933
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
105✔
934

×
935
                                return fmt.Errorf("first block of reply %v "+
×
936
                                        "does not continue from last block of "+
×
937
                                        "previous %v", msg.FirstBlockHeight,
×
938
                                        prevReplyLastHeight)
×
939
                        }
×
940
                }
941
        }
942

943
        g.prevReplyChannelRange = msg
128✔
944

128✔
945
        for i, scid := range msg.ShortChanIDs {
258✔
946
                info := graphdb.NewChannelUpdateInfo(
130✔
947
                        scid, time.Time{}, time.Time{},
130✔
948
                )
130✔
949

130✔
950
                if len(msg.Timestamps) != 0 {
145✔
951
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
15✔
952
                        info.Node1UpdateTimestamp = t1
15✔
953

15✔
954
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
15✔
955
                        info.Node2UpdateTimestamp = t2
15✔
956

15✔
957
                        // Sort out all channels with outdated or skewed
15✔
958
                        // timestamps. Both timestamps need to be out of
15✔
959
                        // boundaries for us to skip the channel and not query
15✔
960
                        // it later on.
15✔
961
                        switch {
15✔
962
                        case isStale(info.Node1UpdateTimestamp) &&
963
                                isStale(info.Node2UpdateTimestamp):
2✔
964

2✔
965
                                continue
2✔
966

967
                        case isSkewed(info.Node1UpdateTimestamp) &&
968
                                isSkewed(info.Node2UpdateTimestamp):
2✔
969

2✔
970
                                continue
2✔
971

972
                        case isStale(info.Node1UpdateTimestamp) &&
973
                                isSkewed(info.Node2UpdateTimestamp):
2✔
974

2✔
975
                                continue
2✔
976

977
                        case isStale(info.Node2UpdateTimestamp) &&
978
                                isSkewed(info.Node1UpdateTimestamp):
2✔
979

2✔
980
                                continue
2✔
981
                        }
982
                }
983

984
                g.bufferedChanRangeReplies = append(
122✔
985
                        g.bufferedChanRangeReplies, info,
122✔
986
                )
122✔
987
        }
988

989
        switch g.cfg.encodingType {
128✔
990
        case lnwire.EncodingSortedPlain:
128✔
991
                g.numChanRangeRepliesRcvd++
128✔
992
        case lnwire.EncodingSortedZlib:
×
993
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
994
        default:
×
995
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
996
        }
997

998
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
128✔
999
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
128✔
1000

128✔
1001
        // If this isn't the last response and we can continue to receive more,
128✔
1002
        // then we can exit as we've already buffered the latest portion of the
128✔
1003
        // streaming reply.
128✔
1004
        maxReplies := g.cfg.maxQueryChanRangeReplies
128✔
1005
        switch {
128✔
1006
        // If we're communicating with a legacy node, we'll need to look at the
1007
        // complete field.
1008
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
18✔
1009
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
21✔
1010
                        return nil
3✔
1011
                }
3✔
1012

1013
        // Otherwise, we'll look at the reply's height range.
1014
        default:
110✔
1015
                replyLastHeight := msg.LastBlockHeight()
110✔
1016
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
110✔
1017

110✔
1018
                // TODO(wilmer): This might require some padding if the remote
110✔
1019
                // node is not aware of the last height we sent them, i.e., is
110✔
1020
                // behind a few blocks from us.
110✔
1021
                if replyLastHeight < queryLastHeight &&
110✔
1022
                        g.numChanRangeRepliesRcvd < maxReplies {
215✔
1023

105✔
1024
                        return nil
105✔
1025
                }
105✔
1026
        }
1027

1028
        log.Infof("GossipSyncer(%x): filtering through %v chans",
20✔
1029
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
20✔
1030

20✔
1031
        // Otherwise, this is the final response, so we'll now check to see
20✔
1032
        // which channels they know of that we don't.
20✔
1033
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
20✔
1034
                g.cfg.chainHash, g.bufferedChanRangeReplies,
20✔
1035
                g.cfg.isStillZombieChannel,
20✔
1036
        )
20✔
1037
        if err != nil {
20✔
1038
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
1039
        }
×
1040

1041
        // As we've received the entirety of the reply, we no longer need to
1042
        // hold on to the set of buffered replies or the original query that
1043
        // prompted the replies, so we'll let that be garbage collected now.
1044
        g.curQueryRangeMsg = nil
20✔
1045
        g.prevReplyChannelRange = nil
20✔
1046
        g.bufferedChanRangeReplies = nil
20✔
1047
        g.numChanRangeRepliesRcvd = 0
20✔
1048

20✔
1049
        // If there aren't any channels that we don't know of, then we can
20✔
1050
        // switch straight to our terminal state.
20✔
1051
        if len(newChans) == 0 {
37✔
1052
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
17✔
1053
                        g.cfg.peerPub[:])
17✔
1054

17✔
1055
                g.setSyncState(chansSynced)
17✔
1056

17✔
1057
                // Ensure that the sync manager becomes aware that the
17✔
1058
                // historical sync completed so synced_to_graph is updated over
17✔
1059
                // rpc.
17✔
1060
                g.cfg.markGraphSynced()
17✔
1061
                return nil
17✔
1062
        }
17✔
1063

1064
        // Otherwise, we'll set the set of channels that we need to query for
1065
        // the next state, and also transition our state.
1066
        g.newChansToQuery = newChans
6✔
1067
        g.setSyncState(queryNewChannels)
6✔
1068

6✔
1069
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
6✔
1070
                g.cfg.peerPub[:], len(newChans))
6✔
1071

6✔
1072
        return nil
6✔
1073
}
1074

1075
// genChanRangeQuery generates the initial message we'll send to the remote
1076
// party when we're kicking off the channel graph synchronization upon
1077
// connection. The historicalQuery boolean can be used to generate a query from
1078
// the genesis block of the chain.
1079
func (g *GossipSyncer) genChanRangeQuery(ctx context.Context,
1080
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
91✔
1081

91✔
1082
        // First, we'll query our channel graph time series for its highest
91✔
1083
        // known channel ID.
91✔
1084
        newestChan, err := g.cfg.channelSeries.HighestChanID(
91✔
1085
                ctx, g.cfg.chainHash,
91✔
1086
        )
91✔
1087
        if err != nil {
91✔
1088
                return nil, err
×
1089
        }
×
1090

1091
        // Once we have the chan ID of the newest, we'll obtain the block height
1092
        // of the channel, then subtract our default horizon to ensure we don't
1093
        // miss any channels. By default, we go back 1 day from the newest
1094
        // channel, unless we're attempting a historical sync, where we'll
1095
        // actually start from the genesis block instead.
1096
        var startHeight uint32
91✔
1097
        switch {
91✔
1098
        case historicalQuery:
22✔
1099
                fallthrough
22✔
1100
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
23✔
1101
                startHeight = 0
23✔
1102
        default:
68✔
1103
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
68✔
1104
        }
1105

1106
        // Determine the number of blocks to request based on our best height.
1107
        // We'll take into account any potential underflows and explicitly set
1108
        // numBlocks to its minimum value of 1 if so.
1109
        bestHeight := g.cfg.bestHeight()
91✔
1110
        numBlocks := bestHeight - startHeight
91✔
1111
        if int64(numBlocks) < 1 {
91✔
1112
                numBlocks = 1
×
1113
        }
×
1114

1115
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
91✔
1116
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
91✔
1117

91✔
1118
        // Finally, we'll craft the channel range query, using our starting
91✔
1119
        // height, then asking for all known channels to the foreseeable end of
91✔
1120
        // the main chain.
91✔
1121
        query := &lnwire.QueryChannelRange{
91✔
1122
                ChainHash:        g.cfg.chainHash,
91✔
1123
                FirstBlockHeight: startHeight,
91✔
1124
                NumBlocks:        numBlocks,
91✔
1125
        }
91✔
1126

91✔
1127
        if !g.cfg.noTimestampQueryOption {
174✔
1128
                query.QueryOptions = lnwire.NewTimestampQueryOption()
83✔
1129
        }
83✔
1130

1131
        g.curQueryRangeMsg = query
91✔
1132

91✔
1133
        return query, nil
91✔
1134
}
1135

1136
// replyPeerQueries is called in response to any query by the remote peer.
1137
// We'll examine our state and send back our best response.
1138
func (g *GossipSyncer) replyPeerQueries(ctx context.Context,
1139
        msg lnwire.Message) error {
8✔
1140

8✔
1141
        switch msg := msg.(type) {
8✔
1142

1143
        // In this state, we'll also handle any incoming channel range queries
1144
        // from the remote peer as they're trying to sync their state as well.
1145
        case *lnwire.QueryChannelRange:
6✔
1146
                return g.replyChanRangeQuery(ctx, msg)
6✔
1147

1148
        // If the remote peer skips straight to requesting new channels that
1149
        // they don't know of, then we'll ensure that we also handle this case.
1150
        case *lnwire.QueryShortChanIDs:
5✔
1151
                return g.replyShortChanIDs(ctx, msg)
5✔
1152

1153
        default:
×
1154
                return fmt.Errorf("unknown message: %T", msg)
×
1155
        }
1156
}
1157

1158
// replyChanRangeQuery will be dispatched in response to a channel range query
1159
// by the remote node. We'll query the channel time series for channels that
1160
// meet the channel range, then chunk our responses to the remote node. We also
1161
// ensure that our final fragment carries the "complete" bit to indicate the
1162
// end of our streaming response.
1163
func (g *GossipSyncer) replyChanRangeQuery(ctx context.Context,
1164
        query *lnwire.QueryChannelRange) error {
12✔
1165

12✔
1166
        // Before responding, we'll check to ensure that the remote peer is
12✔
1167
        // querying for the same chain that we're on. If not, we'll send back a
12✔
1168
        // response with a complete value of zero to indicate we're on a
12✔
1169
        // different chain.
12✔
1170
        if g.cfg.chainHash != query.ChainHash {
13✔
1171
                log.Warnf("Remote peer requested QueryChannelRange for "+
1✔
1172
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1173
                        g.cfg.chainHash)
1✔
1174

1✔
1175
                return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
1✔
1176
                        ChainHash:        query.ChainHash,
1✔
1177
                        FirstBlockHeight: query.FirstBlockHeight,
1✔
1178
                        NumBlocks:        query.NumBlocks,
1✔
1179
                        Complete:         0,
1✔
1180
                        EncodingType:     g.cfg.encodingType,
1✔
1181
                        ShortChanIDs:     nil,
1✔
1182
                })
1✔
1183
        }
1✔
1184

1185
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
11✔
1186
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
11✔
1187
                query.NumBlocks)
11✔
1188

11✔
1189
        // Check if the query asked for timestamps. We will only serve
11✔
1190
        // timestamps if this has not been disabled with
11✔
1191
        // noTimestampQueryOption.
11✔
1192
        withTimestamps := query.WithTimestamps() &&
11✔
1193
                !g.cfg.noTimestampQueryOption
11✔
1194

11✔
1195
        // Next, we'll consult the time series to obtain the set of known
11✔
1196
        // channel ID's that match their query.
11✔
1197
        startBlock := query.FirstBlockHeight
11✔
1198
        endBlock := query.LastBlockHeight()
11✔
1199
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
11✔
1200
                query.ChainHash, startBlock, endBlock, withTimestamps,
11✔
1201
        )
11✔
1202
        if err != nil {
11✔
1203
                return err
×
1204
        }
×
1205

1206
        // TODO(roasbeef): means can't send max uint above?
1207
        //  * or make internal 64
1208

1209
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1210
        // this as there's a transport message size limit which we'll need to
1211
        // adhere to. We also need to make sure all of our replies cover the
1212
        // expected range of the query.
1213
        sendReplyForChunk := func(channelChunk []graphdb.ChannelUpdateInfo,
11✔
1214
                firstHeight, lastHeight uint32, finalChunk bool) error {
27✔
1215

16✔
1216
                // The number of blocks contained in the current chunk (the
16✔
1217
                // total span) is the difference between the last channel ID and
16✔
1218
                // the first in the range. We add one as even if all channels
16✔
1219
                // returned are in the same block, we need to count that.
16✔
1220
                numBlocks := lastHeight - firstHeight + 1
16✔
1221
                complete := uint8(0)
16✔
1222
                if finalChunk {
27✔
1223
                        complete = 1
11✔
1224
                }
11✔
1225

1226
                var timestamps lnwire.Timestamps
16✔
1227
                if withTimestamps {
19✔
1228
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
3✔
1229
                }
3✔
1230

1231
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
16✔
1232
                for i, info := range channelChunk {
33✔
1233
                        scids[i] = info.ShortChannelID
17✔
1234

17✔
1235
                        if !withTimestamps {
31✔
1236
                                continue
14✔
1237
                        }
1238

1239
                        timestamps[i].Timestamp1 = uint32(
3✔
1240
                                info.Node1UpdateTimestamp.Unix(),
3✔
1241
                        )
3✔
1242

3✔
1243
                        timestamps[i].Timestamp2 = uint32(
3✔
1244
                                info.Node2UpdateTimestamp.Unix(),
3✔
1245
                        )
3✔
1246
                }
1247

1248
                return g.sendToPeerSync(ctx, &lnwire.ReplyChannelRange{
16✔
1249
                        ChainHash:        query.ChainHash,
16✔
1250
                        NumBlocks:        numBlocks,
16✔
1251
                        FirstBlockHeight: firstHeight,
16✔
1252
                        Complete:         complete,
16✔
1253
                        EncodingType:     g.cfg.encodingType,
16✔
1254
                        ShortChanIDs:     scids,
16✔
1255
                        Timestamps:       timestamps,
16✔
1256
                })
16✔
1257
        }
1258

1259
        var (
11✔
1260
                firstHeight  = query.FirstBlockHeight
11✔
1261
                lastHeight   uint32
11✔
1262
                channelChunk []graphdb.ChannelUpdateInfo
11✔
1263
        )
11✔
1264

11✔
1265
        // chunkSize is the maximum number of SCIDs that we can safely put in a
11✔
1266
        // single message. If we also need to include timestamps though, then
11✔
1267
        // this number is halved since encoding two timestamps takes the same
11✔
1268
        // number of bytes as encoding an SCID.
11✔
1269
        chunkSize := g.cfg.chunkSize
11✔
1270
        if withTimestamps {
14✔
1271
                chunkSize /= 2
3✔
1272
        }
3✔
1273

1274
        for _, channelRange := range channelRanges {
28✔
1275
                channels := channelRange.Channels
17✔
1276
                numChannels := int32(len(channels))
17✔
1277
                numLeftToAdd := chunkSize - int32(len(channelChunk))
17✔
1278

17✔
1279
                // Include the current block in the ongoing chunk if it can fit
17✔
1280
                // and move on to the next block.
17✔
1281
                if numChannels <= numLeftToAdd {
29✔
1282
                        channelChunk = append(channelChunk, channels...)
12✔
1283
                        continue
12✔
1284
                }
1285

1286
                // Otherwise, we need to send our existing channel chunk as is
1287
                // as its own reply and start a new one for the current block.
1288
                // We'll mark the end of our current chunk as the height before
1289
                // the current block to ensure the whole query range is replied
1290
                // to.
1291
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
5✔
1292
                        g.cfg.peerPub[:], len(channelChunk))
5✔
1293

5✔
1294
                lastHeight = channelRange.Height - 1
5✔
1295
                err := sendReplyForChunk(
5✔
1296
                        channelChunk, firstHeight, lastHeight, false,
5✔
1297
                )
5✔
1298
                if err != nil {
5✔
1299
                        return err
×
1300
                }
×
1301

1302
                // With the reply constructed, we'll start tallying channels for
1303
                // our next one keeping in mind our chunk size. This may result
1304
                // in channels for this block being left out from the reply, but
1305
                // this isn't an issue since we'll randomly shuffle them and we
1306
                // assume a historical gossip sync is performed at a later time.
1307
                firstHeight = channelRange.Height
5✔
1308
                finalChunkSize := numChannels
5✔
1309
                exceedsChunkSize := numChannels > chunkSize
5✔
1310
                if exceedsChunkSize {
5✔
1311
                        rand.Shuffle(len(channels), func(i, j int) {
×
1312
                                channels[i], channels[j] = channels[j], channels[i]
×
1313
                        })
×
1314
                        finalChunkSize = chunkSize
×
1315
                }
1316
                channelChunk = channels[:finalChunkSize]
5✔
1317

5✔
1318
                // Sort the chunk once again if we had to shuffle it.
5✔
1319
                if exceedsChunkSize {
5✔
1320
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1321
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1322
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1323

×
1324
                                return id1 < id2
×
1325
                        })
×
1326
                }
1327
        }
1328

1329
        // Send the remaining chunk as the final reply.
1330
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
11✔
1331
                g.cfg.peerPub[:], len(channelChunk))
11✔
1332

11✔
1333
        return sendReplyForChunk(
11✔
1334
                channelChunk, firstHeight, query.LastBlockHeight(), true,
11✔
1335
        )
11✔
1336
}
1337

1338
// replyShortChanIDs will be dispatched in response to a query by the remote
1339
// node for information concerning a set of short channel ID's. Our response
1340
// will be sent in a streaming chunked manner to ensure that we remain below
1341
// the current transport level message size.
1342
func (g *GossipSyncer) replyShortChanIDs(ctx context.Context,
1343
        query *lnwire.QueryShortChanIDs) error {
7✔
1344

7✔
1345
        // Before responding, we'll check to ensure that the remote peer is
7✔
1346
        // querying for the same chain that we're on. If not, we'll send back a
7✔
1347
        // response with a complete value of zero to indicate we're on a
7✔
1348
        // different chain.
7✔
1349
        if g.cfg.chainHash != query.ChainHash {
8✔
1350
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
1✔
1351
                        "chain=%v, we're on chain=%v", query.ChainHash,
1✔
1352
                        g.cfg.chainHash)
1✔
1353

1✔
1354
                return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
1✔
1355
                        ChainHash: query.ChainHash,
1✔
1356
                        Complete:  0,
1✔
1357
                })
1✔
1358
        }
1✔
1359

1360
        if len(query.ShortChanIDs) == 0 {
6✔
1361
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1362
                        g.cfg.peerPub[:])
×
1363
                return nil
×
1364
        }
×
1365

1366
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
6✔
1367
                g.cfg.peerPub[:], len(query.ShortChanIDs))
6✔
1368

6✔
1369
        // Now that we know we're on the same chain, we'll query the channel
6✔
1370
        // time series for the set of messages that we know of which satisfies
6✔
1371
        // the requirement of being a chan ann, chan update, or a node ann
6✔
1372
        // related to the set of queried channels.
6✔
1373
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
6✔
1374
                query.ChainHash, query.ShortChanIDs,
6✔
1375
        )
6✔
1376
        if err != nil {
6✔
1377
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1378
                        query.ShortChanIDs[0].ToUint64(), err)
×
1379
        }
×
1380

1381
        // Reply with any messages related to those channel ID's, we'll write
1382
        // each one individually and synchronously to throttle the sends and
1383
        // perform buffering of responses in the syncer as opposed to the peer.
1384
        for _, msg := range replyMsgs {
12✔
1385
                err := g.sendToPeerSync(ctx, msg)
6✔
1386
                if err != nil {
6✔
1387
                        return err
×
1388
                }
×
1389
        }
1390

1391
        // Regardless of whether we had any messages to reply with, send over
1392
        // the sentinel message to signal that the stream has terminated.
1393
        return g.sendToPeerSync(ctx, &lnwire.ReplyShortChanIDsEnd{
6✔
1394
                ChainHash: query.ChainHash,
6✔
1395
                Complete:  1,
6✔
1396
        })
6✔
1397
}
1398

1399
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1400
// state machine. Once applied, we'll ensure that we don't forward any messages
1401
// to the peer that aren't within the time range of the filter.
1402
func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
1403
        filter *lnwire.GossipTimestampRange) error {
78✔
1404

78✔
1405
        g.Lock()
78✔
1406

78✔
1407
        g.remoteUpdateHorizon = filter
78✔
1408

78✔
1409
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
78✔
1410
        endTime := startTime.Add(
78✔
1411
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
78✔
1412
        )
78✔
1413

78✔
1414
        g.Unlock()
78✔
1415

78✔
1416
        // If requested, don't reply with historical gossip data when the remote
78✔
1417
        // peer sets their gossip timestamp range.
78✔
1418
        if g.cfg.ignoreHistoricalFilters {
79✔
1419
                return nil
1✔
1420
        }
1✔
1421

1422
        // Check if a goroutine is already sending the backlog. If so, return
1423
        // early without attempting to acquire the semaphore.
1424
        if g.isSendingBacklog.Load() {
82✔
1425
                log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+
5✔
1426
                        "backlog send already in progress", g.cfg.peerPub[:])
5✔
1427
                return nil
5✔
1428
        }
5✔
1429

1430
        select {
72✔
1431
        case <-g.syncerSema:
72✔
1432
        case <-g.cg.Done():
×
1433
                return ErrGossipSyncerExiting
×
1434
        case <-ctx.Done():
×
1435
                return ctx.Err()
×
1436
        }
1437

1438
        // We don't put this in a defer because if the goroutine is launched,
1439
        // it needs to be called when the goroutine is stopped.
1440
        returnSema := func() {
83✔
1441
                g.syncerSema <- struct{}{}
11✔
1442
        }
11✔
1443

1444
        // Now that the remote peer has applied their filter, we'll query the
1445
        // database for all the messages that are beyond this filter.
1446
        newUpdatestoSend := g.cfg.channelSeries.UpdatesInHorizon(
72✔
1447
                g.cfg.chainHash, startTime, endTime,
72✔
1448
        )
72✔
1449

72✔
1450
        // Create a pull-based iterator so we can check if there are any
72✔
1451
        // updates before launching the goroutine.
72✔
1452
        next, stop := iter.Pull2(newUpdatestoSend)
72✔
1453

72✔
1454
        // Check if we have any updates to send by attempting to get the first
72✔
1455
        // message.
72✔
1456
        firstMsg, firstErr, ok := next()
72✔
1457
        if firstErr != nil {
72✔
1458
                stop()
×
1459
                returnSema()
×
1460
                return firstErr
×
1461
        }
×
1462

1463
        log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
11✔
1464
                "start=%v, end=%v, has_updates=%v", g.cfg.peerPub[:],
11✔
1465
                startTime, endTime, ok)
11✔
1466

11✔
1467
        // If we don't have any to send, then we can return early.
11✔
1468
        if !ok {
20✔
1469
                stop()
9✔
1470
                returnSema()
9✔
1471
                return nil
9✔
1472
        }
9✔
1473

1474
        // Set the atomic flag to indicate we're starting to send the backlog.
1475
        // If the swap fails, it means another goroutine is already active, so
1476
        // we return early.
1477
        if !g.isSendingBacklog.CompareAndSwap(false, true) {
5✔
1478
                returnSema()
×
1479
                log.Debugf("GossipSyncer(%x): another goroutine already "+
×
1480
                        "sending backlog, skipping", g.cfg.peerPub[:])
×
1481

×
1482
                return nil
×
1483
        }
×
1484

1485
        // We'll conclude by launching a goroutine to send out any updates.
1486
        // The goroutine takes ownership of the iterator.
1487
        g.cg.WgAdd(1)
5✔
1488
        go func() {
10✔
1489
                defer g.cg.WgDone()
5✔
1490
                defer returnSema()
5✔
1491
                defer g.isSendingBacklog.Store(false)
5✔
1492
                defer stop()
5✔
1493

5✔
1494
                // Send the first message we already pulled.
5✔
1495
                err := g.sendToPeerSync(ctx, firstMsg)
5✔
1496
                switch {
5✔
1497
                case errors.Is(err, ErrGossipSyncerExiting):
×
1498
                        return
×
1499

1500
                case errors.Is(err, lnpeer.ErrPeerExiting):
×
1501
                        return
×
1502

1503
                case err != nil:
×
1504
                        log.Errorf("Unable to send message for "+
×
1505
                                "peer catch up: %v", err)
×
1506
                }
1507

1508
                // Continue with the rest of the messages using the same pull
1509
                // iterator.
1510
                for {
10✔
1511
                        msg, err, ok := next()
5✔
1512
                        if !ok {
10✔
1513
                                return
5✔
1514
                        }
5✔
1515

1516
                        // If the iterator yielded an error, log it and
1517
                        // continue.
1518
                        if err != nil {
3✔
1519
                                log.Errorf("Error fetching update for peer "+
×
1520
                                        "catch up: %v", err)
×
1521
                                continue
×
1522
                        }
1523

1524
                        err = g.sendToPeerSync(ctx, msg)
3✔
1525
                        switch {
3✔
1526
                        case err == ErrGossipSyncerExiting:
×
1527
                                return
×
1528

1529
                        case err == lnpeer.ErrPeerExiting:
×
1530
                                return
×
1531

1532
                        case err != nil:
×
1533
                                log.Errorf("Unable to send message for "+
×
1534
                                        "peer catch up: %v", err)
×
1535
                        }
1536
                }
1537
        }()
1538

1539
        return nil
5✔
1540
}
1541

1542
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1543
// iff the message is within the bounds of their set gossip filter. If the peer
1544
// doesn't have a gossip filter set, then no messages will be forwarded.
1545
func (g *GossipSyncer) FilterGossipMsgs(ctx context.Context,
1546
        msgs ...msgWithSenders) {
5✔
1547

5✔
1548
        // If the peer doesn't have an update horizon set, then we won't send
5✔
1549
        // it any new update messages.
5✔
1550
        if g.remoteUpdateHorizon == nil {
9✔
1551
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
4✔
1552
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
4✔
1553
                return
4✔
1554
        }
4✔
1555

1556
        // If we've been signaled to exit, or are exiting, then we'll stop
1557
        // short.
1558
        select {
4✔
1559
        case <-g.cg.Done():
×
1560
                return
×
1561
        case <-ctx.Done():
×
1562
                return
×
1563
        default:
4✔
1564
        }
1565

1566
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1567
        // gossiper on peer termination to signal peer disconnect?
1568

1569
        var err error
4✔
1570

4✔
1571
        // Before we filter out the messages, we'll construct an index over the
4✔
1572
        // set of channel announcements and channel updates. This will allow us
4✔
1573
        // to quickly check if we should forward a chan ann, based on the known
4✔
1574
        // channel updates for a channel.
4✔
1575
        chanUpdateIndex := make(
4✔
1576
                map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate1,
4✔
1577
        )
4✔
1578
        for _, msg := range msgs {
17✔
1579
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate1)
13✔
1580
                if !ok {
23✔
1581
                        continue
10✔
1582
                }
1583

1584
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
6✔
1585
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
6✔
1586
                )
6✔
1587
        }
1588

1589
        // We'll construct a helper function that we'll us below to determine
1590
        // if a given messages passes the gossip msg filter.
1591
        g.Lock()
4✔
1592
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
4✔
1593
        endTime := startTime.Add(
4✔
1594
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
4✔
1595
        )
4✔
1596
        g.Unlock()
4✔
1597

4✔
1598
        passesFilter := func(timeStamp uint32) bool {
17✔
1599
                t := time.Unix(int64(timeStamp), 0)
13✔
1600
                return t.Equal(startTime) ||
13✔
1601
                        (t.After(startTime) && t.Before(endTime))
13✔
1602
        }
13✔
1603

1604
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
4✔
1605
        for _, msg := range msgs {
17✔
1606
                // If the target peer is the peer that sent us this message,
13✔
1607
                // then we'll exit early as we don't need to filter this
13✔
1608
                // message.
13✔
1609
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
16✔
1610
                        continue
3✔
1611
                }
1612

1613
                switch msg := msg.msg.(type) {
13✔
1614

1615
                // For each channel announcement message, we'll only send this
1616
                // message if the channel updates for the channel are between
1617
                // our time range.
1618
                case *lnwire.ChannelAnnouncement1:
7✔
1619
                        // First, we'll check if the channel updates are in
7✔
1620
                        // this message batch.
7✔
1621
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
7✔
1622
                        if !ok {
11✔
1623
                                // If not, we'll attempt to query the database
4✔
1624
                                // to see if we know of the updates.
4✔
1625
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
4✔
1626
                                        g.cfg.chainHash, msg.ShortChannelID,
4✔
1627
                                )
4✔
1628
                                if err != nil {
4✔
1629
                                        log.Warnf("no channel updates found for "+
×
1630
                                                "short_chan_id=%v",
×
1631
                                                msg.ShortChannelID)
×
1632
                                        continue
×
1633
                                }
1634
                        }
1635

1636
                        for _, chanUpdate := range chanUpdates {
14✔
1637
                                if passesFilter(chanUpdate.Timestamp) {
11✔
1638
                                        msgsToSend = append(msgsToSend, msg)
4✔
1639
                                        break
4✔
1640
                                }
1641
                        }
1642

1643
                        if len(chanUpdates) == 0 {
10✔
1644
                                msgsToSend = append(msgsToSend, msg)
3✔
1645
                        }
3✔
1646

1647
                // For each channel update, we'll only send if it the timestamp
1648
                // is between our time range.
1649
                case *lnwire.ChannelUpdate1:
6✔
1650
                        if passesFilter(msg.Timestamp) {
10✔
1651
                                msgsToSend = append(msgsToSend, msg)
4✔
1652
                        }
4✔
1653

1654
                // Similarly, we only send node announcements if the update
1655
                // timestamp ifs between our set gossip filter time range.
1656
                case *lnwire.NodeAnnouncement1:
6✔
1657
                        if passesFilter(msg.Timestamp) {
10✔
1658
                                msgsToSend = append(msgsToSend, msg)
4✔
1659
                        }
4✔
1660
                }
1661
        }
1662

1663
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
4✔
1664
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
4✔
1665

4✔
1666
        if len(msgsToSend) == 0 {
7✔
1667
                return
3✔
1668
        }
3✔
1669

1670
        if err = g.sendToPeer(ctx, msgsToSend...); err != nil {
4✔
1671
                log.Errorf("unable to send gossip msgs: %v", err)
×
1672
        }
×
1673

1674
}
1675

1676
// ProcessQueryMsg is used by outside callers to pass new channel time series
1677
// queries to the internal processing goroutine.
1678
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
115✔
1679
        var msgChan chan lnwire.Message
115✔
1680
        switch msg.(type) {
115✔
1681
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
3✔
1682
                msgChan = g.queryMsgs
3✔
1683

1684
        // Reply messages should only be expected in states where we're waiting
1685
        // for a reply.
1686
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
115✔
1687
                g.Lock()
115✔
1688
                syncState := g.syncState()
115✔
1689
                g.Unlock()
115✔
1690

115✔
1691
                if syncState != waitingQueryRangeReply &&
115✔
1692
                        syncState != waitingQueryChanReply {
116✔
1693

1✔
1694
                        return fmt.Errorf("unexpected msg %T received in "+
1✔
1695
                                "state %v", msg, syncState)
1✔
1696
                }
1✔
1697
                msgChan = g.gossipMsgs
114✔
1698

1699
        default:
×
1700
                msgChan = g.gossipMsgs
×
1701
        }
1702

1703
        select {
114✔
1704
        case msgChan <- msg:
114✔
1705
        case <-peerQuit:
×
1706
        case <-g.cg.Done():
×
1707
        }
1708

1709
        return nil
114✔
1710
}
1711

1712
// setSyncState sets the gossip syncer's state to the given state.
1713
func (g *GossipSyncer) setSyncState(state syncerState) {
161✔
1714
        atomic.StoreUint32(&g.state, uint32(state))
161✔
1715
}
161✔
1716

1717
// syncState returns the current syncerState of the target GossipSyncer.
1718
func (g *GossipSyncer) syncState() syncerState {
556✔
1719
        return syncerState(atomic.LoadUint32(&g.state))
556✔
1720
}
556✔
1721

1722
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1723
// a signal for when the GossipSyncer has reached its chansSynced state.
1724
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
17✔
1725
        g.Lock()
17✔
1726
        defer g.Unlock()
17✔
1727

17✔
1728
        syncedSignal := make(chan struct{})
17✔
1729

17✔
1730
        syncState := syncerState(atomic.LoadUint32(&g.state))
17✔
1731
        if syncState == chansSynced {
21✔
1732
                close(syncedSignal)
4✔
1733
                return syncedSignal
4✔
1734
        }
4✔
1735

1736
        g.syncedSignal = syncedSignal
15✔
1737
        return g.syncedSignal
15✔
1738
}
1739

1740
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1741
// sync type to a new one.
1742
//
1743
// NOTE: This can only be done once the gossip syncer has reached its final
1744
// chansSynced state.
1745
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
17✔
1746
        errChan := make(chan error, 1)
17✔
1747
        select {
17✔
1748
        case g.syncTransitionReqs <- &syncTransitionReq{
1749
                newSyncType: newSyncType,
1750
                errChan:     errChan,
1751
        }:
17✔
1752
        case <-time.After(syncTransitionTimeout):
×
1753
                return ErrSyncTransitionTimeout
×
1754
        case <-g.cg.Done():
×
1755
                return ErrGossipSyncerExiting
×
1756
        }
1757

1758
        select {
17✔
1759
        case err := <-errChan:
17✔
1760
                return err
17✔
1761
        case <-g.cg.Done():
×
1762
                return ErrGossipSyncerExiting
×
1763
        }
1764
}
1765

1766
// handleSyncTransition handles a new sync type transition request.
1767
//
1768
// NOTE: The gossip syncer might have another sync state as a result of this
1769
// transition.
1770
func (g *GossipSyncer) handleSyncTransition(ctx context.Context,
1771
        req *syncTransitionReq) error {
17✔
1772

17✔
1773
        // Return early from any NOP sync transitions.
17✔
1774
        syncType := g.SyncType()
17✔
1775
        if syncType == req.newSyncType {
17✔
1776
                return nil
×
1777
        }
×
1778

1779
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
17✔
1780
                g.cfg.peerPub, syncType, req.newSyncType)
17✔
1781

17✔
1782
        var (
17✔
1783
                firstTimestamp time.Time
17✔
1784
                timestampRange uint32
17✔
1785
        )
17✔
1786

17✔
1787
        switch req.newSyncType {
17✔
1788
        // If an active sync has been requested, then we should resume receiving
1789
        // new graph updates from the remote peer.
1790
        case ActiveSync, PinnedSync:
15✔
1791
                firstTimestamp = time.Now()
15✔
1792
                timestampRange = math.MaxUint32
15✔
1793

1794
        // If a PassiveSync transition has been requested, then we should no
1795
        // longer receive any new updates from the remote peer. We can do this
1796
        // by setting our update horizon to a range in the past ensuring no
1797
        // graph updates match the timestamp range.
1798
        case PassiveSync:
2✔
1799
                firstTimestamp = zeroTimestamp
2✔
1800
                timestampRange = 0
2✔
1801

1802
        default:
×
1803
                return fmt.Errorf("unhandled sync transition %v",
×
1804
                        req.newSyncType)
×
1805
        }
1806

1807
        err := g.sendGossipTimestampRange(ctx, firstTimestamp, timestampRange)
17✔
1808
        if err != nil {
17✔
1809
                return fmt.Errorf("unable to send local update horizon: %w",
×
1810
                        err)
×
1811
        }
×
1812

1813
        g.setSyncType(req.newSyncType)
17✔
1814

17✔
1815
        return nil
17✔
1816
}
1817

1818
// setSyncType sets the gossip syncer's sync type to the given type.
1819
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
70✔
1820
        atomic.StoreUint32(&g.syncType, uint32(syncType))
70✔
1821
}
70✔
1822

1823
// SyncType returns the current SyncerType of the target GossipSyncer.
1824
func (g *GossipSyncer) SyncType() SyncerType {
426✔
1825
        return SyncerType(atomic.LoadUint32(&g.syncType))
426✔
1826
}
426✔
1827

1828
// historicalSync sends a request to the gossip syncer to perofmr a historical
1829
// sync.
1830
//
1831
// NOTE: This can only be done once the gossip syncer has reached its final
1832
// chansSynced state.
1833
func (g *GossipSyncer) historicalSync() error {
19✔
1834
        done := make(chan struct{})
19✔
1835

19✔
1836
        select {
19✔
1837
        case g.historicalSyncReqs <- &historicalSyncReq{
1838
                doneChan: done,
1839
        }:
19✔
1840
        case <-time.After(syncTransitionTimeout):
×
1841
                return ErrSyncTransitionTimeout
×
1842
        case <-g.cg.Done():
×
1843
                return ErrGossiperShuttingDown
×
1844
        }
1845

1846
        select {
19✔
1847
        case <-done:
19✔
1848
                return nil
19✔
1849
        case <-g.cg.Done():
×
1850
                return ErrGossiperShuttingDown
×
1851
        }
1852
}
1853

1854
// handleHistoricalSync handles a request to the gossip syncer to perform a
1855
// historical sync.
1856
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
19✔
1857
        // We'll go back to our initial syncingChans state in order to request
19✔
1858
        // the remote peer to give us all of the channel IDs they know of
19✔
1859
        // starting from the genesis block.
19✔
1860
        g.genHistoricalChanRangeQuery = true
19✔
1861
        g.setSyncState(syncingChans)
19✔
1862
        close(req.doneChan)
19✔
1863
}
19✔
1864

1865
// sendToPeer sends a variadic number of messages to the remote peer. This
1866
// method should not block while waiting for sends to be written to the wire.
1867
func (g *GossipSyncer) sendToPeer(ctx context.Context,
1868
        msgs ...lnwire.Message) error {
121✔
1869

121✔
1870
        return g.sendMsgRateLimited(ctx, false, msgs...)
121✔
1871
}
121✔
1872

1873
// sendToPeerSync sends a variadic number of messages to the remote peer,
1874
// blocking until all messages have been sent successfully or a write error is
1875
// encountered.
1876
func (g *GossipSyncer) sendToPeerSync(ctx context.Context,
1877
        msgs ...lnwire.Message) error {
26✔
1878

26✔
1879
        return g.sendMsgRateLimited(ctx, true, msgs...)
26✔
1880
}
26✔
1881

1882
// sendMsgRateLimited sends a variadic number of messages to the remote peer,
1883
// applying our per-peer rate limit before each send. The sync boolean
1884
// determines if the send is blocking or not.
1885
func (g *GossipSyncer) sendMsgRateLimited(ctx context.Context, sync bool,
1886
        msgs ...lnwire.Message) error {
144✔
1887

144✔
1888
        for _, msg := range msgs {
290✔
1889
                err := maybeRateLimitMsg(
146✔
1890
                        ctx, g.rateLimiter, g.cfg.peerPub, msg, g.cg.Done(),
146✔
1891
                )
146✔
1892
                if err != nil {
146✔
1893
                        return err
×
1894
                }
×
1895

1896
                err = g.cfg.sendMsg(ctx, sync, msg)
146✔
1897
                if err != nil {
146✔
1898
                        return err
×
1899
                }
×
1900
        }
1901

1902
        return nil
144✔
1903
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc