• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14358372723

09 Apr 2025 01:26PM UTC coverage: 56.696% (-12.3%) from 69.037%
14358372723

Pull #9696

github

web-flow
Merge e2837e400 into 867d27d68
Pull Request #9696: Add `development_guidelines.md` for both human and machine

107055 of 188823 relevant lines covered (56.7%)

22721.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.76
/discovery/sync_manager.go
1
package discovery
2

3
import (
4
        "context"
5
        "errors"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/lightningnetwork/lnd/lnpeer"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
        "github.com/lightningnetwork/lnd/routing/route"
14
        "github.com/lightningnetwork/lnd/ticker"
15
        "golang.org/x/time/rate"
16
)
17

18
const (
19
        // DefaultSyncerRotationInterval is the default interval in which we'll
20
        // rotate a single active syncer.
21
        DefaultSyncerRotationInterval = 20 * time.Minute
22

23
        // DefaultHistoricalSyncInterval is the default interval in which we'll
24
        // force a historical sync to ensure we have as much of the public
25
        // network as possible.
26
        DefaultHistoricalSyncInterval = time.Hour
27

28
        // filterSemaSize is the capacity of gossipFilterSema.
29
        filterSemaSize = 5
30

31
        // DefaultMsgBytesBurst is the allotted burst in bytes we'll permit.
32
        // This is the most that can be sent in a given go. Requests beyond
33
        // this, will block indefinitely. Once tokens (bytes are depleted),
34
        // they'll be refilled at the DefaultMsgBytesPerSecond rate.
35
        DefaultMsgBytesBurst = 2 * 100 * 1_024
36

37
        // DefaultMsgBytesPerSecond is the max bytes/s we'll permit for outgoing
38
        // messages. Once tokens (bytes) have been taken from the bucket,
39
        // they'll be refilled at this rate.
40
        DefaultMsgBytesPerSecond = 100 * 1_024
41

42
        // assumedMsgSize is the assumed size of a message if we can't compute
43
        // its serialized size. This comes out to 1 KB.
44
        assumedMsgSize = 1_024
45
)
46

47
var (
48
        // ErrSyncManagerExiting is an error returned when we attempt to
49
        // start/stop a gossip syncer for a connected/disconnected peer, but the
50
        // SyncManager has already been stopped.
51
        ErrSyncManagerExiting = errors.New("sync manager exiting")
52
)
53

54
// newSyncer in an internal message we'll use within the SyncManager to signal
55
// that we should create a GossipSyncer for a newly connected peer.
56
type newSyncer struct {
57
        // peer is the newly connected peer.
58
        peer lnpeer.Peer
59

60
        // doneChan serves as a signal to the caller that the SyncManager's
61
        // internal state correctly reflects the stale active syncer.
62
        doneChan chan struct{}
63
}
64

65
// staleSyncer is an internal message we'll use within the SyncManager to signal
66
// that a peer has disconnected and its GossipSyncer should be removed.
67
type staleSyncer struct {
68
        // peer is the peer that has disconnected.
69
        peer route.Vertex
70

71
        // doneChan serves as a signal to the caller that the SyncManager's
72
        // internal state correctly reflects the stale active syncer. This is
73
        // needed to ensure we always create a new syncer for a flappy peer
74
        // after they disconnect if they happened to be an active syncer.
75
        doneChan chan struct{}
76
}
77

78
// SyncManagerCfg contains all of the dependencies required for the SyncManager
79
// to carry out its duties.
80
type SyncManagerCfg struct {
81
        // ChainHash is a hash that indicates the specific network of the active
82
        // chain.
83
        ChainHash chainhash.Hash
84

85
        // ChanSeries is an interface that provides access to a time series view
86
        // of the current known channel graph. Each GossipSyncer enabled peer
87
        // will utilize this in order to create and respond to channel graph
88
        // time series queries.
89
        ChanSeries ChannelGraphTimeSeries
90

91
        // NumActiveSyncers is the number of peers for which we should have
92
        // active syncers with. After reaching NumActiveSyncers, any future
93
        // gossip syncers will be passive.
94
        NumActiveSyncers int
95

96
        // NoTimestampQueries will prevent the GossipSyncer from querying
97
        // timestamps of announcement messages from the peer and from responding
98
        // to timestamp queries
99
        NoTimestampQueries bool
100

101
        // RotateTicker is a ticker responsible for notifying the SyncManager
102
        // when it should rotate its active syncers. A single active syncer with
103
        // a chansSynced state will be exchanged for a passive syncer in order
104
        // to ensure we don't keep syncing with the same peers.
105
        RotateTicker ticker.Ticker
106

107
        // HistoricalSyncTicker is a ticker responsible for notifying the
108
        // SyncManager when it should attempt a historical sync with a gossip
109
        // sync peer.
110
        HistoricalSyncTicker ticker.Ticker
111

112
        // IgnoreHistoricalFilters will prevent syncers from replying with
113
        // historical data when the remote peer sets a gossip_timestamp_range.
114
        // This prevents ranges with old start times from causing us to dump the
115
        // graph on connect.
116
        IgnoreHistoricalFilters bool
117

118
        // BestHeight returns the latest height known of the chain.
119
        BestHeight func() uint32
120

121
        // PinnedSyncers is a set of peers that will always transition to
122
        // ActiveSync upon connection. These peers will never transition to
123
        // PassiveSync.
124
        PinnedSyncers PinnedSyncers
125

126
        // IsStillZombieChannel takes the timestamps of the latest channel
127
        // updates for a channel and returns true if the channel should be
128
        // considered a zombie based on these timestamps.
129
        IsStillZombieChannel func(time.Time, time.Time) bool
130

131
        // AllotedMsgBytesPerSecond is the allotted bandwidth rate, expressed in
132
        // bytes/second that the gossip manager can consume. Once we exceed this
133
        // rate, message sending will block until we're below the rate.
134
        AllotedMsgBytesPerSecond uint64
135

136
        // AllotedMsgBytesBurst is the amount of burst bytes we'll permit, if
137
        // we've exceeded the hard upper limit.
138
        AllotedMsgBytesBurst uint64
139
}
140

141
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
142
// for peers currently connected. When a new peer is connected, the manager will
143
// create its accompanying gossip syncer and determine whether it should have an
144
// ActiveSync or PassiveSync sync type based on how many other gossip syncers
145
// are currently active. Any ActiveSync gossip syncers are started in a
146
// round-robin manner to ensure we're not syncing with multiple peers at the
147
// same time. The first GossipSyncer registered with the SyncManager will
148
// attempt a historical sync to ensure we have as much of the public channel
149
// graph as possible.
150
type SyncManager struct {
151
        // initialHistoricalSyncCompleted serves as a barrier when initializing
152
        // new active GossipSyncers. If 0, the initial historical sync has not
153
        // completed, so we'll defer initializing any active GossipSyncers. If
154
        // 1, then we can transition the GossipSyncer immediately. We set up
155
        // this barrier to ensure we have most of the graph before attempting to
156
        // accept new updates at tip.
157
        //
158
        // NOTE: This must be used atomically.
159
        initialHistoricalSyncCompleted int32
160

161
        start sync.Once
162
        stop  sync.Once
163

164
        cfg SyncManagerCfg
165

166
        // newSyncers is a channel we'll use to process requests to create
167
        // GossipSyncers for newly connected peers.
168
        newSyncers chan *newSyncer
169

170
        // staleSyncers is a channel we'll use to process requests to tear down
171
        // GossipSyncers for disconnected peers.
172
        staleSyncers chan *staleSyncer
173

174
        // syncersMu guards the read and write access to the activeSyncers and
175
        // inactiveSyncers maps below.
176
        syncersMu sync.Mutex
177

178
        // activeSyncers is the set of all syncers for which we are currently
179
        // receiving graph updates from. The number of possible active syncers
180
        // is bounded by NumActiveSyncers.
181
        activeSyncers map[route.Vertex]*GossipSyncer
182

183
        // inactiveSyncers is the set of all syncers for which we are not
184
        // currently receiving new graph updates from.
185
        inactiveSyncers map[route.Vertex]*GossipSyncer
186

187
        // pinnedActiveSyncers is the set of all syncers which are pinned into
188
        // an active sync. Pinned peers performan an initial historical sync on
189
        // each connection and will continue to receive graph updates for the
190
        // duration of the connection.
191
        pinnedActiveSyncers map[route.Vertex]*GossipSyncer
192

193
        // gossipFilterSema contains semaphores for the gossip timestamp
194
        // queries.
195
        gossipFilterSema chan struct{}
196

197
        // rateLimiter dictates the frequency with which we will reply to gossip
198
        // queries from a peer. This is used to delay responses to peers to
199
        // prevent DOS vulnerabilities if they are spamming with an unreasonable
200
        // number of queries.
201
        rateLimiter *rate.Limiter
202

203
        wg   sync.WaitGroup
204
        quit chan struct{}
205
}
206

207
// newSyncManager constructs a new SyncManager backed by the given config.
208
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
39✔
209

39✔
210
        filterSema := make(chan struct{}, filterSemaSize)
39✔
211
        for i := 0; i < filterSemaSize; i++ {
234✔
212
                filterSema <- struct{}{}
195✔
213
        }
195✔
214

215
        bytesPerSecond := cfg.AllotedMsgBytesPerSecond
39✔
216
        if bytesPerSecond == 0 {
78✔
217
                bytesPerSecond = DefaultMsgBytesPerSecond
39✔
218
        }
39✔
219

220
        bytesBurst := cfg.AllotedMsgBytesBurst
39✔
221
        if bytesBurst == 0 {
78✔
222
                bytesBurst = DefaultMsgBytesBurst
39✔
223
        }
39✔
224

225
        // We'll use this rate limiter to limit our total outbound bandwidth for
226
        // gossip queries peers.
227
        rateLimiter := rate.NewLimiter(
39✔
228
                rate.Limit(bytesPerSecond), int(bytesBurst),
39✔
229
        )
39✔
230

39✔
231
        return &SyncManager{
39✔
232
                cfg:          *cfg,
39✔
233
                rateLimiter:  rateLimiter,
39✔
234
                newSyncers:   make(chan *newSyncer),
39✔
235
                staleSyncers: make(chan *staleSyncer),
39✔
236
                activeSyncers: make(
39✔
237
                        map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers,
39✔
238
                ),
39✔
239
                inactiveSyncers: make(map[route.Vertex]*GossipSyncer),
39✔
240
                pinnedActiveSyncers: make(
39✔
241
                        map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers),
39✔
242
                ),
39✔
243
                gossipFilterSema: filterSema,
39✔
244
                quit:             make(chan struct{}),
39✔
245
        }
39✔
246
}
247

248
// Start starts the SyncManager in order to properly carry out its duties.
249
func (m *SyncManager) Start() {
39✔
250
        m.start.Do(func() {
78✔
251
                m.wg.Add(1)
39✔
252
                go m.syncerHandler()
39✔
253
        })
39✔
254
}
255

256
// Stop stops the SyncManager from performing its duties.
257
func (m *SyncManager) Stop() {
39✔
258
        m.stop.Do(func() {
78✔
259
                log.Debugf("SyncManager is stopping")
39✔
260
                defer log.Debugf("SyncManager stopped")
39✔
261

39✔
262
                close(m.quit)
39✔
263
                m.wg.Wait()
39✔
264

39✔
265
                for _, syncer := range m.inactiveSyncers {
45✔
266
                        syncer.Stop()
6✔
267
                }
6✔
268
                for _, syncer := range m.activeSyncers {
49✔
269
                        syncer.Stop()
10✔
270
                }
10✔
271
        })
272
}
273

274
// syncerHandler is the SyncManager's main event loop responsible for:
275
//
276
// 1. Creating and tearing down GossipSyncers for connected/disconnected peers.
277

278
// 2. Finding new peers to receive graph updates from to ensure we don't only
279
//    receive them from the same set of peers.
280

281
//  3. Finding new peers to force a historical sync with to ensure we have as
282
//     much of the public network as possible.
283
//
284
// NOTE: This must be run as a goroutine.
285
func (m *SyncManager) syncerHandler() {
39✔
286
        defer m.wg.Done()
39✔
287

39✔
288
        m.cfg.RotateTicker.Resume()
39✔
289
        defer m.cfg.RotateTicker.Stop()
39✔
290

39✔
291
        defer m.cfg.HistoricalSyncTicker.Stop()
39✔
292

39✔
293
        var (
39✔
294
                // initialHistoricalSyncer is the syncer we are currently
39✔
295
                // performing an initial historical sync with.
39✔
296
                initialHistoricalSyncer *GossipSyncer
39✔
297

39✔
298
                // initialHistoricalSyncSignal is a signal that will fire once
39✔
299
                // the initial historical sync has been completed. This is
39✔
300
                // crucial to ensure that another historical sync isn't
39✔
301
                // attempted just because the initialHistoricalSyncer was
39✔
302
                // disconnected.
39✔
303
                initialHistoricalSyncSignal chan struct{}
39✔
304
        )
39✔
305

39✔
306
        setInitialHistoricalSyncer := func(s *GossipSyncer) {
51✔
307
                initialHistoricalSyncer = s
12✔
308
                initialHistoricalSyncSignal = s.ResetSyncedSignal()
12✔
309

12✔
310
                // Restart the timer for our new historical sync peer. This will
12✔
311
                // ensure that all initial syncers receive an equivalent
12✔
312
                // duration before attempting the next sync. Without doing so we
12✔
313
                // might attempt two historical sync back to back if a peer
12✔
314
                // disconnects just before the ticker fires.
12✔
315
                m.cfg.HistoricalSyncTicker.Pause()
12✔
316
                m.cfg.HistoricalSyncTicker.Resume()
12✔
317
        }
12✔
318

319
        for {
122✔
320
                select {
83✔
321
                // A new peer has been connected, so we'll create its
322
                // accompanying GossipSyncer.
323
                case newSyncer := <-m.newSyncers:
25✔
324
                        // If we already have a syncer, then we'll exit early as
25✔
325
                        // we don't want to override it.
25✔
326
                        if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok {
25✔
327
                                close(newSyncer.doneChan)
×
328
                                continue
×
329
                        }
330

331
                        s := m.createGossipSyncer(newSyncer.peer)
25✔
332

25✔
333
                        isPinnedSyncer := m.isPinnedSyncer(s)
25✔
334

25✔
335
                        // attemptHistoricalSync determines whether we should
25✔
336
                        // attempt an initial historical sync when a new peer
25✔
337
                        // connects.
25✔
338
                        attemptHistoricalSync := false
25✔
339

25✔
340
                        m.syncersMu.Lock()
25✔
341
                        switch {
25✔
342
                        // For pinned syncers, we will immediately transition
343
                        // the peer into an active (pinned) sync state.
344
                        case isPinnedSyncer:
3✔
345
                                attemptHistoricalSync = true
3✔
346
                                s.setSyncType(PinnedSync)
3✔
347
                                s.setSyncState(syncerIdle)
3✔
348
                                m.pinnedActiveSyncers[s.cfg.peerPub] = s
3✔
349

350
                        // Regardless of whether the initial historical sync
351
                        // has completed, we'll re-trigger a historical sync if
352
                        // we no longer have any syncers. This might be
353
                        // necessary if we lost all our peers at one point, and
354
                        // now we finally have one again.
355
                        case len(m.activeSyncers) == 0 &&
356
                                len(m.inactiveSyncers) == 0:
10✔
357

10✔
358
                                attemptHistoricalSync =
10✔
359
                                        m.cfg.NumActiveSyncers > 0
10✔
360
                                fallthrough
10✔
361

362
                        // If we've exceeded our total number of active syncers,
363
                        // we'll initialize this GossipSyncer as passive.
364
                        case len(m.activeSyncers) >= m.cfg.NumActiveSyncers:
14✔
365
                                fallthrough
14✔
366

367
                        // If the initial historical sync has yet to complete,
368
                        // then we'll declare it as passive and attempt to
369
                        // transition it when the initial historical sync
370
                        // completes.
371
                        case !m.IsGraphSynced():
18✔
372
                                s.setSyncType(PassiveSync)
18✔
373
                                m.inactiveSyncers[s.cfg.peerPub] = s
18✔
374

375
                        // The initial historical sync has completed, so we can
376
                        // immediately start the GossipSyncer as active.
377
                        default:
4✔
378
                                s.setSyncType(ActiveSync)
4✔
379
                                m.activeSyncers[s.cfg.peerPub] = s
4✔
380
                        }
381
                        m.syncersMu.Unlock()
25✔
382

25✔
383
                        s.Start()
25✔
384

25✔
385
                        // Once we create the GossipSyncer, we'll signal to the
25✔
386
                        // caller that they can proceed since the SyncManager's
25✔
387
                        // internal state has been updated.
25✔
388
                        close(newSyncer.doneChan)
25✔
389

25✔
390
                        // We'll force a historical sync with the first peer we
25✔
391
                        // connect to, to ensure we get as much of the graph as
25✔
392
                        // possible.
25✔
393
                        if !attemptHistoricalSync {
38✔
394
                                continue
13✔
395
                        }
396

397
                        log.Debugf("Attempting initial historical sync with "+
12✔
398
                                "GossipSyncer(%x)", s.cfg.peerPub)
12✔
399

12✔
400
                        if err := s.historicalSync(); err != nil {
12✔
401
                                log.Errorf("Unable to attempt initial "+
×
402
                                        "historical sync with "+
×
403
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
404
                                        err)
×
405
                                continue
×
406
                        }
407

408
                        // Once the historical sync has started, we'll get a
409
                        // keep track of the corresponding syncer to properly
410
                        // handle disconnects. We'll also use a signal to know
411
                        // when the historical sync completed.
412
                        if !isPinnedSyncer {
21✔
413
                                setInitialHistoricalSyncer(s)
9✔
414
                        }
9✔
415

416
                // An existing peer has disconnected, so we'll tear down its
417
                // corresponding GossipSyncer.
418
                case staleSyncer := <-m.staleSyncers:
6✔
419
                        // Once the corresponding GossipSyncer has been stopped
6✔
420
                        // and removed, we'll signal to the caller that they can
6✔
421
                        // proceed since the SyncManager's internal state has
6✔
422
                        // been updated.
6✔
423
                        m.removeGossipSyncer(staleSyncer.peer)
6✔
424
                        close(staleSyncer.doneChan)
6✔
425

6✔
426
                        // If we don't have an initialHistoricalSyncer, or we do
6✔
427
                        // but it is not the peer being disconnected, then we
6✔
428
                        // have nothing left to do and can proceed.
6✔
429
                        switch {
6✔
430
                        case initialHistoricalSyncer == nil:
4✔
431
                                fallthrough
4✔
432
                        case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub:
5✔
433
                                fallthrough
5✔
434
                        case m.cfg.NumActiveSyncers == 0:
5✔
435
                                continue
5✔
436
                        }
437

438
                        // Otherwise, our initialHistoricalSyncer corresponds to
439
                        // the peer being disconnected, so we'll have to find a
440
                        // replacement.
441
                        log.Debug("Finding replacement for initial " +
1✔
442
                                "historical sync")
1✔
443

1✔
444
                        s := m.forceHistoricalSync()
1✔
445
                        if s == nil {
1✔
446
                                log.Debug("No eligible replacement found " +
×
447
                                        "for initial historical sync")
×
448
                                continue
×
449
                        }
450

451
                        log.Debugf("Replaced initial historical "+
1✔
452
                                "GossipSyncer(%v) with GossipSyncer(%x)",
1✔
453
                                staleSyncer.peer, s.cfg.peerPub)
1✔
454

1✔
455
                        setInitialHistoricalSyncer(s)
1✔
456

457
                // Our initial historical sync signal has completed, so we'll
458
                // nil all of the relevant fields as they're no longer needed.
459
                case <-initialHistoricalSyncSignal:
8✔
460
                        initialHistoricalSyncer = nil
8✔
461
                        initialHistoricalSyncSignal = nil
8✔
462

8✔
463
                        log.Debug("Initial historical sync completed")
8✔
464

8✔
465
                        // With the initial historical sync complete, we can
8✔
466
                        // begin receiving new graph updates at tip. We'll
8✔
467
                        // determine whether we can have any more active
8✔
468
                        // GossipSyncers. If we do, we'll randomly select some
8✔
469
                        // that are currently passive to transition.
8✔
470
                        m.syncersMu.Lock()
8✔
471
                        numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers)
8✔
472
                        if numActiveLeft <= 0 {
8✔
473
                                m.syncersMu.Unlock()
×
474
                                continue
×
475
                        }
476

477
                        // We may not even have enough inactive syncers to be
478
                        // transitted. In that case, we will transit all the
479
                        // inactive syncers.
480
                        if len(m.inactiveSyncers) < numActiveLeft {
13✔
481
                                numActiveLeft = len(m.inactiveSyncers)
5✔
482
                        }
5✔
483

484
                        log.Debugf("Attempting to transition %v passive "+
8✔
485
                                "GossipSyncers to active", numActiveLeft)
8✔
486

8✔
487
                        for i := 0; i < numActiveLeft; i++ {
16✔
488
                                chooseRandomSyncer(
8✔
489
                                        m.inactiveSyncers, m.transitionPassiveSyncer,
8✔
490
                                )
8✔
491
                        }
8✔
492

493
                        m.syncersMu.Unlock()
8✔
494

495
                // Our RotateTicker has ticked, so we'll attempt to rotate a
496
                // single active syncer with a passive one.
497
                case <-m.cfg.RotateTicker.Ticks():
2✔
498
                        m.rotateActiveSyncerCandidate()
2✔
499

500
                // Our HistoricalSyncTicker has ticked, so we'll randomly select
501
                // a peer and force a historical sync with them.
502
                case <-m.cfg.HistoricalSyncTicker.Ticks():
3✔
503
                        // To be extra cautious, gate the forceHistoricalSync
3✔
504
                        // call such that it can only execute if we are
3✔
505
                        // configured to have a non-zero number of sync peers.
3✔
506
                        // This way even if the historical sync ticker manages
3✔
507
                        // to tick we can be sure that a historical sync won't
3✔
508
                        // accidentally begin.
3✔
509
                        if m.cfg.NumActiveSyncers == 0 {
4✔
510
                                continue
1✔
511
                        }
512

513
                        // If we don't have a syncer available we have nothing
514
                        // to do.
515
                        s := m.forceHistoricalSync()
2✔
516
                        if s == nil {
2✔
517
                                continue
×
518
                        }
519

520
                        // If we've already completed a historical sync, we'll
521
                        // skip setting the initial historical syncer.
522
                        if m.IsGraphSynced() {
2✔
523
                                continue
×
524
                        }
525

526
                        // Otherwise, we'll track the peer we've performed a
527
                        // historical sync with in order to handle the case
528
                        // where our previous historical sync peer did not
529
                        // respond to our queries and we haven't ingested as
530
                        // much of the graph as we should.
531
                        setInitialHistoricalSyncer(s)
2✔
532

533
                case <-m.quit:
39✔
534
                        return
39✔
535
                }
536
        }
537
}
538

539
// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned
540
// sync peers.
541
func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
25✔
542
        _, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub]
25✔
543
        return isPinnedSyncer
25✔
544
}
25✔
545

546
// deriveRateLimitReservation will take the current message and derive a
547
// reservation that can be used to wait on the rate limiter.
548
func (m *SyncManager) deriveRateLimitReservation(msg lnwire.Message,
549
) (*rate.Reservation, error) {
42✔
550

42✔
551
        var (
42✔
552
                msgSize uint32
42✔
553
                err     error
42✔
554
        )
42✔
555

42✔
556
        // Figure out the serialized size of the message. If we can't easily
42✔
557
        // compute it, then we'll used the assumed msg size.
42✔
558
        if sMsg, ok := msg.(lnwire.SizeableMessage); ok {
84✔
559
                msgSize, err = sMsg.SerializedSize()
42✔
560
                if err != nil {
43✔
561
                        return nil, err
1✔
562
                }
1✔
563
        } else {
×
564
                log.Warnf("Unable to compute serialized size of %T", msg)
×
565

×
566
                msgSize = assumedMsgSize
×
567
        }
×
568

569
        return m.rateLimiter.ReserveN(time.Now(), int(msgSize)), nil
41✔
570
}
571

572
// waitMsgDelay takes a delay, and waits until it has finished.
573
func (m *SyncManager) waitMsgDelay(ctx context.Context, peerPub [33]byte,
574
        limitReservation *rate.Reservation) error {
34✔
575

34✔
576
        // If we've already replied a handful of times, we will start to delay
34✔
577
        // responses back to the remote peer. This can help prevent DOS attacks
34✔
578
        // where the remote peer spams us endlessly.
34✔
579
        //
34✔
580
        // We skip checking for reservation.OK() here, as during config
34✔
581
        // validation, we ensure that the burst is enough for a single message
34✔
582
        // to be sent.
34✔
583
        delay := limitReservation.Delay()
34✔
584
        if delay > 0 {
34✔
585
                log.Infof("GossipSyncer(%x): rate limiting gossip replies, "+
×
586
                        "responding in %s", peerPub, delay)
×
587

×
588
                select {
×
589
                case <-time.After(delay):
×
590

591
                case <-ctx.Done():
×
592
                        limitReservation.Cancel()
×
593

×
594
                        return ErrGossipSyncerExiting
×
595

596
                case <-m.quit:
×
597
                        limitReservation.Cancel()
×
598

×
599
                        return ErrGossipSyncerExiting
×
600
                }
601
        }
602

603
        return nil
34✔
604
}
605

606
// maybeRateLimitMsg takes a message, and may wait a period of time to rate
607
// limit the msg.
608
func (m *SyncManager) maybeRateLimitMsg(ctx context.Context, peerPub [33]byte,
609
        msg lnwire.Message) error {
34✔
610

34✔
611
        delay, err := m.deriveRateLimitReservation(msg)
34✔
612
        if err != nil {
34✔
613
                return nil
×
614
        }
×
615

616
        return m.waitMsgDelay(ctx, peerPub, delay)
34✔
617
}
618

619
// sendMessages sends a set of messages to the remote peer.
620
func (m *SyncManager) sendMessages(ctx context.Context, sync bool,
621
        peer lnpeer.Peer, nodeID route.Vertex, msgs ...lnwire.Message) error {
34✔
622

34✔
623
        for _, msg := range msgs {
68✔
624
                if err := m.maybeRateLimitMsg(ctx, nodeID, msg); err != nil {
34✔
625
                        return err
×
626
                }
×
627
                if err := peer.SendMessageLazy(sync, msg); err != nil {
34✔
628
                        return err
×
629
                }
×
630
        }
631

632
        return nil
34✔
633
}
634

635
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
636
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
25✔
637
        nodeID := route.Vertex(peer.PubKey())
25✔
638
        log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:])
25✔
639

25✔
640
        encoding := lnwire.EncodingSortedPlain
25✔
641
        s := newGossipSyncer(gossipSyncerCfg{
25✔
642
                chainHash:     m.cfg.ChainHash,
25✔
643
                peerPub:       nodeID,
25✔
644
                channelSeries: m.cfg.ChanSeries,
25✔
645
                encodingType:  encoding,
25✔
646
                chunkSize:     encodingTypeToChunkSize[encoding],
25✔
647
                batchSize:     requestBatchSize,
25✔
648
                sendToPeer: func(ctx context.Context,
25✔
649
                        msgs ...lnwire.Message) error {
59✔
650

34✔
651
                        return m.sendMessages(ctx, false, peer, nodeID, msgs...)
34✔
652
                },
34✔
653
                sendToPeerSync: func(ctx context.Context,
654
                        msgs ...lnwire.Message) error {
×
655

×
656
                        return m.sendMessages(ctx, true, peer, nodeID, msgs...)
×
657
                },
×
658
                ignoreHistoricalFilters:  m.cfg.IgnoreHistoricalFilters,
659
                bestHeight:               m.cfg.BestHeight,
660
                markGraphSynced:          m.markGraphSynced,
661
                maxQueryChanRangeReplies: maxQueryChanRangeReplies,
662
                noTimestampQueryOption:   m.cfg.NoTimestampQueries,
663
                isStillZombieChannel:     m.cfg.IsStillZombieChannel,
664
        }, m.gossipFilterSema)
665

666
        // Gossip syncers are initialized by default in a PassiveSync type
667
        // and chansSynced state so that they can reply to any peer queries or
668
        // handle any sync transitions.
669
        s.setSyncState(chansSynced)
25✔
670
        s.setSyncType(PassiveSync)
25✔
671

25✔
672
        log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%x",
25✔
673
                s.syncState(), s.SyncType(), peer.PubKey())
25✔
674

25✔
675
        return s
25✔
676
}
677

678
// removeGossipSyncer removes all internal references to the disconnected peer's
679
// GossipSyncer and stops it. In the event of an active GossipSyncer being
680
// disconnected, a passive GossipSyncer, if any, will take its place.
681
func (m *SyncManager) removeGossipSyncer(peer route.Vertex) {
6✔
682
        m.syncersMu.Lock()
6✔
683
        defer m.syncersMu.Unlock()
6✔
684

6✔
685
        s, ok := m.gossipSyncer(peer)
6✔
686
        if !ok {
6✔
687
                return
×
688
        }
×
689

690
        log.Infof("Removing GossipSyncer for peer=%v", peer)
6✔
691

6✔
692
        // We'll stop the GossipSyncer for the disconnected peer in a goroutine
6✔
693
        // to prevent blocking the SyncManager.
6✔
694
        go s.Stop()
6✔
695

6✔
696
        // If it's a non-active syncer, then we can just exit now.
6✔
697
        if _, ok := m.inactiveSyncers[peer]; ok {
8✔
698
                delete(m.inactiveSyncers, peer)
2✔
699
                return
2✔
700
        }
2✔
701

702
        // If it's a pinned syncer, then we can just exit as this doesn't
703
        // affect our active syncer count.
704
        if _, ok := m.pinnedActiveSyncers[peer]; ok {
4✔
705
                delete(m.pinnedActiveSyncers, peer)
×
706
                return
×
707
        }
×
708

709
        // Otherwise, we'll need find a new one to replace it, if any.
710
        delete(m.activeSyncers, peer)
4✔
711
        newActiveSyncer := chooseRandomSyncer(
4✔
712
                m.inactiveSyncers, m.transitionPassiveSyncer,
4✔
713
        )
4✔
714
        if newActiveSyncer == nil {
6✔
715
                return
2✔
716
        }
2✔
717

718
        log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)",
2✔
719
                peer, newActiveSyncer.cfg.peerPub)
2✔
720
}
721

722
// rotateActiveSyncerCandidate rotates a single active syncer. In order to
723
// achieve this, the active syncer must be in a chansSynced state in order to
724
// process the sync transition.
725
func (m *SyncManager) rotateActiveSyncerCandidate() {
2✔
726
        m.syncersMu.Lock()
2✔
727
        defer m.syncersMu.Unlock()
2✔
728

2✔
729
        // If we couldn't find an eligible active syncer to rotate, we can
2✔
730
        // return early.
2✔
731
        activeSyncer := chooseRandomSyncer(m.activeSyncers, nil)
2✔
732
        if activeSyncer == nil {
2✔
733
                log.Debug("No eligible active syncer to rotate")
×
734
                return
×
735
        }
×
736

737
        // Similarly, if we don't have a candidate to rotate with, we can return
738
        // early as well.
739
        candidate := chooseRandomSyncer(m.inactiveSyncers, nil)
2✔
740
        if candidate == nil {
3✔
741
                log.Debug("No eligible candidate to rotate active syncer")
1✔
742
                return
1✔
743
        }
1✔
744

745
        // Otherwise, we'll attempt to transition each syncer to their
746
        // respective new sync type.
747
        log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)",
1✔
748
                activeSyncer.cfg.peerPub, candidate.cfg.peerPub)
1✔
749

1✔
750
        if err := m.transitionActiveSyncer(activeSyncer); err != nil {
1✔
751
                log.Errorf("Unable to transition active GossipSyncer(%x): %v",
×
752
                        activeSyncer.cfg.peerPub, err)
×
753
                return
×
754
        }
×
755

756
        if err := m.transitionPassiveSyncer(candidate); err != nil {
1✔
757
                log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
×
758
                        activeSyncer.cfg.peerPub, err)
×
759
                return
×
760
        }
×
761
}
762

763
// transitionActiveSyncer transitions an active syncer to a passive one.
764
//
765
// NOTE: This must be called with the syncersMu lock held.
766
func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error {
1✔
767
        log.Debugf("Transitioning active GossipSyncer(%x) to passive",
1✔
768
                s.cfg.peerPub)
1✔
769

1✔
770
        if err := s.ProcessSyncTransition(PassiveSync); err != nil {
1✔
771
                return err
×
772
        }
×
773

774
        delete(m.activeSyncers, s.cfg.peerPub)
1✔
775
        m.inactiveSyncers[s.cfg.peerPub] = s
1✔
776

1✔
777
        return nil
1✔
778
}
779

780
// transitionPassiveSyncer transitions a passive syncer to an active one.
781
//
782
// NOTE: This must be called with the syncersMu lock held.
783
func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error {
11✔
784
        log.Debugf("Transitioning passive GossipSyncer(%x) to active",
11✔
785
                s.cfg.peerPub)
11✔
786

11✔
787
        if err := s.ProcessSyncTransition(ActiveSync); err != nil {
11✔
788
                return err
×
789
        }
×
790

791
        delete(m.inactiveSyncers, s.cfg.peerPub)
11✔
792
        m.activeSyncers[s.cfg.peerPub] = s
11✔
793

11✔
794
        return nil
11✔
795
}
796

797
// forceHistoricalSync chooses a syncer with a remote peer at random and forces
798
// a historical sync with it.
799
func (m *SyncManager) forceHistoricalSync() *GossipSyncer {
3✔
800
        m.syncersMu.Lock()
3✔
801
        defer m.syncersMu.Unlock()
3✔
802

3✔
803
        // We'll sample from both sets of active and inactive syncers in the
3✔
804
        // event that we don't have any inactive syncers.
3✔
805
        return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
6✔
806
                return s.historicalSync()
3✔
807
        })
3✔
808
}
809

810
// chooseRandomSyncer iterates through the set of syncers given and returns the
811
// first one which was able to successfully perform the action enclosed in the
812
// function closure.
813
//
814
// NOTE: It's possible for a nil value to be returned if there are no eligible
815
// candidate syncers.
816
func chooseRandomSyncer(syncers map[route.Vertex]*GossipSyncer,
817
        action func(*GossipSyncer) error) *GossipSyncer {
19✔
818

19✔
819
        for _, s := range syncers {
38✔
820
                // Only syncers in a chansSynced state are viable for sync
19✔
821
                // transitions, so skip any that aren't.
19✔
822
                if s.syncState() != chansSynced {
22✔
823
                        continue
3✔
824
                }
825

826
                if action != nil {
29✔
827
                        if err := action(s); err != nil {
13✔
828
                                log.Debugf("Skipping eligible candidate "+
×
829
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
830
                                        err)
×
831
                                continue
×
832
                        }
833
                }
834

835
                return s
16✔
836
        }
837

838
        return nil
3✔
839
}
840

841
// InitSyncState is called by outside sub-systems when a connection is
842
// established to a new peer that understands how to perform channel range
843
// queries. We'll allocate a new GossipSyncer for it, and start any goroutines
844
// needed to handle new queries. The first GossipSyncer registered with the
845
// SyncManager will attempt a historical sync to ensure we have as much of the
846
// public channel graph as possible.
847
//
848
// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer.
849
func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error {
25✔
850
        done := make(chan struct{})
25✔
851

25✔
852
        select {
25✔
853
        case m.newSyncers <- &newSyncer{
854
                peer:     peer,
855
                doneChan: done,
856
        }:
25✔
857
        case <-m.quit:
×
858
                return ErrSyncManagerExiting
×
859
        }
860

861
        select {
25✔
862
        case <-done:
25✔
863
                return nil
25✔
864
        case <-m.quit:
×
865
                return ErrSyncManagerExiting
×
866
        }
867
}
868

869
// PruneSyncState is called by outside sub-systems once a peer that we were
870
// previously connected to has been disconnected. In this case we can stop the
871
// existing GossipSyncer assigned to the peer and free up resources.
872
func (m *SyncManager) PruneSyncState(peer route.Vertex) {
6✔
873
        done := make(chan struct{})
6✔
874

6✔
875
        // We avoid returning an error when the SyncManager is stopped since the
6✔
876
        // GossipSyncer will be stopped then anyway.
6✔
877
        select {
6✔
878
        case m.staleSyncers <- &staleSyncer{
879
                peer:     peer,
880
                doneChan: done,
881
        }:
6✔
882
        case <-m.quit:
×
883
                return
×
884
        }
885

886
        select {
6✔
887
        case <-done:
6✔
888
        case <-m.quit:
×
889
        }
890
}
891

892
// GossipSyncer returns the associated gossip syncer of a peer. The boolean
893
// returned signals whether there exists a gossip syncer for the peer.
894
func (m *SyncManager) GossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
45✔
895
        m.syncersMu.Lock()
45✔
896
        defer m.syncersMu.Unlock()
45✔
897
        return m.gossipSyncer(peer)
45✔
898
}
45✔
899

900
// gossipSyncer returns the associated gossip syncer of a peer. The boolean
901
// returned signals whether there exists a gossip syncer for the peer.
902
func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
51✔
903
        syncer, ok := m.inactiveSyncers[peer]
51✔
904
        if ok {
66✔
905
                return syncer, true
15✔
906
        }
15✔
907
        syncer, ok = m.activeSyncers[peer]
36✔
908
        if ok {
44✔
909
                return syncer, true
8✔
910
        }
8✔
911
        syncer, ok = m.pinnedActiveSyncers[peer]
28✔
912
        if ok {
31✔
913
                return syncer, true
3✔
914
        }
3✔
915
        return nil, false
25✔
916
}
917

918
// GossipSyncers returns all of the currently initialized gossip syncers.
919
func (m *SyncManager) GossipSyncers() map[route.Vertex]*GossipSyncer {
34✔
920
        m.syncersMu.Lock()
34✔
921
        defer m.syncersMu.Unlock()
34✔
922
        return m.gossipSyncers()
34✔
923
}
34✔
924

925
// gossipSyncers returns all of the currently initialized gossip syncers.
926
func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer {
37✔
927
        numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers)
37✔
928
        syncers := make(map[route.Vertex]*GossipSyncer, numSyncers)
37✔
929

37✔
930
        for _, syncer := range m.inactiveSyncers {
42✔
931
                syncers[syncer.cfg.peerPub] = syncer
5✔
932
        }
5✔
933
        for _, syncer := range m.activeSyncers {
37✔
934
                syncers[syncer.cfg.peerPub] = syncer
×
935
        }
×
936

937
        return syncers
37✔
938
}
939

940
// markGraphSynced allows us to report that the initial historical sync has
941
// completed.
942
func (m *SyncManager) markGraphSynced() {
42✔
943
        atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
42✔
944
}
42✔
945

946
// IsGraphSynced determines whether we've completed our initial historical sync.
947
// The initial historical sync is done to ensure we've ingested as much of the
948
// public graph as possible.
949
func (m *SyncManager) IsGraphSynced() bool {
295✔
950
        return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
295✔
951
}
295✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc