• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13566028875

27 Feb 2025 12:09PM UTC coverage: 49.396% (-9.4%) from 58.748%
13566028875

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

34 of 54 new or added lines in 4 files covered. (62.96%)

27464 existing lines in 436 files now uncovered.

101095 of 204664 relevant lines covered (49.4%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.72
/chanbackup/pubsub.go
1
package chanbackup
2

3
import (
4
        "bytes"
5
        "fmt"
6
        "net"
7
        "os"
8
        "sync"
9

10
        "github.com/btcsuite/btcd/wire"
11
        "github.com/lightningnetwork/lnd/channeldb"
12
        "github.com/lightningnetwork/lnd/keychain"
13
        "github.com/lightningnetwork/lnd/lnutils"
14
)
15

16
// Swapper is an interface that allows the chanbackup.SubSwapper to update the
17
// main multi backup location once it learns of new channels or that prior
18
// channels have been closed.
19
type Swapper interface {
20
        // UpdateAndSwap attempts to atomically update the main multi back up
21
        // file location with the new fully packed multi-channel backup.
22
        UpdateAndSwap(newBackup PackedMulti) error
23

24
        // ExtractMulti attempts to obtain and decode the current SCB instance
25
        // stored by the Swapper instance.
26
        ExtractMulti(keychain keychain.KeyRing) (*Multi, error)
27
}
28

29
// ChannelWithAddrs bundles an open channel along with all the addresses for
30
// the channel peer.
31
type ChannelWithAddrs struct {
32
        *channeldb.OpenChannel
33

34
        // Addrs is the set of addresses that we can use to reach the target
35
        // peer.
36
        Addrs []net.Addr
37
}
38

39
// ChannelEvent packages a new update of new channels since subscription, and
40
// channels that have been opened since prior channel event.
41
type ChannelEvent struct {
42
        // ClosedChans are the set of channels that have been closed since the
43
        // last event.
44
        ClosedChans []wire.OutPoint
45

46
        // NewChans is the set of channels that have been opened since the last
47
        // event.
48
        NewChans []ChannelWithAddrs
49
}
50

51
// manualUpdate holds a group of channel state updates and an error channel
52
// to send back an error happened upon update processing or file updating.
53
type manualUpdate struct {
54
        // singles hold channels backups. They can be either new or known
55
        // channels in the Swapper.
56
        singles []Single
57

58
        // errChan is the channel to send an error back. If the update handling
59
        // and the subsequent file updating succeeds, nil is sent.
60
        // The channel must have capacity of 1 to prevent Swapper blocking.
61
        errChan chan error
62
}
63

64
// ChannelSubscription represents an intent to be notified of any updates to
65
// the primary channel state.
66
type ChannelSubscription struct {
67
        // ChanUpdates is a channel that will be sent upon once the primary
68
        // channel state is updated.
69
        ChanUpdates chan ChannelEvent
70

71
        // Cancel is a closure that allows the caller to cancel their
72
        // subscription and free up any resources allocated.
73
        Cancel func()
74
}
75

76
// ChannelNotifier represents a system that allows the chanbackup.SubSwapper to
77
// be notified of any changes to the primary channel state.
78
type ChannelNotifier interface {
79
        // SubscribeChans requests a new channel subscription relative to the
80
        // initial set of known channels. We use the knownChans as a
81
        // synchronization point to ensure that the chanbackup.SubSwapper does
82
        // not miss any channel open or close events in the period between when
83
        // it's created, and when it requests the channel subscription.
84
        SubscribeChans(map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
85
}
86

87
// SubSwapper subscribes to new updates to the open channel state, and then
88
// swaps out the on-disk channel backup state in response.  This sub-system
89
// that will ensure that the multi chan backup file on disk will always be
90
// updated with the latest channel back up state. We'll receive new
91
// opened/closed channels from the ChannelNotifier, then use the Swapper to
92
// update the file state on disk with the new set of open channels.  This can
93
// be used to implement a system that always keeps the multi-chan backup file
94
// on disk in a consistent state for safety purposes.
95
type SubSwapper struct {
96
        started sync.Once
97
        stopped sync.Once
98

99
        // backupState are the set of SCBs for all open channels we know of.
100
        backupState map[wire.OutPoint]Single
101

102
        // chanEvents is an active subscription to receive new channel state
103
        // over.
104
        chanEvents *ChannelSubscription
105

106
        manualUpdates chan manualUpdate
107

108
        // keyRing is the main key ring that will allow us to pack the new
109
        // multi backup.
110
        keyRing keychain.KeyRing
111

112
        Swapper
113

114
        quit chan struct{}
115
        wg   sync.WaitGroup
116
}
117

118
// NewSubSwapper creates a new instance of the SubSwapper given the starting
119
// set of channels, and the required interfaces to be notified of new channel
120
// updates, pack a multi backup, and swap the current best backup from its
121
// storage location.
122
func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
123
        keyRing keychain.KeyRing, backupSwapper Swapper) (*SubSwapper, error) {
3✔
124

3✔
125
        // First, we'll subscribe to the latest set of channel updates given
3✔
126
        // the set of channels we already know of.
3✔
127
        knownChans := make(map[wire.OutPoint]struct{})
3✔
128
        for _, chanBackup := range startingChans {
6✔
129
                knownChans[chanBackup.FundingOutpoint] = struct{}{}
3✔
130
        }
3✔
131
        chanEvents, err := chanNotifier.SubscribeChans(knownChans)
3✔
132
        if err != nil {
3✔
UNCOV
133
                return nil, err
×
UNCOV
134
        }
×
135

136
        // Next, we'll construct our own backup state so we can add/remove
137
        // channels that have been opened and closed.
138
        backupState := make(map[wire.OutPoint]Single)
3✔
139
        for _, chanBackup := range startingChans {
6✔
140
                backupState[chanBackup.FundingOutpoint] = chanBackup
3✔
141
        }
3✔
142

143
        return &SubSwapper{
3✔
144
                backupState:   backupState,
3✔
145
                chanEvents:    chanEvents,
3✔
146
                keyRing:       keyRing,
3✔
147
                Swapper:       backupSwapper,
3✔
148
                quit:          make(chan struct{}),
3✔
149
                manualUpdates: make(chan manualUpdate),
3✔
150
        }, nil
3✔
151
}
152

153
// Start starts the chanbackup.SubSwapper.
154
func (s *SubSwapper) Start() error {
3✔
155
        var startErr error
3✔
156
        s.started.Do(func() {
6✔
157
                log.Infof("chanbackup.SubSwapper starting")
3✔
158

3✔
159
                // Before we enter our main loop, we'll update the on-disk
3✔
160
                // state with the latest Single state, as nodes may have new
3✔
161
                // advertised addresses.
3✔
162
                if err := s.updateBackupFile(); err != nil {
3✔
163
                        startErr = fmt.Errorf("unable to refresh backup "+
×
164
                                "file: %v", err)
×
165
                        return
×
166
                }
×
167

168
                s.wg.Add(1)
3✔
169
                go s.backupUpdater()
3✔
170
        })
171

172
        return startErr
3✔
173
}
174

175
// Stop signals the SubSwapper to being a graceful shutdown.
176
func (s *SubSwapper) Stop() error {
3✔
177
        s.stopped.Do(func() {
6✔
178
                log.Infof("chanbackup.SubSwapper shutting down...")
3✔
179
                defer log.Debug("chanbackup.SubSwapper shutdown complete")
3✔
180

3✔
181
                close(s.quit)
3✔
182
                s.wg.Wait()
3✔
183
        })
3✔
184
        return nil
3✔
185
}
186

187
// ManualUpdate inserts/updates channel states into the swapper. The updates
188
// are processed in another goroutine. The method waits for the updates to be
189
// fully processed and the file to be updated on-disk before returning.
190
func (s *SubSwapper) ManualUpdate(singles []Single) error {
3✔
191
        // Create the channel to send an error back. If the update handling
3✔
192
        // and the subsequent file updating succeeds, nil is sent.
3✔
193
        // The channel must have capacity of 1 to prevent Swapper blocking.
3✔
194
        errChan := make(chan error, 1)
3✔
195

3✔
196
        // Create the update object to insert into the processing loop.
3✔
197
        update := manualUpdate{
3✔
198
                singles: singles,
3✔
199
                errChan: errChan,
3✔
200
        }
3✔
201

3✔
202
        select {
3✔
203
        case s.manualUpdates <- update:
3✔
204
        case <-s.quit:
×
205
                return fmt.Errorf("swapper stopped when sending manual update")
×
206
        }
207

208
        // Wait for processing, block on errChan.
209
        select {
3✔
210
        case err := <-errChan:
3✔
211
                if err != nil {
6✔
212
                        return fmt.Errorf("processing of manual update "+
3✔
213
                                "failed: %w", err)
3✔
214
                }
3✔
215

216
        case <-s.quit:
×
217
                return fmt.Errorf("swapper stopped when waiting for outcome")
×
218
        }
219

220
        // Success.
221
        return nil
3✔
222
}
223

224
// updateBackupFile updates the backup file in place given the current state of
225
// the SubSwapper. We accept the set of channels that were closed between this
226
// update and the last to make sure we leave them out of our backup set union.
227
func (s *SubSwapper) updateBackupFile(closedChans ...wire.OutPoint) error {
3✔
228
        // Before we pack the new set of SCBs, we'll first decode what we
3✔
229
        // already have on-disk, to make sure we can decode it (proper seed)
3✔
230
        // and that we're able to combine it with our new data.
3✔
231
        diskMulti, err := s.Swapper.ExtractMulti(s.keyRing)
3✔
232

3✔
233
        // If the file doesn't exist on disk, then that's OK as it was never
3✔
234
        // created. In this case we'll continue onwards as it isn't a critical
3✔
235
        // error.
3✔
236
        if err != nil && !os.IsNotExist(err) {
3✔
237
                return fmt.Errorf("unable to extract on disk encrypted "+
×
238
                        "SCB: %v", err)
×
239
        }
×
240

241
        // Now that we have channels stored on-disk, we'll create a new set of
242
        // the combined old and new channels to make sure we retain what's
243
        // already on-disk.
244
        //
245
        // NOTE: The ordering of this operations means that our in-memory
246
        // structure will replace what we read from disk.
247
        combinedBackup := make(map[wire.OutPoint]Single)
3✔
248
        if diskMulti != nil {
6✔
249
                for _, diskChannel := range diskMulti.StaticBackups {
6✔
250
                        chanPoint := diskChannel.FundingOutpoint
3✔
251
                        combinedBackup[chanPoint] = diskChannel
3✔
252
                }
3✔
253
        }
254
        for _, memChannel := range s.backupState {
6✔
255
                chanPoint := memChannel.FundingOutpoint
3✔
256
                if _, ok := combinedBackup[chanPoint]; ok {
6✔
257
                        log.Warnf("Replacing disk backup for ChannelPoint(%v) "+
3✔
258
                                "w/ newer version", chanPoint)
3✔
259
                }
3✔
260

261
                combinedBackup[chanPoint] = memChannel
3✔
262
        }
263

264
        // Remove the set of closed channels from the final set of backups.
265
        for _, closedChan := range closedChans {
6✔
266
                delete(combinedBackup, closedChan)
3✔
267
        }
3✔
268

269
        // With our updated channel state obtained, we'll create a new multi
270
        // from our series of singles.
271
        var newMulti Multi
3✔
272
        for _, backup := range combinedBackup {
6✔
273
                newMulti.StaticBackups = append(
3✔
274
                        newMulti.StaticBackups, backup,
3✔
275
                )
3✔
276
        }
3✔
277

278
        // Now that our multi has been assembled, we'll attempt to pack
279
        // (encrypt+encode) the new channel state to our target reader.
280
        var b bytes.Buffer
3✔
281
        err = newMulti.PackToWriter(&b, s.keyRing)
3✔
282
        if err != nil {
3✔
283
                return fmt.Errorf("unable to pack multi backup: %w", err)
×
284
        }
×
285

286
        // Finally, we'll swap out the old backup for this new one in a single
287
        // atomic step, combining the file already on-disk with this set of new
288
        // channels.
289
        err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes()))
3✔
290
        if err != nil {
6✔
291
                return fmt.Errorf("unable to update multi backup: %w", err)
3✔
292
        }
3✔
293

294
        return nil
3✔
295
}
296

297
// backupFileUpdater is the primary goroutine of the SubSwapper which is
298
// responsible for listening for changes to the channel, and updating the
299
// persistent multi backup state with a new packed multi of the latest channel
300
// state.
301
func (s *SubSwapper) backupUpdater() {
3✔
302
        // Ensure that once we exit, we'll cancel our active channel
3✔
303
        // subscription.
3✔
304
        defer s.chanEvents.Cancel()
3✔
305
        defer s.wg.Done()
3✔
306

3✔
307
        log.Debugf("SubSwapper's backupUpdater is active!")
3✔
308

3✔
309
        for {
6✔
310
                select {
3✔
311
                // The channel state has been modified! We'll evaluate all
312
                // changes, and swap out the old packed multi with a new one
313
                // with the latest channel state.
314
                case chanUpdate := <-s.chanEvents.ChanUpdates:
3✔
315
                        oldStateSize := len(s.backupState)
3✔
316

3✔
317
                        // For all new open channels, we'll create a new SCB
3✔
318
                        // given the required information.
3✔
319
                        for _, newChan := range chanUpdate.NewChans {
6✔
320
                                log.Debugf("Adding channel %v to backup state",
3✔
321
                                        newChan.FundingOutpoint)
3✔
322

3✔
323
                                single := NewSingle(
3✔
324
                                        newChan.OpenChannel, newChan.Addrs,
3✔
325
                                )
3✔
326
                                s.backupState[newChan.FundingOutpoint] = single
3✔
327
                        }
3✔
328

329
                        // For all closed channels, we'll remove the prior
330
                        // backup state.
331
                        closedChans := make(
3✔
332
                                []wire.OutPoint, 0, len(chanUpdate.ClosedChans),
3✔
333
                        )
3✔
334
                        for i, closedChan := range chanUpdate.ClosedChans {
6✔
335
                                log.Debugf("Removing channel %v from backup "+
3✔
336
                                        "state", lnutils.NewLogClosure(
3✔
337
                                        chanUpdate.ClosedChans[i].String))
3✔
338

3✔
339
                                delete(s.backupState, closedChan)
3✔
340

3✔
341
                                closedChans = append(closedChans, closedChan)
3✔
342
                        }
3✔
343

344
                        newStateSize := len(s.backupState)
3✔
345

3✔
346
                        log.Infof("Updating on-disk multi SCB backup: "+
3✔
347
                                "num_old_chans=%v, num_new_chans=%v",
3✔
348
                                oldStateSize, newStateSize)
3✔
349

3✔
350
                        // Without new state constructed, we'll, atomically
3✔
351
                        // update the on-disk backup state.
3✔
352
                        if err := s.updateBackupFile(closedChans...); err != nil {
3✔
353
                                log.Errorf("unable to update backup file: %v",
×
354
                                        err)
×
355
                        }
×
356

357
                // We received a manual update. Handle it and update the file.
358
                case manualUpdate := <-s.manualUpdates:
3✔
359
                        oldStateSize := len(s.backupState)
3✔
360

3✔
361
                        // For all open channels, we'll create a new SCB given
3✔
362
                        // the required information.
3✔
363
                        for _, single := range manualUpdate.singles {
6✔
364
                                log.Debugf("Manual update of channel %v",
3✔
365
                                        single.FundingOutpoint)
3✔
366

3✔
367
                                s.backupState[single.FundingOutpoint] = single
3✔
368
                        }
3✔
369

370
                        newStateSize := len(s.backupState)
3✔
371

3✔
372
                        log.Infof("Updating on-disk multi SCB backup: "+
3✔
373
                                "num_old_chans=%v, num_new_chans=%v",
3✔
374
                                oldStateSize, newStateSize)
3✔
375

3✔
376
                        // Without new state constructed, we'll, atomically
3✔
377
                        // update the on-disk backup state.
3✔
378
                        err := s.updateBackupFile()
3✔
379
                        if err != nil {
6✔
380
                                log.Errorf("unable to update backup file: %v",
3✔
381
                                        err)
3✔
382
                        }
3✔
383

384
                        // Send the error (or nil) to the caller of
385
                        // ManualUpdate. The error channel must have capacity of
386
                        // 1 not to block here.
387
                        manualUpdate.errChan <- err
3✔
388

389
                // TODO(roasbeef): refresh periodically on a time basis due to
390
                // possible addr changes from node
391

392
                // Exit at once if a quit signal is detected.
393
                case <-s.quit:
3✔
394
                        return
3✔
395
                }
396
        }
397
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc