• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17830307614

18 Sep 2025 01:29PM UTC coverage: 54.617% (-12.0%) from 66.637%
17830307614

Pull #10200

github

web-flow
Merge 181a0a7bc into b34fc964b
Pull Request #10200: github: change to form-based issue template

109249 of 200028 relevant lines covered (54.62%)

21896.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.03
/discovery/sync_manager.go
1
package discovery
2

3
import (
4
        "context"
5
        "errors"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/lightningnetwork/lnd/lnpeer"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
        "github.com/lightningnetwork/lnd/routing/route"
14
        "github.com/lightningnetwork/lnd/ticker"
15
        "golang.org/x/time/rate"
16
)
17

18
const (
19
        // DefaultSyncerRotationInterval is the default interval in which we'll
20
        // rotate a single active syncer.
21
        DefaultSyncerRotationInterval = 20 * time.Minute
22

23
        // DefaultHistoricalSyncInterval is the default interval in which we'll
24
        // force a historical sync to ensure we have as much of the public
25
        // network as possible.
26
        DefaultHistoricalSyncInterval = time.Hour
27

28
        // DefaultFilterConcurrency is the default maximum number of concurrent
29
        // gossip filter applications that can be processed.
30
        DefaultFilterConcurrency = 5
31

32
        // DefaultMsgBytesBurst is the allotted burst in bytes we'll permit.
33
        // This is the most that can be sent in a given go. Requests beyond
34
        // this, will block indefinitely. Once tokens (bytes are depleted),
35
        // they'll be refilled at the DefaultMsgBytesPerSecond rate.
36
        DefaultMsgBytesBurst = 2 * 1000 * 1_024
37

38
        // DefaultMsgBytesPerSecond is the max bytes/s we'll permit for outgoing
39
        // messages. Once tokens (bytes) have been taken from the bucket,
40
        // they'll be refilled at this rate.
41
        DefaultMsgBytesPerSecond = 1000 * 1_024
42

43
        // DefaultPeerMsgBytesPerSecond is the max bytes/s we'll permit for
44
        // outgoing messages for a single peer. Once tokens (bytes) have been
45
        // taken from the bucket, they'll be refilled at this rate.
46
        DefaultPeerMsgBytesPerSecond = 50 * 1_024
47

48
        // assumedMsgSize is the assumed size of a message if we can't compute
49
        // its serialized size. This comes out to 1 KB.
50
        assumedMsgSize = 1_024
51
)
52

53
var (
54
        // ErrSyncManagerExiting is an error returned when we attempt to
55
        // start/stop a gossip syncer for a connected/disconnected peer, but the
56
        // SyncManager has already been stopped.
57
        ErrSyncManagerExiting = errors.New("sync manager exiting")
58
)
59

60
// newSyncer in an internal message we'll use within the SyncManager to signal
61
// that we should create a GossipSyncer for a newly connected peer.
62
type newSyncer struct {
63
        // peer is the newly connected peer.
64
        peer lnpeer.Peer
65

66
        // doneChan serves as a signal to the caller that the SyncManager's
67
        // internal state correctly reflects the stale active syncer.
68
        doneChan chan struct{}
69
}
70

71
// staleSyncer is an internal message we'll use within the SyncManager to signal
72
// that a peer has disconnected and its GossipSyncer should be removed.
73
type staleSyncer struct {
74
        // peer is the peer that has disconnected.
75
        peer route.Vertex
76

77
        // doneChan serves as a signal to the caller that the SyncManager's
78
        // internal state correctly reflects the stale active syncer. This is
79
        // needed to ensure we always create a new syncer for a flappy peer
80
        // after they disconnect if they happened to be an active syncer.
81
        doneChan chan struct{}
82
}
83

84
// SyncManagerCfg contains all of the dependencies required for the SyncManager
85
// to carry out its duties.
86
type SyncManagerCfg struct {
87
        // ChainHash is a hash that indicates the specific network of the active
88
        // chain.
89
        ChainHash chainhash.Hash
90

91
        // ChanSeries is an interface that provides access to a time series view
92
        // of the current known channel graph. Each GossipSyncer enabled peer
93
        // will utilize this in order to create and respond to channel graph
94
        // time series queries.
95
        ChanSeries ChannelGraphTimeSeries
96

97
        // NumActiveSyncers is the number of peers for which we should have
98
        // active syncers with. After reaching NumActiveSyncers, any future
99
        // gossip syncers will be passive.
100
        NumActiveSyncers int
101

102
        // NoTimestampQueries will prevent the GossipSyncer from querying
103
        // timestamps of announcement messages from the peer and from responding
104
        // to timestamp queries
105
        NoTimestampQueries bool
106

107
        // RotateTicker is a ticker responsible for notifying the SyncManager
108
        // when it should rotate its active syncers. A single active syncer with
109
        // a chansSynced state will be exchanged for a passive syncer in order
110
        // to ensure we don't keep syncing with the same peers.
111
        RotateTicker ticker.Ticker
112

113
        // HistoricalSyncTicker is a ticker responsible for notifying the
114
        // SyncManager when it should attempt a historical sync with a gossip
115
        // sync peer.
116
        HistoricalSyncTicker ticker.Ticker
117

118
        // IgnoreHistoricalFilters will prevent syncers from replying with
119
        // historical data when the remote peer sets a gossip_timestamp_range.
120
        // This prevents ranges with old start times from causing us to dump the
121
        // graph on connect.
122
        IgnoreHistoricalFilters bool
123

124
        // BestHeight returns the latest height known of the chain.
125
        BestHeight func() uint32
126

127
        // PinnedSyncers is a set of peers that will always transition to
128
        // ActiveSync upon connection. These peers will never transition to
129
        // PassiveSync.
130
        PinnedSyncers PinnedSyncers
131

132
        // IsStillZombieChannel takes the timestamps of the latest channel
133
        // updates for a channel and returns true if the channel should be
134
        // considered a zombie based on these timestamps.
135
        IsStillZombieChannel func(time.Time, time.Time) bool
136

137
        // AllotedMsgBytesPerSecond is the allotted bandwidth rate, expressed in
138
        // bytes/second that the gossip manager can consume. Once we exceed this
139
        // rate, message sending will block until we're below the rate.
140
        AllotedMsgBytesPerSecond uint64
141

142
        // AllotedMsgBytesBurst is the amount of burst bytes we'll permit, if
143
        // we've exceeded the hard upper limit.
144
        AllotedMsgBytesBurst uint64
145

146
        // FilterConcurrency is the maximum number of concurrent gossip filter
147
        // applications that can be processed. If not set, defaults to 5.
148
        FilterConcurrency int
149

150
        // PeerMsgBytesPerSecond is the allotted bandwidth rate, expressed in
151
        // bytes/second that a single gossip syncer can consume. Once we exceed
152
        // this rate, message sending will block until we're below the rate.
153
        PeerMsgBytesPerSecond uint64
154
}
155

156
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
157
// for peers currently connected. When a new peer is connected, the manager will
158
// create its accompanying gossip syncer and determine whether it should have an
159
// ActiveSync or PassiveSync sync type based on how many other gossip syncers
160
// are currently active. Any ActiveSync gossip syncers are started in a
161
// round-robin manner to ensure we're not syncing with multiple peers at the
162
// same time. The first GossipSyncer registered with the SyncManager will
163
// attempt a historical sync to ensure we have as much of the public channel
164
// graph as possible.
165
type SyncManager struct {
166
        // initialHistoricalSyncCompleted serves as a barrier when initializing
167
        // new active GossipSyncers. If 0, the initial historical sync has not
168
        // completed, so we'll defer initializing any active GossipSyncers. If
169
        // 1, then we can transition the GossipSyncer immediately. We set up
170
        // this barrier to ensure we have most of the graph before attempting to
171
        // accept new updates at tip.
172
        //
173
        // NOTE: This must be used atomically.
174
        initialHistoricalSyncCompleted int32
175

176
        start sync.Once
177
        stop  sync.Once
178

179
        cfg SyncManagerCfg
180

181
        // newSyncers is a channel we'll use to process requests to create
182
        // GossipSyncers for newly connected peers.
183
        newSyncers chan *newSyncer
184

185
        // staleSyncers is a channel we'll use to process requests to tear down
186
        // GossipSyncers for disconnected peers.
187
        staleSyncers chan *staleSyncer
188

189
        // syncersMu guards the read and write access to the activeSyncers and
190
        // inactiveSyncers maps below.
191
        syncersMu sync.Mutex
192

193
        // activeSyncers is the set of all syncers for which we are currently
194
        // receiving graph updates from. The number of possible active syncers
195
        // is bounded by NumActiveSyncers.
196
        activeSyncers map[route.Vertex]*GossipSyncer
197

198
        // inactiveSyncers is the set of all syncers for which we are not
199
        // currently receiving new graph updates from.
200
        inactiveSyncers map[route.Vertex]*GossipSyncer
201

202
        // pinnedActiveSyncers is the set of all syncers which are pinned into
203
        // an active sync. Pinned peers performan an initial historical sync on
204
        // each connection and will continue to receive graph updates for the
205
        // duration of the connection.
206
        pinnedActiveSyncers map[route.Vertex]*GossipSyncer
207

208
        // gossipFilterSema contains semaphores for the gossip timestamp
209
        // queries.
210
        gossipFilterSema chan struct{}
211

212
        // rateLimiter dictates the frequency with which we will reply to gossip
213
        // queries from all peers. This is used to delay responses to peers to
214
        // prevent DOS vulnerabilities if they are spamming with an unreasonable
215
        // number of queries.
216
        rateLimiter *rate.Limiter
217

218
        wg   sync.WaitGroup
219
        quit chan struct{}
220
}
221

222
// newSyncManager constructs a new SyncManager backed by the given config.
223
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
39✔
224
        filterConcurrency := cfg.FilterConcurrency
39✔
225
        if filterConcurrency == 0 {
78✔
226
                filterConcurrency = DefaultFilterConcurrency
39✔
227
        }
39✔
228

229
        filterSema := make(chan struct{}, filterConcurrency)
39✔
230
        for i := 0; i < filterConcurrency; i++ {
234✔
231
                filterSema <- struct{}{}
195✔
232
        }
195✔
233

234
        bytesPerSecond := cfg.AllotedMsgBytesPerSecond
39✔
235
        if bytesPerSecond == 0 {
78✔
236
                bytesPerSecond = DefaultMsgBytesPerSecond
39✔
237
        }
39✔
238

239
        bytesBurst := cfg.AllotedMsgBytesBurst
39✔
240
        if bytesBurst == 0 {
78✔
241
                bytesBurst = DefaultMsgBytesBurst
39✔
242
        }
39✔
243

244
        // We'll use this rate limiter to limit our total outbound bandwidth for
245
        // gossip queries peers.
246
        rateLimiter := rate.NewLimiter(
39✔
247
                rate.Limit(bytesPerSecond), int(bytesBurst),
39✔
248
        )
39✔
249

39✔
250
        return &SyncManager{
39✔
251
                cfg:          *cfg,
39✔
252
                rateLimiter:  rateLimiter,
39✔
253
                newSyncers:   make(chan *newSyncer),
39✔
254
                staleSyncers: make(chan *staleSyncer),
39✔
255
                activeSyncers: make(
39✔
256
                        map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers,
39✔
257
                ),
39✔
258
                inactiveSyncers: make(map[route.Vertex]*GossipSyncer),
39✔
259
                pinnedActiveSyncers: make(
39✔
260
                        map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers),
39✔
261
                ),
39✔
262
                gossipFilterSema: filterSema,
39✔
263
                quit:             make(chan struct{}),
39✔
264
        }
39✔
265
}
266

267
// Start starts the SyncManager in order to properly carry out its duties.
268
func (m *SyncManager) Start() {
39✔
269
        m.start.Do(func() {
78✔
270
                m.wg.Add(1)
39✔
271
                go m.syncerHandler()
39✔
272
        })
39✔
273
}
274

275
// Stop stops the SyncManager from performing its duties.
276
func (m *SyncManager) Stop() {
39✔
277
        m.stop.Do(func() {
78✔
278
                log.Debugf("SyncManager is stopping")
39✔
279
                defer log.Debugf("SyncManager stopped")
39✔
280

39✔
281
                close(m.quit)
39✔
282
                m.wg.Wait()
39✔
283

39✔
284
                for _, syncer := range m.inactiveSyncers {
45✔
285
                        syncer.Stop()
6✔
286
                }
6✔
287
                for _, syncer := range m.activeSyncers {
49✔
288
                        syncer.Stop()
10✔
289
                }
10✔
290
        })
291
}
292

293
// syncerHandler is the SyncManager's main event loop responsible for:
294
//
295
// 1. Creating and tearing down GossipSyncers for connected/disconnected peers.
296

297
// 2. Finding new peers to receive graph updates from to ensure we don't only
298
//    receive them from the same set of peers.
299

300
//  3. Finding new peers to force a historical sync with to ensure we have as
301
//     much of the public network as possible.
302
//
303
// NOTE: This must be run as a goroutine.
304
func (m *SyncManager) syncerHandler() {
39✔
305
        defer m.wg.Done()
39✔
306

39✔
307
        m.cfg.RotateTicker.Resume()
39✔
308
        defer m.cfg.RotateTicker.Stop()
39✔
309

39✔
310
        defer m.cfg.HistoricalSyncTicker.Stop()
39✔
311

39✔
312
        var (
39✔
313
                // initialHistoricalSyncer is the syncer we are currently
39✔
314
                // performing an initial historical sync with.
39✔
315
                initialHistoricalSyncer *GossipSyncer
39✔
316

39✔
317
                // initialHistoricalSyncSignal is a signal that will fire once
39✔
318
                // the initial historical sync has been completed. This is
39✔
319
                // crucial to ensure that another historical sync isn't
39✔
320
                // attempted just because the initialHistoricalSyncer was
39✔
321
                // disconnected.
39✔
322
                initialHistoricalSyncSignal chan struct{}
39✔
323
        )
39✔
324

39✔
325
        setInitialHistoricalSyncer := func(s *GossipSyncer) {
51✔
326
                initialHistoricalSyncer = s
12✔
327
                initialHistoricalSyncSignal = s.ResetSyncedSignal()
12✔
328

12✔
329
                // Restart the timer for our new historical sync peer. This will
12✔
330
                // ensure that all initial syncers receive an equivalent
12✔
331
                // duration before attempting the next sync. Without doing so we
12✔
332
                // might attempt two historical sync back to back if a peer
12✔
333
                // disconnects just before the ticker fires.
12✔
334
                m.cfg.HistoricalSyncTicker.Pause()
12✔
335
                m.cfg.HistoricalSyncTicker.Resume()
12✔
336
        }
12✔
337

338
        for {
122✔
339
                select {
83✔
340
                // A new peer has been connected, so we'll create its
341
                // accompanying GossipSyncer.
342
                case newSyncer := <-m.newSyncers:
25✔
343
                        // If we already have a syncer, then we'll exit early as
25✔
344
                        // we don't want to override it.
25✔
345
                        if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok {
25✔
346
                                close(newSyncer.doneChan)
×
347
                                continue
×
348
                        }
349

350
                        s := m.createGossipSyncer(newSyncer.peer)
25✔
351

25✔
352
                        isPinnedSyncer := m.isPinnedSyncer(s)
25✔
353

25✔
354
                        // attemptHistoricalSync determines whether we should
25✔
355
                        // attempt an initial historical sync when a new peer
25✔
356
                        // connects.
25✔
357
                        attemptHistoricalSync := false
25✔
358

25✔
359
                        m.syncersMu.Lock()
25✔
360
                        switch {
25✔
361
                        // For pinned syncers, we will immediately transition
362
                        // the peer into an active (pinned) sync state.
363
                        case isPinnedSyncer:
3✔
364
                                attemptHistoricalSync = true
3✔
365
                                s.setSyncType(PinnedSync)
3✔
366
                                s.setSyncState(syncerIdle)
3✔
367
                                m.pinnedActiveSyncers[s.cfg.peerPub] = s
3✔
368

369
                        // Regardless of whether the initial historical sync
370
                        // has completed, we'll re-trigger a historical sync if
371
                        // we no longer have any syncers. This might be
372
                        // necessary if we lost all our peers at one point, and
373
                        // now we finally have one again.
374
                        case len(m.activeSyncers) == 0 &&
375
                                len(m.inactiveSyncers) == 0:
10✔
376

10✔
377
                                attemptHistoricalSync =
10✔
378
                                        m.cfg.NumActiveSyncers > 0
10✔
379
                                fallthrough
10✔
380

381
                        // If we've exceeded our total number of active syncers,
382
                        // we'll initialize this GossipSyncer as passive.
383
                        case len(m.activeSyncers) >= m.cfg.NumActiveSyncers:
14✔
384
                                fallthrough
14✔
385

386
                        // If the initial historical sync has yet to complete,
387
                        // then we'll declare it as passive and attempt to
388
                        // transition it when the initial historical sync
389
                        // completes.
390
                        case !m.IsGraphSynced():
18✔
391
                                s.setSyncType(PassiveSync)
18✔
392
                                m.inactiveSyncers[s.cfg.peerPub] = s
18✔
393

394
                        // The initial historical sync has completed, so we can
395
                        // immediately start the GossipSyncer as active.
396
                        default:
4✔
397
                                s.setSyncType(ActiveSync)
4✔
398
                                m.activeSyncers[s.cfg.peerPub] = s
4✔
399
                        }
400
                        m.syncersMu.Unlock()
25✔
401

25✔
402
                        s.Start()
25✔
403

25✔
404
                        // Once we create the GossipSyncer, we'll signal to the
25✔
405
                        // caller that they can proceed since the SyncManager's
25✔
406
                        // internal state has been updated.
25✔
407
                        close(newSyncer.doneChan)
25✔
408

25✔
409
                        // We'll force a historical sync with the first peer we
25✔
410
                        // connect to, to ensure we get as much of the graph as
25✔
411
                        // possible.
25✔
412
                        if !attemptHistoricalSync {
38✔
413
                                continue
13✔
414
                        }
415

416
                        log.Debugf("Attempting initial historical sync with "+
12✔
417
                                "GossipSyncer(%x)", s.cfg.peerPub)
12✔
418

12✔
419
                        if err := s.historicalSync(); err != nil {
12✔
420
                                log.Errorf("Unable to attempt initial "+
×
421
                                        "historical sync with "+
×
422
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
423
                                        err)
×
424
                                continue
×
425
                        }
426

427
                        // Once the historical sync has started, we'll get a
428
                        // keep track of the corresponding syncer to properly
429
                        // handle disconnects. We'll also use a signal to know
430
                        // when the historical sync completed.
431
                        if !isPinnedSyncer {
21✔
432
                                setInitialHistoricalSyncer(s)
9✔
433
                        }
9✔
434

435
                // An existing peer has disconnected, so we'll tear down its
436
                // corresponding GossipSyncer.
437
                case staleSyncer := <-m.staleSyncers:
6✔
438
                        // Once the corresponding GossipSyncer has been stopped
6✔
439
                        // and removed, we'll signal to the caller that they can
6✔
440
                        // proceed since the SyncManager's internal state has
6✔
441
                        // been updated.
6✔
442
                        m.removeGossipSyncer(staleSyncer.peer)
6✔
443
                        close(staleSyncer.doneChan)
6✔
444

6✔
445
                        // If we don't have an initialHistoricalSyncer, or we do
6✔
446
                        // but it is not the peer being disconnected, then we
6✔
447
                        // have nothing left to do and can proceed.
6✔
448
                        switch {
6✔
449
                        case initialHistoricalSyncer == nil:
4✔
450
                                fallthrough
4✔
451
                        case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub:
5✔
452
                                fallthrough
5✔
453
                        case m.cfg.NumActiveSyncers == 0:
5✔
454
                                continue
5✔
455
                        }
456

457
                        // Otherwise, our initialHistoricalSyncer corresponds to
458
                        // the peer being disconnected, so we'll have to find a
459
                        // replacement.
460
                        log.Debug("Finding replacement for initial " +
1✔
461
                                "historical sync")
1✔
462

1✔
463
                        s := m.forceHistoricalSync()
1✔
464
                        if s == nil {
1✔
465
                                log.Debug("No eligible replacement found " +
×
466
                                        "for initial historical sync")
×
467
                                continue
×
468
                        }
469

470
                        log.Debugf("Replaced initial historical "+
1✔
471
                                "GossipSyncer(%v) with GossipSyncer(%x)",
1✔
472
                                staleSyncer.peer, s.cfg.peerPub)
1✔
473

1✔
474
                        setInitialHistoricalSyncer(s)
1✔
475

476
                // Our initial historical sync signal has completed, so we'll
477
                // nil all of the relevant fields as they're no longer needed.
478
                case <-initialHistoricalSyncSignal:
8✔
479
                        initialHistoricalSyncer = nil
8✔
480
                        initialHistoricalSyncSignal = nil
8✔
481

8✔
482
                        log.Debug("Initial historical sync completed")
8✔
483

8✔
484
                        // With the initial historical sync complete, we can
8✔
485
                        // begin receiving new graph updates at tip. We'll
8✔
486
                        // determine whether we can have any more active
8✔
487
                        // GossipSyncers. If we do, we'll randomly select some
8✔
488
                        // that are currently passive to transition.
8✔
489
                        m.syncersMu.Lock()
8✔
490
                        numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers)
8✔
491
                        if numActiveLeft <= 0 {
8✔
492
                                m.syncersMu.Unlock()
×
493
                                continue
×
494
                        }
495

496
                        // We may not even have enough inactive syncers to be
497
                        // transitted. In that case, we will transit all the
498
                        // inactive syncers.
499
                        if len(m.inactiveSyncers) < numActiveLeft {
13✔
500
                                numActiveLeft = len(m.inactiveSyncers)
5✔
501
                        }
5✔
502

503
                        log.Debugf("Attempting to transition %v passive "+
8✔
504
                                "GossipSyncers to active", numActiveLeft)
8✔
505

8✔
506
                        for i := 0; i < numActiveLeft; i++ {
16✔
507
                                chooseRandomSyncer(
8✔
508
                                        m.inactiveSyncers, m.transitionPassiveSyncer,
8✔
509
                                )
8✔
510
                        }
8✔
511

512
                        m.syncersMu.Unlock()
8✔
513

514
                // Our RotateTicker has ticked, so we'll attempt to rotate a
515
                // single active syncer with a passive one.
516
                case <-m.cfg.RotateTicker.Ticks():
2✔
517
                        m.rotateActiveSyncerCandidate()
2✔
518

519
                // Our HistoricalSyncTicker has ticked, so we'll randomly select
520
                // a peer and force a historical sync with them.
521
                case <-m.cfg.HistoricalSyncTicker.Ticks():
3✔
522
                        // To be extra cautious, gate the forceHistoricalSync
3✔
523
                        // call such that it can only execute if we are
3✔
524
                        // configured to have a non-zero number of sync peers.
3✔
525
                        // This way even if the historical sync ticker manages
3✔
526
                        // to tick we can be sure that a historical sync won't
3✔
527
                        // accidentally begin.
3✔
528
                        if m.cfg.NumActiveSyncers == 0 {
4✔
529
                                continue
1✔
530
                        }
531

532
                        // If we don't have a syncer available we have nothing
533
                        // to do.
534
                        s := m.forceHistoricalSync()
2✔
535
                        if s == nil {
2✔
536
                                continue
×
537
                        }
538

539
                        // If we've already completed a historical sync, we'll
540
                        // skip setting the initial historical syncer.
541
                        if m.IsGraphSynced() {
2✔
542
                                continue
×
543
                        }
544

545
                        // Otherwise, we'll track the peer we've performed a
546
                        // historical sync with in order to handle the case
547
                        // where our previous historical sync peer did not
548
                        // respond to our queries and we haven't ingested as
549
                        // much of the graph as we should.
550
                        setInitialHistoricalSyncer(s)
2✔
551

552
                case <-m.quit:
39✔
553
                        return
39✔
554
                }
555
        }
556
}
557

558
// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned
559
// sync peers.
560
func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
25✔
561
        _, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub]
25✔
562
        return isPinnedSyncer
25✔
563
}
25✔
564

565
// deriveRateLimitReservation will take the current message and derive a
566
// reservation that can be used to wait on the rate limiter.
567
func deriveRateLimitReservation(rl *rate.Limiter,
568
        msg lnwire.Message) (*rate.Reservation, error) {
177✔
569

177✔
570
        var (
177✔
571
                msgSize uint32
177✔
572
                err     error
177✔
573
        )
177✔
574

177✔
575
        // Figure out the serialized size of the message. If we can't easily
177✔
576
        // compute it, then we'll used the assumed msg size.
177✔
577
        if sMsg, ok := msg.(lnwire.SizeableMessage); ok {
354✔
578
                msgSize, err = sMsg.SerializedSize()
177✔
579
                if err != nil {
182✔
580
                        return nil, err
5✔
581
                }
5✔
582
        } else {
×
583
                log.Warnf("Unable to compute serialized size of %T", msg)
×
584

×
585
                msgSize = assumedMsgSize
×
586
        }
×
587

588
        return rl.ReserveN(time.Now(), int(msgSize)), nil
172✔
589
}
590

591
// waitMsgDelay takes a delay, and waits until it has finished.
592
func waitMsgDelay(ctx context.Context, peerPub [33]byte,
593
        limitReservation *rate.Reservation, quit <-chan struct{}) error {
165✔
594

165✔
595
        // If we've already replied a handful of times, we will start to delay
165✔
596
        // responses back to the remote peer. This can help prevent DOS attacks
165✔
597
        // where the remote peer spams us endlessly.
165✔
598
        //
165✔
599
        // We skip checking for reservation.OK() here, as during config
165✔
600
        // validation, we ensure that the burst is enough for a single message
165✔
601
        // to be sent.
165✔
602
        delay := limitReservation.Delay()
165✔
603
        if delay > 0 {
165✔
604
                log.Debugf("GossipSyncer(%x): rate limiting gossip replies, "+
×
605
                        "responding in %s", peerPub, delay)
×
606

×
607
                select {
×
608
                case <-time.After(delay):
×
609

610
                case <-ctx.Done():
×
611
                        limitReservation.Cancel()
×
612

×
613
                        return ErrGossipSyncerExiting
×
614

615
                case <-quit:
×
616
                        limitReservation.Cancel()
×
617

×
618
                        return ErrGossipSyncerExiting
×
619
                }
620
        }
621

622
        return nil
165✔
623
}
624

625
// maybeRateLimitMsg takes a message, and may wait a period of time to rate
626
// limit the msg.
627
func maybeRateLimitMsg(ctx context.Context, rl *rate.Limiter, peerPub [33]byte,
628
        msg lnwire.Message, quit <-chan struct{}) error {
169✔
629

169✔
630
        delay, err := deriveRateLimitReservation(rl, msg)
169✔
631
        if err != nil {
173✔
632
                return nil
4✔
633
        }
4✔
634

635
        return waitMsgDelay(ctx, peerPub, delay, quit)
165✔
636
}
637

638
// sendMessages sends a set of messages to the remote peer.
639
func (m *SyncManager) sendMessages(ctx context.Context, sync bool,
640
        peer lnpeer.Peer, nodeID route.Vertex, msgs ...lnwire.Message) error {
34✔
641

34✔
642
        for _, msg := range msgs {
68✔
643
                err := maybeRateLimitMsg(
34✔
644
                        ctx, m.rateLimiter, nodeID, msg, m.quit,
34✔
645
                )
34✔
646
                if err != nil {
34✔
647
                        return err
×
648
                }
×
649

650
                if err := peer.SendMessageLazy(sync, msg); err != nil {
34✔
651
                        return err
×
652
                }
×
653
        }
654

655
        return nil
34✔
656
}
657

658
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
659
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
25✔
660
        nodeID := route.Vertex(peer.PubKey())
25✔
661
        log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:])
25✔
662

25✔
663
        encoding := lnwire.EncodingSortedPlain
25✔
664
        s := newGossipSyncer(gossipSyncerCfg{
25✔
665
                chainHash:     m.cfg.ChainHash,
25✔
666
                peerPub:       nodeID,
25✔
667
                channelSeries: m.cfg.ChanSeries,
25✔
668
                encodingType:  encoding,
25✔
669
                chunkSize:     encodingTypeToChunkSize[encoding],
25✔
670
                batchSize:     requestBatchSize,
25✔
671
                sendMsg: func(ctx context.Context, sync bool,
25✔
672
                        msgs ...lnwire.Message) error {
59✔
673

34✔
674
                        return m.sendMessages(ctx, sync, peer, nodeID, msgs...)
34✔
675
                },
34✔
676
                ignoreHistoricalFilters:  m.cfg.IgnoreHistoricalFilters,
677
                bestHeight:               m.cfg.BestHeight,
678
                markGraphSynced:          m.markGraphSynced,
679
                maxQueryChanRangeReplies: maxQueryChanRangeReplies,
680
                noTimestampQueryOption:   m.cfg.NoTimestampQueries,
681
                isStillZombieChannel:     m.cfg.IsStillZombieChannel,
682
                msgBytesPerSecond:        m.cfg.PeerMsgBytesPerSecond,
683
        }, m.gossipFilterSema)
684

685
        // Gossip syncers are initialized by default in a PassiveSync type
686
        // and chansSynced state so that they can reply to any peer queries or
687
        // handle any sync transitions.
688
        s.setSyncState(chansSynced)
25✔
689
        s.setSyncType(PassiveSync)
25✔
690

25✔
691
        log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%x",
25✔
692
                s.syncState(), s.SyncType(), peer.PubKey())
25✔
693

25✔
694
        return s
25✔
695
}
696

697
// removeGossipSyncer removes all internal references to the disconnected peer's
698
// GossipSyncer and stops it. In the event of an active GossipSyncer being
699
// disconnected, a passive GossipSyncer, if any, will take its place.
700
func (m *SyncManager) removeGossipSyncer(peer route.Vertex) {
6✔
701
        m.syncersMu.Lock()
6✔
702
        defer m.syncersMu.Unlock()
6✔
703

6✔
704
        s, ok := m.gossipSyncer(peer)
6✔
705
        if !ok {
6✔
706
                return
×
707
        }
×
708

709
        log.Infof("Removing GossipSyncer for peer=%v", peer)
6✔
710

6✔
711
        // We'll stop the GossipSyncer for the disconnected peer in a goroutine
6✔
712
        // to prevent blocking the SyncManager.
6✔
713
        go s.Stop()
6✔
714

6✔
715
        // If it's a non-active syncer, then we can just exit now.
6✔
716
        if _, ok := m.inactiveSyncers[peer]; ok {
8✔
717
                delete(m.inactiveSyncers, peer)
2✔
718
                return
2✔
719
        }
2✔
720

721
        // If it's a pinned syncer, then we can just exit as this doesn't
722
        // affect our active syncer count.
723
        if _, ok := m.pinnedActiveSyncers[peer]; ok {
4✔
724
                delete(m.pinnedActiveSyncers, peer)
×
725
                return
×
726
        }
×
727

728
        // Otherwise, we'll need find a new one to replace it, if any.
729
        delete(m.activeSyncers, peer)
4✔
730
        newActiveSyncer := chooseRandomSyncer(
4✔
731
                m.inactiveSyncers, m.transitionPassiveSyncer,
4✔
732
        )
4✔
733
        if newActiveSyncer == nil {
6✔
734
                return
2✔
735
        }
2✔
736

737
        log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)",
2✔
738
                peer, newActiveSyncer.cfg.peerPub)
2✔
739
}
740

741
// rotateActiveSyncerCandidate rotates a single active syncer. In order to
742
// achieve this, the active syncer must be in a chansSynced state in order to
743
// process the sync transition.
744
func (m *SyncManager) rotateActiveSyncerCandidate() {
2✔
745
        m.syncersMu.Lock()
2✔
746
        defer m.syncersMu.Unlock()
2✔
747

2✔
748
        // If we couldn't find an eligible active syncer to rotate, we can
2✔
749
        // return early.
2✔
750
        activeSyncer := chooseRandomSyncer(m.activeSyncers, nil)
2✔
751
        if activeSyncer == nil {
2✔
752
                log.Debug("No eligible active syncer to rotate")
×
753
                return
×
754
        }
×
755

756
        // Similarly, if we don't have a candidate to rotate with, we can return
757
        // early as well.
758
        candidate := chooseRandomSyncer(m.inactiveSyncers, nil)
2✔
759
        if candidate == nil {
3✔
760
                log.Debug("No eligible candidate to rotate active syncer")
1✔
761
                return
1✔
762
        }
1✔
763

764
        // Otherwise, we'll attempt to transition each syncer to their
765
        // respective new sync type.
766
        log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)",
1✔
767
                activeSyncer.cfg.peerPub, candidate.cfg.peerPub)
1✔
768

1✔
769
        if err := m.transitionActiveSyncer(activeSyncer); err != nil {
1✔
770
                log.Errorf("Unable to transition active GossipSyncer(%x): %v",
×
771
                        activeSyncer.cfg.peerPub, err)
×
772
                return
×
773
        }
×
774

775
        if err := m.transitionPassiveSyncer(candidate); err != nil {
1✔
776
                log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
×
777
                        activeSyncer.cfg.peerPub, err)
×
778
                return
×
779
        }
×
780
}
781

782
// transitionActiveSyncer transitions an active syncer to a passive one.
783
//
784
// NOTE: This must be called with the syncersMu lock held.
785
func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error {
1✔
786
        log.Debugf("Transitioning active GossipSyncer(%x) to passive",
1✔
787
                s.cfg.peerPub)
1✔
788

1✔
789
        if err := s.ProcessSyncTransition(PassiveSync); err != nil {
1✔
790
                return err
×
791
        }
×
792

793
        delete(m.activeSyncers, s.cfg.peerPub)
1✔
794
        m.inactiveSyncers[s.cfg.peerPub] = s
1✔
795

1✔
796
        return nil
1✔
797
}
798

799
// transitionPassiveSyncer transitions a passive syncer to an active one.
800
//
801
// NOTE: This must be called with the syncersMu lock held.
802
func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error {
11✔
803
        log.Debugf("Transitioning passive GossipSyncer(%x) to active",
11✔
804
                s.cfg.peerPub)
11✔
805

11✔
806
        if err := s.ProcessSyncTransition(ActiveSync); err != nil {
11✔
807
                return err
×
808
        }
×
809

810
        delete(m.inactiveSyncers, s.cfg.peerPub)
11✔
811
        m.activeSyncers[s.cfg.peerPub] = s
11✔
812

11✔
813
        return nil
11✔
814
}
815

816
// forceHistoricalSync chooses a syncer with a remote peer at random and forces
817
// a historical sync with it.
818
func (m *SyncManager) forceHistoricalSync() *GossipSyncer {
3✔
819
        m.syncersMu.Lock()
3✔
820
        defer m.syncersMu.Unlock()
3✔
821

3✔
822
        // We'll sample from both sets of active and inactive syncers in the
3✔
823
        // event that we don't have any inactive syncers.
3✔
824
        return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
6✔
825
                return s.historicalSync()
3✔
826
        })
3✔
827
}
828

829
// chooseRandomSyncer iterates through the set of syncers given and returns the
830
// first one which was able to successfully perform the action enclosed in the
831
// function closure.
832
//
833
// NOTE: It's possible for a nil value to be returned if there are no eligible
834
// candidate syncers.
835
func chooseRandomSyncer(syncers map[route.Vertex]*GossipSyncer,
836
        action func(*GossipSyncer) error) *GossipSyncer {
19✔
837

19✔
838
        for _, s := range syncers {
38✔
839
                // Only syncers in a chansSynced state are viable for sync
19✔
840
                // transitions, so skip any that aren't.
19✔
841
                if s.syncState() != chansSynced {
22✔
842
                        continue
3✔
843
                }
844

845
                if action != nil {
29✔
846
                        if err := action(s); err != nil {
13✔
847
                                log.Debugf("Skipping eligible candidate "+
×
848
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
849
                                        err)
×
850
                                continue
×
851
                        }
852
                }
853

854
                return s
16✔
855
        }
856

857
        return nil
3✔
858
}
859

860
// InitSyncState is called by outside sub-systems when a connection is
861
// established to a new peer that understands how to perform channel range
862
// queries. We'll allocate a new GossipSyncer for it, and start any goroutines
863
// needed to handle new queries. The first GossipSyncer registered with the
864
// SyncManager will attempt a historical sync to ensure we have as much of the
865
// public channel graph as possible.
866
//
867
// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer.
868
func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error {
25✔
869
        done := make(chan struct{})
25✔
870

25✔
871
        select {
25✔
872
        case m.newSyncers <- &newSyncer{
873
                peer:     peer,
874
                doneChan: done,
875
        }:
25✔
876
        case <-m.quit:
×
877
                return ErrSyncManagerExiting
×
878
        }
879

880
        select {
25✔
881
        case <-done:
25✔
882
                return nil
25✔
883
        case <-m.quit:
×
884
                return ErrSyncManagerExiting
×
885
        }
886
}
887

888
// PruneSyncState is called by outside sub-systems once a peer that we were
889
// previously connected to has been disconnected. In this case we can stop the
890
// existing GossipSyncer assigned to the peer and free up resources.
891
func (m *SyncManager) PruneSyncState(peer route.Vertex) {
6✔
892
        done := make(chan struct{})
6✔
893

6✔
894
        // We avoid returning an error when the SyncManager is stopped since the
6✔
895
        // GossipSyncer will be stopped then anyway.
6✔
896
        select {
6✔
897
        case m.staleSyncers <- &staleSyncer{
898
                peer:     peer,
899
                doneChan: done,
900
        }:
6✔
901
        case <-m.quit:
×
902
                return
×
903
        }
904

905
        select {
6✔
906
        case <-done:
6✔
907
        case <-m.quit:
×
908
        }
909
}
910

911
// GossipSyncer returns the associated gossip syncer of a peer. The boolean
912
// returned signals whether there exists a gossip syncer for the peer.
913
func (m *SyncManager) GossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
45✔
914
        m.syncersMu.Lock()
45✔
915
        defer m.syncersMu.Unlock()
45✔
916
        return m.gossipSyncer(peer)
45✔
917
}
45✔
918

919
// gossipSyncer returns the associated gossip syncer of a peer. The boolean
920
// returned signals whether there exists a gossip syncer for the peer.
921
func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
51✔
922
        syncer, ok := m.inactiveSyncers[peer]
51✔
923
        if ok {
66✔
924
                return syncer, true
15✔
925
        }
15✔
926
        syncer, ok = m.activeSyncers[peer]
36✔
927
        if ok {
44✔
928
                return syncer, true
8✔
929
        }
8✔
930
        syncer, ok = m.pinnedActiveSyncers[peer]
28✔
931
        if ok {
31✔
932
                return syncer, true
3✔
933
        }
3✔
934
        return nil, false
25✔
935
}
936

937
// GossipSyncers returns all of the currently initialized gossip syncers.
938
func (m *SyncManager) GossipSyncers() map[route.Vertex]*GossipSyncer {
34✔
939
        m.syncersMu.Lock()
34✔
940
        defer m.syncersMu.Unlock()
34✔
941
        return m.gossipSyncers()
34✔
942
}
34✔
943

944
// gossipSyncers returns all of the currently initialized gossip syncers.
945
func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer {
37✔
946
        numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers)
37✔
947
        syncers := make(map[route.Vertex]*GossipSyncer, numSyncers)
37✔
948

37✔
949
        for _, syncer := range m.inactiveSyncers {
42✔
950
                syncers[syncer.cfg.peerPub] = syncer
5✔
951
        }
5✔
952
        for _, syncer := range m.activeSyncers {
37✔
953
                syncers[syncer.cfg.peerPub] = syncer
×
954
        }
×
955

956
        return syncers
37✔
957
}
958

959
// markGraphSynced allows us to report that the initial historical sync has
960
// completed.
961
func (m *SyncManager) markGraphSynced() {
42✔
962
        atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
42✔
963
}
42✔
964

965
// IsGraphSynced determines whether we've completed our initial historical sync.
966
// The initial historical sync is done to ensure we've ingested as much of the
967
// public graph as possible.
968
func (m *SyncManager) IsGraphSynced() bool {
295✔
969
        return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
295✔
970
}
295✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc