• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 10182761790

31 Jul 2024 03:02PM UTC coverage: 58.471% (+0.01%) from 58.459%
10182761790

Pull #8959

github

guggero
mod: bump kvdb to v1.4.9

To support the new comma-separated list of etcd hosts in db.etcd.host,
we need to bump the `kvdb` submodule version.
Pull Request #8959: mod: bump kvdb to v1.4.9

124820 of 213473 relevant lines covered (58.47%)

27892.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.2
/discovery/sync_manager.go
1
package discovery
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/lightningnetwork/lnd/lnpeer"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
        "github.com/lightningnetwork/lnd/routing/route"
13
        "github.com/lightningnetwork/lnd/ticker"
14
)
15

16
const (
17
        // DefaultSyncerRotationInterval is the default interval in which we'll
18
        // rotate a single active syncer.
19
        DefaultSyncerRotationInterval = 20 * time.Minute
20

21
        // DefaultHistoricalSyncInterval is the default interval in which we'll
22
        // force a historical sync to ensure we have as much of the public
23
        // network as possible.
24
        DefaultHistoricalSyncInterval = time.Hour
25
)
26

27
var (
28
        // ErrSyncManagerExiting is an error returned when we attempt to
29
        // start/stop a gossip syncer for a connected/disconnected peer, but the
30
        // SyncManager has already been stopped.
31
        ErrSyncManagerExiting = errors.New("sync manager exiting")
32
)
33

34
// newSyncer in an internal message we'll use within the SyncManager to signal
35
// that we should create a GossipSyncer for a newly connected peer.
36
type newSyncer struct {
37
        // peer is the newly connected peer.
38
        peer lnpeer.Peer
39

40
        // doneChan serves as a signal to the caller that the SyncManager's
41
        // internal state correctly reflects the stale active syncer.
42
        doneChan chan struct{}
43
}
44

45
// staleSyncer is an internal message we'll use within the SyncManager to signal
46
// that a peer has disconnected and its GossipSyncer should be removed.
47
type staleSyncer struct {
48
        // peer is the peer that has disconnected.
49
        peer route.Vertex
50

51
        // doneChan serves as a signal to the caller that the SyncManager's
52
        // internal state correctly reflects the stale active syncer. This is
53
        // needed to ensure we always create a new syncer for a flappy peer
54
        // after they disconnect if they happened to be an active syncer.
55
        doneChan chan struct{}
56
}
57

58
// SyncManagerCfg contains all of the dependencies required for the SyncManager
59
// to carry out its duties.
60
type SyncManagerCfg struct {
61
        // ChainHash is a hash that indicates the specific network of the active
62
        // chain.
63
        ChainHash chainhash.Hash
64

65
        // ChanSeries is an interface that provides access to a time series view
66
        // of the current known channel graph. Each GossipSyncer enabled peer
67
        // will utilize this in order to create and respond to channel graph
68
        // time series queries.
69
        ChanSeries ChannelGraphTimeSeries
70

71
        // NumActiveSyncers is the number of peers for which we should have
72
        // active syncers with. After reaching NumActiveSyncers, any future
73
        // gossip syncers will be passive.
74
        NumActiveSyncers int
75

76
        // NoTimestampQueries will prevent the GossipSyncer from querying
77
        // timestamps of announcement messages from the peer and from responding
78
        // to timestamp queries
79
        NoTimestampQueries bool
80

81
        // RotateTicker is a ticker responsible for notifying the SyncManager
82
        // when it should rotate its active syncers. A single active syncer with
83
        // a chansSynced state will be exchanged for a passive syncer in order
84
        // to ensure we don't keep syncing with the same peers.
85
        RotateTicker ticker.Ticker
86

87
        // HistoricalSyncTicker is a ticker responsible for notifying the
88
        // SyncManager when it should attempt a historical sync with a gossip
89
        // sync peer.
90
        HistoricalSyncTicker ticker.Ticker
91

92
        // IgnoreHistoricalFilters will prevent syncers from replying with
93
        // historical data when the remote peer sets a gossip_timestamp_range.
94
        // This prevents ranges with old start times from causing us to dump the
95
        // graph on connect.
96
        IgnoreHistoricalFilters bool
97

98
        // BestHeight returns the latest height known of the chain.
99
        BestHeight func() uint32
100

101
        // PinnedSyncers is a set of peers that will always transition to
102
        // ActiveSync upon connection. These peers will never transition to
103
        // PassiveSync.
104
        PinnedSyncers PinnedSyncers
105

106
        // IsStillZombieChannel takes the timestamps of the latest channel
107
        // updates for a channel and returns true if the channel should be
108
        // considered a zombie based on these timestamps.
109
        IsStillZombieChannel func(time.Time, time.Time) bool
110
}
111

112
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
113
// for peers currently connected. When a new peer is connected, the manager will
114
// create its accompanying gossip syncer and determine whether it should have an
115
// ActiveSync or PassiveSync sync type based on how many other gossip syncers
116
// are currently active. Any ActiveSync gossip syncers are started in a
117
// round-robin manner to ensure we're not syncing with multiple peers at the
118
// same time. The first GossipSyncer registered with the SyncManager will
119
// attempt a historical sync to ensure we have as much of the public channel
120
// graph as possible.
121
type SyncManager struct {
122
        // initialHistoricalSyncCompleted serves as a barrier when initializing
123
        // new active GossipSyncers. If 0, the initial historical sync has not
124
        // completed, so we'll defer initializing any active GossipSyncers. If
125
        // 1, then we can transition the GossipSyncer immediately. We set up
126
        // this barrier to ensure we have most of the graph before attempting to
127
        // accept new updates at tip.
128
        //
129
        // NOTE: This must be used atomically.
130
        initialHistoricalSyncCompleted int32
131

132
        start sync.Once
133
        stop  sync.Once
134

135
        cfg SyncManagerCfg
136

137
        // newSyncers is a channel we'll use to process requests to create
138
        // GossipSyncers for newly connected peers.
139
        newSyncers chan *newSyncer
140

141
        // staleSyncers is a channel we'll use to process requests to tear down
142
        // GossipSyncers for disconnected peers.
143
        staleSyncers chan *staleSyncer
144

145
        // syncersMu guards the read and write access to the activeSyncers and
146
        // inactiveSyncers maps below.
147
        syncersMu sync.Mutex
148

149
        // activeSyncers is the set of all syncers for which we are currently
150
        // receiving graph updates from. The number of possible active syncers
151
        // is bounded by NumActiveSyncers.
152
        activeSyncers map[route.Vertex]*GossipSyncer
153

154
        // inactiveSyncers is the set of all syncers for which we are not
155
        // currently receiving new graph updates from.
156
        inactiveSyncers map[route.Vertex]*GossipSyncer
157

158
        // pinnedActiveSyncers is the set of all syncers which are pinned into
159
        // an active sync. Pinned peers performan an initial historical sync on
160
        // each connection and will continue to receive graph updates for the
161
        // duration of the connection.
162
        pinnedActiveSyncers map[route.Vertex]*GossipSyncer
163

164
        wg   sync.WaitGroup
165
        quit chan struct{}
166
}
167

168
// newSyncManager constructs a new SyncManager backed by the given config.
169
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
37✔
170
        return &SyncManager{
37✔
171
                cfg:          *cfg,
37✔
172
                newSyncers:   make(chan *newSyncer),
37✔
173
                staleSyncers: make(chan *staleSyncer),
37✔
174
                activeSyncers: make(
37✔
175
                        map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers,
37✔
176
                ),
37✔
177
                inactiveSyncers: make(map[route.Vertex]*GossipSyncer),
37✔
178
                pinnedActiveSyncers: make(
37✔
179
                        map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers),
37✔
180
                ),
37✔
181
                quit: make(chan struct{}),
37✔
182
        }
37✔
183
}
37✔
184

185
// Start starts the SyncManager in order to properly carry out its duties.
186
func (m *SyncManager) Start() {
37✔
187
        m.start.Do(func() {
74✔
188
                m.wg.Add(1)
37✔
189
                go m.syncerHandler()
37✔
190
        })
37✔
191
}
192

193
// Stop stops the SyncManager from performing its duties.
194
func (m *SyncManager) Stop() {
37✔
195
        m.stop.Do(func() {
74✔
196
                log.Debugf("SyncManager is stopping")
37✔
197
                defer log.Debugf("SyncManager stopped")
37✔
198

37✔
199
                close(m.quit)
37✔
200
                m.wg.Wait()
37✔
201

37✔
202
                for _, syncer := range m.inactiveSyncers {
44✔
203
                        syncer.Stop()
7✔
204
                }
7✔
205
                for _, syncer := range m.activeSyncers {
49✔
206
                        syncer.Stop()
12✔
207
                }
12✔
208
        })
209
}
210

211
// syncerHandler is the SyncManager's main event loop responsible for:
212
//
213
// 1. Creating and tearing down GossipSyncers for connected/disconnected peers.
214

215
// 2. Finding new peers to receive graph updates from to ensure we don't only
216
//    receive them from the same set of peers.
217

218
//  3. Finding new peers to force a historical sync with to ensure we have as
219
//     much of the public network as possible.
220
//
221
// NOTE: This must be run as a goroutine.
222
func (m *SyncManager) syncerHandler() {
37✔
223
        defer m.wg.Done()
37✔
224

37✔
225
        m.cfg.RotateTicker.Resume()
37✔
226
        defer m.cfg.RotateTicker.Stop()
37✔
227

37✔
228
        defer m.cfg.HistoricalSyncTicker.Stop()
37✔
229

37✔
230
        var (
37✔
231
                // initialHistoricalSyncer is the syncer we are currently
37✔
232
                // performing an initial historical sync with.
37✔
233
                initialHistoricalSyncer *GossipSyncer
37✔
234

37✔
235
                // initialHistoricalSyncSignal is a signal that will fire once
37✔
236
                // the initial historical sync has been completed. This is
37✔
237
                // crucial to ensure that another historical sync isn't
37✔
238
                // attempted just because the initialHistoricalSyncer was
37✔
239
                // disconnected.
37✔
240
                initialHistoricalSyncSignal chan struct{}
37✔
241
        )
37✔
242

37✔
243
        setInitialHistoricalSyncer := func(s *GossipSyncer) {
52✔
244
                initialHistoricalSyncer = s
15✔
245
                initialHistoricalSyncSignal = s.ResetSyncedSignal()
15✔
246

15✔
247
                // Restart the timer for our new historical sync peer. This will
15✔
248
                // ensure that all initial syncers receive an equivalent
15✔
249
                // duration before attempting the next sync. Without doing so we
15✔
250
                // might attempt two historical sync back to back if a peer
15✔
251
                // disconnects just before the ticker fires.
15✔
252
                m.cfg.HistoricalSyncTicker.Pause()
15✔
253
                m.cfg.HistoricalSyncTicker.Resume()
15✔
254
        }
15✔
255

256
        for {
118✔
257
                select {
81✔
258
                // A new peer has been connected, so we'll create its
259
                // accompanying GossipSyncer.
260
                case newSyncer := <-m.newSyncers:
28✔
261
                        // If we already have a syncer, then we'll exit early as
28✔
262
                        // we don't want to override it.
28✔
263
                        if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok {
28✔
264
                                close(newSyncer.doneChan)
×
265
                                continue
×
266
                        }
267

268
                        s := m.createGossipSyncer(newSyncer.peer)
28✔
269

28✔
270
                        isPinnedSyncer := m.isPinnedSyncer(s)
28✔
271

28✔
272
                        // attemptHistoricalSync determines whether we should
28✔
273
                        // attempt an initial historical sync when a new peer
28✔
274
                        // connects.
28✔
275
                        attemptHistoricalSync := false
28✔
276

28✔
277
                        m.syncersMu.Lock()
28✔
278
                        switch {
28✔
279
                        // For pinned syncers, we will immediately transition
280
                        // the peer into an active (pinned) sync state.
281
                        case isPinnedSyncer:
6✔
282
                                attemptHistoricalSync = true
6✔
283
                                s.setSyncType(PinnedSync)
6✔
284
                                s.setSyncState(syncerIdle)
6✔
285
                                m.pinnedActiveSyncers[s.cfg.peerPub] = s
6✔
286

287
                        // Regardless of whether the initial historical sync
288
                        // has completed, we'll re-trigger a historical sync if
289
                        // we no longer have any syncers. This might be
290
                        // necessary if we lost all our peers at one point, and
291
                        // now we finally have one again.
292
                        case len(m.activeSyncers) == 0 &&
293
                                len(m.inactiveSyncers) == 0:
13✔
294

13✔
295
                                attemptHistoricalSync =
13✔
296
                                        m.cfg.NumActiveSyncers > 0
13✔
297
                                fallthrough
13✔
298

299
                        // If we've exceeded our total number of active syncers,
300
                        // we'll initialize this GossipSyncer as passive.
301
                        case len(m.activeSyncers) >= m.cfg.NumActiveSyncers:
17✔
302
                                fallthrough
17✔
303

304
                        // If the initial historical sync has yet to complete,
305
                        // then we'll declare it as passive and attempt to
306
                        // transition it when the initial historical sync
307
                        // completes.
308
                        case !m.IsGraphSynced():
21✔
309
                                s.setSyncType(PassiveSync)
21✔
310
                                m.inactiveSyncers[s.cfg.peerPub] = s
21✔
311

312
                        // The initial historical sync has completed, so we can
313
                        // immediately start the GossipSyncer as active.
314
                        default:
7✔
315
                                s.setSyncType(ActiveSync)
7✔
316
                                m.activeSyncers[s.cfg.peerPub] = s
7✔
317
                        }
318
                        m.syncersMu.Unlock()
28✔
319

28✔
320
                        s.Start()
28✔
321

28✔
322
                        // Once we create the GossipSyncer, we'll signal to the
28✔
323
                        // caller that they can proceed since the SyncManager's
28✔
324
                        // internal state has been updated.
28✔
325
                        close(newSyncer.doneChan)
28✔
326

28✔
327
                        // We'll force a historical sync with the first peer we
28✔
328
                        // connect to, to ensure we get as much of the graph as
28✔
329
                        // possible.
28✔
330
                        if !attemptHistoricalSync {
44✔
331
                                continue
16✔
332
                        }
333

334
                        log.Debugf("Attempting initial historical sync with "+
15✔
335
                                "GossipSyncer(%x)", s.cfg.peerPub)
15✔
336

15✔
337
                        if err := s.historicalSync(); err != nil {
15✔
338
                                log.Errorf("Unable to attempt initial "+
×
339
                                        "historical sync with "+
×
340
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
341
                                        err)
×
342
                                continue
×
343
                        }
344

345
                        // Once the historical sync has started, we'll get a
346
                        // keep track of the corresponding syncer to properly
347
                        // handle disconnects. We'll also use a signal to know
348
                        // when the historical sync completed.
349
                        if !isPinnedSyncer {
27✔
350
                                setInitialHistoricalSyncer(s)
12✔
351
                        }
12✔
352

353
                // An existing peer has disconnected, so we'll tear down its
354
                // corresponding GossipSyncer.
355
                case staleSyncer := <-m.staleSyncers:
9✔
356
                        // Once the corresponding GossipSyncer has been stopped
9✔
357
                        // and removed, we'll signal to the caller that they can
9✔
358
                        // proceed since the SyncManager's internal state has
9✔
359
                        // been updated.
9✔
360
                        m.removeGossipSyncer(staleSyncer.peer)
9✔
361
                        close(staleSyncer.doneChan)
9✔
362

9✔
363
                        // If we don't have an initialHistoricalSyncer, or we do
9✔
364
                        // but it is not the peer being disconnected, then we
9✔
365
                        // have nothing left to do and can proceed.
9✔
366
                        switch {
9✔
367
                        case initialHistoricalSyncer == nil:
7✔
368
                                fallthrough
7✔
369
                        case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub:
8✔
370
                                fallthrough
8✔
371
                        case m.cfg.NumActiveSyncers == 0:
8✔
372
                                continue
8✔
373
                        }
374

375
                        // Otherwise, our initialHistoricalSyncer corresponds to
376
                        // the peer being disconnected, so we'll have to find a
377
                        // replacement.
378
                        log.Debug("Finding replacement for initial " +
1✔
379
                                "historical sync")
1✔
380

1✔
381
                        s := m.forceHistoricalSync()
1✔
382
                        if s == nil {
1✔
383
                                log.Debug("No eligible replacement found " +
×
384
                                        "for initial historical sync")
×
385
                                continue
×
386
                        }
387

388
                        log.Debugf("Replaced initial historical "+
1✔
389
                                "GossipSyncer(%v) with GossipSyncer(%x)",
1✔
390
                                staleSyncer.peer, s.cfg.peerPub)
1✔
391

1✔
392
                        setInitialHistoricalSyncer(s)
1✔
393

394
                // Our initial historical sync signal has completed, so we'll
395
                // nil all of the relevant fields as they're no longer needed.
396
                case <-initialHistoricalSyncSignal:
11✔
397
                        initialHistoricalSyncer = nil
11✔
398
                        initialHistoricalSyncSignal = nil
11✔
399

11✔
400
                        log.Debug("Initial historical sync completed")
11✔
401

11✔
402
                        // With the initial historical sync complete, we can
11✔
403
                        // begin receiving new graph updates at tip. We'll
11✔
404
                        // determine whether we can have any more active
11✔
405
                        // GossipSyncers. If we do, we'll randomly select some
11✔
406
                        // that are currently passive to transition.
11✔
407
                        m.syncersMu.Lock()
11✔
408
                        numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers)
11✔
409
                        if numActiveLeft <= 0 {
11✔
410
                                m.syncersMu.Unlock()
×
411
                                continue
×
412
                        }
413

414
                        // We may not even have enough inactive syncers to be
415
                        // transitted. In that case, we will transit all the
416
                        // inactive syncers.
417
                        if len(m.inactiveSyncers) < numActiveLeft {
19✔
418
                                numActiveLeft = len(m.inactiveSyncers)
8✔
419
                        }
8✔
420

421
                        log.Debugf("Attempting to transition %v passive "+
11✔
422
                                "GossipSyncers to active", numActiveLeft)
11✔
423

11✔
424
                        for i := 0; i < numActiveLeft; i++ {
22✔
425
                                chooseRandomSyncer(
11✔
426
                                        m.inactiveSyncers, m.transitionPassiveSyncer,
11✔
427
                                )
11✔
428
                        }
11✔
429

430
                        m.syncersMu.Unlock()
11✔
431

432
                // Our RotateTicker has ticked, so we'll attempt to rotate a
433
                // single active syncer with a passive one.
434
                case <-m.cfg.RotateTicker.Ticks():
2✔
435
                        m.rotateActiveSyncerCandidate()
2✔
436

437
                // Our HistoricalSyncTicker has ticked, so we'll randomly select
438
                // a peer and force a historical sync with them.
439
                case <-m.cfg.HistoricalSyncTicker.Ticks():
3✔
440
                        // To be extra cautious, gate the forceHistoricalSync
3✔
441
                        // call such that it can only execute if we are
3✔
442
                        // configured to have a non-zero number of sync peers.
3✔
443
                        // This way even if the historical sync ticker manages
3✔
444
                        // to tick we can be sure that a historical sync won't
3✔
445
                        // accidentally begin.
3✔
446
                        if m.cfg.NumActiveSyncers == 0 {
4✔
447
                                continue
1✔
448
                        }
449

450
                        // If we don't have a syncer available we have nothing
451
                        // to do.
452
                        s := m.forceHistoricalSync()
2✔
453
                        if s == nil {
2✔
454
                                continue
×
455
                        }
456

457
                        // If we've already completed a historical sync, we'll
458
                        // skip setting the initial historical syncer.
459
                        if m.IsGraphSynced() {
2✔
460
                                continue
×
461
                        }
462

463
                        // Otherwise, we'll track the peer we've performed a
464
                        // historical sync with in order to handle the case
465
                        // where our previous historical sync peer did not
466
                        // respond to our queries and we haven't ingested as
467
                        // much of the graph as we should.
468
                        setInitialHistoricalSyncer(s)
2✔
469

470
                case <-m.quit:
37✔
471
                        return
37✔
472
                }
473
        }
474
}
475

476
// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned
477
// sync peers.
478
func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
28✔
479
        _, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub]
28✔
480
        return isPinnedSyncer
28✔
481
}
28✔
482

483
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
484
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
28✔
485
        nodeID := route.Vertex(peer.PubKey())
28✔
486
        log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:])
28✔
487

28✔
488
        encoding := lnwire.EncodingSortedPlain
28✔
489
        s := newGossipSyncer(gossipSyncerCfg{
28✔
490
                chainHash:     m.cfg.ChainHash,
28✔
491
                peerPub:       nodeID,
28✔
492
                channelSeries: m.cfg.ChanSeries,
28✔
493
                encodingType:  encoding,
28✔
494
                chunkSize:     encodingTypeToChunkSize[encoding],
28✔
495
                batchSize:     requestBatchSize,
28✔
496
                sendToPeer: func(msgs ...lnwire.Message) error {
65✔
497
                        return peer.SendMessageLazy(false, msgs...)
37✔
498
                },
37✔
499
                sendToPeerSync: func(msgs ...lnwire.Message) error {
3✔
500
                        return peer.SendMessageLazy(true, msgs...)
3✔
501
                },
3✔
502
                ignoreHistoricalFilters:   m.cfg.IgnoreHistoricalFilters,
503
                maxUndelayedQueryReplies:  DefaultMaxUndelayedQueryReplies,
504
                delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval,
505
                bestHeight:                m.cfg.BestHeight,
506
                markGraphSynced:           m.markGraphSynced,
507
                maxQueryChanRangeReplies:  maxQueryChanRangeReplies,
508
                noTimestampQueryOption:    m.cfg.NoTimestampQueries,
509
                isStillZombieChannel:      m.cfg.IsStillZombieChannel,
510
        })
511

512
        // Gossip syncers are initialized by default in a PassiveSync type
513
        // and chansSynced state so that they can reply to any peer queries or
514
        // handle any sync transitions.
515
        s.setSyncState(chansSynced)
28✔
516
        s.setSyncType(PassiveSync)
28✔
517

28✔
518
        log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%v",
28✔
519
                s.syncState(), s.SyncType(), peer)
28✔
520

28✔
521
        return s
28✔
522
}
523

524
// removeGossipSyncer removes all internal references to the disconnected peer's
525
// GossipSyncer and stops it. In the event of an active GossipSyncer being
526
// disconnected, a passive GossipSyncer, if any, will take its place.
527
func (m *SyncManager) removeGossipSyncer(peer route.Vertex) {
9✔
528
        m.syncersMu.Lock()
9✔
529
        defer m.syncersMu.Unlock()
9✔
530

9✔
531
        s, ok := m.gossipSyncer(peer)
9✔
532
        if !ok {
9✔
533
                return
×
534
        }
×
535

536
        log.Infof("Removing GossipSyncer for peer=%v", peer)
9✔
537

9✔
538
        // We'll stop the GossipSyncer for the disconnected peer in a goroutine
9✔
539
        // to prevent blocking the SyncManager.
9✔
540
        go s.Stop()
9✔
541

9✔
542
        // If it's a non-active syncer, then we can just exit now.
9✔
543
        if _, ok := m.inactiveSyncers[peer]; ok {
14✔
544
                delete(m.inactiveSyncers, peer)
5✔
545
                return
5✔
546
        }
5✔
547

548
        // If it's a pinned syncer, then we can just exit as this doesn't
549
        // affect our active syncer count.
550
        if _, ok := m.pinnedActiveSyncers[peer]; ok {
10✔
551
                delete(m.pinnedActiveSyncers, peer)
3✔
552
                return
3✔
553
        }
3✔
554

555
        // Otherwise, we'll need find a new one to replace it, if any.
556
        delete(m.activeSyncers, peer)
7✔
557
        newActiveSyncer := chooseRandomSyncer(
7✔
558
                m.inactiveSyncers, m.transitionPassiveSyncer,
7✔
559
        )
7✔
560
        if newActiveSyncer == nil {
12✔
561
                return
5✔
562
        }
5✔
563

564
        log.Debugf("Replaced active GossipSyncer(%x) with GossipSyncer(%x)",
5✔
565
                peer, newActiveSyncer.cfg.peerPub)
5✔
566
}
567

568
// rotateActiveSyncerCandidate rotates a single active syncer. In order to
569
// achieve this, the active syncer must be in a chansSynced state in order to
570
// process the sync transition.
571
func (m *SyncManager) rotateActiveSyncerCandidate() {
2✔
572
        m.syncersMu.Lock()
2✔
573
        defer m.syncersMu.Unlock()
2✔
574

2✔
575
        // If we couldn't find an eligible active syncer to rotate, we can
2✔
576
        // return early.
2✔
577
        activeSyncer := chooseRandomSyncer(m.activeSyncers, nil)
2✔
578
        if activeSyncer == nil {
2✔
579
                log.Debug("No eligible active syncer to rotate")
×
580
                return
×
581
        }
×
582

583
        // Similarly, if we don't have a candidate to rotate with, we can return
584
        // early as well.
585
        candidate := chooseRandomSyncer(m.inactiveSyncers, nil)
2✔
586
        if candidate == nil {
3✔
587
                log.Debug("No eligible candidate to rotate active syncer")
1✔
588
                return
1✔
589
        }
1✔
590

591
        // Otherwise, we'll attempt to transition each syncer to their
592
        // respective new sync type.
593
        log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)",
1✔
594
                activeSyncer.cfg.peerPub, candidate.cfg.peerPub)
1✔
595

1✔
596
        if err := m.transitionActiveSyncer(activeSyncer); err != nil {
1✔
597
                log.Errorf("Unable to transition active GossipSyncer(%x): %v",
×
598
                        activeSyncer.cfg.peerPub, err)
×
599
                return
×
600
        }
×
601

602
        if err := m.transitionPassiveSyncer(candidate); err != nil {
1✔
603
                log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
×
604
                        activeSyncer.cfg.peerPub, err)
×
605
                return
×
606
        }
×
607
}
608

609
// transitionActiveSyncer transitions an active syncer to a passive one.
610
//
611
// NOTE: This must be called with the syncersMu lock held.
612
func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error {
1✔
613
        log.Debugf("Transitioning active GossipSyncer(%x) to passive",
1✔
614
                s.cfg.peerPub)
1✔
615

1✔
616
        if err := s.ProcessSyncTransition(PassiveSync); err != nil {
1✔
617
                return err
×
618
        }
×
619

620
        delete(m.activeSyncers, s.cfg.peerPub)
1✔
621
        m.inactiveSyncers[s.cfg.peerPub] = s
1✔
622

1✔
623
        return nil
1✔
624
}
625

626
// transitionPassiveSyncer transitions a passive syncer to an active one.
627
//
628
// NOTE: This must be called with the syncersMu lock held.
629
func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error {
14✔
630
        log.Debugf("Transitioning passive GossipSyncer(%x) to active",
14✔
631
                s.cfg.peerPub)
14✔
632

14✔
633
        if err := s.ProcessSyncTransition(ActiveSync); err != nil {
15✔
634
                return err
1✔
635
        }
1✔
636

637
        delete(m.inactiveSyncers, s.cfg.peerPub)
13✔
638
        m.activeSyncers[s.cfg.peerPub] = s
13✔
639

13✔
640
        return nil
13✔
641
}
642

643
// forceHistoricalSync chooses a syncer with a remote peer at random and forces
644
// a historical sync with it.
645
func (m *SyncManager) forceHistoricalSync() *GossipSyncer {
3✔
646
        m.syncersMu.Lock()
3✔
647
        defer m.syncersMu.Unlock()
3✔
648

3✔
649
        // We'll sample from both sets of active and inactive syncers in the
3✔
650
        // event that we don't have any inactive syncers.
3✔
651
        return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
6✔
652
                return s.historicalSync()
3✔
653
        })
3✔
654
}
655

656
// chooseRandomSyncer iterates through the set of syncers given and returns the
657
// first one which was able to successfully perform the action enclosed in the
658
// function closure.
659
//
660
// NOTE: It's possible for a nil value to be returned if there are no eligible
661
// candidate syncers.
662
func chooseRandomSyncer(syncers map[route.Vertex]*GossipSyncer,
663
        action func(*GossipSyncer) error) *GossipSyncer {
22✔
664

22✔
665
        for _, s := range syncers {
43✔
666
                // Only syncers in a chansSynced state are viable for sync
21✔
667
                // transitions, so skip any that aren't.
21✔
668
                if s.syncState() != chansSynced {
23✔
669
                        continue
2✔
670
                }
671

672
                if action != nil {
35✔
673
                        if err := action(s); err != nil {
17✔
674
                                log.Debugf("Skipping eligible candidate "+
1✔
675
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
1✔
676
                                        err)
1✔
677
                                continue
1✔
678
                        }
679
                }
680

681
                return s
18✔
682
        }
683

684
        return nil
7✔
685
}
686

687
// InitSyncState is called by outside sub-systems when a connection is
688
// established to a new peer that understands how to perform channel range
689
// queries. We'll allocate a new GossipSyncer for it, and start any goroutines
690
// needed to handle new queries. The first GossipSyncer registered with the
691
// SyncManager will attempt a historical sync to ensure we have as much of the
692
// public channel graph as possible.
693
//
694
// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer.
695
func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error {
28✔
696
        done := make(chan struct{})
28✔
697

28✔
698
        select {
28✔
699
        case m.newSyncers <- &newSyncer{
700
                peer:     peer,
701
                doneChan: done,
702
        }:
28✔
703
        case <-m.quit:
×
704
                return ErrSyncManagerExiting
×
705
        }
706

707
        select {
28✔
708
        case <-done:
28✔
709
                return nil
28✔
710
        case <-m.quit:
×
711
                return ErrSyncManagerExiting
×
712
        }
713
}
714

715
// PruneSyncState is called by outside sub-systems once a peer that we were
716
// previously connected to has been disconnected. In this case we can stop the
717
// existing GossipSyncer assigned to the peer and free up resources.
718
func (m *SyncManager) PruneSyncState(peer route.Vertex) {
9✔
719
        done := make(chan struct{})
9✔
720

9✔
721
        // We avoid returning an error when the SyncManager is stopped since the
9✔
722
        // GossipSyncer will be stopped then anyway.
9✔
723
        select {
9✔
724
        case m.staleSyncers <- &staleSyncer{
725
                peer:     peer,
726
                doneChan: done,
727
        }:
9✔
728
        case <-m.quit:
×
729
                return
×
730
        }
731

732
        select {
9✔
733
        case <-done:
9✔
734
        case <-m.quit:
×
735
        }
736
}
737

738
// GossipSyncer returns the associated gossip syncer of a peer. The boolean
739
// returned signals whether there exists a gossip syncer for the peer.
740
func (m *SyncManager) GossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
48✔
741
        m.syncersMu.Lock()
48✔
742
        defer m.syncersMu.Unlock()
48✔
743
        return m.gossipSyncer(peer)
48✔
744
}
48✔
745

746
// gossipSyncer returns the associated gossip syncer of a peer. The boolean
747
// returned signals whether there exists a gossip syncer for the peer.
748
func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
54✔
749
        syncer, ok := m.inactiveSyncers[peer]
54✔
750
        if ok {
72✔
751
                return syncer, true
18✔
752
        }
18✔
753
        syncer, ok = m.activeSyncers[peer]
39✔
754
        if ok {
50✔
755
                return syncer, true
11✔
756
        }
11✔
757
        syncer, ok = m.pinnedActiveSyncers[peer]
31✔
758
        if ok {
37✔
759
                return syncer, true
6✔
760
        }
6✔
761
        return nil, false
28✔
762
}
763

764
// GossipSyncers returns all of the currently initialized gossip syncers.
765
func (m *SyncManager) GossipSyncers() map[route.Vertex]*GossipSyncer {
34✔
766
        m.syncersMu.Lock()
34✔
767
        defer m.syncersMu.Unlock()
34✔
768
        return m.gossipSyncers()
34✔
769
}
34✔
770

771
// gossipSyncers returns all of the currently initialized gossip syncers.
772
func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer {
37✔
773
        numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers)
37✔
774
        syncers := make(map[route.Vertex]*GossipSyncer, numSyncers)
37✔
775

37✔
776
        for _, syncer := range m.inactiveSyncers {
45✔
777
                syncers[syncer.cfg.peerPub] = syncer
8✔
778
        }
8✔
779
        for _, syncer := range m.activeSyncers {
40✔
780
                syncers[syncer.cfg.peerPub] = syncer
3✔
781
        }
3✔
782

783
        return syncers
37✔
784
}
785

786
// markGraphSynced allows us to report that the initial historical sync has
787
// completed.
788
func (m *SyncManager) markGraphSynced() {
40✔
789
        atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
40✔
790
}
40✔
791

792
// IsGraphSynced determines whether we've completed our initial historical sync.
793
// The initial historical sync is done to ensure we've ingested as much of the
794
// public graph as possible.
795
func (m *SyncManager) IsGraphSynced() bool {
89✔
796
        return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
89✔
797
}
89✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc