• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.65
/chanbackup/pubsub.go
1
package chanbackup
2

3
import (
4
        "bytes"
5
        "fmt"
6
        "net"
7
        "os"
8
        "sync"
9

10
        "github.com/btcsuite/btcd/wire"
11
        "github.com/lightningnetwork/lnd/channeldb"
12
        "github.com/lightningnetwork/lnd/keychain"
13
)
14

15
// Swapper is an interface that allows the chanbackup.SubSwapper to update the
16
// main multi backup location once it learns of new channels or that prior
17
// channels have been closed.
18
type Swapper interface {
19
        // UpdateAndSwap attempts to atomically update the main multi back up
20
        // file location with the new fully packed multi-channel backup.
21
        UpdateAndSwap(newBackup PackedMulti) error
22

23
        // ExtractMulti attempts to obtain and decode the current SCB instance
24
        // stored by the Swapper instance.
25
        ExtractMulti(keychain keychain.KeyRing) (*Multi, error)
26
}
27

28
// ChannelWithAddrs bundles an open channel along with all the addresses for
29
// the channel peer.
30
type ChannelWithAddrs struct {
31
        *channeldb.OpenChannel
32

33
        // Addrs is the set of addresses that we can use to reach the target
34
        // peer.
35
        Addrs []net.Addr
36
}
37

38
// ChannelEvent packages a new update of new channels since subscription, and
39
// channels that have been opened since prior channel event.
40
type ChannelEvent struct {
41
        // ClosedChans are the set of channels that have been closed since the
42
        // last event.
43
        ClosedChans []wire.OutPoint
44

45
        // NewChans is the set of channels that have been opened since the last
46
        // event.
47
        NewChans []ChannelWithAddrs
48
}
49

50
// ChannelSubscription represents an intent to be notified of any updates to
51
// the primary channel state.
52
type ChannelSubscription struct {
53
        // ChanUpdates is a channel that will be sent upon once the primary
54
        // channel state is updated.
55
        ChanUpdates chan ChannelEvent
56

57
        // Cancel is a closure that allows the caller to cancel their
58
        // subscription and free up any resources allocated.
59
        Cancel func()
60
}
61

62
// ChannelNotifier represents a system that allows the chanbackup.SubSwapper to
63
// be notified of any changes to the primary channel state.
64
type ChannelNotifier interface {
65
        // SubscribeChans requests a new channel subscription relative to the
66
        // initial set of known channels. We use the knownChans as a
67
        // synchronization point to ensure that the chanbackup.SubSwapper does
68
        // not miss any channel open or close events in the period between when
69
        // it's created, and when it requests the channel subscription.
70
        SubscribeChans(map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
71
}
72

73
// SubSwapper subscribes to new updates to the open channel state, and then
74
// swaps out the on-disk channel backup state in response.  This sub-system
75
// that will ensure that the multi chan backup file on disk will always be
76
// updated with the latest channel back up state. We'll receive new
77
// opened/closed channels from the ChannelNotifier, then use the Swapper to
78
// update the file state on disk with the new set of open channels.  This can
79
// be used to implement a system that always keeps the multi-chan backup file
80
// on disk in a consistent state for safety purposes.
81
type SubSwapper struct {
82
        started sync.Once
83
        stopped sync.Once
84

85
        // backupState are the set of SCBs for all open channels we know of.
86
        backupState map[wire.OutPoint]Single
87

88
        // chanEvents is an active subscription to receive new channel state
89
        // over.
90
        chanEvents *ChannelSubscription
91

92
        // keyRing is the main key ring that will allow us to pack the new
93
        // multi backup.
94
        keyRing keychain.KeyRing
95

96
        Swapper
97

98
        quit chan struct{}
99
        wg   sync.WaitGroup
100
}
101

102
// NewSubSwapper creates a new instance of the SubSwapper given the starting
103
// set of channels, and the required interfaces to be notified of new channel
104
// updates, pack a multi backup, and swap the current best backup from its
105
// storage location.
106
func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
107
        keyRing keychain.KeyRing, backupSwapper Swapper) (*SubSwapper, error) {
3✔
108

3✔
109
        // First, we'll subscribe to the latest set of channel updates given
3✔
110
        // the set of channels we already know of.
3✔
111
        knownChans := make(map[wire.OutPoint]struct{})
3✔
112
        for _, chanBackup := range startingChans {
6✔
113
                knownChans[chanBackup.FundingOutpoint] = struct{}{}
3✔
114
        }
3✔
115
        chanEvents, err := chanNotifier.SubscribeChans(knownChans)
3✔
116
        if err != nil {
3✔
117
                return nil, err
×
118
        }
×
119

120
        // Next, we'll construct our own backup state so we can add/remove
121
        // channels that have been opened and closed.
122
        backupState := make(map[wire.OutPoint]Single)
3✔
123
        for _, chanBackup := range startingChans {
6✔
124
                backupState[chanBackup.FundingOutpoint] = chanBackup
3✔
125
        }
3✔
126

127
        return &SubSwapper{
3✔
128
                backupState: backupState,
3✔
129
                chanEvents:  chanEvents,
3✔
130
                keyRing:     keyRing,
3✔
131
                Swapper:     backupSwapper,
3✔
132
                quit:        make(chan struct{}),
3✔
133
        }, nil
3✔
134
}
135

136
// Start starts the chanbackup.SubSwapper.
137
func (s *SubSwapper) Start() error {
3✔
138
        var startErr error
3✔
139
        s.started.Do(func() {
6✔
140
                log.Infof("chanbackup.SubSwapper starting")
3✔
141

3✔
142
                // Before we enter our main loop, we'll update the on-disk
3✔
143
                // state with the latest Single state, as nodes may have new
3✔
144
                // advertised addresses.
3✔
145
                if err := s.updateBackupFile(); err != nil {
3✔
146
                        startErr = fmt.Errorf("unable to refresh backup "+
×
147
                                "file: %v", err)
×
148
                        return
×
149
                }
×
150

151
                s.wg.Add(1)
3✔
152
                go s.backupUpdater()
3✔
153
        })
154

155
        return startErr
3✔
156
}
157

158
// Stop signals the SubSwapper to being a graceful shutdown.
159
func (s *SubSwapper) Stop() error {
3✔
160
        s.stopped.Do(func() {
6✔
161
                log.Infof("chanbackup.SubSwapper shutting down...")
3✔
162
                defer log.Debug("chanbackup.SubSwapper shutdown complete")
3✔
163

3✔
164
                close(s.quit)
3✔
165
                s.wg.Wait()
3✔
166
        })
3✔
167
        return nil
3✔
168
}
169

170
// updateBackupFile updates the backup file in place given the current state of
171
// the SubSwapper. We accept the set of channels that were closed between this
172
// update and the last to make sure we leave them out of our backup set union.
173
func (s *SubSwapper) updateBackupFile(closedChans ...wire.OutPoint) error {
3✔
174
        // Before we pack the new set of SCBs, we'll first decode what we
3✔
175
        // already have on-disk, to make sure we can decode it (proper seed)
3✔
176
        // and that we're able to combine it with our new data.
3✔
177
        diskMulti, err := s.Swapper.ExtractMulti(s.keyRing)
3✔
178

3✔
179
        // If the file doesn't exist on disk, then that's OK as it was never
3✔
180
        // created. In this case we'll continue onwards as it isn't a critical
3✔
181
        // error.
3✔
182
        if err != nil && !os.IsNotExist(err) {
3✔
183
                return fmt.Errorf("unable to extract on disk encrypted "+
×
184
                        "SCB: %v", err)
×
185
        }
×
186

187
        // Now that we have channels stored on-disk, we'll create a new set of
188
        // the combined old and new channels to make sure we retain what's
189
        // already on-disk.
190
        //
191
        // NOTE: The ordering of this operations means that our in-memory
192
        // structure will replace what we read from disk.
193
        combinedBackup := make(map[wire.OutPoint]Single)
3✔
194
        if diskMulti != nil {
6✔
195
                for _, diskChannel := range diskMulti.StaticBackups {
6✔
196
                        chanPoint := diskChannel.FundingOutpoint
3✔
197
                        combinedBackup[chanPoint] = diskChannel
3✔
198
                }
3✔
199
        }
200
        for _, memChannel := range s.backupState {
6✔
201
                chanPoint := memChannel.FundingOutpoint
3✔
202
                if _, ok := combinedBackup[chanPoint]; ok {
6✔
203
                        log.Warnf("Replacing disk backup for ChannelPoint(%v) "+
3✔
204
                                "w/ newer version", chanPoint)
3✔
205
                }
3✔
206

207
                combinedBackup[chanPoint] = memChannel
3✔
208
        }
209

210
        // Remove the set of closed channels from the final set of backups.
211
        for _, closedChan := range closedChans {
6✔
212
                delete(combinedBackup, closedChan)
3✔
213
        }
3✔
214

215
        // With our updated channel state obtained, we'll create a new multi
216
        // from our series of singles.
217
        var newMulti Multi
3✔
218
        for _, backup := range combinedBackup {
6✔
219
                newMulti.StaticBackups = append(
3✔
220
                        newMulti.StaticBackups, backup,
3✔
221
                )
3✔
222
        }
3✔
223

224
        // Now that our multi has been assembled, we'll attempt to pack
225
        // (encrypt+encode) the new channel state to our target reader.
226
        var b bytes.Buffer
3✔
227
        err = newMulti.PackToWriter(&b, s.keyRing)
3✔
228
        if err != nil {
3✔
229
                return fmt.Errorf("unable to pack multi backup: %w", err)
×
230
        }
×
231

232
        // Finally, we'll swap out the old backup for this new one in a single
233
        // atomic step, combining the file already on-disk with this set of new
234
        // channels.
235
        err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes()))
3✔
236
        if err != nil {
3✔
237
                return fmt.Errorf("unable to update multi backup: %w", err)
×
238
        }
×
239

240
        return nil
3✔
241
}
242

243
// backupFileUpdater is the primary goroutine of the SubSwapper which is
244
// responsible for listening for changes to the channel, and updating the
245
// persistent multi backup state with a new packed multi of the latest channel
246
// state.
247
func (s *SubSwapper) backupUpdater() {
3✔
248
        // Ensure that once we exit, we'll cancel our active channel
3✔
249
        // subscription.
3✔
250
        defer s.chanEvents.Cancel()
3✔
251
        defer s.wg.Done()
3✔
252

3✔
253
        log.Debugf("SubSwapper's backupUpdater is active!")
3✔
254

3✔
255
        for {
6✔
256
                select {
3✔
257
                // The channel state has been modified! We'll evaluate all
258
                // changes, and swap out the old packed multi with a new one
259
                // with the latest channel state.
260
                case chanUpdate := <-s.chanEvents.ChanUpdates:
3✔
261
                        oldStateSize := len(s.backupState)
3✔
262

3✔
263
                        // For all new open channels, we'll create a new SCB
3✔
264
                        // given the required information.
3✔
265
                        for _, newChan := range chanUpdate.NewChans {
6✔
266
                                log.Debugf("Adding channel %v to backup state",
3✔
267
                                        newChan.FundingOutpoint)
3✔
268

3✔
269
                                s.backupState[newChan.FundingOutpoint] = NewSingle(
3✔
270
                                        newChan.OpenChannel, newChan.Addrs,
3✔
271
                                )
3✔
272
                        }
3✔
273

274
                        // For all closed channels, we'll remove the prior
275
                        // backup state.
276
                        closedChans := make(
3✔
277
                                []wire.OutPoint, 0, len(chanUpdate.ClosedChans),
3✔
278
                        )
3✔
279
                        for i, closedChan := range chanUpdate.ClosedChans {
6✔
280
                                log.Debugf("Removing channel %v from backup "+
3✔
281
                                        "state", newLogClosure(func() string {
6✔
282
                                        return chanUpdate.ClosedChans[i].String()
3✔
283
                                }))
3✔
284

285
                                delete(s.backupState, closedChan)
3✔
286

3✔
287
                                closedChans = append(closedChans, closedChan)
3✔
288
                        }
289

290
                        newStateSize := len(s.backupState)
3✔
291

3✔
292
                        log.Infof("Updating on-disk multi SCB backup: "+
3✔
293
                                "num_old_chans=%v, num_new_chans=%v",
3✔
294
                                oldStateSize, newStateSize)
3✔
295

3✔
296
                        // With out new state constructed, we'll, atomically
3✔
297
                        // update the on-disk backup state.
3✔
298
                        if err := s.updateBackupFile(closedChans...); err != nil {
3✔
299
                                log.Errorf("unable to update backup file: %v",
×
300
                                        err)
×
301
                        }
×
302

303
                // TODO(roasbeef): refresh periodically on a time basis due to
304
                // possible addr changes from node
305

306
                // Exit at once if a quit signal is detected.
307
                case <-s.quit:
3✔
308
                        return
3✔
309
                }
310
        }
311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc