• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13051234467

30 Jan 2025 11:19AM UTC coverage: 49.289% (-9.5%) from 58.782%
13051234467

Pull #9459

github

ziggie1984
docs: add release-notes.
Pull Request #9459: invoices: amp invoices bugfix.

27 of 54 new or added lines in 4 files covered. (50.0%)

27265 existing lines in 434 files now uncovered.

100654 of 204212 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.88
/discovery/sync_manager.go
1
package discovery
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/lightningnetwork/lnd/lnpeer"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
        "github.com/lightningnetwork/lnd/routing/route"
13
        "github.com/lightningnetwork/lnd/ticker"
14
)
15

16
const (
17
        // DefaultSyncerRotationInterval is the default interval in which we'll
18
        // rotate a single active syncer.
19
        DefaultSyncerRotationInterval = 20 * time.Minute
20

21
        // DefaultHistoricalSyncInterval is the default interval in which we'll
22
        // force a historical sync to ensure we have as much of the public
23
        // network as possible.
24
        DefaultHistoricalSyncInterval = time.Hour
25

26
        // filterSemaSize is the capacity of gossipFilterSema.
27
        filterSemaSize = 5
28
)
29

30
var (
31
        // ErrSyncManagerExiting is an error returned when we attempt to
32
        // start/stop a gossip syncer for a connected/disconnected peer, but the
33
        // SyncManager has already been stopped.
34
        ErrSyncManagerExiting = errors.New("sync manager exiting")
35
)
36

37
// newSyncer in an internal message we'll use within the SyncManager to signal
38
// that we should create a GossipSyncer for a newly connected peer.
39
type newSyncer struct {
40
        // peer is the newly connected peer.
41
        peer lnpeer.Peer
42

43
        // doneChan serves as a signal to the caller that the SyncManager's
44
        // internal state correctly reflects the stale active syncer.
45
        doneChan chan struct{}
46
}
47

48
// staleSyncer is an internal message we'll use within the SyncManager to signal
49
// that a peer has disconnected and its GossipSyncer should be removed.
50
type staleSyncer struct {
51
        // peer is the peer that has disconnected.
52
        peer route.Vertex
53

54
        // doneChan serves as a signal to the caller that the SyncManager's
55
        // internal state correctly reflects the stale active syncer. This is
56
        // needed to ensure we always create a new syncer for a flappy peer
57
        // after they disconnect if they happened to be an active syncer.
58
        doneChan chan struct{}
59
}
60

61
// SyncManagerCfg contains all of the dependencies required for the SyncManager
62
// to carry out its duties.
63
type SyncManagerCfg struct {
64
        // ChainHash is a hash that indicates the specific network of the active
65
        // chain.
66
        ChainHash chainhash.Hash
67

68
        // ChanSeries is an interface that provides access to a time series view
69
        // of the current known channel graph. Each GossipSyncer enabled peer
70
        // will utilize this in order to create and respond to channel graph
71
        // time series queries.
72
        ChanSeries ChannelGraphTimeSeries
73

74
        // NumActiveSyncers is the number of peers for which we should have
75
        // active syncers with. After reaching NumActiveSyncers, any future
76
        // gossip syncers will be passive.
77
        NumActiveSyncers int
78

79
        // NoTimestampQueries will prevent the GossipSyncer from querying
80
        // timestamps of announcement messages from the peer and from responding
81
        // to timestamp queries
82
        NoTimestampQueries bool
83

84
        // RotateTicker is a ticker responsible for notifying the SyncManager
85
        // when it should rotate its active syncers. A single active syncer with
86
        // a chansSynced state will be exchanged for a passive syncer in order
87
        // to ensure we don't keep syncing with the same peers.
88
        RotateTicker ticker.Ticker
89

90
        // HistoricalSyncTicker is a ticker responsible for notifying the
91
        // SyncManager when it should attempt a historical sync with a gossip
92
        // sync peer.
93
        HistoricalSyncTicker ticker.Ticker
94

95
        // IgnoreHistoricalFilters will prevent syncers from replying with
96
        // historical data when the remote peer sets a gossip_timestamp_range.
97
        // This prevents ranges with old start times from causing us to dump the
98
        // graph on connect.
99
        IgnoreHistoricalFilters bool
100

101
        // BestHeight returns the latest height known of the chain.
102
        BestHeight func() uint32
103

104
        // PinnedSyncers is a set of peers that will always transition to
105
        // ActiveSync upon connection. These peers will never transition to
106
        // PassiveSync.
107
        PinnedSyncers PinnedSyncers
108

109
        // IsStillZombieChannel takes the timestamps of the latest channel
110
        // updates for a channel and returns true if the channel should be
111
        // considered a zombie based on these timestamps.
112
        IsStillZombieChannel func(time.Time, time.Time) bool
113
}
114

115
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
116
// for peers currently connected. When a new peer is connected, the manager will
117
// create its accompanying gossip syncer and determine whether it should have an
118
// ActiveSync or PassiveSync sync type based on how many other gossip syncers
119
// are currently active. Any ActiveSync gossip syncers are started in a
120
// round-robin manner to ensure we're not syncing with multiple peers at the
121
// same time. The first GossipSyncer registered with the SyncManager will
122
// attempt a historical sync to ensure we have as much of the public channel
123
// graph as possible.
124
type SyncManager struct {
125
        // initialHistoricalSyncCompleted serves as a barrier when initializing
126
        // new active GossipSyncers. If 0, the initial historical sync has not
127
        // completed, so we'll defer initializing any active GossipSyncers. If
128
        // 1, then we can transition the GossipSyncer immediately. We set up
129
        // this barrier to ensure we have most of the graph before attempting to
130
        // accept new updates at tip.
131
        //
132
        // NOTE: This must be used atomically.
133
        initialHistoricalSyncCompleted int32
134

135
        start sync.Once
136
        stop  sync.Once
137

138
        cfg SyncManagerCfg
139

140
        // newSyncers is a channel we'll use to process requests to create
141
        // GossipSyncers for newly connected peers.
142
        newSyncers chan *newSyncer
143

144
        // staleSyncers is a channel we'll use to process requests to tear down
145
        // GossipSyncers for disconnected peers.
146
        staleSyncers chan *staleSyncer
147

148
        // syncersMu guards the read and write access to the activeSyncers and
149
        // inactiveSyncers maps below.
150
        syncersMu sync.Mutex
151

152
        // activeSyncers is the set of all syncers for which we are currently
153
        // receiving graph updates from. The number of possible active syncers
154
        // is bounded by NumActiveSyncers.
155
        activeSyncers map[route.Vertex]*GossipSyncer
156

157
        // inactiveSyncers is the set of all syncers for which we are not
158
        // currently receiving new graph updates from.
159
        inactiveSyncers map[route.Vertex]*GossipSyncer
160

161
        // pinnedActiveSyncers is the set of all syncers which are pinned into
162
        // an active sync. Pinned peers performan an initial historical sync on
163
        // each connection and will continue to receive graph updates for the
164
        // duration of the connection.
165
        pinnedActiveSyncers map[route.Vertex]*GossipSyncer
166

167
        // gossipFilterSema contains semaphores for the gossip timestamp
168
        // queries.
169
        gossipFilterSema chan struct{}
170

171
        wg   sync.WaitGroup
172
        quit chan struct{}
173
}
174

175
// newSyncManager constructs a new SyncManager backed by the given config.
176
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
3✔
177

3✔
178
        filterSema := make(chan struct{}, filterSemaSize)
3✔
179
        for i := 0; i < filterSemaSize; i++ {
6✔
180
                filterSema <- struct{}{}
3✔
181
        }
3✔
182

183
        return &SyncManager{
3✔
184
                cfg:          *cfg,
3✔
185
                newSyncers:   make(chan *newSyncer),
3✔
186
                staleSyncers: make(chan *staleSyncer),
3✔
187
                activeSyncers: make(
3✔
188
                        map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers,
3✔
189
                ),
3✔
190
                inactiveSyncers: make(map[route.Vertex]*GossipSyncer),
3✔
191
                pinnedActiveSyncers: make(
3✔
192
                        map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers),
3✔
193
                ),
3✔
194
                gossipFilterSema: filterSema,
3✔
195
                quit:             make(chan struct{}),
3✔
196
        }
3✔
197
}
198

199
// Start starts the SyncManager in order to properly carry out its duties.
200
func (m *SyncManager) Start() {
3✔
201
        m.start.Do(func() {
6✔
202
                m.wg.Add(1)
3✔
203
                go m.syncerHandler()
3✔
204
        })
3✔
205
}
206

207
// Stop stops the SyncManager from performing its duties.
208
func (m *SyncManager) Stop() {
3✔
209
        m.stop.Do(func() {
6✔
210
                log.Debugf("SyncManager is stopping")
3✔
211
                defer log.Debugf("SyncManager stopped")
3✔
212

3✔
213
                close(m.quit)
3✔
214
                m.wg.Wait()
3✔
215

3✔
216
                for _, syncer := range m.inactiveSyncers {
4✔
217
                        syncer.Stop()
1✔
218
                }
1✔
219
                for _, syncer := range m.activeSyncers {
6✔
220
                        syncer.Stop()
3✔
221
                }
3✔
222
        })
223
}
224

225
// syncerHandler is the SyncManager's main event loop responsible for:
226
//
227
// 1. Creating and tearing down GossipSyncers for connected/disconnected peers.
228

229
// 2. Finding new peers to receive graph updates from to ensure we don't only
230
//    receive them from the same set of peers.
231

232
//  3. Finding new peers to force a historical sync with to ensure we have as
233
//     much of the public network as possible.
234
//
235
// NOTE: This must be run as a goroutine.
236
func (m *SyncManager) syncerHandler() {
3✔
237
        defer m.wg.Done()
3✔
238

3✔
239
        m.cfg.RotateTicker.Resume()
3✔
240
        defer m.cfg.RotateTicker.Stop()
3✔
241

3✔
242
        defer m.cfg.HistoricalSyncTicker.Stop()
3✔
243

3✔
244
        var (
3✔
245
                // initialHistoricalSyncer is the syncer we are currently
3✔
246
                // performing an initial historical sync with.
3✔
247
                initialHistoricalSyncer *GossipSyncer
3✔
248

3✔
249
                // initialHistoricalSyncSignal is a signal that will fire once
3✔
250
                // the initial historical sync has been completed. This is
3✔
251
                // crucial to ensure that another historical sync isn't
3✔
252
                // attempted just because the initialHistoricalSyncer was
3✔
253
                // disconnected.
3✔
254
                initialHistoricalSyncSignal chan struct{}
3✔
255
        )
3✔
256

3✔
257
        setInitialHistoricalSyncer := func(s *GossipSyncer) {
6✔
258
                initialHistoricalSyncer = s
3✔
259
                initialHistoricalSyncSignal = s.ResetSyncedSignal()
3✔
260

3✔
261
                // Restart the timer for our new historical sync peer. This will
3✔
262
                // ensure that all initial syncers receive an equivalent
3✔
263
                // duration before attempting the next sync. Without doing so we
3✔
264
                // might attempt two historical sync back to back if a peer
3✔
265
                // disconnects just before the ticker fires.
3✔
266
                m.cfg.HistoricalSyncTicker.Pause()
3✔
267
                m.cfg.HistoricalSyncTicker.Resume()
3✔
268
        }
3✔
269

270
        for {
6✔
271
                select {
3✔
272
                // A new peer has been connected, so we'll create its
273
                // accompanying GossipSyncer.
274
                case newSyncer := <-m.newSyncers:
3✔
275
                        // If we already have a syncer, then we'll exit early as
3✔
276
                        // we don't want to override it.
3✔
277
                        if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok {
3✔
278
                                close(newSyncer.doneChan)
×
279
                                continue
×
280
                        }
281

282
                        s := m.createGossipSyncer(newSyncer.peer)
3✔
283

3✔
284
                        isPinnedSyncer := m.isPinnedSyncer(s)
3✔
285

3✔
286
                        // attemptHistoricalSync determines whether we should
3✔
287
                        // attempt an initial historical sync when a new peer
3✔
288
                        // connects.
3✔
289
                        attemptHistoricalSync := false
3✔
290

3✔
291
                        m.syncersMu.Lock()
3✔
292
                        switch {
3✔
293
                        // For pinned syncers, we will immediately transition
294
                        // the peer into an active (pinned) sync state.
295
                        case isPinnedSyncer:
3✔
296
                                attemptHistoricalSync = true
3✔
297
                                s.setSyncType(PinnedSync)
3✔
298
                                s.setSyncState(syncerIdle)
3✔
299
                                m.pinnedActiveSyncers[s.cfg.peerPub] = s
3✔
300

301
                        // Regardless of whether the initial historical sync
302
                        // has completed, we'll re-trigger a historical sync if
303
                        // we no longer have any syncers. This might be
304
                        // necessary if we lost all our peers at one point, and
305
                        // now we finally have one again.
306
                        case len(m.activeSyncers) == 0 &&
307
                                len(m.inactiveSyncers) == 0:
3✔
308

3✔
309
                                attemptHistoricalSync =
3✔
310
                                        m.cfg.NumActiveSyncers > 0
3✔
311
                                fallthrough
3✔
312

313
                        // If we've exceeded our total number of active syncers,
314
                        // we'll initialize this GossipSyncer as passive.
315
                        case len(m.activeSyncers) >= m.cfg.NumActiveSyncers:
3✔
316
                                fallthrough
3✔
317

318
                        // If the initial historical sync has yet to complete,
319
                        // then we'll declare it as passive and attempt to
320
                        // transition it when the initial historical sync
321
                        // completes.
322
                        case !m.IsGraphSynced():
3✔
323
                                s.setSyncType(PassiveSync)
3✔
324
                                m.inactiveSyncers[s.cfg.peerPub] = s
3✔
325

326
                        // The initial historical sync has completed, so we can
327
                        // immediately start the GossipSyncer as active.
328
                        default:
3✔
329
                                s.setSyncType(ActiveSync)
3✔
330
                                m.activeSyncers[s.cfg.peerPub] = s
3✔
331
                        }
332
                        m.syncersMu.Unlock()
3✔
333

3✔
334
                        s.Start()
3✔
335

3✔
336
                        // Once we create the GossipSyncer, we'll signal to the
3✔
337
                        // caller that they can proceed since the SyncManager's
3✔
338
                        // internal state has been updated.
3✔
339
                        close(newSyncer.doneChan)
3✔
340

3✔
341
                        // We'll force a historical sync with the first peer we
3✔
342
                        // connect to, to ensure we get as much of the graph as
3✔
343
                        // possible.
3✔
344
                        if !attemptHistoricalSync {
6✔
345
                                continue
3✔
346
                        }
347

348
                        log.Debugf("Attempting initial historical sync with "+
3✔
349
                                "GossipSyncer(%x)", s.cfg.peerPub)
3✔
350

3✔
351
                        if err := s.historicalSync(); err != nil {
3✔
352
                                log.Errorf("Unable to attempt initial "+
×
353
                                        "historical sync with "+
×
354
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
355
                                        err)
×
356
                                continue
×
357
                        }
358

359
                        // Once the historical sync has started, we'll get a
360
                        // keep track of the corresponding syncer to properly
361
                        // handle disconnects. We'll also use a signal to know
362
                        // when the historical sync completed.
363
                        if !isPinnedSyncer {
6✔
364
                                setInitialHistoricalSyncer(s)
3✔
365
                        }
3✔
366

367
                // An existing peer has disconnected, so we'll tear down its
368
                // corresponding GossipSyncer.
369
                case staleSyncer := <-m.staleSyncers:
3✔
370
                        // Once the corresponding GossipSyncer has been stopped
3✔
371
                        // and removed, we'll signal to the caller that they can
3✔
372
                        // proceed since the SyncManager's internal state has
3✔
373
                        // been updated.
3✔
374
                        m.removeGossipSyncer(staleSyncer.peer)
3✔
375
                        close(staleSyncer.doneChan)
3✔
376

3✔
377
                        // If we don't have an initialHistoricalSyncer, or we do
3✔
378
                        // but it is not the peer being disconnected, then we
3✔
379
                        // have nothing left to do and can proceed.
3✔
380
                        switch {
3✔
381
                        case initialHistoricalSyncer == nil:
3✔
382
                                fallthrough
3✔
383
                        case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub:
3✔
384
                                fallthrough
3✔
385
                        case m.cfg.NumActiveSyncers == 0:
3✔
386
                                continue
3✔
387
                        }
388

389
                        // Otherwise, our initialHistoricalSyncer corresponds to
390
                        // the peer being disconnected, so we'll have to find a
391
                        // replacement.
UNCOV
392
                        log.Debug("Finding replacement for initial " +
×
UNCOV
393
                                "historical sync")
×
UNCOV
394

×
UNCOV
395
                        s := m.forceHistoricalSync()
×
UNCOV
396
                        if s == nil {
×
397
                                log.Debug("No eligible replacement found " +
×
398
                                        "for initial historical sync")
×
399
                                continue
×
400
                        }
401

UNCOV
402
                        log.Debugf("Replaced initial historical "+
×
UNCOV
403
                                "GossipSyncer(%v) with GossipSyncer(%x)",
×
UNCOV
404
                                staleSyncer.peer, s.cfg.peerPub)
×
UNCOV
405

×
UNCOV
406
                        setInitialHistoricalSyncer(s)
×
407

408
                // Our initial historical sync signal has completed, so we'll
409
                // nil all of the relevant fields as they're no longer needed.
410
                case <-initialHistoricalSyncSignal:
3✔
411
                        initialHistoricalSyncer = nil
3✔
412
                        initialHistoricalSyncSignal = nil
3✔
413

3✔
414
                        log.Debug("Initial historical sync completed")
3✔
415

3✔
416
                        // With the initial historical sync complete, we can
3✔
417
                        // begin receiving new graph updates at tip. We'll
3✔
418
                        // determine whether we can have any more active
3✔
419
                        // GossipSyncers. If we do, we'll randomly select some
3✔
420
                        // that are currently passive to transition.
3✔
421
                        m.syncersMu.Lock()
3✔
422
                        numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers)
3✔
423
                        if numActiveLeft <= 0 {
3✔
424
                                m.syncersMu.Unlock()
×
425
                                continue
×
426
                        }
427

428
                        // We may not even have enough inactive syncers to be
429
                        // transitted. In that case, we will transit all the
430
                        // inactive syncers.
431
                        if len(m.inactiveSyncers) < numActiveLeft {
6✔
432
                                numActiveLeft = len(m.inactiveSyncers)
3✔
433
                        }
3✔
434

435
                        log.Debugf("Attempting to transition %v passive "+
3✔
436
                                "GossipSyncers to active", numActiveLeft)
3✔
437

3✔
438
                        for i := 0; i < numActiveLeft; i++ {
6✔
439
                                chooseRandomSyncer(
3✔
440
                                        m.inactiveSyncers, m.transitionPassiveSyncer,
3✔
441
                                )
3✔
442
                        }
3✔
443

444
                        m.syncersMu.Unlock()
3✔
445

446
                // Our RotateTicker has ticked, so we'll attempt to rotate a
447
                // single active syncer with a passive one.
UNCOV
448
                case <-m.cfg.RotateTicker.Ticks():
×
UNCOV
449
                        m.rotateActiveSyncerCandidate()
×
450

451
                // Our HistoricalSyncTicker has ticked, so we'll randomly select
452
                // a peer and force a historical sync with them.
UNCOV
453
                case <-m.cfg.HistoricalSyncTicker.Ticks():
×
UNCOV
454
                        // To be extra cautious, gate the forceHistoricalSync
×
UNCOV
455
                        // call such that it can only execute if we are
×
UNCOV
456
                        // configured to have a non-zero number of sync peers.
×
UNCOV
457
                        // This way even if the historical sync ticker manages
×
UNCOV
458
                        // to tick we can be sure that a historical sync won't
×
UNCOV
459
                        // accidentally begin.
×
UNCOV
460
                        if m.cfg.NumActiveSyncers == 0 {
×
UNCOV
461
                                continue
×
462
                        }
463

464
                        // If we don't have a syncer available we have nothing
465
                        // to do.
UNCOV
466
                        s := m.forceHistoricalSync()
×
UNCOV
467
                        if s == nil {
×
468
                                continue
×
469
                        }
470

471
                        // If we've already completed a historical sync, we'll
472
                        // skip setting the initial historical syncer.
UNCOV
473
                        if m.IsGraphSynced() {
×
474
                                continue
×
475
                        }
476

477
                        // Otherwise, we'll track the peer we've performed a
478
                        // historical sync with in order to handle the case
479
                        // where our previous historical sync peer did not
480
                        // respond to our queries and we haven't ingested as
481
                        // much of the graph as we should.
UNCOV
482
                        setInitialHistoricalSyncer(s)
×
483

484
                case <-m.quit:
3✔
485
                        return
3✔
486
                }
487
        }
488
}
489

490
// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned
491
// sync peers.
492
func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
3✔
493
        _, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub]
3✔
494
        return isPinnedSyncer
3✔
495
}
3✔
496

497
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
498
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
3✔
499
        nodeID := route.Vertex(peer.PubKey())
3✔
500
        log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:])
3✔
501

3✔
502
        encoding := lnwire.EncodingSortedPlain
3✔
503
        s := newGossipSyncer(gossipSyncerCfg{
3✔
504
                chainHash:     m.cfg.ChainHash,
3✔
505
                peerPub:       nodeID,
3✔
506
                channelSeries: m.cfg.ChanSeries,
3✔
507
                encodingType:  encoding,
3✔
508
                chunkSize:     encodingTypeToChunkSize[encoding],
3✔
509
                batchSize:     requestBatchSize,
3✔
510
                sendToPeer: func(msgs ...lnwire.Message) error {
6✔
511
                        return peer.SendMessageLazy(false, msgs...)
3✔
512
                },
3✔
513
                sendToPeerSync: func(msgs ...lnwire.Message) error {
3✔
514
                        return peer.SendMessageLazy(true, msgs...)
3✔
515
                },
3✔
516
                ignoreHistoricalFilters:   m.cfg.IgnoreHistoricalFilters,
517
                maxUndelayedQueryReplies:  DefaultMaxUndelayedQueryReplies,
518
                delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval,
519
                bestHeight:                m.cfg.BestHeight,
520
                markGraphSynced:           m.markGraphSynced,
521
                maxQueryChanRangeReplies:  maxQueryChanRangeReplies,
522
                noTimestampQueryOption:    m.cfg.NoTimestampQueries,
523
                isStillZombieChannel:      m.cfg.IsStillZombieChannel,
524
        }, m.gossipFilterSema)
525

526
        // Gossip syncers are initialized by default in a PassiveSync type
527
        // and chansSynced state so that they can reply to any peer queries or
528
        // handle any sync transitions.
529
        s.setSyncState(chansSynced)
3✔
530
        s.setSyncType(PassiveSync)
3✔
531

3✔
532
        log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%x",
3✔
533
                s.syncState(), s.SyncType(), peer.PubKey())
3✔
534

3✔
535
        return s
3✔
536
}
537

538
// removeGossipSyncer removes all internal references to the disconnected peer's
539
// GossipSyncer and stops it. In the event of an active GossipSyncer being
540
// disconnected, a passive GossipSyncer, if any, will take its place.
541
func (m *SyncManager) removeGossipSyncer(peer route.Vertex) {
3✔
542
        m.syncersMu.Lock()
3✔
543
        defer m.syncersMu.Unlock()
3✔
544

3✔
545
        s, ok := m.gossipSyncer(peer)
3✔
546
        if !ok {
3✔
547
                return
×
548
        }
×
549

550
        log.Infof("Removing GossipSyncer for peer=%v", peer)
3✔
551

3✔
552
        // We'll stop the GossipSyncer for the disconnected peer in a goroutine
3✔
553
        // to prevent blocking the SyncManager.
3✔
554
        go s.Stop()
3✔
555

3✔
556
        // If it's a non-active syncer, then we can just exit now.
3✔
557
        if _, ok := m.inactiveSyncers[peer]; ok {
6✔
558
                delete(m.inactiveSyncers, peer)
3✔
559
                return
3✔
560
        }
3✔
561

562
        // If it's a pinned syncer, then we can just exit as this doesn't
563
        // affect our active syncer count.
564
        if _, ok := m.pinnedActiveSyncers[peer]; ok {
6✔
565
                delete(m.pinnedActiveSyncers, peer)
3✔
566
                return
3✔
567
        }
3✔
568

569
        // Otherwise, we'll need find a new one to replace it, if any.
570
        delete(m.activeSyncers, peer)
3✔
571
        newActiveSyncer := chooseRandomSyncer(
3✔
572
                m.inactiveSyncers, m.transitionPassiveSyncer,
3✔
573
        )
3✔
574
        if newActiveSyncer == nil {
6✔
575
                return
3✔
576
        }
3✔
577

578
        log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)",
3✔
579
                peer, newActiveSyncer.cfg.peerPub)
3✔
580
}
581

582
// rotateActiveSyncerCandidate rotates a single active syncer. In order to
583
// achieve this, the active syncer must be in a chansSynced state in order to
584
// process the sync transition.
UNCOV
585
func (m *SyncManager) rotateActiveSyncerCandidate() {
×
UNCOV
586
        m.syncersMu.Lock()
×
UNCOV
587
        defer m.syncersMu.Unlock()
×
UNCOV
588

×
UNCOV
589
        // If we couldn't find an eligible active syncer to rotate, we can
×
UNCOV
590
        // return early.
×
UNCOV
591
        activeSyncer := chooseRandomSyncer(m.activeSyncers, nil)
×
UNCOV
592
        if activeSyncer == nil {
×
593
                log.Debug("No eligible active syncer to rotate")
×
594
                return
×
595
        }
×
596

597
        // Similarly, if we don't have a candidate to rotate with, we can return
598
        // early as well.
UNCOV
599
        candidate := chooseRandomSyncer(m.inactiveSyncers, nil)
×
UNCOV
600
        if candidate == nil {
×
UNCOV
601
                log.Debug("No eligible candidate to rotate active syncer")
×
UNCOV
602
                return
×
UNCOV
603
        }
×
604

605
        // Otherwise, we'll attempt to transition each syncer to their
606
        // respective new sync type.
UNCOV
607
        log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)",
×
UNCOV
608
                activeSyncer.cfg.peerPub, candidate.cfg.peerPub)
×
UNCOV
609

×
UNCOV
610
        if err := m.transitionActiveSyncer(activeSyncer); err != nil {
×
611
                log.Errorf("Unable to transition active GossipSyncer(%x): %v",
×
612
                        activeSyncer.cfg.peerPub, err)
×
613
                return
×
614
        }
×
615

UNCOV
616
        if err := m.transitionPassiveSyncer(candidate); err != nil {
×
617
                log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
×
618
                        activeSyncer.cfg.peerPub, err)
×
619
                return
×
620
        }
×
621
}
622

623
// transitionActiveSyncer transitions an active syncer to a passive one.
624
//
625
// NOTE: This must be called with the syncersMu lock held.
UNCOV
626
func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error {
×
UNCOV
627
        log.Debugf("Transitioning active GossipSyncer(%x) to passive",
×
UNCOV
628
                s.cfg.peerPub)
×
UNCOV
629

×
UNCOV
630
        if err := s.ProcessSyncTransition(PassiveSync); err != nil {
×
631
                return err
×
632
        }
×
633

UNCOV
634
        delete(m.activeSyncers, s.cfg.peerPub)
×
UNCOV
635
        m.inactiveSyncers[s.cfg.peerPub] = s
×
UNCOV
636

×
UNCOV
637
        return nil
×
638
}
639

640
// transitionPassiveSyncer transitions a passive syncer to an active one.
641
//
642
// NOTE: This must be called with the syncersMu lock held.
643
func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error {
3✔
644
        log.Debugf("Transitioning passive GossipSyncer(%x) to active",
3✔
645
                s.cfg.peerPub)
3✔
646

3✔
647
        if err := s.ProcessSyncTransition(ActiveSync); err != nil {
3✔
648
                return err
×
649
        }
×
650

651
        delete(m.inactiveSyncers, s.cfg.peerPub)
3✔
652
        m.activeSyncers[s.cfg.peerPub] = s
3✔
653

3✔
654
        return nil
3✔
655
}
656

657
// forceHistoricalSync chooses a syncer with a remote peer at random and forces
658
// a historical sync with it.
UNCOV
659
func (m *SyncManager) forceHistoricalSync() *GossipSyncer {
×
UNCOV
660
        m.syncersMu.Lock()
×
UNCOV
661
        defer m.syncersMu.Unlock()
×
UNCOV
662

×
UNCOV
663
        // We'll sample from both sets of active and inactive syncers in the
×
UNCOV
664
        // event that we don't have any inactive syncers.
×
UNCOV
665
        return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
×
UNCOV
666
                return s.historicalSync()
×
UNCOV
667
        })
×
668
}
669

670
// chooseRandomSyncer iterates through the set of syncers given and returns the
671
// first one which was able to successfully perform the action enclosed in the
672
// function closure.
673
//
674
// NOTE: It's possible for a nil value to be returned if there are no eligible
675
// candidate syncers.
676
func chooseRandomSyncer(syncers map[route.Vertex]*GossipSyncer,
677
        action func(*GossipSyncer) error) *GossipSyncer {
3✔
678

3✔
679
        for _, s := range syncers {
6✔
680
                // Only syncers in a chansSynced state are viable for sync
3✔
681
                // transitions, so skip any that aren't.
3✔
682
                if s.syncState() != chansSynced {
3✔
UNCOV
683
                        continue
×
684
                }
685

686
                if action != nil {
6✔
687
                        if err := action(s); err != nil {
3✔
688
                                log.Debugf("Skipping eligible candidate "+
×
689
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
690
                                        err)
×
691
                                continue
×
692
                        }
693
                }
694

695
                return s
3✔
696
        }
697

698
        return nil
3✔
699
}
700

701
// InitSyncState is called by outside sub-systems when a connection is
702
// established to a new peer that understands how to perform channel range
703
// queries. We'll allocate a new GossipSyncer for it, and start any goroutines
704
// needed to handle new queries. The first GossipSyncer registered with the
705
// SyncManager will attempt a historical sync to ensure we have as much of the
706
// public channel graph as possible.
707
//
708
// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer.
709
func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error {
3✔
710
        done := make(chan struct{})
3✔
711

3✔
712
        select {
3✔
713
        case m.newSyncers <- &newSyncer{
714
                peer:     peer,
715
                doneChan: done,
716
        }:
3✔
717
        case <-m.quit:
×
718
                return ErrSyncManagerExiting
×
719
        }
720

721
        select {
3✔
722
        case <-done:
3✔
723
                return nil
3✔
724
        case <-m.quit:
×
725
                return ErrSyncManagerExiting
×
726
        }
727
}
728

729
// PruneSyncState is called by outside sub-systems once a peer that we were
730
// previously connected to has been disconnected. In this case we can stop the
731
// existing GossipSyncer assigned to the peer and free up resources.
732
func (m *SyncManager) PruneSyncState(peer route.Vertex) {
3✔
733
        done := make(chan struct{})
3✔
734

3✔
735
        // We avoid returning an error when the SyncManager is stopped since the
3✔
736
        // GossipSyncer will be stopped then anyway.
3✔
737
        select {
3✔
738
        case m.staleSyncers <- &staleSyncer{
739
                peer:     peer,
740
                doneChan: done,
741
        }:
3✔
742
        case <-m.quit:
×
743
                return
×
744
        }
745

746
        select {
3✔
747
        case <-done:
3✔
748
        case <-m.quit:
×
749
        }
750
}
751

752
// GossipSyncer returns the associated gossip syncer of a peer. The boolean
753
// returned signals whether there exists a gossip syncer for the peer.
754
func (m *SyncManager) GossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
3✔
755
        m.syncersMu.Lock()
3✔
756
        defer m.syncersMu.Unlock()
3✔
757
        return m.gossipSyncer(peer)
3✔
758
}
3✔
759

760
// gossipSyncer returns the associated gossip syncer of a peer. The boolean
761
// returned signals whether there exists a gossip syncer for the peer.
762
func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
3✔
763
        syncer, ok := m.inactiveSyncers[peer]
3✔
764
        if ok {
6✔
765
                return syncer, true
3✔
766
        }
3✔
767
        syncer, ok = m.activeSyncers[peer]
3✔
768
        if ok {
6✔
769
                return syncer, true
3✔
770
        }
3✔
771
        syncer, ok = m.pinnedActiveSyncers[peer]
3✔
772
        if ok {
6✔
773
                return syncer, true
3✔
774
        }
3✔
775
        return nil, false
3✔
776
}
777

778
// GossipSyncers returns all of the currently initialized gossip syncers.
779
func (m *SyncManager) GossipSyncers() map[route.Vertex]*GossipSyncer {
3✔
780
        m.syncersMu.Lock()
3✔
781
        defer m.syncersMu.Unlock()
3✔
782
        return m.gossipSyncers()
3✔
783
}
3✔
784

785
// gossipSyncers returns all of the currently initialized gossip syncers.
786
func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer {
3✔
787
        numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers)
3✔
788
        syncers := make(map[route.Vertex]*GossipSyncer, numSyncers)
3✔
789

3✔
790
        for _, syncer := range m.inactiveSyncers {
6✔
791
                syncers[syncer.cfg.peerPub] = syncer
3✔
792
        }
3✔
793
        for _, syncer := range m.activeSyncers {
6✔
794
                syncers[syncer.cfg.peerPub] = syncer
3✔
795
        }
3✔
796

797
        return syncers
3✔
798
}
799

800
// markGraphSynced allows us to report that the initial historical sync has
801
// completed.
802
func (m *SyncManager) markGraphSynced() {
3✔
803
        atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
3✔
804
}
3✔
805

806
// IsGraphSynced determines whether we've completed our initial historical sync.
807
// The initial historical sync is done to ensure we've ingested as much of the
808
// public graph as possible.
809
func (m *SyncManager) IsGraphSynced() bool {
3✔
810
        return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
3✔
811
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc