• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.19
/discovery/syncer.go
1
package discovery
2

3
import (
4
        "errors"
5
        "fmt"
6
        "math"
7
        "math/rand"
8
        "sort"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/chaincfg/chainhash"
14
        "github.com/lightningnetwork/lnd/channeldb"
15
        "github.com/lightningnetwork/lnd/lnpeer"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
        "golang.org/x/time/rate"
18
)
19

20
// SyncerType encapsulates the different types of syncing mechanisms for a
21
// gossip syncer.
22
type SyncerType uint8
23

24
const (
25
        // ActiveSync denotes that a gossip syncer:
26
        //
27
        // 1. Should not attempt to synchronize with the remote peer for
28
        //    missing channels.
29
        // 2. Should respond to queries from the remote peer.
30
        // 3. Should receive new updates from the remote peer.
31
        //
32
        // They are started in a chansSynced state in order to accomplish their
33
        // responsibilities above.
34
        ActiveSync SyncerType = iota
35

36
        // PassiveSync denotes that a gossip syncer:
37
        //
38
        // 1. Should not attempt to synchronize with the remote peer for
39
        //    missing channels.
40
        // 2. Should respond to queries from the remote peer.
41
        // 3. Should not receive new updates from the remote peer.
42
        //
43
        // They are started in a chansSynced state in order to accomplish their
44
        // responsibilities above.
45
        PassiveSync
46

47
        // PinnedSync denotes an ActiveSync that doesn't count towards the
48
        // default active syncer limits and is always active throughout the
49
        // duration of the peer's connection. Each pinned syncer will begin by
50
        // performing a historical sync to ensure we are well synchronized with
51
        // their routing table.
52
        PinnedSync
53
)
54

55
// String returns a human readable string describing the target SyncerType.
56
func (t SyncerType) String() string {
3✔
57
        switch t {
3✔
58
        case ActiveSync:
3✔
59
                return "ActiveSync"
3✔
60
        case PassiveSync:
3✔
61
                return "PassiveSync"
3✔
62
        case PinnedSync:
3✔
63
                return "PinnedSync"
3✔
64
        default:
×
65
                return fmt.Sprintf("unknown sync type %d", t)
×
66
        }
67
}
68

69
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
70
// allowing new gossip messages to be received from the peer.
71
func (t SyncerType) IsActiveSync() bool {
3✔
72
        switch t {
3✔
73
        case ActiveSync, PinnedSync:
3✔
74
                return true
3✔
75
        default:
3✔
76
                return false
3✔
77
        }
78
}
79

80
// syncerState is an enum that represents the current state of the GossipSyncer.
81
// As the syncer is a state machine, we'll gate our actions based off of the
82
// current state and the next incoming message.
83
type syncerState uint32
84

85
const (
86
        // syncingChans is the default state of the GossipSyncer. We start in
87
        // this state when a new peer first connects and we don't yet know if
88
        // we're fully synchronized.
89
        syncingChans syncerState = iota
90

91
        // waitingQueryRangeReply is the second main phase of the GossipSyncer.
92
        // We enter this state after we send out our first QueryChannelRange
93
        // reply. We'll stay in this state until the remote party sends us a
94
        // ReplyShortChanIDsEnd message that indicates they've responded to our
95
        // query entirely. After this state, we'll transition to
96
        // waitingQueryChanReply after we send out requests for all the new
97
        // chan ID's to us.
98
        waitingQueryRangeReply
99

100
        // queryNewChannels is the third main phase of the GossipSyncer.  In
101
        // this phase we'll send out all of our QueryShortChanIDs messages in
102
        // response to the new channels that we don't yet know about.
103
        queryNewChannels
104

105
        // waitingQueryChanReply is the fourth main phase of the GossipSyncer.
106
        // We enter this phase once we've sent off a query chink to the remote
107
        // peer.  We'll stay in this phase until we receive a
108
        // ReplyShortChanIDsEnd message which indicates that the remote party
109
        // has responded to all of our requests.
110
        waitingQueryChanReply
111

112
        // chansSynced is the terminal stage of the GossipSyncer. Once we enter
113
        // this phase, we'll send out our update horizon, which filters out the
114
        // set of channel updates that we're interested in. In this state,
115
        // we'll be able to accept any outgoing messages from the
116
        // AuthenticatedGossiper, and decide if we should forward them to our
117
        // target peer based on its update horizon.
118
        chansSynced
119

120
        // syncerIdle is a state in which the gossip syncer can handle external
121
        // requests to transition or perform historical syncs. It is used as the
122
        // initial state for pinned syncers, as well as a fallthrough case for
123
        // chansSynced allowing fully synced peers to facilitate requests.
124
        syncerIdle
125
)
126

127
// String returns a human readable string describing the target syncerState.
128
func (s syncerState) String() string {
3✔
129
        switch s {
3✔
130
        case syncingChans:
3✔
131
                return "syncingChans"
3✔
132

133
        case waitingQueryRangeReply:
3✔
134
                return "waitingQueryRangeReply"
3✔
135

136
        case queryNewChannels:
3✔
137
                return "queryNewChannels"
3✔
138

139
        case waitingQueryChanReply:
3✔
140
                return "waitingQueryChanReply"
3✔
141

142
        case chansSynced:
3✔
143
                return "chansSynced"
3✔
144

145
        case syncerIdle:
3✔
146
                return "syncerIdle"
3✔
147

148
        default:
×
149
                return "UNKNOWN STATE"
×
150
        }
151
}
152

153
const (
154
        // DefaultMaxUndelayedQueryReplies specifies how many gossip queries we
155
        // will respond to immediately before starting to delay responses.
156
        DefaultMaxUndelayedQueryReplies = 10
157

158
        // DefaultDelayedQueryReplyInterval is the length of time we will wait
159
        // before responding to gossip queries after replying to
160
        // maxUndelayedQueryReplies queries.
161
        DefaultDelayedQueryReplyInterval = 5 * time.Second
162

163
        // maxQueryChanRangeReplies specifies the default limit of replies to
164
        // process for a single QueryChannelRange request.
165
        maxQueryChanRangeReplies = 500
166

167
        // maxQueryChanRangeRepliesZlibFactor specifies the factor applied to
168
        // the maximum number of replies allowed for zlib encoded replies.
169
        maxQueryChanRangeRepliesZlibFactor = 4
170

171
        // chanRangeQueryBuffer is the number of blocks back that we'll go when
172
        // asking the remote peer for their any channels they know of beyond
173
        // our highest known channel ID.
174
        chanRangeQueryBuffer = 144
175

176
        // syncTransitionTimeout is the default timeout in which we'll wait up
177
        // to when attempting to perform a sync transition.
178
        syncTransitionTimeout = 5 * time.Second
179

180
        // requestBatchSize is the maximum number of channels we will query the
181
        // remote peer for in a QueryShortChanIDs message.
182
        requestBatchSize = 500
183

184
        // filterSemaSize is the capacity of gossipFilterSema.
185
        filterSemaSize = 5
186
)
187

188
var (
189
        // encodingTypeToChunkSize maps an encoding type, to the max number of
190
        // short chan ID's using the encoding type that we can fit into a
191
        // single message safely.
192
        encodingTypeToChunkSize = map[lnwire.QueryEncoding]int32{
193
                lnwire.EncodingSortedPlain: 8000,
194
        }
195

196
        // ErrGossipSyncerExiting signals that the syncer has been killed.
197
        ErrGossipSyncerExiting = errors.New("gossip syncer exiting")
198

199
        // ErrSyncTransitionTimeout is an error returned when we've timed out
200
        // attempting to perform a sync transition.
201
        ErrSyncTransitionTimeout = errors.New("timed out attempting to " +
202
                "transition sync type")
203

204
        // zeroTimestamp is the timestamp we'll use when we want to indicate to
205
        // peers that we do not want to receive any new graph updates.
206
        zeroTimestamp time.Time
207
)
208

209
// syncTransitionReq encapsulates a request for a gossip syncer sync transition.
210
type syncTransitionReq struct {
211
        newSyncType SyncerType
212
        errChan     chan error
213
}
214

215
// historicalSyncReq encapsulates a request for a gossip syncer to perform a
216
// historical sync.
217
type historicalSyncReq struct {
218
        // doneChan is a channel that serves as a signal and is closed to ensure
219
        // the historical sync is attempted by the time we return to the caller.
220
        doneChan chan struct{}
221
}
222

223
// gossipSyncerCfg is a struct that packages all the information a GossipSyncer
224
// needs to carry out its duties.
225
type gossipSyncerCfg struct {
226
        // chainHash is the chain that this syncer is responsible for.
227
        chainHash chainhash.Hash
228

229
        // peerPub is the public key of the peer we're syncing with, serialized
230
        // in compressed format.
231
        peerPub [33]byte
232

233
        // channelSeries is the primary interface that we'll use to generate
234
        // our queries and respond to the queries of the remote peer.
235
        channelSeries ChannelGraphTimeSeries
236

237
        // encodingType is the current encoding type we're aware of. Requests
238
        // with different encoding types will be rejected.
239
        encodingType lnwire.QueryEncoding
240

241
        // chunkSize is the max number of short chan IDs using the syncer's
242
        // encoding type that we can fit into a single message safely.
243
        chunkSize int32
244

245
        // batchSize is the max number of channels the syncer will query from
246
        // the remote node in a single QueryShortChanIDs request.
247
        batchSize int32
248

249
        // sendToPeer sends a variadic number of messages to the remote peer.
250
        // This method should not block while waiting for sends to be written
251
        // to the wire.
252
        sendToPeer func(...lnwire.Message) error
253

254
        // sendToPeerSync sends a variadic number of messages to the remote
255
        // peer, blocking until all messages have been sent successfully or a
256
        // write error is encountered.
257
        sendToPeerSync func(...lnwire.Message) error
258

259
        // maxUndelayedQueryReplies specifies how many gossip queries we will
260
        // respond to immediately before starting to delay responses.
261
        maxUndelayedQueryReplies int
262

263
        // delayedQueryReplyInterval is the length of time we will wait before
264
        // responding to gossip queries after replying to
265
        // maxUndelayedQueryReplies queries.
266
        delayedQueryReplyInterval time.Duration
267

268
        // noSyncChannels will prevent the GossipSyncer from spawning a
269
        // channelGraphSyncer, meaning we will not try to reconcile unknown
270
        // channels with the remote peer.
271
        noSyncChannels bool
272

273
        // noReplyQueries will prevent the GossipSyncer from spawning a
274
        // replyHandler, meaning we will not reply to queries from our remote
275
        // peer.
276
        noReplyQueries bool
277

278
        // noTimestampQueryOption will prevent the GossipSyncer from querying
279
        // timestamps of announcement messages from the peer, and it will
280
        // prevent it from responding to timestamp queries.
281
        noTimestampQueryOption bool
282

283
        // ignoreHistoricalFilters will prevent syncers from replying with
284
        // historical data when the remote peer sets a gossip_timestamp_range.
285
        // This prevents ranges with old start times from causing us to dump the
286
        // graph on connect.
287
        ignoreHistoricalFilters bool
288

289
        // bestHeight returns the latest height known of the chain.
290
        bestHeight func() uint32
291

292
        // markGraphSynced updates the SyncManager's perception of whether we
293
        // have completed at least one historical sync.
294
        markGraphSynced func()
295

296
        // maxQueryChanRangeReplies is the maximum number of replies we'll allow
297
        // for a single QueryChannelRange request.
298
        maxQueryChanRangeReplies uint32
299

300
        // isStillZombieChannel takes the timestamps of the latest channel
301
        // updates for a channel and returns true if the channel should be
302
        // considered a zombie based on these timestamps.
303
        isStillZombieChannel func(time.Time, time.Time) bool
304
}
305

306
// GossipSyncer is a struct that handles synchronizing the channel graph state
307
// with a remote peer. The GossipSyncer implements a state machine that will
308
// progressively ensure we're synchronized with the channel state of the remote
309
// node. Once both nodes have been synchronized, we'll use an update filter to
310
// filter out which messages should be sent to a remote peer based on their
311
// update horizon. If the update horizon isn't specified, then we won't send
312
// them any channel updates at all.
313
type GossipSyncer struct {
314
        started sync.Once
315
        stopped sync.Once
316

317
        // state is the current state of the GossipSyncer.
318
        //
319
        // NOTE: This variable MUST be used atomically.
320
        state uint32
321

322
        // syncType denotes the SyncerType the gossip syncer is currently
323
        // exercising.
324
        //
325
        // NOTE: This variable MUST be used atomically.
326
        syncType uint32
327

328
        // remoteUpdateHorizon is the update horizon of the remote peer. We'll
329
        // use this to properly filter out any messages.
330
        remoteUpdateHorizon *lnwire.GossipTimestampRange
331

332
        // localUpdateHorizon is our local update horizon, we'll use this to
333
        // determine if we've already sent out our update.
334
        localUpdateHorizon *lnwire.GossipTimestampRange
335

336
        // syncTransitions is a channel through which new sync type transition
337
        // requests will be sent through. These requests should only be handled
338
        // when the gossip syncer is in a chansSynced state to ensure its state
339
        // machine behaves as expected.
340
        syncTransitionReqs chan *syncTransitionReq
341

342
        // historicalSyncReqs is a channel that serves as a signal for the
343
        // gossip syncer to perform a historical sync. These can only be done
344
        // once the gossip syncer is in a chansSynced state to ensure its state
345
        // machine behaves as expected.
346
        historicalSyncReqs chan *historicalSyncReq
347

348
        // genHistoricalChanRangeQuery when true signals to the gossip syncer
349
        // that it should request the remote peer for all of its known channel
350
        // IDs starting from the genesis block of the chain. This can only
351
        // happen if the gossip syncer receives a request to attempt a
352
        // historical sync. It can be unset if the syncer ever transitions from
353
        // PassiveSync to ActiveSync.
354
        genHistoricalChanRangeQuery bool
355

356
        // gossipMsgs is a channel that all responses to our queries from the
357
        // target peer will be sent over, these will be read by the
358
        // channelGraphSyncer.
359
        gossipMsgs chan lnwire.Message
360

361
        // queryMsgs is a channel that all queries from the remote peer will be
362
        // received over, these will be read by the replyHandler.
363
        queryMsgs chan lnwire.Message
364

365
        // curQueryRangeMsg keeps track of the latest QueryChannelRange message
366
        // we've sent to a peer to ensure we've consumed all expected replies.
367
        // This field is primarily used within the waitingQueryChanReply state.
368
        curQueryRangeMsg *lnwire.QueryChannelRange
369

370
        // prevReplyChannelRange keeps track of the previous ReplyChannelRange
371
        // message we've received from a peer to ensure they've fully replied to
372
        // our query by ensuring they covered our requested block range. This
373
        // field is primarily used within the waitingQueryChanReply state.
374
        prevReplyChannelRange *lnwire.ReplyChannelRange
375

376
        // bufferedChanRangeReplies is used in the waitingQueryChanReply to
377
        // buffer all the chunked response to our query.
378
        bufferedChanRangeReplies []channeldb.ChannelUpdateInfo
379

380
        // numChanRangeRepliesRcvd is used to track the number of replies
381
        // received as part of a QueryChannelRange. This field is primarily used
382
        // within the waitingQueryChanReply state.
383
        numChanRangeRepliesRcvd uint32
384

385
        // newChansToQuery is used to pass the set of channels we should query
386
        // for from the waitingQueryChanReply state to the queryNewChannels
387
        // state.
388
        newChansToQuery []lnwire.ShortChannelID
389

390
        cfg gossipSyncerCfg
391

392
        // rateLimiter dictates the frequency with which we will reply to gossip
393
        // queries from a peer. This is used to delay responses to peers to
394
        // prevent DOS vulnerabilities if they are spamming with an unreasonable
395
        // number of queries.
396
        rateLimiter *rate.Limiter
397

398
        // syncedSignal is a channel that, if set, will be closed when the
399
        // GossipSyncer reaches its terminal chansSynced state.
400
        syncedSignal chan struct{}
401

402
        sync.Mutex
403

404
        gossipFilterSema chan struct{}
405

406
        quit chan struct{}
407
        wg   sync.WaitGroup
408
}
409

410
// newGossipSyncer returns a new instance of the GossipSyncer populated using
411
// the passed config.
412
func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer {
3✔
413
        // If no parameter was specified for max undelayed query replies, set it
3✔
414
        // to the default of 5 queries.
3✔
415
        if cfg.maxUndelayedQueryReplies <= 0 {
3✔
416
                cfg.maxUndelayedQueryReplies = DefaultMaxUndelayedQueryReplies
×
417
        }
×
418

419
        // If no parameter was specified for delayed query reply interval, set
420
        // to the default of 5 seconds.
421
        if cfg.delayedQueryReplyInterval <= 0 {
3✔
422
                cfg.delayedQueryReplyInterval = DefaultDelayedQueryReplyInterval
×
423
        }
×
424

425
        // Construct a rate limiter that will govern how frequently we reply to
426
        // gossip queries from this peer. The limiter will automatically adjust
427
        // during periods of quiescence, and increase the reply interval under
428
        // load.
429
        interval := rate.Every(cfg.delayedQueryReplyInterval)
3✔
430
        rateLimiter := rate.NewLimiter(
3✔
431
                interval, cfg.maxUndelayedQueryReplies,
3✔
432
        )
3✔
433

3✔
434
        filterSema := make(chan struct{}, filterSemaSize)
3✔
435
        for i := 0; i < filterSemaSize; i++ {
6✔
436
                filterSema <- struct{}{}
3✔
437
        }
3✔
438

439
        return &GossipSyncer{
3✔
440
                cfg:                cfg,
3✔
441
                rateLimiter:        rateLimiter,
3✔
442
                syncTransitionReqs: make(chan *syncTransitionReq),
3✔
443
                historicalSyncReqs: make(chan *historicalSyncReq),
3✔
444
                gossipMsgs:         make(chan lnwire.Message, 100),
3✔
445
                queryMsgs:          make(chan lnwire.Message, 100),
3✔
446
                gossipFilterSema:   filterSema,
3✔
447
                quit:               make(chan struct{}),
3✔
448
        }
3✔
449
}
450

451
// Start starts the GossipSyncer and any goroutines that it needs to carry out
452
// its duties.
453
func (g *GossipSyncer) Start() {
3✔
454
        g.started.Do(func() {
6✔
455
                log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
3✔
456

3✔
457
                // TODO(conner): only spawn channelGraphSyncer if remote
3✔
458
                // supports gossip queries, and only spawn replyHandler if we
3✔
459
                // advertise support
3✔
460
                if !g.cfg.noSyncChannels {
6✔
461
                        g.wg.Add(1)
3✔
462
                        go g.channelGraphSyncer()
3✔
463
                }
3✔
464
                if !g.cfg.noReplyQueries {
6✔
465
                        g.wg.Add(1)
3✔
466
                        go g.replyHandler()
3✔
467
                }
3✔
468
        })
469
}
470

471
// Stop signals the GossipSyncer for a graceful exit, then waits until it has
472
// exited.
473
func (g *GossipSyncer) Stop() {
3✔
474
        g.stopped.Do(func() {
6✔
475
                log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
3✔
476
                defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])
3✔
477

3✔
478
                close(g.quit)
3✔
479
                g.wg.Wait()
3✔
480
        })
3✔
481
}
482

483
// channelGraphSyncer is the main goroutine responsible for ensuring that we
484
// properly channel graph state with the remote peer, and also that we only
485
// send them messages which actually pass their defined update horizon.
486
func (g *GossipSyncer) channelGraphSyncer() {
3✔
487
        defer g.wg.Done()
3✔
488

3✔
489
        for {
6✔
490
                state := g.syncState()
3✔
491
                syncType := g.SyncType()
3✔
492

3✔
493
                log.Debugf("GossipSyncer(%x): state=%v, type=%v",
3✔
494
                        g.cfg.peerPub[:], state, syncType)
3✔
495

3✔
496
                switch state {
3✔
497
                // When we're in this state, we're trying to synchronize our
498
                // view of the network with the remote peer. We'll kick off
499
                // this sync by asking them for the set of channels they
500
                // understand, as we'll as responding to any other queries by
501
                // them.
502
                case syncingChans:
3✔
503
                        // If we're in this state, then we'll send the remote
3✔
504
                        // peer our opening QueryChannelRange message.
3✔
505
                        queryRangeMsg, err := g.genChanRangeQuery(
3✔
506
                                g.genHistoricalChanRangeQuery,
3✔
507
                        )
3✔
508
                        if err != nil {
3✔
509
                                log.Errorf("Unable to gen chan range "+
×
510
                                        "query: %v", err)
×
511
                                return
×
512
                        }
×
513

514
                        err = g.cfg.sendToPeer(queryRangeMsg)
3✔
515
                        if err != nil {
3✔
516
                                log.Errorf("Unable to send chan range "+
×
517
                                        "query: %v", err)
×
518
                                return
×
519
                        }
×
520

521
                        // With the message sent successfully, we'll transition
522
                        // into the next state where we wait for their reply.
523
                        g.setSyncState(waitingQueryRangeReply)
3✔
524

525
                // In this state, we've sent out our initial channel range
526
                // query and are waiting for the final response from the remote
527
                // peer before we perform a diff to see with channels they know
528
                // of that we don't.
529
                case waitingQueryRangeReply:
3✔
530
                        // We'll wait to either process a new message from the
3✔
531
                        // remote party, or exit due to the gossiper exiting,
3✔
532
                        // or us being signalled to do so.
3✔
533
                        select {
3✔
534
                        case msg := <-g.gossipMsgs:
3✔
535
                                // The remote peer is sending a response to our
3✔
536
                                // initial query, we'll collate this response,
3✔
537
                                // and see if it's the final one in the series.
3✔
538
                                // If so, we can then transition to querying
3✔
539
                                // for the new channels.
3✔
540
                                queryReply, ok := msg.(*lnwire.ReplyChannelRange)
3✔
541
                                if ok {
6✔
542
                                        err := g.processChanRangeReply(queryReply)
3✔
543
                                        if err != nil {
3✔
544
                                                log.Errorf("Unable to "+
×
545
                                                        "process chan range "+
×
546
                                                        "query: %v", err)
×
547
                                                return
×
548
                                        }
×
549
                                        continue
3✔
550
                                }
551

552
                                log.Warnf("Unexpected message: %T in state=%v",
×
553
                                        msg, state)
×
554

555
                        case <-g.quit:
×
556
                                return
×
557
                        }
558

559
                // We'll enter this state once we've discovered which channels
560
                // the remote party knows of that we don't yet know of
561
                // ourselves.
562
                case queryNewChannels:
3✔
563
                        // First, we'll attempt to continue our channel
3✔
564
                        // synchronization by continuing to send off another
3✔
565
                        // query chunk.
3✔
566
                        done, err := g.synchronizeChanIDs()
3✔
567
                        if err != nil {
3✔
568
                                log.Errorf("Unable to sync chan IDs: %v", err)
×
569
                        }
×
570

571
                        // If this wasn't our last query, then we'll need to
572
                        // transition to our waiting state.
573
                        if !done {
6✔
574
                                g.setSyncState(waitingQueryChanReply)
3✔
575
                                continue
3✔
576
                        }
577

578
                        // If we're fully synchronized, then we can transition
579
                        // to our terminal state.
580
                        g.setSyncState(chansSynced)
3✔
581

3✔
582
                        // Ensure that the sync manager becomes aware that the
3✔
583
                        // historical sync completed so synced_to_graph is
3✔
584
                        // updated over rpc.
3✔
585
                        g.cfg.markGraphSynced()
3✔
586

587
                // In this state, we've just sent off a new query for channels
588
                // that we don't yet know of. We'll remain in this state until
589
                // the remote party signals they've responded to our query in
590
                // totality.
591
                case waitingQueryChanReply:
3✔
592
                        // Once we've sent off our query, we'll wait for either
3✔
593
                        // an ending reply, or just another query from the
3✔
594
                        // remote peer.
3✔
595
                        select {
3✔
596
                        case msg := <-g.gossipMsgs:
3✔
597
                                // If this is the final reply to one of our
3✔
598
                                // queries, then we'll loop back into our query
3✔
599
                                // state to send of the remaining query chunks.
3✔
600
                                _, ok := msg.(*lnwire.ReplyShortChanIDsEnd)
3✔
601
                                if ok {
6✔
602
                                        g.setSyncState(queryNewChannels)
3✔
603
                                        continue
3✔
604
                                }
605

606
                                log.Warnf("Unexpected message: %T in state=%v",
×
607
                                        msg, state)
×
608

609
                        case <-g.quit:
×
610
                                return
×
611
                        }
612

613
                // This is our final terminal state where we'll only reply to
614
                // any further queries by the remote peer.
615
                case chansSynced:
3✔
616
                        g.Lock()
3✔
617
                        if g.syncedSignal != nil {
6✔
618
                                close(g.syncedSignal)
3✔
619
                                g.syncedSignal = nil
3✔
620
                        }
3✔
621
                        g.Unlock()
3✔
622

3✔
623
                        // If we haven't yet sent out our update horizon, and
3✔
624
                        // we want to receive real-time channel updates, we'll
3✔
625
                        // do so now.
3✔
626
                        if g.localUpdateHorizon == nil &&
3✔
627
                                syncType.IsActiveSync() {
6✔
628

3✔
629
                                err := g.sendGossipTimestampRange(
3✔
630
                                        time.Now(), math.MaxUint32,
3✔
631
                                )
3✔
632
                                if err != nil {
3✔
633
                                        log.Errorf("Unable to send update "+
×
634
                                                "horizon to %x: %v",
×
635
                                                g.cfg.peerPub, err)
×
636
                                }
×
637
                        }
638
                        // With our horizon set, we'll simply reply to any new
639
                        // messages or process any state transitions and exit if
640
                        // needed.
641
                        fallthrough
3✔
642

643
                // Pinned peers will begin in this state, since they will
644
                // immediately receive a request to perform a historical sync.
645
                // Otherwise, we fall through after ending in chansSynced to
646
                // facilitate new requests.
647
                case syncerIdle:
3✔
648
                        select {
3✔
649
                        case req := <-g.syncTransitionReqs:
3✔
650
                                req.errChan <- g.handleSyncTransition(req)
3✔
651

652
                        case req := <-g.historicalSyncReqs:
3✔
653
                                g.handleHistoricalSync(req)
3✔
654

655
                        case <-g.quit:
3✔
656
                                return
3✔
657
                        }
658
                }
659
        }
660
}
661

662
// replyHandler is an event loop whose sole purpose is to reply to the remote
663
// peers queries. Our replyHandler will respond to messages generated by their
664
// channelGraphSyncer, and vice versa. Each party's channelGraphSyncer drives
665
// the other's replyHandler, allowing the replyHandler to operate independently
666
// from the state machine maintained on the same node.
667
//
668
// NOTE: This method MUST be run as a goroutine.
669
func (g *GossipSyncer) replyHandler() {
3✔
670
        defer g.wg.Done()
3✔
671

3✔
672
        for {
6✔
673
                select {
3✔
674
                case msg := <-g.queryMsgs:
3✔
675
                        err := g.replyPeerQueries(msg)
3✔
676
                        switch {
3✔
677
                        case err == ErrGossipSyncerExiting:
×
678
                                return
×
679

680
                        case err == lnpeer.ErrPeerExiting:
1✔
681
                                return
1✔
682

683
                        case err != nil:
×
684
                                log.Errorf("Unable to reply to peer "+
×
685
                                        "query: %v", err)
×
686
                        }
687

688
                case <-g.quit:
3✔
689
                        return
3✔
690
                }
691
        }
692
}
693

694
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
695
// syncer and sends it to the remote peer.
696
func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
697
        timestampRange uint32) error {
3✔
698

3✔
699
        endTimestamp := firstTimestamp.Add(
3✔
700
                time.Duration(timestampRange) * time.Second,
3✔
701
        )
3✔
702

3✔
703
        log.Infof("GossipSyncer(%x): applying gossipFilter(start=%v, end=%v)",
3✔
704
                g.cfg.peerPub[:], firstTimestamp, endTimestamp)
3✔
705

3✔
706
        localUpdateHorizon := &lnwire.GossipTimestampRange{
3✔
707
                ChainHash:      g.cfg.chainHash,
3✔
708
                FirstTimestamp: uint32(firstTimestamp.Unix()),
3✔
709
                TimestampRange: timestampRange,
3✔
710
        }
3✔
711

3✔
712
        if err := g.cfg.sendToPeer(localUpdateHorizon); err != nil {
3✔
713
                return err
×
714
        }
×
715

716
        if firstTimestamp == zeroTimestamp && timestampRange == 0 {
3✔
717
                g.localUpdateHorizon = nil
×
718
        } else {
3✔
719
                g.localUpdateHorizon = localUpdateHorizon
3✔
720
        }
3✔
721

722
        return nil
3✔
723
}
724

725
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query
726
// the remote peer for its known set of channel IDs within a particular block
727
// range. This method will be called continually until the entire range has
728
// been queried for with a response received. We'll chunk our requests as
729
// required to ensure they fit into a single message. We may re-renter this
730
// state in the case that chunking is required.
731
func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
3✔
732
        // If we're in this state yet there are no more new channels to query
3✔
733
        // for, then we'll transition to our final synced state and return true
3✔
734
        // to signal that we're fully synchronized.
3✔
735
        if len(g.newChansToQuery) == 0 {
6✔
736
                log.Infof("GossipSyncer(%x): no more chans to query",
3✔
737
                        g.cfg.peerPub[:])
3✔
738
                return true, nil
3✔
739
        }
3✔
740

741
        // Otherwise, we'll issue our next chunked query to receive replies
742
        // for.
743
        var queryChunk []lnwire.ShortChannelID
3✔
744

3✔
745
        // If the number of channels to query for is less than the chunk size,
3✔
746
        // then we can issue a single query.
3✔
747
        if int32(len(g.newChansToQuery)) < g.cfg.batchSize {
6✔
748
                queryChunk = g.newChansToQuery
3✔
749
                g.newChansToQuery = nil
3✔
750

3✔
751
        } else {
3✔
752
                // Otherwise, we'll need to only query for the next chunk.
×
753
                // We'll slice into our query chunk, then slide down our main
×
754
                // pointer down by the chunk size.
×
755
                queryChunk = g.newChansToQuery[:g.cfg.batchSize]
×
756
                g.newChansToQuery = g.newChansToQuery[g.cfg.batchSize:]
×
757
        }
×
758

759
        log.Infof("GossipSyncer(%x): querying for %v new channels",
3✔
760
                g.cfg.peerPub[:], len(queryChunk))
3✔
761

3✔
762
        // With our chunk obtained, we'll send over our next query, then return
3✔
763
        // false indicating that we're net yet fully synced.
3✔
764
        err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
3✔
765
                ChainHash:    g.cfg.chainHash,
3✔
766
                EncodingType: lnwire.EncodingSortedPlain,
3✔
767
                ShortChanIDs: queryChunk,
3✔
768
        })
3✔
769

3✔
770
        return false, err
3✔
771
}
772

773
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
774
// considered legacy. There was a point where lnd used to include the same query
775
// over multiple replies, rather than including the portion of the query the
776
// reply is handling. We'll use this as a way of detecting whether we are
777
// communicating with a legacy node so we can properly sync with them.
778
func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
779
        reply *lnwire.ReplyChannelRange) bool {
3✔
780

3✔
781
        return (reply.ChainHash == query.ChainHash &&
3✔
782
                reply.FirstBlockHeight == query.FirstBlockHeight &&
3✔
783
                reply.NumBlocks == query.NumBlocks)
3✔
784
}
3✔
785

786
// processChanRangeReply is called each time the GossipSyncer receives a new
787
// reply to the initial range query to discover new channels that it didn't
788
// previously know of.
789
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
3✔
790
        // If we're not communicating with a legacy node, we'll apply some
3✔
791
        // further constraints on their reply to ensure it satisfies our query.
3✔
792
        if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
3✔
793
                // The first block should be within our original request.
×
794
                if msg.FirstBlockHeight < g.curQueryRangeMsg.FirstBlockHeight {
×
795
                        return fmt.Errorf("reply includes channels for height "+
×
796
                                "%v prior to query %v", msg.FirstBlockHeight,
×
797
                                g.curQueryRangeMsg.FirstBlockHeight)
×
798
                }
×
799

800
                // The last block should also be. We don't need to check the
801
                // intermediate ones because they should already be in sorted
802
                // order.
803
                replyLastHeight := msg.LastBlockHeight()
×
804
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
×
805
                if replyLastHeight > queryLastHeight {
×
806
                        return fmt.Errorf("reply includes channels for height "+
×
807
                                "%v after query %v", replyLastHeight,
×
808
                                queryLastHeight)
×
809
                }
×
810

811
                // If we've previously received a reply for this query, look at
812
                // its last block to ensure the current reply properly follows
813
                // it.
814
                if g.prevReplyChannelRange != nil {
×
815
                        prevReply := g.prevReplyChannelRange
×
816
                        prevReplyLastHeight := prevReply.LastBlockHeight()
×
817

×
818
                        // The current reply can either start from the previous
×
819
                        // reply's last block, if there are still more channels
×
820
                        // for the same block, or the block after.
×
821
                        if msg.FirstBlockHeight != prevReplyLastHeight &&
×
822
                                msg.FirstBlockHeight != prevReplyLastHeight+1 {
×
823

×
824
                                return fmt.Errorf("first block of reply %v "+
×
825
                                        "does not continue from last block of "+
×
826
                                        "previous %v", msg.FirstBlockHeight,
×
827
                                        prevReplyLastHeight)
×
828
                        }
×
829
                }
830
        }
831

832
        g.prevReplyChannelRange = msg
3✔
833
        if len(msg.Timestamps) != 0 &&
3✔
834
                len(msg.Timestamps) != len(msg.ShortChanIDs) {
3✔
835

×
836
                return fmt.Errorf("number of timestamps not equal to " +
×
837
                        "number of SCIDs")
×
838
        }
×
839

840
        for i, scid := range msg.ShortChanIDs {
6✔
841
                info := channeldb.ChannelUpdateInfo{
3✔
842
                        ShortChannelID: scid,
3✔
843
                }
3✔
844

3✔
845
                if len(msg.Timestamps) != 0 {
6✔
846
                        t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
3✔
847
                        info.Node1UpdateTimestamp = t1
3✔
848

3✔
849
                        t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
3✔
850
                        info.Node2UpdateTimestamp = t2
3✔
851
                }
3✔
852

853
                g.bufferedChanRangeReplies = append(
3✔
854
                        g.bufferedChanRangeReplies, info,
3✔
855
                )
3✔
856
        }
857

858
        switch g.cfg.encodingType {
3✔
859
        case lnwire.EncodingSortedPlain:
3✔
860
                g.numChanRangeRepliesRcvd++
3✔
861
        case lnwire.EncodingSortedZlib:
×
862
                g.numChanRangeRepliesRcvd += maxQueryChanRangeRepliesZlibFactor
×
863
        default:
×
864
                return fmt.Errorf("unhandled encoding type %v", g.cfg.encodingType)
×
865
        }
866

867
        log.Infof("GossipSyncer(%x): buffering chan range reply of size=%v",
3✔
868
                g.cfg.peerPub[:], len(msg.ShortChanIDs))
3✔
869

3✔
870
        // If this isn't the last response and we can continue to receive more,
3✔
871
        // then we can exit as we've already buffered the latest portion of the
3✔
872
        // streaming reply.
3✔
873
        maxReplies := g.cfg.maxQueryChanRangeReplies
3✔
874
        switch {
3✔
875
        // If we're communicating with a legacy node, we'll need to look at the
876
        // complete field.
877
        case isLegacyReplyChannelRange(g.curQueryRangeMsg, msg):
3✔
878
                if msg.Complete == 0 && g.numChanRangeRepliesRcvd < maxReplies {
3✔
879
                        return nil
×
880
                }
×
881

882
        // Otherwise, we'll look at the reply's height range.
883
        default:
×
884
                replyLastHeight := msg.LastBlockHeight()
×
885
                queryLastHeight := g.curQueryRangeMsg.LastBlockHeight()
×
886

×
887
                // TODO(wilmer): This might require some padding if the remote
×
888
                // node is not aware of the last height we sent them, i.e., is
×
889
                // behind a few blocks from us.
×
890
                if replyLastHeight < queryLastHeight &&
×
891
                        g.numChanRangeRepliesRcvd < maxReplies {
×
892

×
893
                        return nil
×
894
                }
×
895
        }
896

897
        log.Infof("GossipSyncer(%x): filtering through %v chans",
3✔
898
                g.cfg.peerPub[:], len(g.bufferedChanRangeReplies))
3✔
899

3✔
900
        // Otherwise, this is the final response, so we'll now check to see
3✔
901
        // which channels they know of that we don't.
3✔
902
        newChans, err := g.cfg.channelSeries.FilterKnownChanIDs(
3✔
903
                g.cfg.chainHash, g.bufferedChanRangeReplies,
3✔
904
                g.cfg.isStillZombieChannel,
3✔
905
        )
3✔
906
        if err != nil {
3✔
907
                return fmt.Errorf("unable to filter chan ids: %w", err)
×
908
        }
×
909

910
        // As we've received the entirety of the reply, we no longer need to
911
        // hold on to the set of buffered replies or the original query that
912
        // prompted the replies, so we'll let that be garbage collected now.
913
        g.curQueryRangeMsg = nil
3✔
914
        g.prevReplyChannelRange = nil
3✔
915
        g.bufferedChanRangeReplies = nil
3✔
916
        g.numChanRangeRepliesRcvd = 0
3✔
917

3✔
918
        // If there aren't any channels that we don't know of, then we can
3✔
919
        // switch straight to our terminal state.
3✔
920
        if len(newChans) == 0 {
6✔
921
                log.Infof("GossipSyncer(%x): remote peer has no new chans",
3✔
922
                        g.cfg.peerPub[:])
3✔
923

3✔
924
                g.setSyncState(chansSynced)
3✔
925

3✔
926
                // Ensure that the sync manager becomes aware that the
3✔
927
                // historical sync completed so synced_to_graph is updated over
3✔
928
                // rpc.
3✔
929
                g.cfg.markGraphSynced()
3✔
930
                return nil
3✔
931
        }
3✔
932

933
        // Otherwise, we'll set the set of channels that we need to query for
934
        // the next state, and also transition our state.
935
        g.newChansToQuery = newChans
3✔
936
        g.setSyncState(queryNewChannels)
3✔
937

3✔
938
        log.Infof("GossipSyncer(%x): starting query for %v new chans",
3✔
939
                g.cfg.peerPub[:], len(newChans))
3✔
940

3✔
941
        return nil
3✔
942
}
943

944
// genChanRangeQuery generates the initial message we'll send to the remote
945
// party when we're kicking off the channel graph synchronization upon
946
// connection. The historicalQuery boolean can be used to generate a query from
947
// the genesis block of the chain.
948
func (g *GossipSyncer) genChanRangeQuery(
949
        historicalQuery bool) (*lnwire.QueryChannelRange, error) {
3✔
950

3✔
951
        // First, we'll query our channel graph time series for its highest
3✔
952
        // known channel ID.
3✔
953
        newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
3✔
954
        if err != nil {
3✔
955
                return nil, err
×
956
        }
×
957

958
        // Once we have the chan ID of the newest, we'll obtain the block height
959
        // of the channel, then subtract our default horizon to ensure we don't
960
        // miss any channels. By default, we go back 1 day from the newest
961
        // channel, unless we're attempting a historical sync, where we'll
962
        // actually start from the genesis block instead.
963
        var startHeight uint32
3✔
964
        switch {
3✔
965
        case historicalQuery:
3✔
966
                fallthrough
3✔
967
        case newestChan.BlockHeight <= chanRangeQueryBuffer:
3✔
968
                startHeight = 0
3✔
969
        default:
×
970
                startHeight = newestChan.BlockHeight - chanRangeQueryBuffer
×
971
        }
972

973
        // Determine the number of blocks to request based on our best height.
974
        // We'll take into account any potential underflows and explicitly set
975
        // numBlocks to its minimum value of 1 if so.
976
        bestHeight := g.cfg.bestHeight()
3✔
977
        numBlocks := bestHeight - startHeight
3✔
978
        if int64(numBlocks) < 1 {
3✔
979
                numBlocks = 1
×
980
        }
×
981

982
        log.Infof("GossipSyncer(%x): requesting new chans from height=%v "+
3✔
983
                "and %v blocks after", g.cfg.peerPub[:], startHeight, numBlocks)
3✔
984

3✔
985
        // Finally, we'll craft the channel range query, using our starting
3✔
986
        // height, then asking for all known channels to the foreseeable end of
3✔
987
        // the main chain.
3✔
988
        query := &lnwire.QueryChannelRange{
3✔
989
                ChainHash:        g.cfg.chainHash,
3✔
990
                FirstBlockHeight: startHeight,
3✔
991
                NumBlocks:        numBlocks,
3✔
992
        }
3✔
993

3✔
994
        if !g.cfg.noTimestampQueryOption {
6✔
995
                query.QueryOptions = lnwire.NewTimestampQueryOption()
3✔
996
        }
3✔
997

998
        g.curQueryRangeMsg = query
3✔
999

3✔
1000
        return query, nil
3✔
1001
}
1002

1003
// replyPeerQueries is called in response to any query by the remote peer.
1004
// We'll examine our state and send back our best response.
1005
func (g *GossipSyncer) replyPeerQueries(msg lnwire.Message) error {
3✔
1006
        reservation := g.rateLimiter.Reserve()
3✔
1007
        delay := reservation.Delay()
3✔
1008

3✔
1009
        // If we've already replied a handful of times, we will start to delay
3✔
1010
        // responses back to the remote peer. This can help prevent DOS attacks
3✔
1011
        // where the remote peer spams us endlessly.
3✔
1012
        if delay > 0 {
3✔
1013
                log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
×
1014
                        "responding in %s", g.cfg.peerPub[:], delay)
×
1015

×
1016
                select {
×
1017
                case <-time.After(delay):
×
1018
                case <-g.quit:
×
1019
                        return ErrGossipSyncerExiting
×
1020
                }
1021
        }
1022

1023
        switch msg := msg.(type) {
3✔
1024

1025
        // In this state, we'll also handle any incoming channel range queries
1026
        // from the remote peer as they're trying to sync their state as well.
1027
        case *lnwire.QueryChannelRange:
3✔
1028
                return g.replyChanRangeQuery(msg)
3✔
1029

1030
        // If the remote peer skips straight to requesting new channels that
1031
        // they don't know of, then we'll ensure that we also handle this case.
1032
        case *lnwire.QueryShortChanIDs:
3✔
1033
                return g.replyShortChanIDs(msg)
3✔
1034

1035
        default:
×
1036
                return fmt.Errorf("unknown message: %T", msg)
×
1037
        }
1038
}
1039

1040
// replyChanRangeQuery will be dispatched in response to a channel range query
1041
// by the remote node. We'll query the channel time series for channels that
1042
// meet the channel range, then chunk our responses to the remote node. We also
1043
// ensure that our final fragment carries the "complete" bit to indicate the
1044
// end of our streaming response.
1045
func (g *GossipSyncer) replyChanRangeQuery(query *lnwire.QueryChannelRange) error {
3✔
1046
        // Before responding, we'll check to ensure that the remote peer is
3✔
1047
        // querying for the same chain that we're on. If not, we'll send back a
3✔
1048
        // response with a complete value of zero to indicate we're on a
3✔
1049
        // different chain.
3✔
1050
        if g.cfg.chainHash != query.ChainHash {
3✔
1051
                log.Warnf("Remote peer requested QueryChannelRange for "+
×
1052
                        "chain=%v, we're on chain=%v", query.ChainHash,
×
1053
                        g.cfg.chainHash)
×
1054

×
1055
                return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
×
1056
                        ChainHash:        query.ChainHash,
×
1057
                        FirstBlockHeight: query.FirstBlockHeight,
×
1058
                        NumBlocks:        query.NumBlocks,
×
1059
                        Complete:         0,
×
1060
                        EncodingType:     g.cfg.encodingType,
×
1061
                        ShortChanIDs:     nil,
×
1062
                })
×
1063
        }
×
1064

1065
        log.Infof("GossipSyncer(%x): filtering chan range: start_height=%v, "+
3✔
1066
                "num_blocks=%v", g.cfg.peerPub[:], query.FirstBlockHeight,
3✔
1067
                query.NumBlocks)
3✔
1068

3✔
1069
        // Check if the query asked for timestamps. We will only serve
3✔
1070
        // timestamps if this has not been disabled with
3✔
1071
        // noTimestampQueryOption.
3✔
1072
        withTimestamps := query.WithTimestamps() &&
3✔
1073
                !g.cfg.noTimestampQueryOption
3✔
1074

3✔
1075
        // Next, we'll consult the time series to obtain the set of known
3✔
1076
        // channel ID's that match their query.
3✔
1077
        startBlock := query.FirstBlockHeight
3✔
1078
        endBlock := query.LastBlockHeight()
3✔
1079
        channelRanges, err := g.cfg.channelSeries.FilterChannelRange(
3✔
1080
                query.ChainHash, startBlock, endBlock, withTimestamps,
3✔
1081
        )
3✔
1082
        if err != nil {
3✔
1083
                return err
×
1084
        }
×
1085

1086
        // TODO(roasbeef): means can't send max uint above?
1087
        //  * or make internal 64
1088

1089
        // We'll send our response in a streaming manner, chunk-by-chunk. We do
1090
        // this as there's a transport message size limit which we'll need to
1091
        // adhere to. We also need to make sure all of our replies cover the
1092
        // expected range of the query.
1093
        sendReplyForChunk := func(channelChunk []channeldb.ChannelUpdateInfo,
3✔
1094
                firstHeight, lastHeight uint32, finalChunk bool) error {
6✔
1095

3✔
1096
                // The number of blocks contained in the current chunk (the
3✔
1097
                // total span) is the difference between the last channel ID and
3✔
1098
                // the first in the range. We add one as even if all channels
3✔
1099
                // returned are in the same block, we need to count that.
3✔
1100
                numBlocks := lastHeight - firstHeight + 1
3✔
1101
                complete := uint8(0)
3✔
1102
                if finalChunk {
6✔
1103
                        complete = 1
3✔
1104
                }
3✔
1105

1106
                var timestamps lnwire.Timestamps
3✔
1107
                if withTimestamps {
6✔
1108
                        timestamps = make(lnwire.Timestamps, len(channelChunk))
3✔
1109
                }
3✔
1110

1111
                scids := make([]lnwire.ShortChannelID, len(channelChunk))
3✔
1112
                for i, info := range channelChunk {
6✔
1113
                        scids[i] = info.ShortChannelID
3✔
1114

3✔
1115
                        if !withTimestamps {
3✔
1116
                                continue
×
1117
                        }
1118

1119
                        timestamps[i].Timestamp1 = uint32(
3✔
1120
                                info.Node1UpdateTimestamp.Unix(),
3✔
1121
                        )
3✔
1122

3✔
1123
                        timestamps[i].Timestamp2 = uint32(
3✔
1124
                                info.Node2UpdateTimestamp.Unix(),
3✔
1125
                        )
3✔
1126
                }
1127

1128
                return g.cfg.sendToPeerSync(&lnwire.ReplyChannelRange{
3✔
1129
                        ChainHash:        query.ChainHash,
3✔
1130
                        NumBlocks:        numBlocks,
3✔
1131
                        FirstBlockHeight: firstHeight,
3✔
1132
                        Complete:         complete,
3✔
1133
                        EncodingType:     g.cfg.encodingType,
3✔
1134
                        ShortChanIDs:     scids,
3✔
1135
                        Timestamps:       timestamps,
3✔
1136
                })
3✔
1137
        }
1138

1139
        var (
3✔
1140
                firstHeight  = query.FirstBlockHeight
3✔
1141
                lastHeight   uint32
3✔
1142
                channelChunk []channeldb.ChannelUpdateInfo
3✔
1143
        )
3✔
1144

3✔
1145
        // chunkSize is the maximum number of SCIDs that we can safely put in a
3✔
1146
        // single message. If we also need to include timestamps though, then
3✔
1147
        // this number is halved since encoding two timestamps takes the same
3✔
1148
        // number of bytes as encoding an SCID.
3✔
1149
        chunkSize := g.cfg.chunkSize
3✔
1150
        if withTimestamps {
6✔
1151
                chunkSize /= 2
3✔
1152
        }
3✔
1153

1154
        for _, channelRange := range channelRanges {
6✔
1155
                channels := channelRange.Channels
3✔
1156
                numChannels := int32(len(channels))
3✔
1157
                numLeftToAdd := chunkSize - int32(len(channelChunk))
3✔
1158

3✔
1159
                // Include the current block in the ongoing chunk if it can fit
3✔
1160
                // and move on to the next block.
3✔
1161
                if numChannels <= numLeftToAdd {
6✔
1162
                        channelChunk = append(channelChunk, channels...)
3✔
1163
                        continue
3✔
1164
                }
1165

1166
                // Otherwise, we need to send our existing channel chunk as is
1167
                // as its own reply and start a new one for the current block.
1168
                // We'll mark the end of our current chunk as the height before
1169
                // the current block to ensure the whole query range is replied
1170
                // to.
1171
                log.Infof("GossipSyncer(%x): sending range chunk of size=%v",
×
1172
                        g.cfg.peerPub[:], len(channelChunk))
×
1173

×
1174
                lastHeight = channelRange.Height - 1
×
1175
                err := sendReplyForChunk(
×
1176
                        channelChunk, firstHeight, lastHeight, false,
×
1177
                )
×
1178
                if err != nil {
×
1179
                        return err
×
1180
                }
×
1181

1182
                // With the reply constructed, we'll start tallying channels for
1183
                // our next one keeping in mind our chunk size. This may result
1184
                // in channels for this block being left out from the reply, but
1185
                // this isn't an issue since we'll randomly shuffle them and we
1186
                // assume a historical gossip sync is performed at a later time.
1187
                firstHeight = channelRange.Height
×
1188
                finalChunkSize := numChannels
×
1189
                exceedsChunkSize := numChannels > chunkSize
×
1190
                if exceedsChunkSize {
×
1191
                        rand.Shuffle(len(channels), func(i, j int) {
×
1192
                                channels[i], channels[j] = channels[j], channels[i]
×
1193
                        })
×
1194
                        finalChunkSize = chunkSize
×
1195
                }
1196
                channelChunk = channels[:finalChunkSize]
×
1197

×
1198
                // Sort the chunk once again if we had to shuffle it.
×
1199
                if exceedsChunkSize {
×
1200
                        sort.Slice(channelChunk, func(i, j int) bool {
×
1201
                                id1 := channelChunk[i].ShortChannelID.ToUint64()
×
1202
                                id2 := channelChunk[j].ShortChannelID.ToUint64()
×
1203

×
1204
                                return id1 < id2
×
1205
                        })
×
1206
                }
1207
        }
1208

1209
        // Send the remaining chunk as the final reply.
1210
        log.Infof("GossipSyncer(%x): sending final chan range chunk, size=%v",
3✔
1211
                g.cfg.peerPub[:], len(channelChunk))
3✔
1212

3✔
1213
        return sendReplyForChunk(
3✔
1214
                channelChunk, firstHeight, query.LastBlockHeight(), true,
3✔
1215
        )
3✔
1216
}
1217

1218
// replyShortChanIDs will be dispatched in response to a query by the remote
1219
// node for information concerning a set of short channel ID's. Our response
1220
// will be sent in a streaming chunked manner to ensure that we remain below
1221
// the current transport level message size.
1222
func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error {
3✔
1223
        // Before responding, we'll check to ensure that the remote peer is
3✔
1224
        // querying for the same chain that we're on. If not, we'll send back a
3✔
1225
        // response with a complete value of zero to indicate we're on a
3✔
1226
        // different chain.
3✔
1227
        if g.cfg.chainHash != query.ChainHash {
3✔
1228
                log.Warnf("Remote peer requested QueryShortChanIDs for "+
×
1229
                        "chain=%v, we're on chain=%v", query.ChainHash,
×
1230
                        g.cfg.chainHash)
×
1231

×
1232
                return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
×
1233
                        ChainHash: query.ChainHash,
×
1234
                        Complete:  0,
×
1235
                })
×
1236
        }
×
1237

1238
        if len(query.ShortChanIDs) == 0 {
3✔
1239
                log.Infof("GossipSyncer(%x): ignoring query for blank short chan ID's",
×
1240
                        g.cfg.peerPub[:])
×
1241
                return nil
×
1242
        }
×
1243

1244
        log.Infof("GossipSyncer(%x): fetching chan anns for %v chans",
3✔
1245
                g.cfg.peerPub[:], len(query.ShortChanIDs))
3✔
1246

3✔
1247
        // Now that we know we're on the same chain, we'll query the channel
3✔
1248
        // time series for the set of messages that we know of which satisfies
3✔
1249
        // the requirement of being a chan ann, chan update, or a node ann
3✔
1250
        // related to the set of queried channels.
3✔
1251
        replyMsgs, err := g.cfg.channelSeries.FetchChanAnns(
3✔
1252
                query.ChainHash, query.ShortChanIDs,
3✔
1253
        )
3✔
1254
        if err != nil {
3✔
1255
                return fmt.Errorf("unable to fetch chan anns for %v..., %w",
×
1256
                        query.ShortChanIDs[0].ToUint64(), err)
×
1257
        }
×
1258

1259
        // Reply with any messages related to those channel ID's, we'll write
1260
        // each one individually and synchronously to throttle the sends and
1261
        // perform buffering of responses in the syncer as opposed to the peer.
1262
        for _, msg := range replyMsgs {
6✔
1263
                err := g.cfg.sendToPeerSync(msg)
3✔
1264
                if err != nil {
3✔
1265
                        return err
×
1266
                }
×
1267
        }
1268

1269
        // Regardless of whether we had any messages to reply with, send over
1270
        // the sentinel message to signal that the stream has terminated.
1271
        return g.cfg.sendToPeerSync(&lnwire.ReplyShortChanIDsEnd{
3✔
1272
                ChainHash: query.ChainHash,
3✔
1273
                Complete:  1,
3✔
1274
        })
3✔
1275
}
1276

1277
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
1278
// state machine. Once applied, we'll ensure that we don't forward any messages
1279
// to the peer that aren't within the time range of the filter.
1280
func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error {
3✔
1281
        g.Lock()
3✔
1282

3✔
1283
        g.remoteUpdateHorizon = filter
3✔
1284

3✔
1285
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
3✔
1286
        endTime := startTime.Add(
3✔
1287
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
3✔
1288
        )
3✔
1289

3✔
1290
        g.Unlock()
3✔
1291

3✔
1292
        // If requested, don't reply with historical gossip data when the remote
3✔
1293
        // peer sets their gossip timestamp range.
3✔
1294
        if g.cfg.ignoreHistoricalFilters {
3✔
1295
                return nil
×
1296
        }
×
1297

1298
        // Now that the remote peer has applied their filter, we'll query the
1299
        // database for all the messages that are beyond this filter.
1300
        newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
3✔
1301
                g.cfg.chainHash, startTime, endTime,
3✔
1302
        )
3✔
1303
        if err != nil {
3✔
1304
                return err
×
1305
        }
×
1306

1307
        log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+
3✔
1308
                "end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime,
3✔
1309
                len(newUpdatestoSend))
3✔
1310

3✔
1311
        // If we don't have any to send, then we can return early.
3✔
1312
        if len(newUpdatestoSend) == 0 {
6✔
1313
                return nil
3✔
1314
        }
3✔
1315

1316
        select {
3✔
1317
        case <-g.gossipFilterSema:
3✔
1318
        case <-g.quit:
×
1319
                return ErrGossipSyncerExiting
×
1320
        }
1321

1322
        // We'll conclude by launching a goroutine to send out any updates.
1323
        g.wg.Add(1)
3✔
1324
        go func() {
6✔
1325
                defer g.wg.Done()
3✔
1326
                defer func() {
6✔
1327
                        g.gossipFilterSema <- struct{}{}
3✔
1328
                }()
3✔
1329

1330
                for _, msg := range newUpdatestoSend {
6✔
1331
                        err := g.cfg.sendToPeerSync(msg)
3✔
1332
                        switch {
3✔
1333
                        case err == ErrGossipSyncerExiting:
×
1334
                                return
×
1335

1336
                        case err == lnpeer.ErrPeerExiting:
×
1337
                                return
×
1338

1339
                        case err != nil:
×
1340
                                log.Errorf("Unable to send message for "+
×
1341
                                        "peer catch up: %v", err)
×
1342
                        }
1343
                }
1344
        }()
1345

1346
        return nil
3✔
1347
}
1348

1349
// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
1350
// iff the message is within the bounds of their set gossip filter. If the peer
1351
// doesn't have a gossip filter set, then no messages will be forwarded.
1352
func (g *GossipSyncer) FilterGossipMsgs(msgs ...msgWithSenders) {
3✔
1353
        // If the peer doesn't have an update horizon set, then we won't send
3✔
1354
        // it any new update messages.
3✔
1355
        if g.remoteUpdateHorizon == nil {
6✔
1356
                log.Tracef("GossipSyncer(%x): skipped due to nil "+
3✔
1357
                        "remoteUpdateHorizon", g.cfg.peerPub[:])
3✔
1358
                return
3✔
1359
        }
3✔
1360

1361
        // If we've been signaled to exit, or are exiting, then we'll stop
1362
        // short.
1363
        select {
3✔
1364
        case <-g.quit:
×
1365
                return
×
1366
        default:
3✔
1367
        }
1368

1369
        // TODO(roasbeef): need to ensure that peer still online...send msg to
1370
        // gossiper on peer termination to signal peer disconnect?
1371

1372
        var err error
3✔
1373

3✔
1374
        // Before we filter out the messages, we'll construct an index over the
3✔
1375
        // set of channel announcements and channel updates. This will allow us
3✔
1376
        // to quickly check if we should forward a chan ann, based on the known
3✔
1377
        // channel updates for a channel.
3✔
1378
        chanUpdateIndex := make(map[lnwire.ShortChannelID][]*lnwire.ChannelUpdate)
3✔
1379
        for _, msg := range msgs {
6✔
1380
                chanUpdate, ok := msg.msg.(*lnwire.ChannelUpdate)
3✔
1381
                if !ok {
6✔
1382
                        continue
3✔
1383
                }
1384

1385
                chanUpdateIndex[chanUpdate.ShortChannelID] = append(
3✔
1386
                        chanUpdateIndex[chanUpdate.ShortChannelID], chanUpdate,
3✔
1387
                )
3✔
1388
        }
1389

1390
        // We'll construct a helper function that we'll us below to determine
1391
        // if a given messages passes the gossip msg filter.
1392
        g.Lock()
3✔
1393
        startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
3✔
1394
        endTime := startTime.Add(
3✔
1395
                time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
3✔
1396
        )
3✔
1397
        g.Unlock()
3✔
1398

3✔
1399
        passesFilter := func(timeStamp uint32) bool {
6✔
1400
                t := time.Unix(int64(timeStamp), 0)
3✔
1401
                return t.Equal(startTime) ||
3✔
1402
                        (t.After(startTime) && t.Before(endTime))
3✔
1403
        }
3✔
1404

1405
        msgsToSend := make([]lnwire.Message, 0, len(msgs))
3✔
1406
        for _, msg := range msgs {
6✔
1407
                // If the target peer is the peer that sent us this message,
3✔
1408
                // then we'll exit early as we don't need to filter this
3✔
1409
                // message.
3✔
1410
                if _, ok := msg.senders[g.cfg.peerPub]; ok {
6✔
1411
                        continue
3✔
1412
                }
1413

1414
                switch msg := msg.msg.(type) {
3✔
1415

1416
                // For each channel announcement message, we'll only send this
1417
                // message if the channel updates for the channel are between
1418
                // our time range.
1419
                case *lnwire.ChannelAnnouncement:
3✔
1420
                        // First, we'll check if the channel updates are in
3✔
1421
                        // this message batch.
3✔
1422
                        chanUpdates, ok := chanUpdateIndex[msg.ShortChannelID]
3✔
1423
                        if !ok {
6✔
1424
                                // If not, we'll attempt to query the database
3✔
1425
                                // to see if we know of the updates.
3✔
1426
                                chanUpdates, err = g.cfg.channelSeries.FetchChanUpdates(
3✔
1427
                                        g.cfg.chainHash, msg.ShortChannelID,
3✔
1428
                                )
3✔
1429
                                if err != nil {
3✔
1430
                                        log.Warnf("no channel updates found for "+
×
1431
                                                "short_chan_id=%v",
×
1432
                                                msg.ShortChannelID)
×
1433
                                        continue
×
1434
                                }
1435
                        }
1436

1437
                        for _, chanUpdate := range chanUpdates {
6✔
1438
                                if passesFilter(chanUpdate.Timestamp) {
6✔
1439
                                        msgsToSend = append(msgsToSend, msg)
3✔
1440
                                        break
3✔
1441
                                }
1442
                        }
1443

1444
                        if len(chanUpdates) == 0 {
6✔
1445
                                msgsToSend = append(msgsToSend, msg)
3✔
1446
                        }
3✔
1447

1448
                // For each channel update, we'll only send if it the timestamp
1449
                // is between our time range.
1450
                case *lnwire.ChannelUpdate:
3✔
1451
                        if passesFilter(msg.Timestamp) {
6✔
1452
                                msgsToSend = append(msgsToSend, msg)
3✔
1453
                        }
3✔
1454

1455
                // Similarly, we only send node announcements if the update
1456
                // timestamp ifs between our set gossip filter time range.
1457
                case *lnwire.NodeAnnouncement:
3✔
1458
                        if passesFilter(msg.Timestamp) {
6✔
1459
                                msgsToSend = append(msgsToSend, msg)
3✔
1460
                        }
3✔
1461
                }
1462
        }
1463

1464
        log.Tracef("GossipSyncer(%x): filtered gossip msgs: set=%v, sent=%v",
3✔
1465
                g.cfg.peerPub[:], len(msgs), len(msgsToSend))
3✔
1466

3✔
1467
        if len(msgsToSend) == 0 {
6✔
1468
                return
3✔
1469
        }
3✔
1470

1471
        g.cfg.sendToPeer(msgsToSend...)
3✔
1472
}
1473

1474
// ProcessQueryMsg is used by outside callers to pass new channel time series
1475
// queries to the internal processing goroutine.
1476
func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struct{}) error {
3✔
1477
        var msgChan chan lnwire.Message
3✔
1478
        switch msg.(type) {
3✔
1479
        case *lnwire.QueryChannelRange, *lnwire.QueryShortChanIDs:
3✔
1480
                msgChan = g.queryMsgs
3✔
1481

1482
        // Reply messages should only be expected in states where we're waiting
1483
        // for a reply.
1484
        case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
3✔
1485
                syncState := g.syncState()
3✔
1486
                if syncState != waitingQueryRangeReply &&
3✔
1487
                        syncState != waitingQueryChanReply {
3✔
1488

×
1489
                        return fmt.Errorf("received unexpected query reply "+
×
1490
                                "message %T", msg)
×
1491
                }
×
1492
                msgChan = g.gossipMsgs
3✔
1493

1494
        default:
×
1495
                msgChan = g.gossipMsgs
×
1496
        }
1497

1498
        select {
3✔
1499
        case msgChan <- msg:
3✔
1500
        case <-peerQuit:
×
1501
        case <-g.quit:
×
1502
        }
1503

1504
        return nil
3✔
1505
}
1506

1507
// setSyncState sets the gossip syncer's state to the given state.
1508
func (g *GossipSyncer) setSyncState(state syncerState) {
3✔
1509
        atomic.StoreUint32(&g.state, uint32(state))
3✔
1510
}
3✔
1511

1512
// syncState returns the current syncerState of the target GossipSyncer.
1513
func (g *GossipSyncer) syncState() syncerState {
3✔
1514
        return syncerState(atomic.LoadUint32(&g.state))
3✔
1515
}
3✔
1516

1517
// ResetSyncedSignal returns a channel that will be closed in order to serve as
1518
// a signal for when the GossipSyncer has reached its chansSynced state.
1519
func (g *GossipSyncer) ResetSyncedSignal() chan struct{} {
3✔
1520
        g.Lock()
3✔
1521
        defer g.Unlock()
3✔
1522

3✔
1523
        syncedSignal := make(chan struct{})
3✔
1524

3✔
1525
        syncState := syncerState(atomic.LoadUint32(&g.state))
3✔
1526
        if syncState == chansSynced {
3✔
1527
                close(syncedSignal)
×
1528
                return syncedSignal
×
1529
        }
×
1530

1531
        g.syncedSignal = syncedSignal
3✔
1532
        return g.syncedSignal
3✔
1533
}
1534

1535
// ProcessSyncTransition sends a request to the gossip syncer to transition its
1536
// sync type to a new one.
1537
//
1538
// NOTE: This can only be done once the gossip syncer has reached its final
1539
// chansSynced state.
1540
func (g *GossipSyncer) ProcessSyncTransition(newSyncType SyncerType) error {
3✔
1541
        errChan := make(chan error, 1)
3✔
1542
        select {
3✔
1543
        case g.syncTransitionReqs <- &syncTransitionReq{
1544
                newSyncType: newSyncType,
1545
                errChan:     errChan,
1546
        }:
3✔
1547
        case <-time.After(syncTransitionTimeout):
×
1548
                return ErrSyncTransitionTimeout
×
1549
        case <-g.quit:
×
1550
                return ErrGossipSyncerExiting
×
1551
        }
1552

1553
        select {
3✔
1554
        case err := <-errChan:
3✔
1555
                return err
3✔
1556
        case <-g.quit:
×
1557
                return ErrGossipSyncerExiting
×
1558
        }
1559
}
1560

1561
// handleSyncTransition handles a new sync type transition request.
1562
//
1563
// NOTE: The gossip syncer might have another sync state as a result of this
1564
// transition.
1565
func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error {
3✔
1566
        // Return early from any NOP sync transitions.
3✔
1567
        syncType := g.SyncType()
3✔
1568
        if syncType == req.newSyncType {
3✔
1569
                return nil
×
1570
        }
×
1571

1572
        log.Debugf("GossipSyncer(%x): transitioning from %v to %v",
3✔
1573
                g.cfg.peerPub, syncType, req.newSyncType)
3✔
1574

3✔
1575
        var (
3✔
1576
                firstTimestamp time.Time
3✔
1577
                timestampRange uint32
3✔
1578
        )
3✔
1579

3✔
1580
        switch req.newSyncType {
3✔
1581
        // If an active sync has been requested, then we should resume receiving
1582
        // new graph updates from the remote peer.
1583
        case ActiveSync, PinnedSync:
3✔
1584
                firstTimestamp = time.Now()
3✔
1585
                timestampRange = math.MaxUint32
3✔
1586

1587
        // If a PassiveSync transition has been requested, then we should no
1588
        // longer receive any new updates from the remote peer. We can do this
1589
        // by setting our update horizon to a range in the past ensuring no
1590
        // graph updates match the timestamp range.
1591
        case PassiveSync:
×
1592
                firstTimestamp = zeroTimestamp
×
1593
                timestampRange = 0
×
1594

1595
        default:
×
1596
                return fmt.Errorf("unhandled sync transition %v",
×
1597
                        req.newSyncType)
×
1598
        }
1599

1600
        err := g.sendGossipTimestampRange(firstTimestamp, timestampRange)
3✔
1601
        if err != nil {
3✔
1602
                return fmt.Errorf("unable to send local update horizon: %w",
×
1603
                        err)
×
1604
        }
×
1605

1606
        g.setSyncType(req.newSyncType)
3✔
1607

3✔
1608
        return nil
3✔
1609
}
1610

1611
// setSyncType sets the gossip syncer's sync type to the given type.
1612
func (g *GossipSyncer) setSyncType(syncType SyncerType) {
3✔
1613
        atomic.StoreUint32(&g.syncType, uint32(syncType))
3✔
1614
}
3✔
1615

1616
// SyncType returns the current SyncerType of the target GossipSyncer.
1617
func (g *GossipSyncer) SyncType() SyncerType {
3✔
1618
        return SyncerType(atomic.LoadUint32(&g.syncType))
3✔
1619
}
3✔
1620

1621
// historicalSync sends a request to the gossip syncer to perofmr a historical
1622
// sync.
1623
//
1624
// NOTE: This can only be done once the gossip syncer has reached its final
1625
// chansSynced state.
1626
func (g *GossipSyncer) historicalSync() error {
3✔
1627
        done := make(chan struct{})
3✔
1628

3✔
1629
        select {
3✔
1630
        case g.historicalSyncReqs <- &historicalSyncReq{
1631
                doneChan: done,
1632
        }:
3✔
1633
        case <-time.After(syncTransitionTimeout):
×
1634
                return ErrSyncTransitionTimeout
×
1635
        case <-g.quit:
×
1636
                return ErrGossiperShuttingDown
×
1637
        }
1638

1639
        select {
3✔
1640
        case <-done:
3✔
1641
                return nil
3✔
1642
        case <-g.quit:
×
1643
                return ErrGossiperShuttingDown
×
1644
        }
1645
}
1646

1647
// handleHistoricalSync handles a request to the gossip syncer to perform a
1648
// historical sync.
1649
func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) {
3✔
1650
        // We'll go back to our initial syncingChans state in order to request
3✔
1651
        // the remote peer to give us all of the channel IDs they know of
3✔
1652
        // starting from the genesis block.
3✔
1653
        g.genHistoricalChanRangeQuery = true
3✔
1654
        g.setSyncState(syncingChans)
3✔
1655
        close(req.doneChan)
3✔
1656
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc