• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 10207481183

01 Aug 2024 11:52PM UTC coverage: 58.679% (+0.09%) from 58.591%
10207481183

push

github

web-flow
Merge pull request #8836 from hieblmi/payment-failure-reason-cancel

routing: add payment failure reason `FailureReasonCancel`

7 of 30 new or added lines in 5 files covered. (23.33%)

1662 existing lines in 21 files now uncovered.

125454 of 213798 relevant lines covered (58.68%)

28679.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.69
/discovery/sync_manager.go
1
package discovery
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/lightningnetwork/lnd/lnpeer"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
        "github.com/lightningnetwork/lnd/routing/route"
13
        "github.com/lightningnetwork/lnd/ticker"
14
)
15

16
const (
17
        // DefaultSyncerRotationInterval is the default interval in which we'll
18
        // rotate a single active syncer.
19
        DefaultSyncerRotationInterval = 20 * time.Minute
20

21
        // DefaultHistoricalSyncInterval is the default interval in which we'll
22
        // force a historical sync to ensure we have as much of the public
23
        // network as possible.
24
        DefaultHistoricalSyncInterval = time.Hour
25
)
26

27
var (
28
        // ErrSyncManagerExiting is an error returned when we attempt to
29
        // start/stop a gossip syncer for a connected/disconnected peer, but the
30
        // SyncManager has already been stopped.
31
        ErrSyncManagerExiting = errors.New("sync manager exiting")
32
)
33

34
// newSyncer in an internal message we'll use within the SyncManager to signal
35
// that we should create a GossipSyncer for a newly connected peer.
36
type newSyncer struct {
37
        // peer is the newly connected peer.
38
        peer lnpeer.Peer
39

40
        // doneChan serves as a signal to the caller that the SyncManager's
41
        // internal state correctly reflects the stale active syncer.
42
        doneChan chan struct{}
43
}
44

45
// staleSyncer is an internal message we'll use within the SyncManager to signal
46
// that a peer has disconnected and its GossipSyncer should be removed.
47
type staleSyncer struct {
48
        // peer is the peer that has disconnected.
49
        peer route.Vertex
50

51
        // doneChan serves as a signal to the caller that the SyncManager's
52
        // internal state correctly reflects the stale active syncer. This is
53
        // needed to ensure we always create a new syncer for a flappy peer
54
        // after they disconnect if they happened to be an active syncer.
55
        doneChan chan struct{}
56
}
57

58
// SyncManagerCfg contains all of the dependencies required for the SyncManager
59
// to carry out its duties.
60
type SyncManagerCfg struct {
61
        // ChainHash is a hash that indicates the specific network of the active
62
        // chain.
63
        ChainHash chainhash.Hash
64

65
        // ChanSeries is an interface that provides access to a time series view
66
        // of the current known channel graph. Each GossipSyncer enabled peer
67
        // will utilize this in order to create and respond to channel graph
68
        // time series queries.
69
        ChanSeries ChannelGraphTimeSeries
70

71
        // NumActiveSyncers is the number of peers for which we should have
72
        // active syncers with. After reaching NumActiveSyncers, any future
73
        // gossip syncers will be passive.
74
        NumActiveSyncers int
75

76
        // NoTimestampQueries will prevent the GossipSyncer from querying
77
        // timestamps of announcement messages from the peer and from responding
78
        // to timestamp queries
79
        NoTimestampQueries bool
80

81
        // RotateTicker is a ticker responsible for notifying the SyncManager
82
        // when it should rotate its active syncers. A single active syncer with
83
        // a chansSynced state will be exchanged for a passive syncer in order
84
        // to ensure we don't keep syncing with the same peers.
85
        RotateTicker ticker.Ticker
86

87
        // HistoricalSyncTicker is a ticker responsible for notifying the
88
        // SyncManager when it should attempt a historical sync with a gossip
89
        // sync peer.
90
        HistoricalSyncTicker ticker.Ticker
91

92
        // IgnoreHistoricalFilters will prevent syncers from replying with
93
        // historical data when the remote peer sets a gossip_timestamp_range.
94
        // This prevents ranges with old start times from causing us to dump the
95
        // graph on connect.
96
        IgnoreHistoricalFilters bool
97

98
        // BestHeight returns the latest height known of the chain.
99
        BestHeight func() uint32
100

101
        // PinnedSyncers is a set of peers that will always transition to
102
        // ActiveSync upon connection. These peers will never transition to
103
        // PassiveSync.
104
        PinnedSyncers PinnedSyncers
105

106
        // IsStillZombieChannel takes the timestamps of the latest channel
107
        // updates for a channel and returns true if the channel should be
108
        // considered a zombie based on these timestamps.
109
        IsStillZombieChannel func(time.Time, time.Time) bool
110
}
111

112
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
113
// for peers currently connected. When a new peer is connected, the manager will
114
// create its accompanying gossip syncer and determine whether it should have an
115
// ActiveSync or PassiveSync sync type based on how many other gossip syncers
116
// are currently active. Any ActiveSync gossip syncers are started in a
117
// round-robin manner to ensure we're not syncing with multiple peers at the
118
// same time. The first GossipSyncer registered with the SyncManager will
119
// attempt a historical sync to ensure we have as much of the public channel
120
// graph as possible.
121
type SyncManager struct {
122
        // initialHistoricalSyncCompleted serves as a barrier when initializing
123
        // new active GossipSyncers. If 0, the initial historical sync has not
124
        // completed, so we'll defer initializing any active GossipSyncers. If
125
        // 1, then we can transition the GossipSyncer immediately. We set up
126
        // this barrier to ensure we have most of the graph before attempting to
127
        // accept new updates at tip.
128
        //
129
        // NOTE: This must be used atomically.
130
        initialHistoricalSyncCompleted int32
131

132
        start sync.Once
133
        stop  sync.Once
134

135
        cfg SyncManagerCfg
136

137
        // newSyncers is a channel we'll use to process requests to create
138
        // GossipSyncers for newly connected peers.
139
        newSyncers chan *newSyncer
140

141
        // staleSyncers is a channel we'll use to process requests to tear down
142
        // GossipSyncers for disconnected peers.
143
        staleSyncers chan *staleSyncer
144

145
        // syncersMu guards the read and write access to the activeSyncers and
146
        // inactiveSyncers maps below.
147
        syncersMu sync.Mutex
148

149
        // activeSyncers is the set of all syncers for which we are currently
150
        // receiving graph updates from. The number of possible active syncers
151
        // is bounded by NumActiveSyncers.
152
        activeSyncers map[route.Vertex]*GossipSyncer
153

154
        // inactiveSyncers is the set of all syncers for which we are not
155
        // currently receiving new graph updates from.
156
        inactiveSyncers map[route.Vertex]*GossipSyncer
157

158
        // pinnedActiveSyncers is the set of all syncers which are pinned into
159
        // an active sync. Pinned peers performan an initial historical sync on
160
        // each connection and will continue to receive graph updates for the
161
        // duration of the connection.
162
        pinnedActiveSyncers map[route.Vertex]*GossipSyncer
163

164
        wg   sync.WaitGroup
165
        quit chan struct{}
166
}
167

168
// newSyncManager constructs a new SyncManager backed by the given config.
169
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
38✔
170
        return &SyncManager{
38✔
171
                cfg:          *cfg,
38✔
172
                newSyncers:   make(chan *newSyncer),
38✔
173
                staleSyncers: make(chan *staleSyncer),
38✔
174
                activeSyncers: make(
38✔
175
                        map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers,
38✔
176
                ),
38✔
177
                inactiveSyncers: make(map[route.Vertex]*GossipSyncer),
38✔
178
                pinnedActiveSyncers: make(
38✔
179
                        map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers),
38✔
180
                ),
38✔
181
                quit: make(chan struct{}),
38✔
182
        }
38✔
183
}
38✔
184

185
// Start starts the SyncManager in order to properly carry out its duties.
186
func (m *SyncManager) Start() {
38✔
187
        m.start.Do(func() {
76✔
188
                m.wg.Add(1)
38✔
189
                go m.syncerHandler()
38✔
190
        })
38✔
191
}
192

193
// Stop stops the SyncManager from performing its duties.
194
func (m *SyncManager) Stop() {
38✔
195
        m.stop.Do(func() {
76✔
196
                log.Debugf("SyncManager is stopping")
38✔
197
                defer log.Debugf("SyncManager stopped")
38✔
198

38✔
199
                close(m.quit)
38✔
200
                m.wg.Wait()
38✔
201

38✔
202
                for _, syncer := range m.inactiveSyncers {
45✔
203
                        syncer.Stop()
7✔
204
                }
7✔
205
                for _, syncer := range m.activeSyncers {
51✔
206
                        syncer.Stop()
13✔
207
                }
13✔
208
        })
209
}
210

211
// syncerHandler is the SyncManager's main event loop responsible for:
212
//
213
// 1. Creating and tearing down GossipSyncers for connected/disconnected peers.
214

215
// 2. Finding new peers to receive graph updates from to ensure we don't only
216
//    receive them from the same set of peers.
217

218
//  3. Finding new peers to force a historical sync with to ensure we have as
219
//     much of the public network as possible.
220
//
221
// NOTE: This must be run as a goroutine.
222
func (m *SyncManager) syncerHandler() {
38✔
223
        defer m.wg.Done()
38✔
224

38✔
225
        m.cfg.RotateTicker.Resume()
38✔
226
        defer m.cfg.RotateTicker.Stop()
38✔
227

38✔
228
        defer m.cfg.HistoricalSyncTicker.Stop()
38✔
229

38✔
230
        var (
38✔
231
                // initialHistoricalSyncer is the syncer we are currently
38✔
232
                // performing an initial historical sync with.
38✔
233
                initialHistoricalSyncer *GossipSyncer
38✔
234

38✔
235
                // initialHistoricalSyncSignal is a signal that will fire once
38✔
236
                // the initial historical sync has been completed. This is
38✔
237
                // crucial to ensure that another historical sync isn't
38✔
238
                // attempted just because the initialHistoricalSyncer was
38✔
239
                // disconnected.
38✔
240
                initialHistoricalSyncSignal chan struct{}
38✔
241
        )
38✔
242

38✔
243
        setInitialHistoricalSyncer := func(s *GossipSyncer) {
54✔
244
                initialHistoricalSyncer = s
16✔
245
                initialHistoricalSyncSignal = s.ResetSyncedSignal()
16✔
246

16✔
247
                // Restart the timer for our new historical sync peer. This will
16✔
248
                // ensure that all initial syncers receive an equivalent
16✔
249
                // duration before attempting the next sync. Without doing so we
16✔
250
                // might attempt two historical sync back to back if a peer
16✔
251
                // disconnects just before the ticker fires.
16✔
252
                m.cfg.HistoricalSyncTicker.Pause()
16✔
253
                m.cfg.HistoricalSyncTicker.Resume()
16✔
254
        }
16✔
255

256
        for {
120✔
257
                select {
82✔
258
                // A new peer has been connected, so we'll create its
259
                // accompanying GossipSyncer.
260
                case newSyncer := <-m.newSyncers:
29✔
261
                        // If we already have a syncer, then we'll exit early as
29✔
262
                        // we don't want to override it.
29✔
263
                        if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok {
29✔
264
                                close(newSyncer.doneChan)
×
265
                                continue
×
266
                        }
267

268
                        s := m.createGossipSyncer(newSyncer.peer)
29✔
269

29✔
270
                        isPinnedSyncer := m.isPinnedSyncer(s)
29✔
271

29✔
272
                        // attemptHistoricalSync determines whether we should
29✔
273
                        // attempt an initial historical sync when a new peer
29✔
274
                        // connects.
29✔
275
                        attemptHistoricalSync := false
29✔
276

29✔
277
                        m.syncersMu.Lock()
29✔
278
                        switch {
29✔
279
                        // For pinned syncers, we will immediately transition
280
                        // the peer into an active (pinned) sync state.
281
                        case isPinnedSyncer:
7✔
282
                                attemptHistoricalSync = true
7✔
283
                                s.setSyncType(PinnedSync)
7✔
284
                                s.setSyncState(syncerIdle)
7✔
285
                                m.pinnedActiveSyncers[s.cfg.peerPub] = s
7✔
286

287
                        // Regardless of whether the initial historical sync
288
                        // has completed, we'll re-trigger a historical sync if
289
                        // we no longer have any syncers. This might be
290
                        // necessary if we lost all our peers at one point, and
291
                        // now we finally have one again.
292
                        case len(m.activeSyncers) == 0 &&
293
                                len(m.inactiveSyncers) == 0:
14✔
294

14✔
295
                                attemptHistoricalSync =
14✔
296
                                        m.cfg.NumActiveSyncers > 0
14✔
297
                                fallthrough
14✔
298

299
                        // If we've exceeded our total number of active syncers,
300
                        // we'll initialize this GossipSyncer as passive.
301
                        case len(m.activeSyncers) >= m.cfg.NumActiveSyncers:
18✔
302
                                fallthrough
18✔
303

304
                        // If the initial historical sync has yet to complete,
305
                        // then we'll declare it as passive and attempt to
306
                        // transition it when the initial historical sync
307
                        // completes.
308
                        case !m.IsGraphSynced():
22✔
309
                                s.setSyncType(PassiveSync)
22✔
310
                                m.inactiveSyncers[s.cfg.peerPub] = s
22✔
311

312
                        // The initial historical sync has completed, so we can
313
                        // immediately start the GossipSyncer as active.
314
                        default:
8✔
315
                                s.setSyncType(ActiveSync)
8✔
316
                                m.activeSyncers[s.cfg.peerPub] = s
8✔
317
                        }
318
                        m.syncersMu.Unlock()
29✔
319

29✔
320
                        s.Start()
29✔
321

29✔
322
                        // Once we create the GossipSyncer, we'll signal to the
29✔
323
                        // caller that they can proceed since the SyncManager's
29✔
324
                        // internal state has been updated.
29✔
325
                        close(newSyncer.doneChan)
29✔
326

29✔
327
                        // We'll force a historical sync with the first peer we
29✔
328
                        // connect to, to ensure we get as much of the graph as
29✔
329
                        // possible.
29✔
330
                        if !attemptHistoricalSync {
46✔
331
                                continue
17✔
332
                        }
333

334
                        log.Debugf("Attempting initial historical sync with "+
16✔
335
                                "GossipSyncer(%x)", s.cfg.peerPub)
16✔
336

16✔
337
                        if err := s.historicalSync(); err != nil {
16✔
338
                                log.Errorf("Unable to attempt initial "+
×
339
                                        "historical sync with "+
×
340
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
341
                                        err)
×
342
                                continue
×
343
                        }
344

345
                        // Once the historical sync has started, we'll get a
346
                        // keep track of the corresponding syncer to properly
347
                        // handle disconnects. We'll also use a signal to know
348
                        // when the historical sync completed.
349
                        if !isPinnedSyncer {
29✔
350
                                setInitialHistoricalSyncer(s)
13✔
351
                        }
13✔
352

353
                // An existing peer has disconnected, so we'll tear down its
354
                // corresponding GossipSyncer.
355
                case staleSyncer := <-m.staleSyncers:
10✔
356
                        // Once the corresponding GossipSyncer has been stopped
10✔
357
                        // and removed, we'll signal to the caller that they can
10✔
358
                        // proceed since the SyncManager's internal state has
10✔
359
                        // been updated.
10✔
360
                        m.removeGossipSyncer(staleSyncer.peer)
10✔
361
                        close(staleSyncer.doneChan)
10✔
362

10✔
363
                        // If we don't have an initialHistoricalSyncer, or we do
10✔
364
                        // but it is not the peer being disconnected, then we
10✔
365
                        // have nothing left to do and can proceed.
10✔
366
                        switch {
10✔
367
                        case initialHistoricalSyncer == nil:
8✔
368
                                fallthrough
8✔
369
                        case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub:
9✔
370
                                fallthrough
9✔
371
                        case m.cfg.NumActiveSyncers == 0:
9✔
372
                                continue
9✔
373
                        }
374

375
                        // Otherwise, our initialHistoricalSyncer corresponds to
376
                        // the peer being disconnected, so we'll have to find a
377
                        // replacement.
378
                        log.Debug("Finding replacement for initial " +
1✔
379
                                "historical sync")
1✔
380

1✔
381
                        s := m.forceHistoricalSync()
1✔
382
                        if s == nil {
1✔
383
                                log.Debug("No eligible replacement found " +
×
384
                                        "for initial historical sync")
×
385
                                continue
×
386
                        }
387

388
                        log.Debugf("Replaced initial historical "+
1✔
389
                                "GossipSyncer(%v) with GossipSyncer(%x)",
1✔
390
                                staleSyncer.peer, s.cfg.peerPub)
1✔
391

1✔
392
                        setInitialHistoricalSyncer(s)
1✔
393

394
                // Our initial historical sync signal has completed, so we'll
395
                // nil all of the relevant fields as they're no longer needed.
396
                case <-initialHistoricalSyncSignal:
12✔
397
                        initialHistoricalSyncer = nil
12✔
398
                        initialHistoricalSyncSignal = nil
12✔
399

12✔
400
                        log.Debug("Initial historical sync completed")
12✔
401

12✔
402
                        // With the initial historical sync complete, we can
12✔
403
                        // begin receiving new graph updates at tip. We'll
12✔
404
                        // determine whether we can have any more active
12✔
405
                        // GossipSyncers. If we do, we'll randomly select some
12✔
406
                        // that are currently passive to transition.
12✔
407
                        m.syncersMu.Lock()
12✔
408
                        numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers)
12✔
409
                        if numActiveLeft <= 0 {
13✔
410
                                m.syncersMu.Unlock()
1✔
411
                                continue
1✔
412
                        }
413

414
                        // We may not even have enough inactive syncers to be
415
                        // transitted. In that case, we will transit all the
416
                        // inactive syncers.
417
                        if len(m.inactiveSyncers) < numActiveLeft {
21✔
418
                                numActiveLeft = len(m.inactiveSyncers)
9✔
419
                        }
9✔
420

421
                        log.Debugf("Attempting to transition %v passive "+
12✔
422
                                "GossipSyncers to active", numActiveLeft)
12✔
423

12✔
424
                        for i := 0; i < numActiveLeft; i++ {
24✔
425
                                chooseRandomSyncer(
12✔
426
                                        m.inactiveSyncers, m.transitionPassiveSyncer,
12✔
427
                                )
12✔
428
                        }
12✔
429

430
                        m.syncersMu.Unlock()
12✔
431

432
                // Our RotateTicker has ticked, so we'll attempt to rotate a
433
                // single active syncer with a passive one.
434
                case <-m.cfg.RotateTicker.Ticks():
2✔
435
                        m.rotateActiveSyncerCandidate()
2✔
436

437
                // Our HistoricalSyncTicker has ticked, so we'll randomly select
438
                // a peer and force a historical sync with them.
439
                case <-m.cfg.HistoricalSyncTicker.Ticks():
3✔
440
                        // To be extra cautious, gate the forceHistoricalSync
3✔
441
                        // call such that it can only execute if we are
3✔
442
                        // configured to have a non-zero number of sync peers.
3✔
443
                        // This way even if the historical sync ticker manages
3✔
444
                        // to tick we can be sure that a historical sync won't
3✔
445
                        // accidentally begin.
3✔
446
                        if m.cfg.NumActiveSyncers == 0 {
4✔
447
                                continue
1✔
448
                        }
449

450
                        // If we don't have a syncer available we have nothing
451
                        // to do.
452
                        s := m.forceHistoricalSync()
2✔
453
                        if s == nil {
2✔
454
                                continue
×
455
                        }
456

457
                        // If we've already completed a historical sync, we'll
458
                        // skip setting the initial historical syncer.
459
                        if m.IsGraphSynced() {
2✔
460
                                continue
×
461
                        }
462

463
                        // Otherwise, we'll track the peer we've performed a
464
                        // historical sync with in order to handle the case
465
                        // where our previous historical sync peer did not
466
                        // respond to our queries and we haven't ingested as
467
                        // much of the graph as we should.
468
                        setInitialHistoricalSyncer(s)
2✔
469

470
                case <-m.quit:
38✔
471
                        return
38✔
472
                }
473
        }
474
}
475

476
// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned
477
// sync peers.
478
func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
29✔
479
        _, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub]
29✔
480
        return isPinnedSyncer
29✔
481
}
29✔
482

483
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
484
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
29✔
485
        nodeID := route.Vertex(peer.PubKey())
29✔
486
        log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:])
29✔
487

29✔
488
        encoding := lnwire.EncodingSortedPlain
29✔
489
        s := newGossipSyncer(gossipSyncerCfg{
29✔
490
                chainHash:     m.cfg.ChainHash,
29✔
491
                peerPub:       nodeID,
29✔
492
                channelSeries: m.cfg.ChanSeries,
29✔
493
                encodingType:  encoding,
29✔
494
                chunkSize:     encodingTypeToChunkSize[encoding],
29✔
495
                batchSize:     requestBatchSize,
29✔
496
                sendToPeer: func(msgs ...lnwire.Message) error {
67✔
497
                        return peer.SendMessageLazy(false, msgs...)
38✔
498
                },
38✔
499
                sendToPeerSync: func(msgs ...lnwire.Message) error {
4✔
500
                        return peer.SendMessageLazy(true, msgs...)
4✔
501
                },
4✔
502
                ignoreHistoricalFilters:   m.cfg.IgnoreHistoricalFilters,
503
                maxUndelayedQueryReplies:  DefaultMaxUndelayedQueryReplies,
504
                delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval,
505
                bestHeight:                m.cfg.BestHeight,
506
                markGraphSynced:           m.markGraphSynced,
507
                maxQueryChanRangeReplies:  maxQueryChanRangeReplies,
508
                noTimestampQueryOption:    m.cfg.NoTimestampQueries,
509
                isStillZombieChannel:      m.cfg.IsStillZombieChannel,
510
        })
511

512
        // Gossip syncers are initialized by default in a PassiveSync type
513
        // and chansSynced state so that they can reply to any peer queries or
514
        // handle any sync transitions.
515
        s.setSyncState(chansSynced)
29✔
516
        s.setSyncType(PassiveSync)
29✔
517

29✔
518
        log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%v",
29✔
519
                s.syncState(), s.SyncType(), peer)
29✔
520

29✔
521
        return s
29✔
522
}
523

524
// removeGossipSyncer removes all internal references to the disconnected peer's
525
// GossipSyncer and stops it. In the event of an active GossipSyncer being
526
// disconnected, a passive GossipSyncer, if any, will take its place.
527
func (m *SyncManager) removeGossipSyncer(peer route.Vertex) {
10✔
528
        m.syncersMu.Lock()
10✔
529
        defer m.syncersMu.Unlock()
10✔
530

10✔
531
        s, ok := m.gossipSyncer(peer)
10✔
532
        if !ok {
10✔
UNCOV
533
                return
×
UNCOV
534
        }
×
535

536
        log.Infof("Removing GossipSyncer for peer=%v", peer)
10✔
537

10✔
538
        // We'll stop the GossipSyncer for the disconnected peer in a goroutine
10✔
539
        // to prevent blocking the SyncManager.
10✔
540
        go s.Stop()
10✔
541

10✔
542
        // If it's a non-active syncer, then we can just exit now.
10✔
543
        if _, ok := m.inactiveSyncers[peer]; ok {
16✔
544
                delete(m.inactiveSyncers, peer)
6✔
545
                return
6✔
546
        }
6✔
547

548
        // If it's a pinned syncer, then we can just exit as this doesn't
549
        // affect our active syncer count.
550
        if _, ok := m.pinnedActiveSyncers[peer]; ok {
12✔
551
                delete(m.pinnedActiveSyncers, peer)
4✔
552
                return
4✔
553
        }
4✔
554

555
        // Otherwise, we'll need find a new one to replace it, if any.
556
        delete(m.activeSyncers, peer)
8✔
557
        newActiveSyncer := chooseRandomSyncer(
8✔
558
                m.inactiveSyncers, m.transitionPassiveSyncer,
8✔
559
        )
8✔
560
        if newActiveSyncer == nil {
14✔
561
                return
6✔
562
        }
6✔
563

564
        log.Debugf("Replaced active GossipSyncer(%x) with GossipSyncer(%x)",
6✔
565
                peer, newActiveSyncer.cfg.peerPub)
6✔
566
}
567

568
// rotateActiveSyncerCandidate rotates a single active syncer. In order to
569
// achieve this, the active syncer must be in a chansSynced state in order to
570
// process the sync transition.
571
func (m *SyncManager) rotateActiveSyncerCandidate() {
2✔
572
        m.syncersMu.Lock()
2✔
573
        defer m.syncersMu.Unlock()
2✔
574

2✔
575
        // If we couldn't find an eligible active syncer to rotate, we can
2✔
576
        // return early.
2✔
577
        activeSyncer := chooseRandomSyncer(m.activeSyncers, nil)
2✔
578
        if activeSyncer == nil {
2✔
579
                log.Debug("No eligible active syncer to rotate")
×
580
                return
×
581
        }
×
582

583
        // Similarly, if we don't have a candidate to rotate with, we can return
584
        // early as well.
585
        candidate := chooseRandomSyncer(m.inactiveSyncers, nil)
2✔
586
        if candidate == nil {
3✔
587
                log.Debug("No eligible candidate to rotate active syncer")
1✔
588
                return
1✔
589
        }
1✔
590

591
        // Otherwise, we'll attempt to transition each syncer to their
592
        // respective new sync type.
593
        log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)",
1✔
594
                activeSyncer.cfg.peerPub, candidate.cfg.peerPub)
1✔
595

1✔
596
        if err := m.transitionActiveSyncer(activeSyncer); err != nil {
1✔
597
                log.Errorf("Unable to transition active GossipSyncer(%x): %v",
×
598
                        activeSyncer.cfg.peerPub, err)
×
599
                return
×
600
        }
×
601

602
        if err := m.transitionPassiveSyncer(candidate); err != nil {
1✔
603
                log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
×
604
                        activeSyncer.cfg.peerPub, err)
×
605
                return
×
606
        }
×
607
}
608

609
// transitionActiveSyncer transitions an active syncer to a passive one.
610
//
611
// NOTE: This must be called with the syncersMu lock held.
612
func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error {
1✔
613
        log.Debugf("Transitioning active GossipSyncer(%x) to passive",
1✔
614
                s.cfg.peerPub)
1✔
615

1✔
616
        if err := s.ProcessSyncTransition(PassiveSync); err != nil {
1✔
617
                return err
×
618
        }
×
619

620
        delete(m.activeSyncers, s.cfg.peerPub)
1✔
621
        m.inactiveSyncers[s.cfg.peerPub] = s
1✔
622

1✔
623
        return nil
1✔
624
}
625

626
// transitionPassiveSyncer transitions a passive syncer to an active one.
627
//
628
// NOTE: This must be called with the syncersMu lock held.
629
func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error {
15✔
630
        log.Debugf("Transitioning passive GossipSyncer(%x) to active",
15✔
631
                s.cfg.peerPub)
15✔
632

15✔
633
        if err := s.ProcessSyncTransition(ActiveSync); err != nil {
16✔
634
                return err
1✔
635
        }
1✔
636

637
        delete(m.inactiveSyncers, s.cfg.peerPub)
14✔
638
        m.activeSyncers[s.cfg.peerPub] = s
14✔
639

14✔
640
        return nil
14✔
641
}
642

643
// forceHistoricalSync chooses a syncer with a remote peer at random and forces
644
// a historical sync with it.
645
func (m *SyncManager) forceHistoricalSync() *GossipSyncer {
3✔
646
        m.syncersMu.Lock()
3✔
647
        defer m.syncersMu.Unlock()
3✔
648

3✔
649
        // We'll sample from both sets of active and inactive syncers in the
3✔
650
        // event that we don't have any inactive syncers.
3✔
651
        return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
6✔
652
                return s.historicalSync()
3✔
653
        })
3✔
654
}
655

656
// chooseRandomSyncer iterates through the set of syncers given and returns the
657
// first one which was able to successfully perform the action enclosed in the
658
// function closure.
659
//
660
// NOTE: It's possible for a nil value to be returned if there are no eligible
661
// candidate syncers.
662
func chooseRandomSyncer(syncers map[route.Vertex]*GossipSyncer,
663
        action func(*GossipSyncer) error) *GossipSyncer {
23✔
664

23✔
665
        for _, s := range syncers {
46✔
666
                // Only syncers in a chansSynced state are viable for sync
23✔
667
                // transitions, so skip any that aren't.
23✔
668
                if s.syncState() != chansSynced {
26✔
669
                        continue
3✔
670
                }
671

672
                if action != nil {
37✔
673
                        if err := action(s); err != nil {
18✔
674
                                log.Debugf("Skipping eligible candidate "+
1✔
675
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
1✔
676
                                        err)
1✔
677
                                continue
1✔
678
                        }
679
                }
680

681
                return s
19✔
682
        }
683

684
        return nil
8✔
685
}
686

687
// InitSyncState is called by outside sub-systems when a connection is
688
// established to a new peer that understands how to perform channel range
689
// queries. We'll allocate a new GossipSyncer for it, and start any goroutines
690
// needed to handle new queries. The first GossipSyncer registered with the
691
// SyncManager will attempt a historical sync to ensure we have as much of the
692
// public channel graph as possible.
693
//
694
// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer.
695
func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error {
29✔
696
        done := make(chan struct{})
29✔
697

29✔
698
        select {
29✔
699
        case m.newSyncers <- &newSyncer{
700
                peer:     peer,
701
                doneChan: done,
702
        }:
29✔
703
        case <-m.quit:
×
704
                return ErrSyncManagerExiting
×
705
        }
706

707
        select {
29✔
708
        case <-done:
29✔
709
                return nil
29✔
710
        case <-m.quit:
×
711
                return ErrSyncManagerExiting
×
712
        }
713
}
714

715
// PruneSyncState is called by outside sub-systems once a peer that we were
716
// previously connected to has been disconnected. In this case we can stop the
717
// existing GossipSyncer assigned to the peer and free up resources.
718
func (m *SyncManager) PruneSyncState(peer route.Vertex) {
10✔
719
        done := make(chan struct{})
10✔
720

10✔
721
        // We avoid returning an error when the SyncManager is stopped since the
10✔
722
        // GossipSyncer will be stopped then anyway.
10✔
723
        select {
10✔
724
        case m.staleSyncers <- &staleSyncer{
725
                peer:     peer,
726
                doneChan: done,
727
        }:
10✔
728
        case <-m.quit:
×
729
                return
×
730
        }
731

732
        select {
10✔
733
        case <-done:
10✔
734
        case <-m.quit:
×
735
        }
736
}
737

738
// GossipSyncer returns the associated gossip syncer of a peer. The boolean
739
// returned signals whether there exists a gossip syncer for the peer.
740
func (m *SyncManager) GossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
49✔
741
        m.syncersMu.Lock()
49✔
742
        defer m.syncersMu.Unlock()
49✔
743
        return m.gossipSyncer(peer)
49✔
744
}
49✔
745

746
// gossipSyncer returns the associated gossip syncer of a peer. The boolean
747
// returned signals whether there exists a gossip syncer for the peer.
748
func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
55✔
749
        syncer, ok := m.inactiveSyncers[peer]
55✔
750
        if ok {
74✔
751
                return syncer, true
19✔
752
        }
19✔
753
        syncer, ok = m.activeSyncers[peer]
40✔
754
        if ok {
52✔
755
                return syncer, true
12✔
756
        }
12✔
757
        syncer, ok = m.pinnedActiveSyncers[peer]
32✔
758
        if ok {
39✔
759
                return syncer, true
7✔
760
        }
7✔
761
        return nil, false
29✔
762
}
763

764
// GossipSyncers returns all of the currently initialized gossip syncers.
765
func (m *SyncManager) GossipSyncers() map[route.Vertex]*GossipSyncer {
35✔
766
        m.syncersMu.Lock()
35✔
767
        defer m.syncersMu.Unlock()
35✔
768
        return m.gossipSyncers()
35✔
769
}
35✔
770

771
// gossipSyncers returns all of the currently initialized gossip syncers.
772
func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer {
38✔
773
        numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers)
38✔
774
        syncers := make(map[route.Vertex]*GossipSyncer, numSyncers)
38✔
775

38✔
776
        for _, syncer := range m.inactiveSyncers {
47✔
777
                syncers[syncer.cfg.peerPub] = syncer
9✔
778
        }
9✔
779
        for _, syncer := range m.activeSyncers {
42✔
780
                syncers[syncer.cfg.peerPub] = syncer
4✔
781
        }
4✔
782

783
        return syncers
38✔
784
}
785

786
// markGraphSynced allows us to report that the initial historical sync has
787
// completed.
788
func (m *SyncManager) markGraphSynced() {
41✔
789
        atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
41✔
790
}
41✔
791

792
// IsGraphSynced determines whether we've completed our initial historical sync.
793
// The initial historical sync is done to ensure we've ingested as much of the
794
// public graph as possible.
795
func (m *SyncManager) IsGraphSynced() bool {
90✔
796
        return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
90✔
797
}
90✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc