• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12280337084

11 Dec 2024 04:09PM UTC coverage: 49.495% (-0.05%) from 49.54%
12280337084

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

100285 of 202617 relevant lines covered (49.49%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.88
/discovery/sync_manager.go
1
package discovery
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/lightningnetwork/lnd/lnpeer"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
        "github.com/lightningnetwork/lnd/routing/route"
13
        "github.com/lightningnetwork/lnd/ticker"
14
)
15

16
const (
17
        // DefaultSyncerRotationInterval is the default interval in which we'll
18
        // rotate a single active syncer.
19
        DefaultSyncerRotationInterval = 20 * time.Minute
20

21
        // DefaultHistoricalSyncInterval is the default interval in which we'll
22
        // force a historical sync to ensure we have as much of the public
23
        // network as possible.
24
        DefaultHistoricalSyncInterval = time.Hour
25

26
        // filterSemaSize is the capacity of gossipFilterSema.
27
        filterSemaSize = 5
28
)
29

30
var (
31
        // ErrSyncManagerExiting is an error returned when we attempt to
32
        // start/stop a gossip syncer for a connected/disconnected peer, but the
33
        // SyncManager has already been stopped.
34
        ErrSyncManagerExiting = errors.New("sync manager exiting")
35
)
36

37
// newSyncer in an internal message we'll use within the SyncManager to signal
38
// that we should create a GossipSyncer for a newly connected peer.
39
type newSyncer struct {
40
        // peer is the newly connected peer.
41
        peer lnpeer.Peer
42

43
        // doneChan serves as a signal to the caller that the SyncManager's
44
        // internal state correctly reflects the stale active syncer.
45
        doneChan chan struct{}
46
}
47

48
// staleSyncer is an internal message we'll use within the SyncManager to signal
49
// that a peer has disconnected and its GossipSyncer should be removed.
50
type staleSyncer struct {
51
        // peer is the peer that has disconnected.
52
        peer route.Vertex
53

54
        // doneChan serves as a signal to the caller that the SyncManager's
55
        // internal state correctly reflects the stale active syncer. This is
56
        // needed to ensure we always create a new syncer for a flappy peer
57
        // after they disconnect if they happened to be an active syncer.
58
        doneChan chan struct{}
59
}
60

61
// SyncManagerCfg contains all of the dependencies required for the SyncManager
62
// to carry out its duties.
63
type SyncManagerCfg struct {
64
        // ChainHash is a hash that indicates the specific network of the active
65
        // chain.
66
        ChainHash chainhash.Hash
67

68
        // ChanSeries is an interface that provides access to a time series view
69
        // of the current known channel graph. Each GossipSyncer enabled peer
70
        // will utilize this in order to create and respond to channel graph
71
        // time series queries.
72
        ChanSeries ChannelGraphTimeSeries
73

74
        // NumActiveSyncers is the number of peers for which we should have
75
        // active syncers with. After reaching NumActiveSyncers, any future
76
        // gossip syncers will be passive.
77
        NumActiveSyncers int
78

79
        // NoTimestampQueries will prevent the GossipSyncer from querying
80
        // timestamps of announcement messages from the peer and from responding
81
        // to timestamp queries
82
        NoTimestampQueries bool
83

84
        // RotateTicker is a ticker responsible for notifying the SyncManager
85
        // when it should rotate its active syncers. A single active syncer with
86
        // a chansSynced state will be exchanged for a passive syncer in order
87
        // to ensure we don't keep syncing with the same peers.
88
        RotateTicker ticker.Ticker
89

90
        // HistoricalSyncTicker is a ticker responsible for notifying the
91
        // SyncManager when it should attempt a historical sync with a gossip
92
        // sync peer.
93
        HistoricalSyncTicker ticker.Ticker
94

95
        // IgnoreHistoricalFilters will prevent syncers from replying with
96
        // historical data when the remote peer sets a gossip_timestamp_range.
97
        // This prevents ranges with old start times from causing us to dump the
98
        // graph on connect.
99
        IgnoreHistoricalFilters bool
100

101
        // BestHeight returns the latest height known of the chain.
102
        BestHeight func() uint32
103

104
        // PinnedSyncers is a set of peers that will always transition to
105
        // ActiveSync upon connection. These peers will never transition to
106
        // PassiveSync.
107
        PinnedSyncers PinnedSyncers
108

109
        // IsStillZombieChannel takes the timestamps of the latest channel
110
        // updates for a channel and returns true if the channel should be
111
        // considered a zombie based on these timestamps.
112
        IsStillZombieChannel func(time.Time, time.Time) bool
113
}
114

115
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
116
// for peers currently connected. When a new peer is connected, the manager will
117
// create its accompanying gossip syncer and determine whether it should have an
118
// ActiveSync or PassiveSync sync type based on how many other gossip syncers
119
// are currently active. Any ActiveSync gossip syncers are started in a
120
// round-robin manner to ensure we're not syncing with multiple peers at the
121
// same time. The first GossipSyncer registered with the SyncManager will
122
// attempt a historical sync to ensure we have as much of the public channel
123
// graph as possible.
124
type SyncManager struct {
125
        // initialHistoricalSyncCompleted serves as a barrier when initializing
126
        // new active GossipSyncers. If 0, the initial historical sync has not
127
        // completed, so we'll defer initializing any active GossipSyncers. If
128
        // 1, then we can transition the GossipSyncer immediately. We set up
129
        // this barrier to ensure we have most of the graph before attempting to
130
        // accept new updates at tip.
131
        //
132
        // NOTE: This must be used atomically.
133
        initialHistoricalSyncCompleted int32
134

135
        start sync.Once
136
        stop  sync.Once
137

138
        cfg SyncManagerCfg
139

140
        // newSyncers is a channel we'll use to process requests to create
141
        // GossipSyncers for newly connected peers.
142
        newSyncers chan *newSyncer
143

144
        // staleSyncers is a channel we'll use to process requests to tear down
145
        // GossipSyncers for disconnected peers.
146
        staleSyncers chan *staleSyncer
147

148
        // syncersMu guards the read and write access to the activeSyncers and
149
        // inactiveSyncers maps below.
150
        syncersMu sync.Mutex
151

152
        // activeSyncers is the set of all syncers for which we are currently
153
        // receiving graph updates from. The number of possible active syncers
154
        // is bounded by NumActiveSyncers.
155
        activeSyncers map[route.Vertex]*GossipSyncer
156

157
        // inactiveSyncers is the set of all syncers for which we are not
158
        // currently receiving new graph updates from.
159
        inactiveSyncers map[route.Vertex]*GossipSyncer
160

161
        // pinnedActiveSyncers is the set of all syncers which are pinned into
162
        // an active sync. Pinned peers performan an initial historical sync on
163
        // each connection and will continue to receive graph updates for the
164
        // duration of the connection.
165
        pinnedActiveSyncers map[route.Vertex]*GossipSyncer
166

167
        // gossipFilterSema contains semaphores for the gossip timestamp
168
        // queries.
169
        gossipFilterSema chan struct{}
170

171
        wg   sync.WaitGroup
172
        quit chan struct{}
173
}
174

175
// newSyncManager constructs a new SyncManager backed by the given config.
176
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
3✔
177

3✔
178
        filterSema := make(chan struct{}, filterSemaSize)
3✔
179
        for i := 0; i < filterSemaSize; i++ {
6✔
180
                filterSema <- struct{}{}
3✔
181
        }
3✔
182

183
        return &SyncManager{
3✔
184
                cfg:          *cfg,
3✔
185
                newSyncers:   make(chan *newSyncer),
3✔
186
                staleSyncers: make(chan *staleSyncer),
3✔
187
                activeSyncers: make(
3✔
188
                        map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers,
3✔
189
                ),
3✔
190
                inactiveSyncers: make(map[route.Vertex]*GossipSyncer),
3✔
191
                pinnedActiveSyncers: make(
3✔
192
                        map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers),
3✔
193
                ),
3✔
194
                gossipFilterSema: filterSema,
3✔
195
                quit:             make(chan struct{}),
3✔
196
        }
3✔
197
}
198

199
// Start starts the SyncManager in order to properly carry out its duties.
200
func (m *SyncManager) Start() {
3✔
201
        m.start.Do(func() {
6✔
202
                m.wg.Add(1)
3✔
203
                go m.syncerHandler()
3✔
204
        })
3✔
205
}
206

207
// Stop stops the SyncManager from performing its duties.
208
func (m *SyncManager) Stop() {
3✔
209
        m.stop.Do(func() {
6✔
210
                log.Debugf("SyncManager is stopping")
3✔
211
                defer log.Debugf("SyncManager stopped")
3✔
212

3✔
213
                close(m.quit)
3✔
214
                m.wg.Wait()
3✔
215

3✔
216
                for _, syncer := range m.inactiveSyncers {
3✔
217
                        syncer.Stop()
×
218
                }
×
219
                for _, syncer := range m.activeSyncers {
6✔
220
                        syncer.Stop()
3✔
221
                }
3✔
222
        })
223
}
224

225
// syncerHandler is the SyncManager's main event loop responsible for:
226
//
227
// 1. Creating and tearing down GossipSyncers for connected/disconnected peers.
228

229
// 2. Finding new peers to receive graph updates from to ensure we don't only
230
//    receive them from the same set of peers.
231

232
//  3. Finding new peers to force a historical sync with to ensure we have as
233
//     much of the public network as possible.
234
//
235
// NOTE: This must be run as a goroutine.
236
func (m *SyncManager) syncerHandler() {
3✔
237
        defer m.wg.Done()
3✔
238

3✔
239
        m.cfg.RotateTicker.Resume()
3✔
240
        defer m.cfg.RotateTicker.Stop()
3✔
241

3✔
242
        defer m.cfg.HistoricalSyncTicker.Stop()
3✔
243

3✔
244
        var (
3✔
245
                // initialHistoricalSyncer is the syncer we are currently
3✔
246
                // performing an initial historical sync with.
3✔
247
                initialHistoricalSyncer *GossipSyncer
3✔
248

3✔
249
                // initialHistoricalSyncSignal is a signal that will fire once
3✔
250
                // the initial historical sync has been completed. This is
3✔
251
                // crucial to ensure that another historical sync isn't
3✔
252
                // attempted just because the initialHistoricalSyncer was
3✔
253
                // disconnected.
3✔
254
                initialHistoricalSyncSignal chan struct{}
3✔
255
        )
3✔
256

3✔
257
        setInitialHistoricalSyncer := func(s *GossipSyncer) {
6✔
258
                initialHistoricalSyncer = s
3✔
259
                initialHistoricalSyncSignal = s.ResetSyncedSignal()
3✔
260

3✔
261
                // Restart the timer for our new historical sync peer. This will
3✔
262
                // ensure that all initial syncers receive an equivalent
3✔
263
                // duration before attempting the next sync. Without doing so we
3✔
264
                // might attempt two historical sync back to back if a peer
3✔
265
                // disconnects just before the ticker fires.
3✔
266
                m.cfg.HistoricalSyncTicker.Pause()
3✔
267
                m.cfg.HistoricalSyncTicker.Resume()
3✔
268
        }
3✔
269

270
        for {
6✔
271
                select {
3✔
272
                // A new peer has been connected, so we'll create its
273
                // accompanying GossipSyncer.
274
                case newSyncer := <-m.newSyncers:
3✔
275
                        // If we already have a syncer, then we'll exit early as
3✔
276
                        // we don't want to override it.
3✔
277
                        if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok {
3✔
278
                                close(newSyncer.doneChan)
×
279
                                continue
×
280
                        }
281

282
                        s := m.createGossipSyncer(newSyncer.peer)
3✔
283

3✔
284
                        isPinnedSyncer := m.isPinnedSyncer(s)
3✔
285

3✔
286
                        // attemptHistoricalSync determines whether we should
3✔
287
                        // attempt an initial historical sync when a new peer
3✔
288
                        // connects.
3✔
289
                        attemptHistoricalSync := false
3✔
290

3✔
291
                        m.syncersMu.Lock()
3✔
292
                        switch {
3✔
293
                        // For pinned syncers, we will immediately transition
294
                        // the peer into an active (pinned) sync state.
295
                        case isPinnedSyncer:
3✔
296
                                attemptHistoricalSync = true
3✔
297
                                s.setSyncType(PinnedSync)
3✔
298
                                s.setSyncState(syncerIdle)
3✔
299
                                m.pinnedActiveSyncers[s.cfg.peerPub] = s
3✔
300

301
                        // Regardless of whether the initial historical sync
302
                        // has completed, we'll re-trigger a historical sync if
303
                        // we no longer have any syncers. This might be
304
                        // necessary if we lost all our peers at one point, and
305
                        // now we finally have one again.
306
                        case len(m.activeSyncers) == 0 &&
307
                                len(m.inactiveSyncers) == 0:
3✔
308

3✔
309
                                attemptHistoricalSync =
3✔
310
                                        m.cfg.NumActiveSyncers > 0
3✔
311
                                fallthrough
3✔
312

313
                        // If we've exceeded our total number of active syncers,
314
                        // we'll initialize this GossipSyncer as passive.
315
                        case len(m.activeSyncers) >= m.cfg.NumActiveSyncers:
3✔
316
                                fallthrough
3✔
317

318
                        // If the initial historical sync has yet to complete,
319
                        // then we'll declare it as passive and attempt to
320
                        // transition it when the initial historical sync
321
                        // completes.
322
                        case !m.IsGraphSynced():
3✔
323
                                s.setSyncType(PassiveSync)
3✔
324
                                m.inactiveSyncers[s.cfg.peerPub] = s
3✔
325

326
                        // The initial historical sync has completed, so we can
327
                        // immediately start the GossipSyncer as active.
328
                        default:
3✔
329
                                s.setSyncType(ActiveSync)
3✔
330
                                m.activeSyncers[s.cfg.peerPub] = s
3✔
331
                        }
332
                        m.syncersMu.Unlock()
3✔
333

3✔
334
                        s.Start()
3✔
335

3✔
336
                        // Once we create the GossipSyncer, we'll signal to the
3✔
337
                        // caller that they can proceed since the SyncManager's
3✔
338
                        // internal state has been updated.
3✔
339
                        close(newSyncer.doneChan)
3✔
340

3✔
341
                        // We'll force a historical sync with the first peer we
3✔
342
                        // connect to, to ensure we get as much of the graph as
3✔
343
                        // possible.
3✔
344
                        if !attemptHistoricalSync {
6✔
345
                                continue
3✔
346
                        }
347

348
                        log.Debugf("Attempting initial historical sync with "+
3✔
349
                                "GossipSyncer(%x)", s.cfg.peerPub)
3✔
350

3✔
351
                        if err := s.historicalSync(); err != nil {
3✔
352
                                log.Errorf("Unable to attempt initial "+
×
353
                                        "historical sync with "+
×
354
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
355
                                        err)
×
356
                                continue
×
357
                        }
358

359
                        // Once the historical sync has started, we'll get a
360
                        // keep track of the corresponding syncer to properly
361
                        // handle disconnects. We'll also use a signal to know
362
                        // when the historical sync completed.
363
                        if !isPinnedSyncer {
6✔
364
                                setInitialHistoricalSyncer(s)
3✔
365
                        }
3✔
366

367
                // An existing peer has disconnected, so we'll tear down its
368
                // corresponding GossipSyncer.
369
                case staleSyncer := <-m.staleSyncers:
3✔
370
                        // Once the corresponding GossipSyncer has been stopped
3✔
371
                        // and removed, we'll signal to the caller that they can
3✔
372
                        // proceed since the SyncManager's internal state has
3✔
373
                        // been updated.
3✔
374
                        m.removeGossipSyncer(staleSyncer.peer)
3✔
375
                        close(staleSyncer.doneChan)
3✔
376

3✔
377
                        // If we don't have an initialHistoricalSyncer, or we do
3✔
378
                        // but it is not the peer being disconnected, then we
3✔
379
                        // have nothing left to do and can proceed.
3✔
380
                        switch {
3✔
381
                        case initialHistoricalSyncer == nil:
3✔
382
                                fallthrough
3✔
383
                        case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub:
3✔
384
                                fallthrough
3✔
385
                        case m.cfg.NumActiveSyncers == 0:
3✔
386
                                continue
3✔
387
                        }
388

389
                        // Otherwise, our initialHistoricalSyncer corresponds to
390
                        // the peer being disconnected, so we'll have to find a
391
                        // replacement.
392
                        log.Debug("Finding replacement for initial " +
×
393
                                "historical sync")
×
394

×
395
                        s := m.forceHistoricalSync()
×
396
                        if s == nil {
×
397
                                log.Debug("No eligible replacement found " +
×
398
                                        "for initial historical sync")
×
399
                                continue
×
400
                        }
401

402
                        log.Debugf("Replaced initial historical "+
×
403
                                "GossipSyncer(%v) with GossipSyncer(%x)",
×
404
                                staleSyncer.peer, s.cfg.peerPub)
×
405

×
406
                        setInitialHistoricalSyncer(s)
×
407

408
                // Our initial historical sync signal has completed, so we'll
409
                // nil all of the relevant fields as they're no longer needed.
410
                case <-initialHistoricalSyncSignal:
3✔
411
                        initialHistoricalSyncer = nil
3✔
412
                        initialHistoricalSyncSignal = nil
3✔
413

3✔
414
                        log.Debug("Initial historical sync completed")
3✔
415

3✔
416
                        // With the initial historical sync complete, we can
3✔
417
                        // begin receiving new graph updates at tip. We'll
3✔
418
                        // determine whether we can have any more active
3✔
419
                        // GossipSyncers. If we do, we'll randomly select some
3✔
420
                        // that are currently passive to transition.
3✔
421
                        m.syncersMu.Lock()
3✔
422
                        numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers)
3✔
423
                        if numActiveLeft <= 0 {
3✔
424
                                m.syncersMu.Unlock()
×
425
                                continue
×
426
                        }
427

428
                        // We may not even have enough inactive syncers to be
429
                        // transitted. In that case, we will transit all the
430
                        // inactive syncers.
431
                        if len(m.inactiveSyncers) < numActiveLeft {
6✔
432
                                numActiveLeft = len(m.inactiveSyncers)
3✔
433
                        }
3✔
434

435
                        log.Debugf("Attempting to transition %v passive "+
3✔
436
                                "GossipSyncers to active", numActiveLeft)
3✔
437

3✔
438
                        for i := 0; i < numActiveLeft; i++ {
6✔
439
                                chooseRandomSyncer(
3✔
440
                                        m.inactiveSyncers, m.transitionPassiveSyncer,
3✔
441
                                )
3✔
442
                        }
3✔
443

444
                        m.syncersMu.Unlock()
3✔
445

446
                // Our RotateTicker has ticked, so we'll attempt to rotate a
447
                // single active syncer with a passive one.
448
                case <-m.cfg.RotateTicker.Ticks():
×
449
                        m.rotateActiveSyncerCandidate()
×
450

451
                // Our HistoricalSyncTicker has ticked, so we'll randomly select
452
                // a peer and force a historical sync with them.
453
                case <-m.cfg.HistoricalSyncTicker.Ticks():
×
454
                        // To be extra cautious, gate the forceHistoricalSync
×
455
                        // call such that it can only execute if we are
×
456
                        // configured to have a non-zero number of sync peers.
×
457
                        // This way even if the historical sync ticker manages
×
458
                        // to tick we can be sure that a historical sync won't
×
459
                        // accidentally begin.
×
460
                        if m.cfg.NumActiveSyncers == 0 {
×
461
                                continue
×
462
                        }
463

464
                        // If we don't have a syncer available we have nothing
465
                        // to do.
466
                        s := m.forceHistoricalSync()
×
467
                        if s == nil {
×
468
                                continue
×
469
                        }
470

471
                        // If we've already completed a historical sync, we'll
472
                        // skip setting the initial historical syncer.
473
                        if m.IsGraphSynced() {
×
474
                                continue
×
475
                        }
476

477
                        // Otherwise, we'll track the peer we've performed a
478
                        // historical sync with in order to handle the case
479
                        // where our previous historical sync peer did not
480
                        // respond to our queries and we haven't ingested as
481
                        // much of the graph as we should.
482
                        setInitialHistoricalSyncer(s)
×
483

484
                case <-m.quit:
3✔
485
                        return
3✔
486
                }
487
        }
488
}
489

490
// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned
491
// sync peers.
492
func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
3✔
493
        _, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub]
3✔
494
        return isPinnedSyncer
3✔
495
}
3✔
496

497
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
498
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
3✔
499
        nodeID := route.Vertex(peer.PubKey())
3✔
500
        log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:])
3✔
501

3✔
502
        encoding := lnwire.EncodingSortedPlain
3✔
503
        s := newGossipSyncer(gossipSyncerCfg{
3✔
504
                chainHash:     m.cfg.ChainHash,
3✔
505
                peerPub:       nodeID,
3✔
506
                channelSeries: m.cfg.ChanSeries,
3✔
507
                encodingType:  encoding,
3✔
508
                chunkSize:     encodingTypeToChunkSize[encoding],
3✔
509
                batchSize:     requestBatchSize,
3✔
510
                sendToPeer: func(msgs ...lnwire.Message) error {
6✔
511
                        return peer.SendMessageLazy(false, msgs...)
3✔
512
                },
3✔
513
                sendToPeerSync: func(msgs ...lnwire.Message) error {
3✔
514
                        return peer.SendMessageLazy(true, msgs...)
3✔
515
                },
3✔
516
                ignoreHistoricalFilters:   m.cfg.IgnoreHistoricalFilters,
517
                maxUndelayedQueryReplies:  DefaultMaxUndelayedQueryReplies,
518
                delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval,
519
                bestHeight:                m.cfg.BestHeight,
520
                markGraphSynced:           m.markGraphSynced,
521
                maxQueryChanRangeReplies:  maxQueryChanRangeReplies,
522
                noTimestampQueryOption:    m.cfg.NoTimestampQueries,
523
                isStillZombieChannel:      m.cfg.IsStillZombieChannel,
524
        }, m.gossipFilterSema)
525

526
        // Gossip syncers are initialized by default in a PassiveSync type
527
        // and chansSynced state so that they can reply to any peer queries or
528
        // handle any sync transitions.
529
        s.setSyncState(chansSynced)
3✔
530
        s.setSyncType(PassiveSync)
3✔
531

3✔
532
        log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%v",
3✔
533
                s.syncState(), s.SyncType(), peer)
3✔
534

3✔
535
        return s
3✔
536
}
537

538
// removeGossipSyncer removes all internal references to the disconnected peer's
539
// GossipSyncer and stops it. In the event of an active GossipSyncer being
540
// disconnected, a passive GossipSyncer, if any, will take its place.
541
func (m *SyncManager) removeGossipSyncer(peer route.Vertex) {
3✔
542
        m.syncersMu.Lock()
3✔
543
        defer m.syncersMu.Unlock()
3✔
544

3✔
545
        s, ok := m.gossipSyncer(peer)
3✔
546
        if !ok {
5✔
547
                return
2✔
548
        }
2✔
549

550
        log.Infof("Removing GossipSyncer for peer=%v", peer)
3✔
551

3✔
552
        // We'll stop the GossipSyncer for the disconnected peer in a goroutine
3✔
553
        // to prevent blocking the SyncManager.
3✔
554
        go s.Stop()
3✔
555

3✔
556
        // If it's a non-active syncer, then we can just exit now.
3✔
557
        if _, ok := m.inactiveSyncers[peer]; ok {
6✔
558
                delete(m.inactiveSyncers, peer)
3✔
559
                return
3✔
560
        }
3✔
561

562
        // If it's a pinned syncer, then we can just exit as this doesn't
563
        // affect our active syncer count.
564
        if _, ok := m.pinnedActiveSyncers[peer]; ok {
6✔
565
                delete(m.pinnedActiveSyncers, peer)
3✔
566
                return
3✔
567
        }
3✔
568

569
        // Otherwise, we'll need find a new one to replace it, if any.
570
        delete(m.activeSyncers, peer)
3✔
571
        newActiveSyncer := chooseRandomSyncer(
3✔
572
                m.inactiveSyncers, m.transitionPassiveSyncer,
3✔
573
        )
3✔
574
        if newActiveSyncer == nil {
6✔
575
                return
3✔
576
        }
3✔
577

578
        log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)",
3✔
579
                peer, newActiveSyncer.cfg.peerPub)
3✔
580
}
581

582
// rotateActiveSyncerCandidate rotates a single active syncer. In order to
583
// achieve this, the active syncer must be in a chansSynced state in order to
584
// process the sync transition.
585
func (m *SyncManager) rotateActiveSyncerCandidate() {
×
586
        m.syncersMu.Lock()
×
587
        defer m.syncersMu.Unlock()
×
588

×
589
        // If we couldn't find an eligible active syncer to rotate, we can
×
590
        // return early.
×
591
        activeSyncer := chooseRandomSyncer(m.activeSyncers, nil)
×
592
        if activeSyncer == nil {
×
593
                log.Debug("No eligible active syncer to rotate")
×
594
                return
×
595
        }
×
596

597
        // Similarly, if we don't have a candidate to rotate with, we can return
598
        // early as well.
599
        candidate := chooseRandomSyncer(m.inactiveSyncers, nil)
×
600
        if candidate == nil {
×
601
                log.Debug("No eligible candidate to rotate active syncer")
×
602
                return
×
603
        }
×
604

605
        // Otherwise, we'll attempt to transition each syncer to their
606
        // respective new sync type.
607
        log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)",
×
608
                activeSyncer.cfg.peerPub, candidate.cfg.peerPub)
×
609

×
610
        if err := m.transitionActiveSyncer(activeSyncer); err != nil {
×
611
                log.Errorf("Unable to transition active GossipSyncer(%x): %v",
×
612
                        activeSyncer.cfg.peerPub, err)
×
613
                return
×
614
        }
×
615

616
        if err := m.transitionPassiveSyncer(candidate); err != nil {
×
617
                log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
×
618
                        activeSyncer.cfg.peerPub, err)
×
619
                return
×
620
        }
×
621
}
622

623
// transitionActiveSyncer transitions an active syncer to a passive one.
624
//
625
// NOTE: This must be called with the syncersMu lock held.
626
func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error {
×
627
        log.Debugf("Transitioning active GossipSyncer(%x) to passive",
×
628
                s.cfg.peerPub)
×
629

×
630
        if err := s.ProcessSyncTransition(PassiveSync); err != nil {
×
631
                return err
×
632
        }
×
633

634
        delete(m.activeSyncers, s.cfg.peerPub)
×
635
        m.inactiveSyncers[s.cfg.peerPub] = s
×
636

×
637
        return nil
×
638
}
639

640
// transitionPassiveSyncer transitions a passive syncer to an active one.
641
//
642
// NOTE: This must be called with the syncersMu lock held.
643
func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error {
3✔
644
        log.Debugf("Transitioning passive GossipSyncer(%x) to active",
3✔
645
                s.cfg.peerPub)
3✔
646

3✔
647
        if err := s.ProcessSyncTransition(ActiveSync); err != nil {
3✔
648
                return err
×
649
        }
×
650

651
        delete(m.inactiveSyncers, s.cfg.peerPub)
3✔
652
        m.activeSyncers[s.cfg.peerPub] = s
3✔
653

3✔
654
        return nil
3✔
655
}
656

657
// forceHistoricalSync chooses a syncer with a remote peer at random and forces
658
// a historical sync with it.
659
func (m *SyncManager) forceHistoricalSync() *GossipSyncer {
×
660
        m.syncersMu.Lock()
×
661
        defer m.syncersMu.Unlock()
×
662

×
663
        // We'll sample from both sets of active and inactive syncers in the
×
664
        // event that we don't have any inactive syncers.
×
665
        return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
×
666
                return s.historicalSync()
×
667
        })
×
668
}
669

670
// chooseRandomSyncer iterates through the set of syncers given and returns the
671
// first one which was able to successfully perform the action enclosed in the
672
// function closure.
673
//
674
// NOTE: It's possible for a nil value to be returned if there are no eligible
675
// candidate syncers.
676
func chooseRandomSyncer(syncers map[route.Vertex]*GossipSyncer,
677
        action func(*GossipSyncer) error) *GossipSyncer {
3✔
678

3✔
679
        for _, s := range syncers {
6✔
680
                // Only syncers in a chansSynced state are viable for sync
3✔
681
                // transitions, so skip any that aren't.
3✔
682
                if s.syncState() != chansSynced {
3✔
683
                        continue
×
684
                }
685

686
                if action != nil {
6✔
687
                        if err := action(s); err != nil {
3✔
688
                                log.Debugf("Skipping eligible candidate "+
×
689
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
690
                                        err)
×
691
                                continue
×
692
                        }
693
                }
694

695
                return s
3✔
696
        }
697

698
        return nil
3✔
699
}
700

701
// InitSyncState is called by outside sub-systems when a connection is
702
// established to a new peer that understands how to perform channel range
703
// queries. We'll allocate a new GossipSyncer for it, and start any goroutines
704
// needed to handle new queries. The first GossipSyncer registered with the
705
// SyncManager will attempt a historical sync to ensure we have as much of the
706
// public channel graph as possible.
707
//
708
// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer.
709
func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error {
3✔
710
        done := make(chan struct{})
3✔
711

3✔
712
        select {
3✔
713
        case m.newSyncers <- &newSyncer{
714
                peer:     peer,
715
                doneChan: done,
716
        }:
3✔
717
        case <-m.quit:
×
718
                return ErrSyncManagerExiting
×
719
        }
720

721
        select {
3✔
722
        case <-done:
3✔
723
                return nil
3✔
724
        case <-m.quit:
×
725
                return ErrSyncManagerExiting
×
726
        }
727
}
728

729
// PruneSyncState is called by outside sub-systems once a peer that we were
730
// previously connected to has been disconnected. In this case we can stop the
731
// existing GossipSyncer assigned to the peer and free up resources.
732
func (m *SyncManager) PruneSyncState(peer route.Vertex) {
3✔
733
        done := make(chan struct{})
3✔
734

3✔
735
        // We avoid returning an error when the SyncManager is stopped since the
3✔
736
        // GossipSyncer will be stopped then anyway.
3✔
737
        select {
3✔
738
        case m.staleSyncers <- &staleSyncer{
739
                peer:     peer,
740
                doneChan: done,
741
        }:
3✔
742
        case <-m.quit:
×
743
                return
×
744
        }
745

746
        select {
3✔
747
        case <-done:
3✔
748
        case <-m.quit:
×
749
        }
750
}
751

752
// GossipSyncer returns the associated gossip syncer of a peer. The boolean
753
// returned signals whether there exists a gossip syncer for the peer.
754
func (m *SyncManager) GossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
3✔
755
        m.syncersMu.Lock()
3✔
756
        defer m.syncersMu.Unlock()
3✔
757
        return m.gossipSyncer(peer)
3✔
758
}
3✔
759

760
// gossipSyncer returns the associated gossip syncer of a peer. The boolean
761
// returned signals whether there exists a gossip syncer for the peer.
762
func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
3✔
763
        syncer, ok := m.inactiveSyncers[peer]
3✔
764
        if ok {
6✔
765
                return syncer, true
3✔
766
        }
3✔
767
        syncer, ok = m.activeSyncers[peer]
3✔
768
        if ok {
6✔
769
                return syncer, true
3✔
770
        }
3✔
771
        syncer, ok = m.pinnedActiveSyncers[peer]
3✔
772
        if ok {
6✔
773
                return syncer, true
3✔
774
        }
3✔
775
        return nil, false
3✔
776
}
777

778
// GossipSyncers returns all of the currently initialized gossip syncers.
779
func (m *SyncManager) GossipSyncers() map[route.Vertex]*GossipSyncer {
3✔
780
        m.syncersMu.Lock()
3✔
781
        defer m.syncersMu.Unlock()
3✔
782
        return m.gossipSyncers()
3✔
783
}
3✔
784

785
// gossipSyncers returns all of the currently initialized gossip syncers.
786
func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer {
3✔
787
        numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers)
3✔
788
        syncers := make(map[route.Vertex]*GossipSyncer, numSyncers)
3✔
789

3✔
790
        for _, syncer := range m.inactiveSyncers {
6✔
791
                syncers[syncer.cfg.peerPub] = syncer
3✔
792
        }
3✔
793
        for _, syncer := range m.activeSyncers {
6✔
794
                syncers[syncer.cfg.peerPub] = syncer
3✔
795
        }
3✔
796

797
        return syncers
3✔
798
}
799

800
// markGraphSynced allows us to report that the initial historical sync has
801
// completed.
802
func (m *SyncManager) markGraphSynced() {
3✔
803
        atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
3✔
804
}
3✔
805

806
// IsGraphSynced determines whether we've completed our initial historical sync.
807
// The initial historical sync is done to ensure we've ingested as much of the
808
// public graph as possible.
809
func (m *SyncManager) IsGraphSynced() bool {
3✔
810
        return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
3✔
811
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc