• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12199391122

06 Dec 2024 01:10PM UTC coverage: 49.807% (-9.1%) from 58.933%
12199391122

push

github

web-flow
Merge pull request #9337 from Guayaba221/patch-1

chore: fix typo in ruby.md

100137 of 201051 relevant lines covered (49.81%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.72
/chanbackup/pubsub.go
1
package chanbackup
2

3
import (
4
        "bytes"
5
        "fmt"
6
        "net"
7
        "os"
8
        "sync"
9

10
        "github.com/btcsuite/btcd/wire"
11
        "github.com/lightningnetwork/lnd/channeldb"
12
        "github.com/lightningnetwork/lnd/keychain"
13
        "github.com/lightningnetwork/lnd/lnutils"
14
)
15

16
// Swapper is an interface that allows the chanbackup.SubSwapper to update the
17
// main multi backup location once it learns of new channels or that prior
18
// channels have been closed.
19
type Swapper interface {
20
        // UpdateAndSwap attempts to atomically update the main multi back up
21
        // file location with the new fully packed multi-channel backup.
22
        UpdateAndSwap(newBackup PackedMulti) error
23

24
        // ExtractMulti attempts to obtain and decode the current SCB instance
25
        // stored by the Swapper instance.
26
        ExtractMulti(keychain keychain.KeyRing) (*Multi, error)
27
}
28

29
// ChannelWithAddrs bundles an open channel along with all the addresses for
30
// the channel peer.
31
type ChannelWithAddrs struct {
32
        *channeldb.OpenChannel
33

34
        // Addrs is the set of addresses that we can use to reach the target
35
        // peer.
36
        Addrs []net.Addr
37
}
38

39
// ChannelEvent packages a new update of new channels since subscription, and
40
// channels that have been opened since prior channel event.
41
type ChannelEvent struct {
42
        // ClosedChans are the set of channels that have been closed since the
43
        // last event.
44
        ClosedChans []wire.OutPoint
45

46
        // NewChans is the set of channels that have been opened since the last
47
        // event.
48
        NewChans []ChannelWithAddrs
49
}
50

51
// manualUpdate holds a group of channel state updates and an error channel
52
// to send back an error happened upon update processing or file updating.
53
type manualUpdate struct {
54
        // singles hold channels backups. They can be either new or known
55
        // channels in the Swapper.
56
        singles []Single
57

58
        // errChan is the channel to send an error back. If the update handling
59
        // and the subsequent file updating succeeds, nil is sent.
60
        // The channel must have capacity of 1 to prevent Swapper blocking.
61
        errChan chan error
62
}
63

64
// ChannelSubscription represents an intent to be notified of any updates to
65
// the primary channel state.
66
type ChannelSubscription struct {
67
        // ChanUpdates is a channel that will be sent upon once the primary
68
        // channel state is updated.
69
        ChanUpdates chan ChannelEvent
70

71
        // Cancel is a closure that allows the caller to cancel their
72
        // subscription and free up any resources allocated.
73
        Cancel func()
74
}
75

76
// ChannelNotifier represents a system that allows the chanbackup.SubSwapper to
77
// be notified of any changes to the primary channel state.
78
type ChannelNotifier interface {
79
        // SubscribeChans requests a new channel subscription relative to the
80
        // initial set of known channels. We use the knownChans as a
81
        // synchronization point to ensure that the chanbackup.SubSwapper does
82
        // not miss any channel open or close events in the period between when
83
        // it's created, and when it requests the channel subscription.
84
        SubscribeChans(map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
85
}
86

87
// SubSwapper subscribes to new updates to the open channel state, and then
88
// swaps out the on-disk channel backup state in response.  This sub-system
89
// that will ensure that the multi chan backup file on disk will always be
90
// updated with the latest channel back up state. We'll receive new
91
// opened/closed channels from the ChannelNotifier, then use the Swapper to
92
// update the file state on disk with the new set of open channels.  This can
93
// be used to implement a system that always keeps the multi-chan backup file
94
// on disk in a consistent state for safety purposes.
95
type SubSwapper struct {
96
        started sync.Once
97
        stopped sync.Once
98

99
        // backupState are the set of SCBs for all open channels we know of.
100
        backupState map[wire.OutPoint]Single
101

102
        // chanEvents is an active subscription to receive new channel state
103
        // over.
104
        chanEvents *ChannelSubscription
105

106
        manualUpdates chan manualUpdate
107

108
        // keyRing is the main key ring that will allow us to pack the new
109
        // multi backup.
110
        keyRing keychain.KeyRing
111

112
        Swapper
113

114
        quit chan struct{}
115
        wg   sync.WaitGroup
116
}
117

118
// NewSubSwapper creates a new instance of the SubSwapper given the starting
119
// set of channels, and the required interfaces to be notified of new channel
120
// updates, pack a multi backup, and swap the current best backup from its
121
// storage location.
122
func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
123
        keyRing keychain.KeyRing, backupSwapper Swapper) (*SubSwapper, error) {
4✔
124

4✔
125
        // First, we'll subscribe to the latest set of channel updates given
4✔
126
        // the set of channels we already know of.
4✔
127
        knownChans := make(map[wire.OutPoint]struct{})
4✔
128
        for _, chanBackup := range startingChans {
8✔
129
                knownChans[chanBackup.FundingOutpoint] = struct{}{}
4✔
130
        }
4✔
131
        chanEvents, err := chanNotifier.SubscribeChans(knownChans)
4✔
132
        if err != nil {
4✔
133
                return nil, err
×
134
        }
×
135

136
        // Next, we'll construct our own backup state so we can add/remove
137
        // channels that have been opened and closed.
138
        backupState := make(map[wire.OutPoint]Single)
4✔
139
        for _, chanBackup := range startingChans {
8✔
140
                backupState[chanBackup.FundingOutpoint] = chanBackup
4✔
141
        }
4✔
142

143
        return &SubSwapper{
4✔
144
                backupState:   backupState,
4✔
145
                chanEvents:    chanEvents,
4✔
146
                keyRing:       keyRing,
4✔
147
                Swapper:       backupSwapper,
4✔
148
                quit:          make(chan struct{}),
4✔
149
                manualUpdates: make(chan manualUpdate),
4✔
150
        }, nil
4✔
151
}
152

153
// Start starts the chanbackup.SubSwapper.
154
func (s *SubSwapper) Start() error {
4✔
155
        var startErr error
4✔
156
        s.started.Do(func() {
8✔
157
                log.Infof("chanbackup.SubSwapper starting")
4✔
158

4✔
159
                // Before we enter our main loop, we'll update the on-disk
4✔
160
                // state with the latest Single state, as nodes may have new
4✔
161
                // advertised addresses.
4✔
162
                if err := s.updateBackupFile(); err != nil {
4✔
163
                        startErr = fmt.Errorf("unable to refresh backup "+
×
164
                                "file: %v", err)
×
165
                        return
×
166
                }
×
167

168
                s.wg.Add(1)
4✔
169
                go s.backupUpdater()
4✔
170
        })
171

172
        return startErr
4✔
173
}
174

175
// Stop signals the SubSwapper to being a graceful shutdown.
176
func (s *SubSwapper) Stop() error {
4✔
177
        s.stopped.Do(func() {
8✔
178
                log.Infof("chanbackup.SubSwapper shutting down...")
4✔
179
                defer log.Debug("chanbackup.SubSwapper shutdown complete")
4✔
180

4✔
181
                close(s.quit)
4✔
182
                s.wg.Wait()
4✔
183
        })
4✔
184
        return nil
4✔
185
}
186

187
// ManualUpdate inserts/updates channel states into the swapper. The updates
188
// are processed in another goroutine. The method waits for the updates to be
189
// fully processed and the file to be updated on-disk before returning.
190
func (s *SubSwapper) ManualUpdate(singles []Single) error {
4✔
191
        // Create the channel to send an error back. If the update handling
4✔
192
        // and the subsequent file updating succeeds, nil is sent.
4✔
193
        // The channel must have capacity of 1 to prevent Swapper blocking.
4✔
194
        errChan := make(chan error, 1)
4✔
195

4✔
196
        // Create the update object to insert into the processing loop.
4✔
197
        update := manualUpdate{
4✔
198
                singles: singles,
4✔
199
                errChan: errChan,
4✔
200
        }
4✔
201

4✔
202
        select {
4✔
203
        case s.manualUpdates <- update:
4✔
204
        case <-s.quit:
×
205
                return fmt.Errorf("swapper stopped when sending manual update")
×
206
        }
207

208
        // Wait for processing, block on errChan.
209
        select {
4✔
210
        case err := <-errChan:
4✔
211
                if err != nil {
8✔
212
                        return fmt.Errorf("processing of manual update "+
4✔
213
                                "failed: %w", err)
4✔
214
                }
4✔
215

216
        case <-s.quit:
×
217
                return fmt.Errorf("swapper stopped when waiting for outcome")
×
218
        }
219

220
        // Success.
221
        return nil
4✔
222
}
223

224
// updateBackupFile updates the backup file in place given the current state of
225
// the SubSwapper. We accept the set of channels that were closed between this
226
// update and the last to make sure we leave them out of our backup set union.
227
func (s *SubSwapper) updateBackupFile(closedChans ...wire.OutPoint) error {
4✔
228
        // Before we pack the new set of SCBs, we'll first decode what we
4✔
229
        // already have on-disk, to make sure we can decode it (proper seed)
4✔
230
        // and that we're able to combine it with our new data.
4✔
231
        diskMulti, err := s.Swapper.ExtractMulti(s.keyRing)
4✔
232

4✔
233
        // If the file doesn't exist on disk, then that's OK as it was never
4✔
234
        // created. In this case we'll continue onwards as it isn't a critical
4✔
235
        // error.
4✔
236
        if err != nil && !os.IsNotExist(err) {
4✔
237
                return fmt.Errorf("unable to extract on disk encrypted "+
×
238
                        "SCB: %v", err)
×
239
        }
×
240

241
        // Now that we have channels stored on-disk, we'll create a new set of
242
        // the combined old and new channels to make sure we retain what's
243
        // already on-disk.
244
        //
245
        // NOTE: The ordering of this operations means that our in-memory
246
        // structure will replace what we read from disk.
247
        combinedBackup := make(map[wire.OutPoint]Single)
4✔
248
        if diskMulti != nil {
8✔
249
                for _, diskChannel := range diskMulti.StaticBackups {
8✔
250
                        chanPoint := diskChannel.FundingOutpoint
4✔
251
                        combinedBackup[chanPoint] = diskChannel
4✔
252
                }
4✔
253
        }
254
        for _, memChannel := range s.backupState {
8✔
255
                chanPoint := memChannel.FundingOutpoint
4✔
256
                if _, ok := combinedBackup[chanPoint]; ok {
8✔
257
                        log.Warnf("Replacing disk backup for ChannelPoint(%v) "+
4✔
258
                                "w/ newer version", chanPoint)
4✔
259
                }
4✔
260

261
                combinedBackup[chanPoint] = memChannel
4✔
262
        }
263

264
        // Remove the set of closed channels from the final set of backups.
265
        for _, closedChan := range closedChans {
8✔
266
                delete(combinedBackup, closedChan)
4✔
267
        }
4✔
268

269
        // With our updated channel state obtained, we'll create a new multi
270
        // from our series of singles.
271
        var newMulti Multi
4✔
272
        for _, backup := range combinedBackup {
8✔
273
                newMulti.StaticBackups = append(
4✔
274
                        newMulti.StaticBackups, backup,
4✔
275
                )
4✔
276
        }
4✔
277

278
        // Now that our multi has been assembled, we'll attempt to pack
279
        // (encrypt+encode) the new channel state to our target reader.
280
        var b bytes.Buffer
4✔
281
        err = newMulti.PackToWriter(&b, s.keyRing)
4✔
282
        if err != nil {
4✔
283
                return fmt.Errorf("unable to pack multi backup: %w", err)
×
284
        }
×
285

286
        // Finally, we'll swap out the old backup for this new one in a single
287
        // atomic step, combining the file already on-disk with this set of new
288
        // channels.
289
        err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes()))
4✔
290
        if err != nil {
8✔
291
                return fmt.Errorf("unable to update multi backup: %w", err)
4✔
292
        }
4✔
293

294
        return nil
4✔
295
}
296

297
// backupFileUpdater is the primary goroutine of the SubSwapper which is
298
// responsible for listening for changes to the channel, and updating the
299
// persistent multi backup state with a new packed multi of the latest channel
300
// state.
301
func (s *SubSwapper) backupUpdater() {
4✔
302
        // Ensure that once we exit, we'll cancel our active channel
4✔
303
        // subscription.
4✔
304
        defer s.chanEvents.Cancel()
4✔
305
        defer s.wg.Done()
4✔
306

4✔
307
        log.Debugf("SubSwapper's backupUpdater is active!")
4✔
308

4✔
309
        for {
8✔
310
                select {
4✔
311
                // The channel state has been modified! We'll evaluate all
312
                // changes, and swap out the old packed multi with a new one
313
                // with the latest channel state.
314
                case chanUpdate := <-s.chanEvents.ChanUpdates:
4✔
315
                        oldStateSize := len(s.backupState)
4✔
316

4✔
317
                        // For all new open channels, we'll create a new SCB
4✔
318
                        // given the required information.
4✔
319
                        for _, newChan := range chanUpdate.NewChans {
8✔
320
                                log.Debugf("Adding channel %v to backup state",
4✔
321
                                        newChan.FundingOutpoint)
4✔
322

4✔
323
                                single := NewSingle(
4✔
324
                                        newChan.OpenChannel, newChan.Addrs,
4✔
325
                                )
4✔
326
                                s.backupState[newChan.FundingOutpoint] = single
4✔
327
                        }
4✔
328

329
                        // For all closed channels, we'll remove the prior
330
                        // backup state.
331
                        closedChans := make(
4✔
332
                                []wire.OutPoint, 0, len(chanUpdate.ClosedChans),
4✔
333
                        )
4✔
334
                        for i, closedChan := range chanUpdate.ClosedChans {
8✔
335
                                log.Debugf("Removing channel %v from backup "+
4✔
336
                                        "state", lnutils.NewLogClosure(
4✔
337
                                        chanUpdate.ClosedChans[i].String))
4✔
338

4✔
339
                                delete(s.backupState, closedChan)
4✔
340

4✔
341
                                closedChans = append(closedChans, closedChan)
4✔
342
                        }
4✔
343

344
                        newStateSize := len(s.backupState)
4✔
345

4✔
346
                        log.Infof("Updating on-disk multi SCB backup: "+
4✔
347
                                "num_old_chans=%v, num_new_chans=%v",
4✔
348
                                oldStateSize, newStateSize)
4✔
349

4✔
350
                        // Without new state constructed, we'll, atomically
4✔
351
                        // update the on-disk backup state.
4✔
352
                        if err := s.updateBackupFile(closedChans...); err != nil {
4✔
353
                                log.Errorf("unable to update backup file: %v",
×
354
                                        err)
×
355
                        }
×
356

357
                // We received a manual update. Handle it and update the file.
358
                case manualUpdate := <-s.manualUpdates:
4✔
359
                        oldStateSize := len(s.backupState)
4✔
360

4✔
361
                        // For all open channels, we'll create a new SCB given
4✔
362
                        // the required information.
4✔
363
                        for _, single := range manualUpdate.singles {
8✔
364
                                log.Debugf("Manual update of channel %v",
4✔
365
                                        single.FundingOutpoint)
4✔
366

4✔
367
                                s.backupState[single.FundingOutpoint] = single
4✔
368
                        }
4✔
369

370
                        newStateSize := len(s.backupState)
4✔
371

4✔
372
                        log.Infof("Updating on-disk multi SCB backup: "+
4✔
373
                                "num_old_chans=%v, num_new_chans=%v",
4✔
374
                                oldStateSize, newStateSize)
4✔
375

4✔
376
                        // Without new state constructed, we'll, atomically
4✔
377
                        // update the on-disk backup state.
4✔
378
                        err := s.updateBackupFile()
4✔
379
                        if err != nil {
8✔
380
                                log.Errorf("unable to update backup file: %v",
4✔
381
                                        err)
4✔
382
                        }
4✔
383

384
                        // Send the error (or nil) to the caller of
385
                        // ManualUpdate. The error channel must have capacity of
386
                        // 1 not to block here.
387
                        manualUpdate.errChan <- err
4✔
388

389
                // TODO(roasbeef): refresh periodically on a time basis due to
390
                // possible addr changes from node
391

392
                // Exit at once if a quit signal is detected.
393
                case <-s.quit:
4✔
394
                        return
4✔
395
                }
396
        }
397
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc