• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13536249039

26 Feb 2025 03:42AM UTC coverage: 57.462% (-1.4%) from 58.835%
13536249039

Pull #8453

github

Roasbeef
peer: update chooseDeliveryScript to gen script if needed

In this commit, we update `chooseDeliveryScript` to generate a new
script if needed. This allows us to fold in a few other lines that
always followed this function into this expanded function.

The tests have been updated accordingly.
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

275 of 1318 new or added lines in 22 files covered. (20.86%)

19521 existing lines in 257 files now uncovered.

103858 of 180741 relevant lines covered (57.46%)

24750.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.92
/discovery/sync_manager.go
1
package discovery
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7
        "time"
8

9
        "github.com/btcsuite/btcd/chaincfg/chainhash"
10
        "github.com/lightningnetwork/lnd/lnpeer"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
        "github.com/lightningnetwork/lnd/routing/route"
13
        "github.com/lightningnetwork/lnd/ticker"
14
)
15

16
const (
17
        // DefaultSyncerRotationInterval is the default interval in which we'll
18
        // rotate a single active syncer.
19
        DefaultSyncerRotationInterval = 20 * time.Minute
20

21
        // DefaultHistoricalSyncInterval is the default interval in which we'll
22
        // force a historical sync to ensure we have as much of the public
23
        // network as possible.
24
        DefaultHistoricalSyncInterval = time.Hour
25

26
        // filterSemaSize is the capacity of gossipFilterSema.
27
        filterSemaSize = 5
28
)
29

30
var (
31
        // ErrSyncManagerExiting is an error returned when we attempt to
32
        // start/stop a gossip syncer for a connected/disconnected peer, but the
33
        // SyncManager has already been stopped.
34
        ErrSyncManagerExiting = errors.New("sync manager exiting")
35
)
36

37
// newSyncer in an internal message we'll use within the SyncManager to signal
38
// that we should create a GossipSyncer for a newly connected peer.
39
type newSyncer struct {
40
        // peer is the newly connected peer.
41
        peer lnpeer.Peer
42

43
        // doneChan serves as a signal to the caller that the SyncManager's
44
        // internal state correctly reflects the stale active syncer.
45
        doneChan chan struct{}
46
}
47

48
// staleSyncer is an internal message we'll use within the SyncManager to signal
49
// that a peer has disconnected and its GossipSyncer should be removed.
50
type staleSyncer struct {
51
        // peer is the peer that has disconnected.
52
        peer route.Vertex
53

54
        // doneChan serves as a signal to the caller that the SyncManager's
55
        // internal state correctly reflects the stale active syncer. This is
56
        // needed to ensure we always create a new syncer for a flappy peer
57
        // after they disconnect if they happened to be an active syncer.
58
        doneChan chan struct{}
59
}
60

61
// SyncManagerCfg contains all of the dependencies required for the SyncManager
62
// to carry out its duties.
63
type SyncManagerCfg struct {
64
        // ChainHash is a hash that indicates the specific network of the active
65
        // chain.
66
        ChainHash chainhash.Hash
67

68
        // ChanSeries is an interface that provides access to a time series view
69
        // of the current known channel graph. Each GossipSyncer enabled peer
70
        // will utilize this in order to create and respond to channel graph
71
        // time series queries.
72
        ChanSeries ChannelGraphTimeSeries
73

74
        // NumActiveSyncers is the number of peers for which we should have
75
        // active syncers with. After reaching NumActiveSyncers, any future
76
        // gossip syncers will be passive.
77
        NumActiveSyncers int
78

79
        // NoTimestampQueries will prevent the GossipSyncer from querying
80
        // timestamps of announcement messages from the peer and from responding
81
        // to timestamp queries
82
        NoTimestampQueries bool
83

84
        // RotateTicker is a ticker responsible for notifying the SyncManager
85
        // when it should rotate its active syncers. A single active syncer with
86
        // a chansSynced state will be exchanged for a passive syncer in order
87
        // to ensure we don't keep syncing with the same peers.
88
        RotateTicker ticker.Ticker
89

90
        // HistoricalSyncTicker is a ticker responsible for notifying the
91
        // SyncManager when it should attempt a historical sync with a gossip
92
        // sync peer.
93
        HistoricalSyncTicker ticker.Ticker
94

95
        // IgnoreHistoricalFilters will prevent syncers from replying with
96
        // historical data when the remote peer sets a gossip_timestamp_range.
97
        // This prevents ranges with old start times from causing us to dump the
98
        // graph on connect.
99
        IgnoreHistoricalFilters bool
100

101
        // BestHeight returns the latest height known of the chain.
102
        BestHeight func() uint32
103

104
        // PinnedSyncers is a set of peers that will always transition to
105
        // ActiveSync upon connection. These peers will never transition to
106
        // PassiveSync.
107
        PinnedSyncers PinnedSyncers
108

109
        // IsStillZombieChannel takes the timestamps of the latest channel
110
        // updates for a channel and returns true if the channel should be
111
        // considered a zombie based on these timestamps.
112
        IsStillZombieChannel func(time.Time, time.Time) bool
113
}
114

115
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
116
// for peers currently connected. When a new peer is connected, the manager will
117
// create its accompanying gossip syncer and determine whether it should have an
118
// ActiveSync or PassiveSync sync type based on how many other gossip syncers
119
// are currently active. Any ActiveSync gossip syncers are started in a
120
// round-robin manner to ensure we're not syncing with multiple peers at the
121
// same time. The first GossipSyncer registered with the SyncManager will
122
// attempt a historical sync to ensure we have as much of the public channel
123
// graph as possible.
124
type SyncManager struct {
125
        // initialHistoricalSyncCompleted serves as a barrier when initializing
126
        // new active GossipSyncers. If 0, the initial historical sync has not
127
        // completed, so we'll defer initializing any active GossipSyncers. If
128
        // 1, then we can transition the GossipSyncer immediately. We set up
129
        // this barrier to ensure we have most of the graph before attempting to
130
        // accept new updates at tip.
131
        //
132
        // NOTE: This must be used atomically.
133
        initialHistoricalSyncCompleted int32
134

135
        start sync.Once
136
        stop  sync.Once
137

138
        cfg SyncManagerCfg
139

140
        // newSyncers is a channel we'll use to process requests to create
141
        // GossipSyncers for newly connected peers.
142
        newSyncers chan *newSyncer
143

144
        // staleSyncers is a channel we'll use to process requests to tear down
145
        // GossipSyncers for disconnected peers.
146
        staleSyncers chan *staleSyncer
147

148
        // syncersMu guards the read and write access to the activeSyncers and
149
        // inactiveSyncers maps below.
150
        syncersMu sync.Mutex
151

152
        // activeSyncers is the set of all syncers for which we are currently
153
        // receiving graph updates from. The number of possible active syncers
154
        // is bounded by NumActiveSyncers.
155
        activeSyncers map[route.Vertex]*GossipSyncer
156

157
        // inactiveSyncers is the set of all syncers for which we are not
158
        // currently receiving new graph updates from.
159
        inactiveSyncers map[route.Vertex]*GossipSyncer
160

161
        // pinnedActiveSyncers is the set of all syncers which are pinned into
162
        // an active sync. Pinned peers performan an initial historical sync on
163
        // each connection and will continue to receive graph updates for the
164
        // duration of the connection.
165
        pinnedActiveSyncers map[route.Vertex]*GossipSyncer
166

167
        // gossipFilterSema contains semaphores for the gossip timestamp
168
        // queries.
169
        gossipFilterSema chan struct{}
170

171
        wg   sync.WaitGroup
172
        quit chan struct{}
173
}
174

175
// newSyncManager constructs a new SyncManager backed by the given config.
176
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
38✔
177

38✔
178
        filterSema := make(chan struct{}, filterSemaSize)
38✔
179
        for i := 0; i < filterSemaSize; i++ {
228✔
180
                filterSema <- struct{}{}
190✔
181
        }
190✔
182

183
        return &SyncManager{
38✔
184
                cfg:          *cfg,
38✔
185
                newSyncers:   make(chan *newSyncer),
38✔
186
                staleSyncers: make(chan *staleSyncer),
38✔
187
                activeSyncers: make(
38✔
188
                        map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers,
38✔
189
                ),
38✔
190
                inactiveSyncers: make(map[route.Vertex]*GossipSyncer),
38✔
191
                pinnedActiveSyncers: make(
38✔
192
                        map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers),
38✔
193
                ),
38✔
194
                gossipFilterSema: filterSema,
38✔
195
                quit:             make(chan struct{}),
38✔
196
        }
38✔
197
}
198

199
// Start starts the SyncManager in order to properly carry out its duties.
200
func (m *SyncManager) Start() {
38✔
201
        m.start.Do(func() {
76✔
202
                m.wg.Add(1)
38✔
203
                go m.syncerHandler()
38✔
204
        })
38✔
205
}
206

207
// Stop stops the SyncManager from performing its duties.
208
func (m *SyncManager) Stop() {
38✔
209
        m.stop.Do(func() {
76✔
210
                log.Debugf("SyncManager is stopping")
38✔
211
                defer log.Debugf("SyncManager stopped")
38✔
212

38✔
213
                close(m.quit)
38✔
214
                m.wg.Wait()
38✔
215

38✔
216
                for _, syncer := range m.inactiveSyncers {
44✔
217
                        syncer.Stop()
6✔
218
                }
6✔
219
                for _, syncer := range m.activeSyncers {
48✔
220
                        syncer.Stop()
10✔
221
                }
10✔
222
        })
223
}
224

225
// syncerHandler is the SyncManager's main event loop responsible for:
226
//
227
// 1. Creating and tearing down GossipSyncers for connected/disconnected peers.
228

229
// 2. Finding new peers to receive graph updates from to ensure we don't only
230
//    receive them from the same set of peers.
231

232
//  3. Finding new peers to force a historical sync with to ensure we have as
233
//     much of the public network as possible.
234
//
235
// NOTE: This must be run as a goroutine.
236
func (m *SyncManager) syncerHandler() {
38✔
237
        defer m.wg.Done()
38✔
238

38✔
239
        m.cfg.RotateTicker.Resume()
38✔
240
        defer m.cfg.RotateTicker.Stop()
38✔
241

38✔
242
        defer m.cfg.HistoricalSyncTicker.Stop()
38✔
243

38✔
244
        var (
38✔
245
                // initialHistoricalSyncer is the syncer we are currently
38✔
246
                // performing an initial historical sync with.
38✔
247
                initialHistoricalSyncer *GossipSyncer
38✔
248

38✔
249
                // initialHistoricalSyncSignal is a signal that will fire once
38✔
250
                // the initial historical sync has been completed. This is
38✔
251
                // crucial to ensure that another historical sync isn't
38✔
252
                // attempted just because the initialHistoricalSyncer was
38✔
253
                // disconnected.
38✔
254
                initialHistoricalSyncSignal chan struct{}
38✔
255
        )
38✔
256

38✔
257
        setInitialHistoricalSyncer := func(s *GossipSyncer) {
50✔
258
                initialHistoricalSyncer = s
12✔
259
                initialHistoricalSyncSignal = s.ResetSyncedSignal()
12✔
260

12✔
261
                // Restart the timer for our new historical sync peer. This will
12✔
262
                // ensure that all initial syncers receive an equivalent
12✔
263
                // duration before attempting the next sync. Without doing so we
12✔
264
                // might attempt two historical sync back to back if a peer
12✔
265
                // disconnects just before the ticker fires.
12✔
266
                m.cfg.HistoricalSyncTicker.Pause()
12✔
267
                m.cfg.HistoricalSyncTicker.Resume()
12✔
268
        }
12✔
269

270
        for {
120✔
271
                select {
82✔
272
                // A new peer has been connected, so we'll create its
273
                // accompanying GossipSyncer.
274
                case newSyncer := <-m.newSyncers:
25✔
275
                        // If we already have a syncer, then we'll exit early as
25✔
276
                        // we don't want to override it.
25✔
277
                        if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok {
25✔
278
                                close(newSyncer.doneChan)
×
279
                                continue
×
280
                        }
281

282
                        s := m.createGossipSyncer(newSyncer.peer)
25✔
283

25✔
284
                        isPinnedSyncer := m.isPinnedSyncer(s)
25✔
285

25✔
286
                        // attemptHistoricalSync determines whether we should
25✔
287
                        // attempt an initial historical sync when a new peer
25✔
288
                        // connects.
25✔
289
                        attemptHistoricalSync := false
25✔
290

25✔
291
                        m.syncersMu.Lock()
25✔
292
                        switch {
25✔
293
                        // For pinned syncers, we will immediately transition
294
                        // the peer into an active (pinned) sync state.
295
                        case isPinnedSyncer:
3✔
296
                                attemptHistoricalSync = true
3✔
297
                                s.setSyncType(PinnedSync)
3✔
298
                                s.setSyncState(syncerIdle)
3✔
299
                                m.pinnedActiveSyncers[s.cfg.peerPub] = s
3✔
300

301
                        // Regardless of whether the initial historical sync
302
                        // has completed, we'll re-trigger a historical sync if
303
                        // we no longer have any syncers. This might be
304
                        // necessary if we lost all our peers at one point, and
305
                        // now we finally have one again.
306
                        case len(m.activeSyncers) == 0 &&
307
                                len(m.inactiveSyncers) == 0:
10✔
308

10✔
309
                                attemptHistoricalSync =
10✔
310
                                        m.cfg.NumActiveSyncers > 0
10✔
311
                                fallthrough
10✔
312

313
                        // If we've exceeded our total number of active syncers,
314
                        // we'll initialize this GossipSyncer as passive.
315
                        case len(m.activeSyncers) >= m.cfg.NumActiveSyncers:
14✔
316
                                fallthrough
14✔
317

318
                        // If the initial historical sync has yet to complete,
319
                        // then we'll declare it as passive and attempt to
320
                        // transition it when the initial historical sync
321
                        // completes.
322
                        case !m.IsGraphSynced():
18✔
323
                                s.setSyncType(PassiveSync)
18✔
324
                                m.inactiveSyncers[s.cfg.peerPub] = s
18✔
325

326
                        // The initial historical sync has completed, so we can
327
                        // immediately start the GossipSyncer as active.
328
                        default:
4✔
329
                                s.setSyncType(ActiveSync)
4✔
330
                                m.activeSyncers[s.cfg.peerPub] = s
4✔
331
                        }
332
                        m.syncersMu.Unlock()
25✔
333

25✔
334
                        s.Start()
25✔
335

25✔
336
                        // Once we create the GossipSyncer, we'll signal to the
25✔
337
                        // caller that they can proceed since the SyncManager's
25✔
338
                        // internal state has been updated.
25✔
339
                        close(newSyncer.doneChan)
25✔
340

25✔
341
                        // We'll force a historical sync with the first peer we
25✔
342
                        // connect to, to ensure we get as much of the graph as
25✔
343
                        // possible.
25✔
344
                        if !attemptHistoricalSync {
38✔
345
                                continue
13✔
346
                        }
347

348
                        log.Debugf("Attempting initial historical sync with "+
12✔
349
                                "GossipSyncer(%x)", s.cfg.peerPub)
12✔
350

12✔
351
                        if err := s.historicalSync(); err != nil {
12✔
352
                                log.Errorf("Unable to attempt initial "+
×
353
                                        "historical sync with "+
×
354
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
355
                                        err)
×
356
                                continue
×
357
                        }
358

359
                        // Once the historical sync has started, we'll get a
360
                        // keep track of the corresponding syncer to properly
361
                        // handle disconnects. We'll also use a signal to know
362
                        // when the historical sync completed.
363
                        if !isPinnedSyncer {
21✔
364
                                setInitialHistoricalSyncer(s)
9✔
365
                        }
9✔
366

367
                // An existing peer has disconnected, so we'll tear down its
368
                // corresponding GossipSyncer.
369
                case staleSyncer := <-m.staleSyncers:
6✔
370
                        // Once the corresponding GossipSyncer has been stopped
6✔
371
                        // and removed, we'll signal to the caller that they can
6✔
372
                        // proceed since the SyncManager's internal state has
6✔
373
                        // been updated.
6✔
374
                        m.removeGossipSyncer(staleSyncer.peer)
6✔
375
                        close(staleSyncer.doneChan)
6✔
376

6✔
377
                        // If we don't have an initialHistoricalSyncer, or we do
6✔
378
                        // but it is not the peer being disconnected, then we
6✔
379
                        // have nothing left to do and can proceed.
6✔
380
                        switch {
6✔
381
                        case initialHistoricalSyncer == nil:
4✔
382
                                fallthrough
4✔
383
                        case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub:
5✔
384
                                fallthrough
5✔
385
                        case m.cfg.NumActiveSyncers == 0:
5✔
386
                                continue
5✔
387
                        }
388

389
                        // Otherwise, our initialHistoricalSyncer corresponds to
390
                        // the peer being disconnected, so we'll have to find a
391
                        // replacement.
392
                        log.Debug("Finding replacement for initial " +
1✔
393
                                "historical sync")
1✔
394

1✔
395
                        s := m.forceHistoricalSync()
1✔
396
                        if s == nil {
1✔
397
                                log.Debug("No eligible replacement found " +
×
398
                                        "for initial historical sync")
×
399
                                continue
×
400
                        }
401

402
                        log.Debugf("Replaced initial historical "+
1✔
403
                                "GossipSyncer(%v) with GossipSyncer(%x)",
1✔
404
                                staleSyncer.peer, s.cfg.peerPub)
1✔
405

1✔
406
                        setInitialHistoricalSyncer(s)
1✔
407

408
                // Our initial historical sync signal has completed, so we'll
409
                // nil all of the relevant fields as they're no longer needed.
410
                case <-initialHistoricalSyncSignal:
8✔
411
                        initialHistoricalSyncer = nil
8✔
412
                        initialHistoricalSyncSignal = nil
8✔
413

8✔
414
                        log.Debug("Initial historical sync completed")
8✔
415

8✔
416
                        // With the initial historical sync complete, we can
8✔
417
                        // begin receiving new graph updates at tip. We'll
8✔
418
                        // determine whether we can have any more active
8✔
419
                        // GossipSyncers. If we do, we'll randomly select some
8✔
420
                        // that are currently passive to transition.
8✔
421
                        m.syncersMu.Lock()
8✔
422
                        numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers)
8✔
423
                        if numActiveLeft <= 0 {
8✔
424
                                m.syncersMu.Unlock()
×
425
                                continue
×
426
                        }
427

428
                        // We may not even have enough inactive syncers to be
429
                        // transitted. In that case, we will transit all the
430
                        // inactive syncers.
431
                        if len(m.inactiveSyncers) < numActiveLeft {
13✔
432
                                numActiveLeft = len(m.inactiveSyncers)
5✔
433
                        }
5✔
434

435
                        log.Debugf("Attempting to transition %v passive "+
8✔
436
                                "GossipSyncers to active", numActiveLeft)
8✔
437

8✔
438
                        for i := 0; i < numActiveLeft; i++ {
16✔
439
                                chooseRandomSyncer(
8✔
440
                                        m.inactiveSyncers, m.transitionPassiveSyncer,
8✔
441
                                )
8✔
442
                        }
8✔
443

444
                        m.syncersMu.Unlock()
8✔
445

446
                // Our RotateTicker has ticked, so we'll attempt to rotate a
447
                // single active syncer with a passive one.
448
                case <-m.cfg.RotateTicker.Ticks():
2✔
449
                        m.rotateActiveSyncerCandidate()
2✔
450

451
                // Our HistoricalSyncTicker has ticked, so we'll randomly select
452
                // a peer and force a historical sync with them.
453
                case <-m.cfg.HistoricalSyncTicker.Ticks():
3✔
454
                        // To be extra cautious, gate the forceHistoricalSync
3✔
455
                        // call such that it can only execute if we are
3✔
456
                        // configured to have a non-zero number of sync peers.
3✔
457
                        // This way even if the historical sync ticker manages
3✔
458
                        // to tick we can be sure that a historical sync won't
3✔
459
                        // accidentally begin.
3✔
460
                        if m.cfg.NumActiveSyncers == 0 {
4✔
461
                                continue
1✔
462
                        }
463

464
                        // If we don't have a syncer available we have nothing
465
                        // to do.
466
                        s := m.forceHistoricalSync()
2✔
467
                        if s == nil {
2✔
468
                                continue
×
469
                        }
470

471
                        // If we've already completed a historical sync, we'll
472
                        // skip setting the initial historical syncer.
473
                        if m.IsGraphSynced() {
2✔
474
                                continue
×
475
                        }
476

477
                        // Otherwise, we'll track the peer we've performed a
478
                        // historical sync with in order to handle the case
479
                        // where our previous historical sync peer did not
480
                        // respond to our queries and we haven't ingested as
481
                        // much of the graph as we should.
482
                        setInitialHistoricalSyncer(s)
2✔
483

484
                case <-m.quit:
38✔
485
                        return
38✔
486
                }
487
        }
488
}
489

490
// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned
491
// sync peers.
492
func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
25✔
493
        _, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub]
25✔
494
        return isPinnedSyncer
25✔
495
}
25✔
496

497
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
498
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
25✔
499
        nodeID := route.Vertex(peer.PubKey())
25✔
500
        log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:])
25✔
501

25✔
502
        encoding := lnwire.EncodingSortedPlain
25✔
503
        s := newGossipSyncer(gossipSyncerCfg{
25✔
504
                chainHash:     m.cfg.ChainHash,
25✔
505
                peerPub:       nodeID,
25✔
506
                channelSeries: m.cfg.ChanSeries,
25✔
507
                encodingType:  encoding,
25✔
508
                chunkSize:     encodingTypeToChunkSize[encoding],
25✔
509
                batchSize:     requestBatchSize,
25✔
510
                sendToPeer: func(msgs ...lnwire.Message) error {
59✔
511
                        return peer.SendMessageLazy(false, msgs...)
34✔
512
                },
34✔
UNCOV
513
                sendToPeerSync: func(msgs ...lnwire.Message) error {
×
UNCOV
514
                        return peer.SendMessageLazy(true, msgs...)
×
UNCOV
515
                },
×
516
                ignoreHistoricalFilters:   m.cfg.IgnoreHistoricalFilters,
517
                maxUndelayedQueryReplies:  DefaultMaxUndelayedQueryReplies,
518
                delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval,
519
                bestHeight:                m.cfg.BestHeight,
520
                markGraphSynced:           m.markGraphSynced,
521
                maxQueryChanRangeReplies:  maxQueryChanRangeReplies,
522
                noTimestampQueryOption:    m.cfg.NoTimestampQueries,
523
                isStillZombieChannel:      m.cfg.IsStillZombieChannel,
524
        }, m.gossipFilterSema)
525

526
        // Gossip syncers are initialized by default in a PassiveSync type
527
        // and chansSynced state so that they can reply to any peer queries or
528
        // handle any sync transitions.
529
        s.setSyncState(chansSynced)
25✔
530
        s.setSyncType(PassiveSync)
25✔
531

25✔
532
        log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%x",
25✔
533
                s.syncState(), s.SyncType(), peer.PubKey())
25✔
534

25✔
535
        return s
25✔
536
}
537

538
// removeGossipSyncer removes all internal references to the disconnected peer's
539
// GossipSyncer and stops it. In the event of an active GossipSyncer being
540
// disconnected, a passive GossipSyncer, if any, will take its place.
541
func (m *SyncManager) removeGossipSyncer(peer route.Vertex) {
6✔
542
        m.syncersMu.Lock()
6✔
543
        defer m.syncersMu.Unlock()
6✔
544

6✔
545
        s, ok := m.gossipSyncer(peer)
6✔
546
        if !ok {
6✔
547
                return
×
548
        }
×
549

550
        log.Infof("Removing GossipSyncer for peer=%v", peer)
6✔
551

6✔
552
        // We'll stop the GossipSyncer for the disconnected peer in a goroutine
6✔
553
        // to prevent blocking the SyncManager.
6✔
554
        go s.Stop()
6✔
555

6✔
556
        // If it's a non-active syncer, then we can just exit now.
6✔
557
        if _, ok := m.inactiveSyncers[peer]; ok {
8✔
558
                delete(m.inactiveSyncers, peer)
2✔
559
                return
2✔
560
        }
2✔
561

562
        // If it's a pinned syncer, then we can just exit as this doesn't
563
        // affect our active syncer count.
564
        if _, ok := m.pinnedActiveSyncers[peer]; ok {
4✔
UNCOV
565
                delete(m.pinnedActiveSyncers, peer)
×
UNCOV
566
                return
×
UNCOV
567
        }
×
568

569
        // Otherwise, we'll need find a new one to replace it, if any.
570
        delete(m.activeSyncers, peer)
4✔
571
        newActiveSyncer := chooseRandomSyncer(
4✔
572
                m.inactiveSyncers, m.transitionPassiveSyncer,
4✔
573
        )
4✔
574
        if newActiveSyncer == nil {
6✔
575
                return
2✔
576
        }
2✔
577

578
        log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)",
2✔
579
                peer, newActiveSyncer.cfg.peerPub)
2✔
580
}
581

582
// rotateActiveSyncerCandidate rotates a single active syncer. In order to
583
// achieve this, the active syncer must be in a chansSynced state in order to
584
// process the sync transition.
585
func (m *SyncManager) rotateActiveSyncerCandidate() {
2✔
586
        m.syncersMu.Lock()
2✔
587
        defer m.syncersMu.Unlock()
2✔
588

2✔
589
        // If we couldn't find an eligible active syncer to rotate, we can
2✔
590
        // return early.
2✔
591
        activeSyncer := chooseRandomSyncer(m.activeSyncers, nil)
2✔
592
        if activeSyncer == nil {
2✔
593
                log.Debug("No eligible active syncer to rotate")
×
594
                return
×
595
        }
×
596

597
        // Similarly, if we don't have a candidate to rotate with, we can return
598
        // early as well.
599
        candidate := chooseRandomSyncer(m.inactiveSyncers, nil)
2✔
600
        if candidate == nil {
3✔
601
                log.Debug("No eligible candidate to rotate active syncer")
1✔
602
                return
1✔
603
        }
1✔
604

605
        // Otherwise, we'll attempt to transition each syncer to their
606
        // respective new sync type.
607
        log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)",
1✔
608
                activeSyncer.cfg.peerPub, candidate.cfg.peerPub)
1✔
609

1✔
610
        if err := m.transitionActiveSyncer(activeSyncer); err != nil {
1✔
611
                log.Errorf("Unable to transition active GossipSyncer(%x): %v",
×
612
                        activeSyncer.cfg.peerPub, err)
×
613
                return
×
614
        }
×
615

616
        if err := m.transitionPassiveSyncer(candidate); err != nil {
1✔
617
                log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
×
618
                        activeSyncer.cfg.peerPub, err)
×
619
                return
×
620
        }
×
621
}
622

623
// transitionActiveSyncer transitions an active syncer to a passive one.
624
//
625
// NOTE: This must be called with the syncersMu lock held.
626
func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error {
1✔
627
        log.Debugf("Transitioning active GossipSyncer(%x) to passive",
1✔
628
                s.cfg.peerPub)
1✔
629

1✔
630
        if err := s.ProcessSyncTransition(PassiveSync); err != nil {
1✔
631
                return err
×
632
        }
×
633

634
        delete(m.activeSyncers, s.cfg.peerPub)
1✔
635
        m.inactiveSyncers[s.cfg.peerPub] = s
1✔
636

1✔
637
        return nil
1✔
638
}
639

640
// transitionPassiveSyncer transitions a passive syncer to an active one.
641
//
642
// NOTE: This must be called with the syncersMu lock held.
643
func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error {
11✔
644
        log.Debugf("Transitioning passive GossipSyncer(%x) to active",
11✔
645
                s.cfg.peerPub)
11✔
646

11✔
647
        if err := s.ProcessSyncTransition(ActiveSync); err != nil {
11✔
648
                return err
×
649
        }
×
650

651
        delete(m.inactiveSyncers, s.cfg.peerPub)
11✔
652
        m.activeSyncers[s.cfg.peerPub] = s
11✔
653

11✔
654
        return nil
11✔
655
}
656

657
// forceHistoricalSync chooses a syncer with a remote peer at random and forces
658
// a historical sync with it.
659
func (m *SyncManager) forceHistoricalSync() *GossipSyncer {
3✔
660
        m.syncersMu.Lock()
3✔
661
        defer m.syncersMu.Unlock()
3✔
662

3✔
663
        // We'll sample from both sets of active and inactive syncers in the
3✔
664
        // event that we don't have any inactive syncers.
3✔
665
        return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
6✔
666
                return s.historicalSync()
3✔
667
        })
3✔
668
}
669

670
// chooseRandomSyncer iterates through the set of syncers given and returns the
671
// first one which was able to successfully perform the action enclosed in the
672
// function closure.
673
//
674
// NOTE: It's possible for a nil value to be returned if there are no eligible
675
// candidate syncers.
676
func chooseRandomSyncer(syncers map[route.Vertex]*GossipSyncer,
677
        action func(*GossipSyncer) error) *GossipSyncer {
19✔
678

19✔
679
        for _, s := range syncers {
37✔
680
                // Only syncers in a chansSynced state are viable for sync
18✔
681
                // transitions, so skip any that aren't.
18✔
682
                if s.syncState() != chansSynced {
20✔
683
                        continue
2✔
684
                }
685

686
                if action != nil {
29✔
687
                        if err := action(s); err != nil {
13✔
688
                                log.Debugf("Skipping eligible candidate "+
×
689
                                        "GossipSyncer(%x): %v", s.cfg.peerPub,
×
690
                                        err)
×
691
                                continue
×
692
                        }
693
                }
694

695
                return s
16✔
696
        }
697

698
        return nil
3✔
699
}
700

701
// InitSyncState is called by outside sub-systems when a connection is
702
// established to a new peer that understands how to perform channel range
703
// queries. We'll allocate a new GossipSyncer for it, and start any goroutines
704
// needed to handle new queries. The first GossipSyncer registered with the
705
// SyncManager will attempt a historical sync to ensure we have as much of the
706
// public channel graph as possible.
707
//
708
// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer.
709
func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error {
25✔
710
        done := make(chan struct{})
25✔
711

25✔
712
        select {
25✔
713
        case m.newSyncers <- &newSyncer{
714
                peer:     peer,
715
                doneChan: done,
716
        }:
25✔
717
        case <-m.quit:
×
718
                return ErrSyncManagerExiting
×
719
        }
720

721
        select {
25✔
722
        case <-done:
25✔
723
                return nil
25✔
724
        case <-m.quit:
×
725
                return ErrSyncManagerExiting
×
726
        }
727
}
728

729
// PruneSyncState is called by outside sub-systems once a peer that we were
730
// previously connected to has been disconnected. In this case we can stop the
731
// existing GossipSyncer assigned to the peer and free up resources.
732
func (m *SyncManager) PruneSyncState(peer route.Vertex) {
6✔
733
        done := make(chan struct{})
6✔
734

6✔
735
        // We avoid returning an error when the SyncManager is stopped since the
6✔
736
        // GossipSyncer will be stopped then anyway.
6✔
737
        select {
6✔
738
        case m.staleSyncers <- &staleSyncer{
739
                peer:     peer,
740
                doneChan: done,
741
        }:
6✔
742
        case <-m.quit:
×
743
                return
×
744
        }
745

746
        select {
6✔
747
        case <-done:
6✔
748
        case <-m.quit:
×
749
        }
750
}
751

752
// GossipSyncer returns the associated gossip syncer of a peer. The boolean
753
// returned signals whether there exists a gossip syncer for the peer.
754
func (m *SyncManager) GossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
45✔
755
        m.syncersMu.Lock()
45✔
756
        defer m.syncersMu.Unlock()
45✔
757
        return m.gossipSyncer(peer)
45✔
758
}
45✔
759

760
// gossipSyncer returns the associated gossip syncer of a peer. The boolean
761
// returned signals whether there exists a gossip syncer for the peer.
762
func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
51✔
763
        syncer, ok := m.inactiveSyncers[peer]
51✔
764
        if ok {
66✔
765
                return syncer, true
15✔
766
        }
15✔
767
        syncer, ok = m.activeSyncers[peer]
36✔
768
        if ok {
44✔
769
                return syncer, true
8✔
770
        }
8✔
771
        syncer, ok = m.pinnedActiveSyncers[peer]
28✔
772
        if ok {
31✔
773
                return syncer, true
3✔
774
        }
3✔
775
        return nil, false
25✔
776
}
777

778
// GossipSyncers returns all of the currently initialized gossip syncers.
779
func (m *SyncManager) GossipSyncers() map[route.Vertex]*GossipSyncer {
31✔
780
        m.syncersMu.Lock()
31✔
781
        defer m.syncersMu.Unlock()
31✔
782
        return m.gossipSyncers()
31✔
783
}
31✔
784

785
// gossipSyncers returns all of the currently initialized gossip syncers.
786
func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer {
34✔
787
        numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers)
34✔
788
        syncers := make(map[route.Vertex]*GossipSyncer, numSyncers)
34✔
789

34✔
790
        for _, syncer := range m.inactiveSyncers {
39✔
791
                syncers[syncer.cfg.peerPub] = syncer
5✔
792
        }
5✔
793
        for _, syncer := range m.activeSyncers {
34✔
UNCOV
794
                syncers[syncer.cfg.peerPub] = syncer
×
UNCOV
795
        }
×
796

797
        return syncers
34✔
798
}
799

800
// markGraphSynced allows us to report that the initial historical sync has
801
// completed.
802
func (m *SyncManager) markGraphSynced() {
41✔
803
        atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
41✔
804
}
41✔
805

806
// IsGraphSynced determines whether we've completed our initial historical sync.
807
// The initial historical sync is done to ensure we've ingested as much of the
808
// public graph as possible.
809
func (m *SyncManager) IsGraphSynced() bool {
288✔
810
        return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
288✔
811
}
288✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc