• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11292787765

11 Oct 2024 12:58PM UTC coverage: 49.179% (-9.5%) from 58.716%
11292787765

push

github

web-flow
Merge pull request #9168 from feelancer21/fix-lncli-wallet-proto

lnrpc: fix lncli documentation tags in walletkit.proto

97369 of 197987 relevant lines covered (49.18%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.73
/chanbackup/pubsub.go
1
package chanbackup
2

3
import (
4
        "bytes"
5
        "fmt"
6
        "net"
7
        "os"
8
        "sync"
9

10
        "github.com/btcsuite/btcd/wire"
11
        "github.com/lightningnetwork/lnd/channeldb"
12
        "github.com/lightningnetwork/lnd/keychain"
13
        "github.com/lightningnetwork/lnd/lnutils"
14
)
15

16
// Swapper is an interface that allows the chanbackup.SubSwapper to update the
17
// main multi backup location once it learns of new channels or that prior
18
// channels have been closed.
19
type Swapper interface {
20
        // UpdateAndSwap attempts to atomically update the main multi back up
21
        // file location with the new fully packed multi-channel backup.
22
        UpdateAndSwap(newBackup PackedMulti) error
23

24
        // ExtractMulti attempts to obtain and decode the current SCB instance
25
        // stored by the Swapper instance.
26
        ExtractMulti(keychain keychain.KeyRing) (*Multi, error)
27
}
28

29
// ChannelWithAddrs bundles an open channel along with all the addresses for
30
// the channel peer.
31
type ChannelWithAddrs struct {
32
        *channeldb.OpenChannel
33

34
        // Addrs is the set of addresses that we can use to reach the target
35
        // peer.
36
        Addrs []net.Addr
37
}
38

39
// ChannelEvent packages a new update of new channels since subscription, and
40
// channels that have been opened since prior channel event.
41
type ChannelEvent struct {
42
        // ClosedChans are the set of channels that have been closed since the
43
        // last event.
44
        ClosedChans []wire.OutPoint
45

46
        // NewChans is the set of channels that have been opened since the last
47
        // event.
48
        NewChans []ChannelWithAddrs
49
}
50

51
// ChannelSubscription represents an intent to be notified of any updates to
52
// the primary channel state.
53
type ChannelSubscription struct {
54
        // ChanUpdates is a channel that will be sent upon once the primary
55
        // channel state is updated.
56
        ChanUpdates chan ChannelEvent
57

58
        // Cancel is a closure that allows the caller to cancel their
59
        // subscription and free up any resources allocated.
60
        Cancel func()
61
}
62

63
// ChannelNotifier represents a system that allows the chanbackup.SubSwapper to
64
// be notified of any changes to the primary channel state.
65
type ChannelNotifier interface {
66
        // SubscribeChans requests a new channel subscription relative to the
67
        // initial set of known channels. We use the knownChans as a
68
        // synchronization point to ensure that the chanbackup.SubSwapper does
69
        // not miss any channel open or close events in the period between when
70
        // it's created, and when it requests the channel subscription.
71
        SubscribeChans(map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
72
}
73

74
// SubSwapper subscribes to new updates to the open channel state, and then
75
// swaps out the on-disk channel backup state in response.  This sub-system
76
// that will ensure that the multi chan backup file on disk will always be
77
// updated with the latest channel back up state. We'll receive new
78
// opened/closed channels from the ChannelNotifier, then use the Swapper to
79
// update the file state on disk with the new set of open channels.  This can
80
// be used to implement a system that always keeps the multi-chan backup file
81
// on disk in a consistent state for safety purposes.
82
type SubSwapper struct {
83
        started sync.Once
84
        stopped sync.Once
85

86
        // backupState are the set of SCBs for all open channels we know of.
87
        backupState map[wire.OutPoint]Single
88

89
        // chanEvents is an active subscription to receive new channel state
90
        // over.
91
        chanEvents *ChannelSubscription
92

93
        // keyRing is the main key ring that will allow us to pack the new
94
        // multi backup.
95
        keyRing keychain.KeyRing
96

97
        Swapper
98

99
        quit chan struct{}
100
        wg   sync.WaitGroup
101
}
102

103
// NewSubSwapper creates a new instance of the SubSwapper given the starting
104
// set of channels, and the required interfaces to be notified of new channel
105
// updates, pack a multi backup, and swap the current best backup from its
106
// storage location.
107
func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
108
        keyRing keychain.KeyRing, backupSwapper Swapper) (*SubSwapper, error) {
2✔
109

2✔
110
        // First, we'll subscribe to the latest set of channel updates given
2✔
111
        // the set of channels we already know of.
2✔
112
        knownChans := make(map[wire.OutPoint]struct{})
2✔
113
        for _, chanBackup := range startingChans {
4✔
114
                knownChans[chanBackup.FundingOutpoint] = struct{}{}
2✔
115
        }
2✔
116
        chanEvents, err := chanNotifier.SubscribeChans(knownChans)
2✔
117
        if err != nil {
2✔
118
                return nil, err
×
119
        }
×
120

121
        // Next, we'll construct our own backup state so we can add/remove
122
        // channels that have been opened and closed.
123
        backupState := make(map[wire.OutPoint]Single)
2✔
124
        for _, chanBackup := range startingChans {
4✔
125
                backupState[chanBackup.FundingOutpoint] = chanBackup
2✔
126
        }
2✔
127

128
        return &SubSwapper{
2✔
129
                backupState: backupState,
2✔
130
                chanEvents:  chanEvents,
2✔
131
                keyRing:     keyRing,
2✔
132
                Swapper:     backupSwapper,
2✔
133
                quit:        make(chan struct{}),
2✔
134
        }, nil
2✔
135
}
136

137
// Start starts the chanbackup.SubSwapper.
138
func (s *SubSwapper) Start() error {
2✔
139
        var startErr error
2✔
140
        s.started.Do(func() {
4✔
141
                log.Infof("chanbackup.SubSwapper starting")
2✔
142

2✔
143
                // Before we enter our main loop, we'll update the on-disk
2✔
144
                // state with the latest Single state, as nodes may have new
2✔
145
                // advertised addresses.
2✔
146
                if err := s.updateBackupFile(); err != nil {
2✔
147
                        startErr = fmt.Errorf("unable to refresh backup "+
×
148
                                "file: %v", err)
×
149
                        return
×
150
                }
×
151

152
                s.wg.Add(1)
2✔
153
                go s.backupUpdater()
2✔
154
        })
155

156
        return startErr
2✔
157
}
158

159
// Stop signals the SubSwapper to being a graceful shutdown.
160
func (s *SubSwapper) Stop() error {
2✔
161
        s.stopped.Do(func() {
4✔
162
                log.Infof("chanbackup.SubSwapper shutting down...")
2✔
163
                defer log.Debug("chanbackup.SubSwapper shutdown complete")
2✔
164

2✔
165
                close(s.quit)
2✔
166
                s.wg.Wait()
2✔
167
        })
2✔
168
        return nil
2✔
169
}
170

171
// updateBackupFile updates the backup file in place given the current state of
172
// the SubSwapper. We accept the set of channels that were closed between this
173
// update and the last to make sure we leave them out of our backup set union.
174
func (s *SubSwapper) updateBackupFile(closedChans ...wire.OutPoint) error {
2✔
175
        // Before we pack the new set of SCBs, we'll first decode what we
2✔
176
        // already have on-disk, to make sure we can decode it (proper seed)
2✔
177
        // and that we're able to combine it with our new data.
2✔
178
        diskMulti, err := s.Swapper.ExtractMulti(s.keyRing)
2✔
179

2✔
180
        // If the file doesn't exist on disk, then that's OK as it was never
2✔
181
        // created. In this case we'll continue onwards as it isn't a critical
2✔
182
        // error.
2✔
183
        if err != nil && !os.IsNotExist(err) {
2✔
184
                return fmt.Errorf("unable to extract on disk encrypted "+
×
185
                        "SCB: %v", err)
×
186
        }
×
187

188
        // Now that we have channels stored on-disk, we'll create a new set of
189
        // the combined old and new channels to make sure we retain what's
190
        // already on-disk.
191
        //
192
        // NOTE: The ordering of this operations means that our in-memory
193
        // structure will replace what we read from disk.
194
        combinedBackup := make(map[wire.OutPoint]Single)
2✔
195
        if diskMulti != nil {
4✔
196
                for _, diskChannel := range diskMulti.StaticBackups {
4✔
197
                        chanPoint := diskChannel.FundingOutpoint
2✔
198
                        combinedBackup[chanPoint] = diskChannel
2✔
199
                }
2✔
200
        }
201
        for _, memChannel := range s.backupState {
4✔
202
                chanPoint := memChannel.FundingOutpoint
2✔
203
                if _, ok := combinedBackup[chanPoint]; ok {
4✔
204
                        log.Warnf("Replacing disk backup for ChannelPoint(%v) "+
2✔
205
                                "w/ newer version", chanPoint)
2✔
206
                }
2✔
207

208
                combinedBackup[chanPoint] = memChannel
2✔
209
        }
210

211
        // Remove the set of closed channels from the final set of backups.
212
        for _, closedChan := range closedChans {
4✔
213
                delete(combinedBackup, closedChan)
2✔
214
        }
2✔
215

216
        // With our updated channel state obtained, we'll create a new multi
217
        // from our series of singles.
218
        var newMulti Multi
2✔
219
        for _, backup := range combinedBackup {
4✔
220
                newMulti.StaticBackups = append(
2✔
221
                        newMulti.StaticBackups, backup,
2✔
222
                )
2✔
223
        }
2✔
224

225
        // Now that our multi has been assembled, we'll attempt to pack
226
        // (encrypt+encode) the new channel state to our target reader.
227
        var b bytes.Buffer
2✔
228
        err = newMulti.PackToWriter(&b, s.keyRing)
2✔
229
        if err != nil {
2✔
230
                return fmt.Errorf("unable to pack multi backup: %w", err)
×
231
        }
×
232

233
        // Finally, we'll swap out the old backup for this new one in a single
234
        // atomic step, combining the file already on-disk with this set of new
235
        // channels.
236
        err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes()))
2✔
237
        if err != nil {
2✔
238
                return fmt.Errorf("unable to update multi backup: %w", err)
×
239
        }
×
240

241
        return nil
2✔
242
}
243

244
// backupFileUpdater is the primary goroutine of the SubSwapper which is
245
// responsible for listening for changes to the channel, and updating the
246
// persistent multi backup state with a new packed multi of the latest channel
247
// state.
248
func (s *SubSwapper) backupUpdater() {
2✔
249
        // Ensure that once we exit, we'll cancel our active channel
2✔
250
        // subscription.
2✔
251
        defer s.chanEvents.Cancel()
2✔
252
        defer s.wg.Done()
2✔
253

2✔
254
        log.Debugf("SubSwapper's backupUpdater is active!")
2✔
255

2✔
256
        for {
4✔
257
                select {
2✔
258
                // The channel state has been modified! We'll evaluate all
259
                // changes, and swap out the old packed multi with a new one
260
                // with the latest channel state.
261
                case chanUpdate := <-s.chanEvents.ChanUpdates:
2✔
262
                        oldStateSize := len(s.backupState)
2✔
263

2✔
264
                        // For all new open channels, we'll create a new SCB
2✔
265
                        // given the required information.
2✔
266
                        for _, newChan := range chanUpdate.NewChans {
4✔
267
                                log.Debugf("Adding channel %v to backup state",
2✔
268
                                        newChan.FundingOutpoint)
2✔
269

2✔
270
                                s.backupState[newChan.FundingOutpoint] = NewSingle(
2✔
271
                                        newChan.OpenChannel, newChan.Addrs,
2✔
272
                                )
2✔
273
                        }
2✔
274

275
                        // For all closed channels, we'll remove the prior
276
                        // backup state.
277
                        closedChans := make(
2✔
278
                                []wire.OutPoint, 0, len(chanUpdate.ClosedChans),
2✔
279
                        )
2✔
280
                        for i, closedChan := range chanUpdate.ClosedChans {
4✔
281
                                log.Debugf("Removing channel %v from backup "+
2✔
282
                                        "state", lnutils.NewLogClosure(
2✔
283
                                        chanUpdate.ClosedChans[i].String))
2✔
284

2✔
285
                                delete(s.backupState, closedChan)
2✔
286

2✔
287
                                closedChans = append(closedChans, closedChan)
2✔
288
                        }
2✔
289

290
                        newStateSize := len(s.backupState)
2✔
291

2✔
292
                        log.Infof("Updating on-disk multi SCB backup: "+
2✔
293
                                "num_old_chans=%v, num_new_chans=%v",
2✔
294
                                oldStateSize, newStateSize)
2✔
295

2✔
296
                        // With out new state constructed, we'll, atomically
2✔
297
                        // update the on-disk backup state.
2✔
298
                        if err := s.updateBackupFile(closedChans...); err != nil {
2✔
299
                                log.Errorf("unable to update backup file: %v",
×
300
                                        err)
×
301
                        }
×
302

303
                // TODO(roasbeef): refresh periodically on a time basis due to
304
                // possible addr changes from node
305

306
                // Exit at once if a quit signal is detected.
307
                case <-s.quit:
2✔
308
                        return
2✔
309
                }
310
        }
311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc