• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16811814134

07 Aug 2025 05:46PM UTC coverage: 57.463% (-9.5%) from 66.947%
16811814134

Pull #9844

github

web-flow
Merge 4b08ee16d into 2269859d9
Pull Request #9844: Refactor Payment PR 3

434 of 645 new or added lines in 17 files covered. (67.29%)

28260 existing lines in 457 files now uncovered.

99053 of 172378 relevant lines covered (57.46%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.98
/discovery/sync_manager.go
1
package discovery
2

3
import (
4
        "context"
5
        "errors"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/lightningnetwork/lnd/lnpeer"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
        "github.com/lightningnetwork/lnd/routing/route"
14
        "github.com/lightningnetwork/lnd/ticker"
15
        "golang.org/x/time/rate"
16
)
17

18
const (
19
        // DefaultSyncerRotationInterval is the default interval in which we'll
20
        // rotate a single active syncer.
21
        DefaultSyncerRotationInterval = 20 * time.Minute
22

23
        // DefaultHistoricalSyncInterval is the default interval in which we'll
24
        // force a historical sync to ensure we have as much of the public
25
        // network as possible.
26
        DefaultHistoricalSyncInterval = time.Hour
27

28
        // DefaultFilterConcurrency is the default maximum number of concurrent
29
        // gossip filter applications that can be processed.
30
        DefaultFilterConcurrency = 5
31

32
        // DefaultMsgBytesBurst is the allotted burst in bytes we'll permit.
33
        // This is the most that can be sent in a given go. Requests beyond
34
        // this, will block indefinitely. Once tokens (bytes are depleted),
35
        // they'll be refilled at the DefaultMsgBytesPerSecond rate.
36
        DefaultMsgBytesBurst = 2 * 1000 * 1_024
37

38
        // DefaultMsgBytesPerSecond is the max bytes/s we'll permit for outgoing
39
        // messages. Once tokens (bytes) have been taken from the bucket,
40
        // they'll be refilled at this rate.
41
        DefaultMsgBytesPerSecond = 1000 * 1_024
42

43
        // assumedMsgSize is the assumed size of a message if we can't compute
44
        // its serialized size. This comes out to 1 KB.
45
        assumedMsgSize = 1_024
46
)
47

48
var (
49
        // ErrSyncManagerExiting is an error returned when we attempt to
50
        // start/stop a gossip syncer for a connected/disconnected peer, but the
51
        // SyncManager has already been stopped.
52
        ErrSyncManagerExiting = errors.New("sync manager exiting")
53
)
54

55
// newSyncer in an internal message we'll use within the SyncManager to signal
56
// that we should create a GossipSyncer for a newly connected peer.
57
type newSyncer struct {
58
        // peer is the newly connected peer.
59
        peer lnpeer.Peer
60

61
        // doneChan serves as a signal to the caller that the SyncManager's
62
        // internal state correctly reflects the stale active syncer.
63
        doneChan chan struct{}
64
}
65

66
// staleSyncer is an internal message we'll use within the SyncManager to signal
67
// that a peer has disconnected and its GossipSyncer should be removed.
68
type staleSyncer struct {
69
        // peer is the peer that has disconnected.
70
        peer route.Vertex
71

72
        // doneChan serves as a signal to the caller that the SyncManager's
73
        // internal state correctly reflects the stale active syncer. This is
74
        // needed to ensure we always create a new syncer for a flappy peer
75
        // after they disconnect if they happened to be an active syncer.
76
        doneChan chan struct{}
77
}
78

79
// SyncManagerCfg contains all of the dependencies required for the SyncManager
80
// to carry out its duties.
81
type SyncManagerCfg struct {
82
        // ChainHash is a hash that indicates the specific network of the active
83
        // chain.
84
        ChainHash chainhash.Hash
85

86
        // ChanSeries is an interface that provides access to a time series view
87
        // of the current known channel graph. Each GossipSyncer enabled peer
88
        // will utilize this in order to create and respond to channel graph
89
        // time series queries.
90
        ChanSeries ChannelGraphTimeSeries
91

92
        // NumActiveSyncers is the number of peers for which we should have
93
        // active syncers with. After reaching NumActiveSyncers, any future
94
        // gossip syncers will be passive.
95
        NumActiveSyncers int
96

97
        // NoTimestampQueries will prevent the GossipSyncer from querying
98
        // timestamps of announcement messages from the peer and from responding
99
        // to timestamp queries
100
        NoTimestampQueries bool
101

102
        // RotateTicker is a ticker responsible for notifying the SyncManager
103
        // when it should rotate its active syncers. A single active syncer with
104
        // a chansSynced state will be exchanged for a passive syncer in order
105
        // to ensure we don't keep syncing with the same peers.
106
        RotateTicker ticker.Ticker
107

108
        // HistoricalSyncTicker is a ticker responsible for notifying the
109
        // SyncManager when it should attempt a historical sync with a gossip
110
        // sync peer.
111
        HistoricalSyncTicker ticker.Ticker
112

113
        // IgnoreHistoricalFilters will prevent syncers from replying with
114
        // historical data when the remote peer sets a gossip_timestamp_range.
115
        // This prevents ranges with old start times from causing us to dump the
116
        // graph on connect.
117
        IgnoreHistoricalFilters bool
118

119
        // BestHeight returns the latest height known of the chain.
120
        BestHeight func() uint32
121

122
        // PinnedSyncers is a set of peers that will always transition to
123
        // ActiveSync upon connection. These peers will never transition to
124
        // PassiveSync.
125
        PinnedSyncers PinnedSyncers
126

127
        // IsStillZombieChannel takes the timestamps of the latest channel
128
        // updates for a channel and returns true if the channel should be
129
        // considered a zombie based on these timestamps.
130
        IsStillZombieChannel func(time.Time, time.Time) bool
131

132
        // AllotedMsgBytesPerSecond is the allotted bandwidth rate, expressed in
133
        // bytes/second that the gossip manager can consume. Once we exceed this
134
        // rate, message sending will block until we're below the rate.
135
        AllotedMsgBytesPerSecond uint64
136

137
        // AllotedMsgBytesBurst is the amount of burst bytes we'll permit, if
138
        // we've exceeded the hard upper limit.
139
        AllotedMsgBytesBurst uint64
140

141
        // FilterConcurrency is the maximum number of concurrent gossip filter
142
        // applications that can be processed. If not set, defaults to 5.
143
        FilterConcurrency int
144
}
145

146
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
147
// for peers currently connected. When a new peer is connected, the manager will
148
// create its accompanying gossip syncer and determine whether it should have an
149
// ActiveSync or PassiveSync sync type based on how many other gossip syncers
150
// are currently active. Any ActiveSync gossip syncers are started in a
151
// round-robin manner to ensure we're not syncing with multiple peers at the
152
// same time. The first GossipSyncer registered with the SyncManager will
153
// attempt a historical sync to ensure we have as much of the public channel
154
// graph as possible.
155
type SyncManager struct {
156
        // initialHistoricalSyncCompleted serves as a barrier when initializing
157
        // new active GossipSyncers. If 0, the initial historical sync has not
158
        // completed, so we'll defer initializing any active GossipSyncers. If
159
        // 1, then we can transition the GossipSyncer immediately. We set up
160
        // this barrier to ensure we have most of the graph before attempting to
161
        // accept new updates at tip.
162
        //
163
        // NOTE: This must be used atomically.
164
        initialHistoricalSyncCompleted int32
165

166
        start sync.Once
167
        stop  sync.Once
168

169
        cfg SyncManagerCfg
170

171
        // newSyncers is a channel we'll use to process requests to create
172
        // GossipSyncers for newly connected peers.
173
        newSyncers chan *newSyncer
174

175
        // staleSyncers is a channel we'll use to process requests to tear down
176
        // GossipSyncers for disconnected peers.
177
        staleSyncers chan *staleSyncer
178

179
        // syncersMu guards the read and write access to the activeSyncers and
180
        // inactiveSyncers maps below.
181
        syncersMu sync.Mutex
182

183
        // activeSyncers is the set of all syncers for which we are currently
184
        // receiving graph updates from. The number of possible active syncers
185
        // is bounded by NumActiveSyncers.
186
        activeSyncers map[route.Vertex]*GossipSyncer
187

188
        // inactiveSyncers is the set of all syncers for which we are not
189
        // currently receiving new graph updates from.
190
        inactiveSyncers map[route.Vertex]*GossipSyncer
191

192
        // pinnedActiveSyncers is the set of all syncers which are pinned into
193
        // an active sync. Pinned peers performan an initial historical sync on
194
        // each connection and will continue to receive graph updates for the
195
        // duration of the connection.
196
        pinnedActiveSyncers map[route.Vertex]*GossipSyncer
197

198
        // gossipFilterSema contains semaphores for the gossip timestamp
199
        // queries.
200
        gossipFilterSema chan struct{}
201

202
        // rateLimiter dictates the frequency with which we will reply to gossip
203
        // queries from a peer. This is used to delay responses to peers to
204
        // prevent DOS vulnerabilities if they are spamming with an unreasonable
205
        // number of queries.
206
        rateLimiter *rate.Limiter
207

208
        wg   sync.WaitGroup
209
        quit chan struct{}
210
}
211

212
// newSyncManager constructs a new SyncManager backed by the given config.
213
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
3✔
214

3✔
215
        filterConcurrency := cfg.FilterConcurrency
3✔
216
        if filterConcurrency == 0 {
3✔
UNCOV
217
                filterConcurrency = DefaultFilterConcurrency
×
UNCOV
218
        }
×
219

220
        filterSema := make(chan struct{}, filterConcurrency)
3✔
221
        for i := 0; i < filterConcurrency; i++ {
6✔
222
                filterSema <- struct{}{}
3✔
223
        }
3✔
224

225
        bytesPerSecond := cfg.AllotedMsgBytesPerSecond
3✔
226
        if bytesPerSecond == 0 {
3✔
UNCOV
227
                bytesPerSecond = DefaultMsgBytesPerSecond
×
UNCOV
228
        }
×
229

230
        bytesBurst := cfg.AllotedMsgBytesBurst
3✔
231
        if bytesBurst == 0 {
3✔
UNCOV
232
                bytesBurst = DefaultMsgBytesBurst
×
UNCOV
233
        }
×
234

235
        // We'll use this rate limiter to limit our total outbound bandwidth for
236
        // gossip queries peers.
237
        rateLimiter := rate.NewLimiter(
3✔
238
                rate.Limit(bytesPerSecond), int(bytesBurst),
3✔
239
        )
3✔
240

3✔
241
        return &SyncManager{
3✔
242
                cfg:          *cfg,
3✔
243
                rateLimiter:  rateLimiter,
3✔
244
                newSyncers:   make(chan *newSyncer),
3✔
245
                staleSyncers: make(chan *staleSyncer),
3✔
246
                activeSyncers: make(
3✔
247
                        map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers,
3✔
248
                ),
3✔
249
                inactiveSyncers: make(map[route.Vertex]*GossipSyncer),
3✔
250
                pinnedActiveSyncers: make(
3✔
251
                        map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers),
3✔
252
                ),
3✔
253
                gossipFilterSema: filterSema,
3✔
254
                quit:             make(chan struct{}),
3✔
255
        }
3✔
256
}
257

258
// Start starts the SyncManager in order to properly carry out its duties.
259
func (m *SyncManager) Start() {
3✔
260
        m.start.Do(func() {
6✔
261
                m.wg.Add(1)
3✔
262
                go m.syncerHandler()
3✔
263
        })
3✔
264
}
265

266
// Stop stops the SyncManager from performing its duties.
267
func (m *SyncManager) Stop() {
3✔
268
        m.stop.Do(func() {
6✔
269
                log.Debugf("SyncManager is stopping")
3✔
270
                defer log.Debugf("SyncManager stopped")
3✔
271

3✔
272
                close(m.quit)
3✔
273
                m.wg.Wait()
3✔
274

3✔
275
                for _, syncer := range m.inactiveSyncers {
6✔
276
                        syncer.Stop()
3✔
277
                }
3✔
278
                for _, syncer := range m.activeSyncers {
6✔
279
                        syncer.Stop()
3✔
280
                }
3✔
281
        })
282
}
283

284
// syncerHandler is the SyncManager's main event loop responsible for:
285
//
286
// 1. Creating and tearing down GossipSyncers for connected/disconnected peers.
287

288
// 2. Finding new peers to receive graph updates from to ensure we don't only
289
//    receive them from the same set of peers.
290

291
//  3. Finding new peers to force a historical sync with to ensure we have as
292
//     much of the public network as possible.
293
//
294
// NOTE: This must be run as a goroutine.
295
func (m *SyncManager) syncerHandler() {
3✔
296
        defer m.wg.Done()
3✔
297

3✔
298
        m.cfg.RotateTicker.Resume()
3✔
299
        defer m.cfg.RotateTicker.Stop()
3✔
300

3✔
301
        defer m.cfg.HistoricalSyncTicker.Stop()
3✔
302

3✔
303
        var (
3✔
304
                // initialHistoricalSyncer is the syncer we are currently
3✔
305
                // performing an initial historical sync with.
3✔
306
                initialHistoricalSyncer *GossipSyncer
3✔
307

3✔
308
                // initialHistoricalSyncSignal is a signal that will fire once
3✔
309
                // the initial historical sync has been completed. This is
3✔
310
                // crucial to ensure that another historical sync isn't
3✔
311
                // attempted just because the initialHistoricalSyncer was
3✔
312
                // disconnected.
3✔
313
                initialHistoricalSyncSignal chan struct{}
3✔
314
        )
3✔
315

3✔
316
        setInitialHistoricalSyncer := func(s *GossipSyncer) {
6✔
317
                initialHistoricalSyncer = s
3✔
318
                initialHistoricalSyncSignal = s.ResetSyncedSignal()
3✔
319

3✔
320
                // Restart the timer for our new historical sync peer. This will
3✔
321
                // ensure that all initial syncers receive an equivalent
3✔
322
                // duration before attempting the next sync. Without doing so we
3✔
323
                // might attempt two historical sync back to back if a peer
3✔
324
                // disconnects just before the ticker fires.
3✔
325
                m.cfg.HistoricalSyncTicker.Pause()
3✔
326
                m.cfg.HistoricalSyncTicker.Resume()
3✔
327
        }
3✔
328

329
        for {
6✔
330
                select {
3✔
331
                // A new peer has been connected, so we'll create its
332
                // accompanying GossipSyncer.
333
                case newSyncer := <-m.newSyncers:
3✔
334
                        // If we already have a syncer, then we'll exit early as
3✔
335
                        // we don't want to override it.
3✔
336
                        if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok {
3✔
337
                                close(newSyncer.doneChan)
×
338
                                continue
×
339
                        }
340

341
                        s := m.createGossipSyncer(newSyncer.peer)
3✔
342

3✔
343
                        isPinnedSyncer := m.isPinnedSyncer(s)
3✔
344

3✔
345
                        // attemptHistoricalSync determines whether we should
3✔
346
                        // attempt an initial historical sync when a new peer
3✔
347
                        // connects.
3✔
348
                        attemptHistoricalSync := false
3✔
349

3✔
350
                        m.syncersMu.Lock()
3✔
351
                        switch {
3✔
352
                        // For pinned syncers, we will immediately transition
353
                        // the peer into an active (pinned) sync state.
354
                        case isPinnedSyncer:
3✔
355
                                attemptHistoricalSync = true
3✔
356
                                s.setSyncType(PinnedSync)
3✔
357
                                s.setSyncState(syncerIdle)
3✔
358
                                m.pinnedActiveSyncers[s.cfg.peerPub] = s
3✔
359

360
                        // Regardless of whether the initial historical sync
361
                        // has completed, we'll re-trigger a historical sync if
362
                        // we no longer have any syncers. This might be
363
                        // necessary if we lost all our peers at one point, and
364
                        // now we finally have one again.
365
                        case len(m.activeSyncers) == 0 &&
366
                                len(m.inactiveSyncers) == 0:
3✔
367

3✔
368
                                attemptHistoricalSync =
3✔
369
                                        m.cfg.NumActiveSyncers > 0
3✔
370
                                fallthrough
3✔
371

372
                        // If we've exceeded our total number of active syncers,
373
                        // we'll initialize this GossipSyncer as passive.
374
                        case len(m.activeSyncers) >= m.cfg.NumActiveSyncers:
3✔
375
                                fallthrough
3✔
376

377
                        // If the initial historical sync has yet to complete,
378
                        // then we'll declare it as passive and attempt to
379
                        // transition it when the initial historical sync
380
                        // completes.
381
                        case !m.IsGraphSynced():
3✔
382
                                s.setSyncType(PassiveSync)
3✔
383
                                m.inactiveSyncers[s.cfg.peerPub] = s
3✔
384

385
                        // The initial historical sync has completed, so we can
386
                        // immediately start the GossipSyncer as active.
387
                        default:
3✔
388
                                s.setSyncType(ActiveSync)
3✔
389
                                m.activeSyncers[s.cfg.peerPub] = s
3✔
390
                        }
391
                        m.syncersMu.Unlock()
3✔
392

3✔
393
                        s.Start()
3✔
394

3✔
395
                        // Once we create the GossipSyncer, we'll signal to the
3✔
396
                        // caller that they can proceed since the SyncManager's
3✔
397
                        // internal state has been updated.
3✔
398
                        close(newSyncer.doneChan)
3✔
399

3✔
400
                        // We'll force a historical sync with the first peer we
3✔
401
                        // connect to, to ensure we get as much of the graph as
3✔
402
                        // possible.
3✔
403
                        if !attemptHistoricalSync {
6✔
404
                                continue
3✔
405
                        }
406

407
                        log.Debugf("Attempting initial historical sync with "+
3✔
408
                                "GossipSyncer(%x)", s.cfg.peerPub)
3✔
409

3✔
410
                        if err := s.historicalSync(); err != nil {
3✔
411
                                log.Errorf("Unable to attempt initial "+
×
412
                                        "historical sync with "+
×
413
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
414
                                        err)
×
415
                                continue
×
416
                        }
417

418
                        // Once the historical sync has started, we'll get a
419
                        // keep track of the corresponding syncer to properly
420
                        // handle disconnects. We'll also use a signal to know
421
                        // when the historical sync completed.
422
                        if !isPinnedSyncer {
6✔
423
                                setInitialHistoricalSyncer(s)
3✔
424
                        }
3✔
425

426
                // An existing peer has disconnected, so we'll tear down its
427
                // corresponding GossipSyncer.
428
                case staleSyncer := <-m.staleSyncers:
3✔
429
                        // Once the corresponding GossipSyncer has been stopped
3✔
430
                        // and removed, we'll signal to the caller that they can
3✔
431
                        // proceed since the SyncManager's internal state has
3✔
432
                        // been updated.
3✔
433
                        m.removeGossipSyncer(staleSyncer.peer)
3✔
434
                        close(staleSyncer.doneChan)
3✔
435

3✔
436
                        // If we don't have an initialHistoricalSyncer, or we do
3✔
437
                        // but it is not the peer being disconnected, then we
3✔
438
                        // have nothing left to do and can proceed.
3✔
439
                        switch {
3✔
440
                        case initialHistoricalSyncer == nil:
3✔
441
                                fallthrough
3✔
442
                        case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub:
3✔
443
                                fallthrough
3✔
444
                        case m.cfg.NumActiveSyncers == 0:
3✔
445
                                continue
3✔
446
                        }
447

448
                        // Otherwise, our initialHistoricalSyncer corresponds to
449
                        // the peer being disconnected, so we'll have to find a
450
                        // replacement.
UNCOV
451
                        log.Debug("Finding replacement for initial " +
×
UNCOV
452
                                "historical sync")
×
UNCOV
453

×
UNCOV
454
                        s := m.forceHistoricalSync()
×
UNCOV
455
                        if s == nil {
×
456
                                log.Debug("No eligible replacement found " +
×
457
                                        "for initial historical sync")
×
458
                                continue
×
459
                        }
460

UNCOV
461
                        log.Debugf("Replaced initial historical "+
×
UNCOV
462
                                "GossipSyncer(%v) with GossipSyncer(%x)",
×
UNCOV
463
                                staleSyncer.peer, s.cfg.peerPub)
×
UNCOV
464

×
UNCOV
465
                        setInitialHistoricalSyncer(s)
×
466

467
                // Our initial historical sync signal has completed, so we'll
468
                // nil all of the relevant fields as they're no longer needed.
469
                case <-initialHistoricalSyncSignal:
3✔
470
                        initialHistoricalSyncer = nil
3✔
471
                        initialHistoricalSyncSignal = nil
3✔
472

3✔
473
                        log.Debug("Initial historical sync completed")
3✔
474

3✔
475
                        // With the initial historical sync complete, we can
3✔
476
                        // begin receiving new graph updates at tip. We'll
3✔
477
                        // determine whether we can have any more active
3✔
478
                        // GossipSyncers. If we do, we'll randomly select some
3✔
479
                        // that are currently passive to transition.
3✔
480
                        m.syncersMu.Lock()
3✔
481
                        numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers)
3✔
482
                        if numActiveLeft <= 0 {
3✔
483
                                m.syncersMu.Unlock()
×
484
                                continue
×
485
                        }
486

487
                        // We may not even have enough inactive syncers to be
488
                        // transitted. In that case, we will transit all the
489
                        // inactive syncers.
490
                        if len(m.inactiveSyncers) < numActiveLeft {
6✔
491
                                numActiveLeft = len(m.inactiveSyncers)
3✔
492
                        }
3✔
493

494
                        log.Debugf("Attempting to transition %v passive "+
3✔
495
                                "GossipSyncers to active", numActiveLeft)
3✔
496

3✔
497
                        for i := 0; i < numActiveLeft; i++ {
6✔
498
                                chooseRandomSyncer(
3✔
499
                                        m.inactiveSyncers, m.transitionPassiveSyncer,
3✔
500
                                )
3✔
501
                        }
3✔
502

503
                        m.syncersMu.Unlock()
3✔
504

505
                // Our RotateTicker has ticked, so we'll attempt to rotate a
506
                // single active syncer with a passive one.
UNCOV
507
                case <-m.cfg.RotateTicker.Ticks():
×
UNCOV
508
                        m.rotateActiveSyncerCandidate()
×
509

510
                // Our HistoricalSyncTicker has ticked, so we'll randomly select
511
                // a peer and force a historical sync with them.
UNCOV
512
                case <-m.cfg.HistoricalSyncTicker.Ticks():
×
UNCOV
513
                        // To be extra cautious, gate the forceHistoricalSync
×
UNCOV
514
                        // call such that it can only execute if we are
×
UNCOV
515
                        // configured to have a non-zero number of sync peers.
×
UNCOV
516
                        // This way even if the historical sync ticker manages
×
UNCOV
517
                        // to tick we can be sure that a historical sync won't
×
UNCOV
518
                        // accidentally begin.
×
UNCOV
519
                        if m.cfg.NumActiveSyncers == 0 {
×
UNCOV
520
                                continue
×
521
                        }
522

523
                        // If we don't have a syncer available we have nothing
524
                        // to do.
UNCOV
525
                        s := m.forceHistoricalSync()
×
UNCOV
526
                        if s == nil {
×
527
                                continue
×
528
                        }
529

530
                        // If we've already completed a historical sync, we'll
531
                        // skip setting the initial historical syncer.
UNCOV
532
                        if m.IsGraphSynced() {
×
533
                                continue
×
534
                        }
535

536
                        // Otherwise, we'll track the peer we've performed a
537
                        // historical sync with in order to handle the case
538
                        // where our previous historical sync peer did not
539
                        // respond to our queries and we haven't ingested as
540
                        // much of the graph as we should.
UNCOV
541
                        setInitialHistoricalSyncer(s)
×
542

543
                case <-m.quit:
3✔
544
                        return
3✔
545
                }
546
        }
547
}
548

549
// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned
550
// sync peers.
551
func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
3✔
552
        _, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub]
3✔
553
        return isPinnedSyncer
3✔
554
}
3✔
555

556
// deriveRateLimitReservation will take the current message and derive a
557
// reservation that can be used to wait on the rate limiter.
558
func (m *SyncManager) deriveRateLimitReservation(msg lnwire.Message,
559
) (*rate.Reservation, error) {
3✔
560

3✔
561
        var (
3✔
562
                msgSize uint32
3✔
563
                err     error
3✔
564
        )
3✔
565

3✔
566
        // Figure out the serialized size of the message. If we can't easily
3✔
567
        // compute it, then we'll used the assumed msg size.
3✔
568
        if sMsg, ok := msg.(lnwire.SizeableMessage); ok {
6✔
569
                msgSize, err = sMsg.SerializedSize()
3✔
570
                if err != nil {
3✔
UNCOV
571
                        return nil, err
×
UNCOV
572
                }
×
573
        } else {
×
574
                log.Warnf("Unable to compute serialized size of %T", msg)
×
575

×
576
                msgSize = assumedMsgSize
×
577
        }
×
578

579
        return m.rateLimiter.ReserveN(time.Now(), int(msgSize)), nil
3✔
580
}
581

582
// waitMsgDelay takes a delay, and waits until it has finished.
583
func (m *SyncManager) waitMsgDelay(ctx context.Context, peerPub [33]byte,
584
        limitReservation *rate.Reservation) error {
3✔
585

3✔
586
        // If we've already replied a handful of times, we will start to delay
3✔
587
        // responses back to the remote peer. This can help prevent DOS attacks
3✔
588
        // where the remote peer spams us endlessly.
3✔
589
        //
3✔
590
        // We skip checking for reservation.OK() here, as during config
3✔
591
        // validation, we ensure that the burst is enough for a single message
3✔
592
        // to be sent.
3✔
593
        delay := limitReservation.Delay()
3✔
594
        if delay > 0 {
3✔
595
                log.Debugf("GossipSyncer(%x): rate limiting gossip replies, "+
×
596
                        "responding in %s", peerPub, delay)
×
597

×
598
                select {
×
599
                case <-time.After(delay):
×
600

601
                case <-ctx.Done():
×
602
                        limitReservation.Cancel()
×
603

×
604
                        return ErrGossipSyncerExiting
×
605

606
                case <-m.quit:
×
607
                        limitReservation.Cancel()
×
608

×
609
                        return ErrGossipSyncerExiting
×
610
                }
611
        }
612

613
        return nil
3✔
614
}
615

616
// maybeRateLimitMsg takes a message, and may wait a period of time to rate
617
// limit the msg.
618
func (m *SyncManager) maybeRateLimitMsg(ctx context.Context, peerPub [33]byte,
619
        msg lnwire.Message) error {
3✔
620

3✔
621
        delay, err := m.deriveRateLimitReservation(msg)
3✔
622
        if err != nil {
3✔
623
                return nil
×
624
        }
×
625

626
        return m.waitMsgDelay(ctx, peerPub, delay)
3✔
627
}
628

629
// sendMessages sends a set of messages to the remote peer.
630
func (m *SyncManager) sendMessages(ctx context.Context, sync bool,
631
        peer lnpeer.Peer, nodeID route.Vertex, msgs ...lnwire.Message) error {
3✔
632

3✔
633
        for _, msg := range msgs {
6✔
634
                if err := m.maybeRateLimitMsg(ctx, nodeID, msg); err != nil {
3✔
635
                        return err
×
636
                }
×
637
                if err := peer.SendMessageLazy(sync, msg); err != nil {
3✔
638
                        return err
×
639
                }
×
640
        }
641

642
        return nil
3✔
643
}
644

645
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
646
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
3✔
647
        nodeID := route.Vertex(peer.PubKey())
3✔
648
        log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:])
3✔
649

3✔
650
        encoding := lnwire.EncodingSortedPlain
3✔
651
        s := newGossipSyncer(gossipSyncerCfg{
3✔
652
                chainHash:     m.cfg.ChainHash,
3✔
653
                peerPub:       nodeID,
3✔
654
                channelSeries: m.cfg.ChanSeries,
3✔
655
                encodingType:  encoding,
3✔
656
                chunkSize:     encodingTypeToChunkSize[encoding],
3✔
657
                batchSize:     requestBatchSize,
3✔
658
                sendToPeer: func(ctx context.Context,
3✔
659
                        msgs ...lnwire.Message) error {
6✔
660

3✔
661
                        return m.sendMessages(ctx, false, peer, nodeID, msgs...)
3✔
662
                },
3✔
663
                sendToPeerSync: func(ctx context.Context,
664
                        msgs ...lnwire.Message) error {
3✔
665

3✔
666
                        return m.sendMessages(ctx, true, peer, nodeID, msgs...)
3✔
667
                },
3✔
668
                ignoreHistoricalFilters:  m.cfg.IgnoreHistoricalFilters,
669
                bestHeight:               m.cfg.BestHeight,
670
                markGraphSynced:          m.markGraphSynced,
671
                maxQueryChanRangeReplies: maxQueryChanRangeReplies,
672
                noTimestampQueryOption:   m.cfg.NoTimestampQueries,
673
                isStillZombieChannel:     m.cfg.IsStillZombieChannel,
674
        }, m.gossipFilterSema)
675

676
        // Gossip syncers are initialized by default in a PassiveSync type
677
        // and chansSynced state so that they can reply to any peer queries or
678
        // handle any sync transitions.
679
        s.setSyncState(chansSynced)
3✔
680
        s.setSyncType(PassiveSync)
3✔
681

3✔
682
        log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%x",
3✔
683
                s.syncState(), s.SyncType(), peer.PubKey())
3✔
684

3✔
685
        return s
3✔
686
}
687

688
// removeGossipSyncer removes all internal references to the disconnected peer's
689
// GossipSyncer and stops it. In the event of an active GossipSyncer being
690
// disconnected, a passive GossipSyncer, if any, will take its place.
691
func (m *SyncManager) removeGossipSyncer(peer route.Vertex) {
3✔
692
        m.syncersMu.Lock()
3✔
693
        defer m.syncersMu.Unlock()
3✔
694

3✔
695
        s, ok := m.gossipSyncer(peer)
3✔
696
        if !ok {
6✔
697
                return
3✔
698
        }
3✔
699

700
        log.Infof("Removing GossipSyncer for peer=%v", peer)
3✔
701

3✔
702
        // We'll stop the GossipSyncer for the disconnected peer in a goroutine
3✔
703
        // to prevent blocking the SyncManager.
3✔
704
        go s.Stop()
3✔
705

3✔
706
        // If it's a non-active syncer, then we can just exit now.
3✔
707
        if _, ok := m.inactiveSyncers[peer]; ok {
6✔
708
                delete(m.inactiveSyncers, peer)
3✔
709
                return
3✔
710
        }
3✔
711

712
        // If it's a pinned syncer, then we can just exit as this doesn't
713
        // affect our active syncer count.
714
        if _, ok := m.pinnedActiveSyncers[peer]; ok {
6✔
715
                delete(m.pinnedActiveSyncers, peer)
3✔
716
                return
3✔
717
        }
3✔
718

719
        // Otherwise, we'll need find a new one to replace it, if any.
720
        delete(m.activeSyncers, peer)
3✔
721
        newActiveSyncer := chooseRandomSyncer(
3✔
722
                m.inactiveSyncers, m.transitionPassiveSyncer,
3✔
723
        )
3✔
724
        if newActiveSyncer == nil {
6✔
725
                return
3✔
726
        }
3✔
727

728
        log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)",
3✔
729
                peer, newActiveSyncer.cfg.peerPub)
3✔
730
}
731

732
// rotateActiveSyncerCandidate rotates a single active syncer. In order to
733
// achieve this, the active syncer must be in a chansSynced state in order to
734
// process the sync transition.
UNCOV
735
func (m *SyncManager) rotateActiveSyncerCandidate() {
×
UNCOV
736
        m.syncersMu.Lock()
×
UNCOV
737
        defer m.syncersMu.Unlock()
×
UNCOV
738

×
UNCOV
739
        // If we couldn't find an eligible active syncer to rotate, we can
×
UNCOV
740
        // return early.
×
UNCOV
741
        activeSyncer := chooseRandomSyncer(m.activeSyncers, nil)
×
UNCOV
742
        if activeSyncer == nil {
×
743
                log.Debug("No eligible active syncer to rotate")
×
744
                return
×
745
        }
×
746

747
        // Similarly, if we don't have a candidate to rotate with, we can return
748
        // early as well.
UNCOV
749
        candidate := chooseRandomSyncer(m.inactiveSyncers, nil)
×
UNCOV
750
        if candidate == nil {
×
UNCOV
751
                log.Debug("No eligible candidate to rotate active syncer")
×
UNCOV
752
                return
×
UNCOV
753
        }
×
754

755
        // Otherwise, we'll attempt to transition each syncer to their
756
        // respective new sync type.
UNCOV
757
        log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)",
×
UNCOV
758
                activeSyncer.cfg.peerPub, candidate.cfg.peerPub)
×
UNCOV
759

×
UNCOV
760
        if err := m.transitionActiveSyncer(activeSyncer); err != nil {
×
761
                log.Errorf("Unable to transition active GossipSyncer(%x): %v",
×
762
                        activeSyncer.cfg.peerPub, err)
×
763
                return
×
764
        }
×
765

UNCOV
766
        if err := m.transitionPassiveSyncer(candidate); err != nil {
×
767
                log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
×
768
                        activeSyncer.cfg.peerPub, err)
×
769
                return
×
770
        }
×
771
}
772

773
// transitionActiveSyncer transitions an active syncer to a passive one.
774
//
775
// NOTE: This must be called with the syncersMu lock held.
UNCOV
776
func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error {
×
UNCOV
777
        log.Debugf("Transitioning active GossipSyncer(%x) to passive",
×
UNCOV
778
                s.cfg.peerPub)
×
UNCOV
779

×
UNCOV
780
        if err := s.ProcessSyncTransition(PassiveSync); err != nil {
×
781
                return err
×
782
        }
×
783

UNCOV
784
        delete(m.activeSyncers, s.cfg.peerPub)
×
UNCOV
785
        m.inactiveSyncers[s.cfg.peerPub] = s
×
UNCOV
786

×
UNCOV
787
        return nil
×
788
}
789

790
// transitionPassiveSyncer transitions a passive syncer to an active one.
791
//
792
// NOTE: This must be called with the syncersMu lock held.
793
func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error {
3✔
794
        log.Debugf("Transitioning passive GossipSyncer(%x) to active",
3✔
795
                s.cfg.peerPub)
3✔
796

3✔
797
        if err := s.ProcessSyncTransition(ActiveSync); err != nil {
3✔
798
                return err
×
799
        }
×
800

801
        delete(m.inactiveSyncers, s.cfg.peerPub)
3✔
802
        m.activeSyncers[s.cfg.peerPub] = s
3✔
803

3✔
804
        return nil
3✔
805
}
806

807
// forceHistoricalSync chooses a syncer with a remote peer at random and forces
808
// a historical sync with it.
UNCOV
809
func (m *SyncManager) forceHistoricalSync() *GossipSyncer {
×
UNCOV
810
        m.syncersMu.Lock()
×
UNCOV
811
        defer m.syncersMu.Unlock()
×
UNCOV
812

×
UNCOV
813
        // We'll sample from both sets of active and inactive syncers in the
×
UNCOV
814
        // event that we don't have any inactive syncers.
×
UNCOV
815
        return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
×
UNCOV
816
                return s.historicalSync()
×
UNCOV
817
        })
×
818
}
819

820
// chooseRandomSyncer iterates through the set of syncers given and returns the
821
// first one which was able to successfully perform the action enclosed in the
822
// function closure.
823
//
824
// NOTE: It's possible for a nil value to be returned if there are no eligible
825
// candidate syncers.
826
func chooseRandomSyncer(syncers map[route.Vertex]*GossipSyncer,
827
        action func(*GossipSyncer) error) *GossipSyncer {
3✔
828

3✔
829
        for _, s := range syncers {
6✔
830
                // Only syncers in a chansSynced state are viable for sync
3✔
831
                // transitions, so skip any that aren't.
3✔
832
                if s.syncState() != chansSynced {
3✔
UNCOV
833
                        continue
×
834
                }
835

836
                if action != nil {
6✔
837
                        if err := action(s); err != nil {
3✔
838
                                log.Debugf("Skipping eligible candidate "+
×
839
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
840
                                        err)
×
841
                                continue
×
842
                        }
843
                }
844

845
                return s
3✔
846
        }
847

848
        return nil
3✔
849
}
850

851
// InitSyncState is called by outside sub-systems when a connection is
852
// established to a new peer that understands how to perform channel range
853
// queries. We'll allocate a new GossipSyncer for it, and start any goroutines
854
// needed to handle new queries. The first GossipSyncer registered with the
855
// SyncManager will attempt a historical sync to ensure we have as much of the
856
// public channel graph as possible.
857
//
858
// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer.
859
func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error {
3✔
860
        done := make(chan struct{})
3✔
861

3✔
862
        select {
3✔
863
        case m.newSyncers <- &newSyncer{
864
                peer:     peer,
865
                doneChan: done,
866
        }:
3✔
867
        case <-m.quit:
×
868
                return ErrSyncManagerExiting
×
869
        }
870

871
        select {
3✔
872
        case <-done:
3✔
873
                return nil
3✔
874
        case <-m.quit:
×
875
                return ErrSyncManagerExiting
×
876
        }
877
}
878

879
// PruneSyncState is called by outside sub-systems once a peer that we were
880
// previously connected to has been disconnected. In this case we can stop the
881
// existing GossipSyncer assigned to the peer and free up resources.
882
func (m *SyncManager) PruneSyncState(peer route.Vertex) {
3✔
883
        done := make(chan struct{})
3✔
884

3✔
885
        // We avoid returning an error when the SyncManager is stopped since the
3✔
886
        // GossipSyncer will be stopped then anyway.
3✔
887
        select {
3✔
888
        case m.staleSyncers <- &staleSyncer{
889
                peer:     peer,
890
                doneChan: done,
891
        }:
3✔
892
        case <-m.quit:
×
893
                return
×
894
        }
895

896
        select {
3✔
897
        case <-done:
3✔
898
        case <-m.quit:
×
899
        }
900
}
901

902
// GossipSyncer returns the associated gossip syncer of a peer. The boolean
903
// returned signals whether there exists a gossip syncer for the peer.
904
func (m *SyncManager) GossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
3✔
905
        m.syncersMu.Lock()
3✔
906
        defer m.syncersMu.Unlock()
3✔
907
        return m.gossipSyncer(peer)
3✔
908
}
3✔
909

910
// gossipSyncer returns the associated gossip syncer of a peer. The boolean
911
// returned signals whether there exists a gossip syncer for the peer.
912
func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
3✔
913
        syncer, ok := m.inactiveSyncers[peer]
3✔
914
        if ok {
6✔
915
                return syncer, true
3✔
916
        }
3✔
917
        syncer, ok = m.activeSyncers[peer]
3✔
918
        if ok {
6✔
919
                return syncer, true
3✔
920
        }
3✔
921
        syncer, ok = m.pinnedActiveSyncers[peer]
3✔
922
        if ok {
6✔
923
                return syncer, true
3✔
924
        }
3✔
925
        return nil, false
3✔
926
}
927

928
// GossipSyncers returns all of the currently initialized gossip syncers.
929
func (m *SyncManager) GossipSyncers() map[route.Vertex]*GossipSyncer {
3✔
930
        m.syncersMu.Lock()
3✔
931
        defer m.syncersMu.Unlock()
3✔
932
        return m.gossipSyncers()
3✔
933
}
3✔
934

935
// gossipSyncers returns all of the currently initialized gossip syncers.
936
func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer {
3✔
937
        numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers)
3✔
938
        syncers := make(map[route.Vertex]*GossipSyncer, numSyncers)
3✔
939

3✔
940
        for _, syncer := range m.inactiveSyncers {
6✔
941
                syncers[syncer.cfg.peerPub] = syncer
3✔
942
        }
3✔
943
        for _, syncer := range m.activeSyncers {
6✔
944
                syncers[syncer.cfg.peerPub] = syncer
3✔
945
        }
3✔
946

947
        return syncers
3✔
948
}
949

950
// markGraphSynced allows us to report that the initial historical sync has
951
// completed.
952
func (m *SyncManager) markGraphSynced() {
3✔
953
        atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
3✔
954
}
3✔
955

956
// IsGraphSynced determines whether we've completed our initial historical sync.
957
// The initial historical sync is done to ensure we've ingested as much of the
958
// public graph as possible.
959
func (m *SyncManager) IsGraphSynced() bool {
3✔
960
        return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
3✔
961
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc