• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16111722900

07 Jul 2025 08:20AM UTC coverage: 67.557% (+9.8%) from 57.803%
16111722900

Pull #10044

github

web-flow
Merge 3c468e02c into ff32e90d1
Pull Request #10044: Fix Shutdown deadlock in some scenarios

24 of 32 new or added lines in 1 file covered. (75.0%)

8 existing lines in 3 files now uncovered.

135120 of 200010 relevant lines covered (67.56%)

21847.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.15
/chanbackup/pubsub.go
1
package chanbackup
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "net"
8
        "os"
9
        "sync"
10
        "sync/atomic"
11

12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/lightningnetwork/lnd/channeldb"
14
        "github.com/lightningnetwork/lnd/keychain"
15
        "github.com/lightningnetwork/lnd/lnutils"
16
)
17

18
// Swapper is an interface that allows the chanbackup.SubSwapper to update the
19
// main multi backup location once it learns of new channels or that prior
20
// channels have been closed.
21
type Swapper interface {
22
        // UpdateAndSwap attempts to atomically update the main multi back up
23
        // file location with the new fully packed multi-channel backup.
24
        UpdateAndSwap(newBackup PackedMulti) error
25

26
        // ExtractMulti attempts to obtain and decode the current SCB instance
27
        // stored by the Swapper instance.
28
        ExtractMulti(keychain keychain.KeyRing) (*Multi, error)
29
}
30

31
// ChannelWithAddrs bundles an open channel along with all the addresses for
32
// the channel peer.
33
type ChannelWithAddrs struct {
34
        *channeldb.OpenChannel
35

36
        // Addrs is the set of addresses that we can use to reach the target
37
        // peer.
38
        Addrs []net.Addr
39
}
40

41
// ChannelEvent packages a new update of new channels since subscription, and
42
// channels that have been opened since prior channel event.
43
type ChannelEvent struct {
44
        // ClosedChans are the set of channels that have been closed since the
45
        // last event.
46
        ClosedChans []wire.OutPoint
47

48
        // NewChans is the set of channels that have been opened since the last
49
        // event.
50
        NewChans []ChannelWithAddrs
51
}
52

53
// manualUpdate holds a group of channel state updates and an error channel
54
// to send back an error happened upon update processing or file updating.
55
type manualUpdate struct {
56
        // singles hold channels backups. They can be either new or known
57
        // channels in the Swapper.
58
        singles []Single
59

60
        // errChan is the channel to send an error back. If the update handling
61
        // and the subsequent file updating succeeds, nil is sent.
62
        // The channel must have capacity of 1 to prevent Swapper blocking.
63
        errChan chan error
64
}
65

66
// ChannelSubscription represents an intent to be notified of any updates to
67
// the primary channel state.
68
type ChannelSubscription struct {
69
        // ChanUpdates is a channel that will be sent upon once the primary
70
        // channel state is updated.
71
        ChanUpdates chan ChannelEvent
72

73
        // Cancel is a closure that allows the caller to cancel their
74
        // subscription and free up any resources allocated.
75
        Cancel func()
76
}
77

78
// ChannelNotifier represents a system that allows the chanbackup.SubSwapper to
79
// be notified of any changes to the primary channel state.
80
type ChannelNotifier interface {
81
        // SubscribeChans requests a new channel subscription relative to the
82
        // initial set of known channels. We use the knownChans as a
83
        // synchronization point to ensure that the chanbackup.SubSwapper does
84
        // not miss any channel open or close events in the period between when
85
        // it's created, and when it requests the channel subscription.
86
        SubscribeChans(context.Context,
87
                map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
88
}
89

90
// SubSwapper subscribes to new updates to the open channel state, and then
91
// swaps out the on-disk channel backup state in response.  This sub-system
92
// that will ensure that the multi chan backup file on disk will always be
93
// updated with the latest channel back up state. We'll receive new
94
// opened/closed channels from the ChannelNotifier, then use the Swapper to
95
// update the file state on disk with the new set of open channels.  This can
96
// be used to implement a system that always keeps the multi-chan backup file
97
// on disk in a consistent state for safety purposes.
98
type SubSwapper struct {
99
        // started tracks whether the SubSwapper has been started and ensures
100
        // it can only be started once.
101
        started atomic.Bool
102

103
        // stopped tracks whether the SubSwapper has been stopped and ensures
104
        // it can only be stopped once.
105
        stopped atomic.Bool
106

107
        // backupState are the set of SCBs for all open channels we know of.
108
        backupState map[wire.OutPoint]Single
109

110
        // chanEvents is an active subscription to receive new channel state
111
        // over.
112
        chanEvents *ChannelSubscription
113

114
        manualUpdates chan manualUpdate
115

116
        // keyRing is the main key ring that will allow us to pack the new
117
        // multi backup.
118
        keyRing keychain.KeyRing
119

120
        Swapper
121

122
        quit chan struct{}
123
        wg   sync.WaitGroup
124
}
125

126
// NewSubSwapper creates a new instance of the SubSwapper given the starting
127
// set of channels, and the required interfaces to be notified of new channel
128
// updates, pack a multi backup, and swap the current best backup from its
129
// storage location.
130
func NewSubSwapper(ctx context.Context, startingChans []Single,
131
        chanNotifier ChannelNotifier, keyRing keychain.KeyRing,
132
        backupSwapper Swapper) (*SubSwapper, error) {
6✔
133

6✔
134
        // First, we'll subscribe to the latest set of channel updates given
6✔
135
        // the set of channels we already know of.
6✔
136
        knownChans := make(map[wire.OutPoint]struct{})
6✔
137
        for _, chanBackup := range startingChans {
12✔
138
                knownChans[chanBackup.FundingOutpoint] = struct{}{}
6✔
139
        }
6✔
140
        chanEvents, err := chanNotifier.SubscribeChans(ctx, knownChans)
6✔
141
        if err != nil {
7✔
142
                return nil, err
1✔
143
        }
1✔
144

145
        // Next, we'll construct our own backup state so we can add/remove
146
        // channels that have been opened and closed.
147
        backupState := make(map[wire.OutPoint]Single)
5✔
148
        for _, chanBackup := range startingChans {
11✔
149
                backupState[chanBackup.FundingOutpoint] = chanBackup
6✔
150
        }
6✔
151

152
        return &SubSwapper{
5✔
153
                backupState:   backupState,
5✔
154
                chanEvents:    chanEvents,
5✔
155
                keyRing:       keyRing,
5✔
156
                Swapper:       backupSwapper,
5✔
157
                quit:          make(chan struct{}),
5✔
158
                manualUpdates: make(chan manualUpdate),
5✔
159
        }, nil
5✔
160
}
161

162
// Start starts the chanbackup.SubSwapper.
163
func (s *SubSwapper) Start() error {
6✔
164
        // Ensure we only start the SubSwapper once.
6✔
165
        if !s.started.CompareAndSwap(false, true) {
7✔
166
                return fmt.Errorf("SubSwapper already started")
1✔
167
        }
1✔
168

169
        log.Infof("chanbackup.SubSwapper starting")
5✔
170

5✔
171
        // Before we enter our main loop, we'll update the on-disk
5✔
172
        // state with the latest Single state, as nodes may have new
5✔
173
        // advertised addresses.
5✔
174
        if err := s.updateBackupFile(); err != nil {
5✔
NEW
175
                // Reset the running state since we failed to start.
×
NEW
176
                s.started.Store(false)
×
NEW
177
                return fmt.Errorf("unable to refresh backup "+
×
NEW
178
                        "file: %v", err)
×
NEW
179
        }
×
180

181
        s.wg.Add(1)
5✔
182
        go s.backupUpdater()
5✔
183

5✔
184
        return nil
5✔
185
}
186

187
// Stop signals the SubSwapper to being a graceful shutdown.
188
func (s *SubSwapper) Stop() error {
6✔
189
        // Ensure we only stop once using atomic compare-and-swap
6✔
190
        if !s.stopped.CompareAndSwap(false, true) {
7✔
191
                return fmt.Errorf("SubSwapper already stopped")
1✔
192
        }
1✔
193

194
        log.Infof("chanbackup.SubSwapper shutting down...")
5✔
195
        defer log.Debug("chanbackup.SubSwapper shutdown complete")
5✔
196

5✔
197
        close(s.quit)
5✔
198
        s.wg.Wait()
5✔
199

5✔
200
        return nil
5✔
201
}
202

203
// ManualUpdate inserts/updates channel states into the swapper. The updates
204
// are processed in another goroutine. The method waits for the updates to be
205
// fully processed and the file to be updated on-disk before returning.
206
func (s *SubSwapper) ManualUpdate(singles []Single) error {
5✔
207
        // Make sure the SubSwapper is running.
5✔
208
        if !s.started.Load() || s.stopped.Load() {
5✔
NEW
209
                return fmt.Errorf("SubSwapper is not running, cannot " +
×
NEW
210
                        "perform manual update")
×
NEW
211
        }
×
212

213
        // Create the channel to send an error back. If the update handling
214
        // and the subsequent file updating succeeds, nil is sent.
215
        // The channel must have capacity of 1 to prevent Swapper blocking.
216
        errChan := make(chan error, 1)
5✔
217

5✔
218
        // Create the update object to insert into the processing loop.
5✔
219
        update := manualUpdate{
5✔
220
                singles: singles,
5✔
221
                errChan: errChan,
5✔
222
        }
5✔
223

5✔
224
        select {
5✔
225
        case s.manualUpdates <- update:
5✔
226
        case <-s.quit:
×
227
                return fmt.Errorf("swapper stopped when sending manual update")
×
228
        }
229

230
        // Wait for processing, block on errChan.
231
        select {
5✔
232
        case err := <-errChan:
5✔
233
                if err != nil {
9✔
234
                        return fmt.Errorf("processing of manual update "+
4✔
235
                                "failed: %w", err)
4✔
236
                }
4✔
237

238
        case <-s.quit:
×
239
                return fmt.Errorf("swapper stopped when waiting for outcome")
×
240
        }
241

242
        // Success.
243
        return nil
4✔
244
}
245

246
// updateBackupFile updates the backup file in place given the current state of
247
// the SubSwapper. We accept the set of channels that were closed between this
248
// update and the last to make sure we leave them out of our backup set union.
249
func (s *SubSwapper) updateBackupFile(closedChans ...wire.OutPoint) error {
9✔
250
        // Before we pack the new set of SCBs, we'll first decode what we
9✔
251
        // already have on-disk, to make sure we can decode it (proper seed)
9✔
252
        // and that we're able to combine it with our new data.
9✔
253
        diskMulti, err := s.Swapper.ExtractMulti(s.keyRing)
9✔
254

9✔
255
        // If the file doesn't exist on disk, then that's OK as it was never
9✔
256
        // created. In this case we'll continue onwards as it isn't a critical
9✔
257
        // error.
9✔
258
        if err != nil && !os.IsNotExist(err) {
9✔
259
                return fmt.Errorf("unable to extract on disk encrypted "+
×
260
                        "SCB: %v", err)
×
261
        }
×
262

263
        // Now that we have channels stored on-disk, we'll create a new set of
264
        // the combined old and new channels to make sure we retain what's
265
        // already on-disk.
266
        //
267
        // NOTE: The ordering of this operations means that our in-memory
268
        // structure will replace what we read from disk.
269
        combinedBackup := make(map[wire.OutPoint]Single)
9✔
270
        if diskMulti != nil {
18✔
271
                for _, diskChannel := range diskMulti.StaticBackups {
36✔
272
                        chanPoint := diskChannel.FundingOutpoint
27✔
273
                        combinedBackup[chanPoint] = diskChannel
27✔
274
                }
27✔
275
        }
276
        for _, memChannel := range s.backupState {
30✔
277
                chanPoint := memChannel.FundingOutpoint
21✔
278
                if _, ok := combinedBackup[chanPoint]; ok {
37✔
279
                        log.Warnf("Replacing disk backup for ChannelPoint(%v) "+
16✔
280
                                "w/ newer version", chanPoint)
16✔
281
                }
16✔
282

283
                combinedBackup[chanPoint] = memChannel
21✔
284
        }
285

286
        // Remove the set of closed channels from the final set of backups.
287
        for _, closedChan := range closedChans {
13✔
288
                delete(combinedBackup, closedChan)
4✔
289
        }
4✔
290

291
        // With our updated channel state obtained, we'll create a new multi
292
        // from our series of singles.
293
        var newMulti Multi
9✔
294
        for _, backup := range combinedBackup {
40✔
295
                newMulti.StaticBackups = append(
31✔
296
                        newMulti.StaticBackups, backup,
31✔
297
                )
31✔
298
        }
31✔
299

300
        // Now that our multi has been assembled, we'll attempt to pack
301
        // (encrypt+encode) the new channel state to our target reader.
302
        var b bytes.Buffer
9✔
303
        err = newMulti.PackToWriter(&b, s.keyRing)
9✔
304
        if err != nil {
9✔
305
                return fmt.Errorf("unable to pack multi backup: %w", err)
×
306
        }
×
307

308
        // Finally, we'll swap out the old backup for this new one in a single
309
        // atomic step, combining the file already on-disk with this set of new
310
        // channels.
311
        err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes()))
9✔
312
        if err != nil {
13✔
313
                return fmt.Errorf("unable to update multi backup: %w", err)
4✔
314
        }
4✔
315

316
        return nil
8✔
317
}
318

319
// backupFileUpdater is the primary goroutine of the SubSwapper which is
320
// responsible for listening for changes to the channel, and updating the
321
// persistent multi backup state with a new packed multi of the latest channel
322
// state.
323
func (s *SubSwapper) backupUpdater() {
5✔
324
        // Ensure that once we exit, we'll cancel our active channel
5✔
325
        // subscription.
5✔
326
        defer s.chanEvents.Cancel()
5✔
327
        defer s.wg.Done()
5✔
328

5✔
329
        log.Debugf("SubSwapper's backupUpdater is active!")
5✔
330

5✔
331
        for {
14✔
332
                select {
9✔
333
                // The channel state has been modified! We'll evaluate all
334
                // changes, and swap out the old packed multi with a new one
335
                // with the latest channel state.
336
                case chanUpdate := <-s.chanEvents.ChanUpdates:
5✔
337
                        oldStateSize := len(s.backupState)
5✔
338

5✔
339
                        // For all new open channels, we'll create a new SCB
5✔
340
                        // given the required information.
5✔
341
                        for _, newChan := range chanUpdate.NewChans {
9✔
342
                                log.Debugf("Adding channel %v to backup state",
4✔
343
                                        newChan.FundingOutpoint)
4✔
344

4✔
345
                                single := NewSingle(
4✔
346
                                        newChan.OpenChannel, newChan.Addrs,
4✔
347
                                )
4✔
348
                                s.backupState[newChan.FundingOutpoint] = single
4✔
349
                        }
4✔
350

351
                        // For all closed channels, we'll remove the prior
352
                        // backup state.
353
                        closedChans := make(
5✔
354
                                []wire.OutPoint, 0, len(chanUpdate.ClosedChans),
5✔
355
                        )
5✔
356
                        for i, closedChan := range chanUpdate.ClosedChans {
9✔
357
                                log.Debugf("Removing channel %v from backup "+
4✔
358
                                        "state", lnutils.NewLogClosure(
4✔
359
                                        chanUpdate.ClosedChans[i].String))
4✔
360

4✔
361
                                delete(s.backupState, closedChan)
4✔
362

4✔
363
                                closedChans = append(closedChans, closedChan)
4✔
364
                        }
4✔
365

366
                        newStateSize := len(s.backupState)
5✔
367

5✔
368
                        log.Infof("Updating on-disk multi SCB backup: "+
5✔
369
                                "num_old_chans=%v, num_new_chans=%v",
5✔
370
                                oldStateSize, newStateSize)
5✔
371

5✔
372
                        // Without new state constructed, we'll, atomically
5✔
373
                        // update the on-disk backup state.
5✔
374
                        if err := s.updateBackupFile(closedChans...); err != nil {
5✔
375
                                log.Errorf("unable to update backup file: %v",
×
376
                                        err)
×
377
                        }
×
378

379
                // We received a manual update. Handle it and update the file.
380
                case manualUpdate := <-s.manualUpdates:
5✔
381
                        oldStateSize := len(s.backupState)
5✔
382

5✔
383
                        // For all open channels, we'll create a new SCB given
5✔
384
                        // the required information.
5✔
385
                        for _, single := range manualUpdate.singles {
10✔
386
                                log.Debugf("Manual update of channel %v",
5✔
387
                                        single.FundingOutpoint)
5✔
388

5✔
389
                                s.backupState[single.FundingOutpoint] = single
5✔
390
                        }
5✔
391

392
                        newStateSize := len(s.backupState)
5✔
393

5✔
394
                        log.Infof("Updating on-disk multi SCB backup: "+
5✔
395
                                "num_old_chans=%v, num_new_chans=%v",
5✔
396
                                oldStateSize, newStateSize)
5✔
397

5✔
398
                        // Without new state constructed, we'll, atomically
5✔
399
                        // update the on-disk backup state.
5✔
400
                        err := s.updateBackupFile()
5✔
401
                        if err != nil {
9✔
402
                                log.Errorf("unable to update backup file: %v",
4✔
403
                                        err)
4✔
404
                        }
4✔
405

406
                        // Send the error (or nil) to the caller of
407
                        // ManualUpdate. The error channel must have capacity of
408
                        // 1 not to block here.
409
                        manualUpdate.errChan <- err
5✔
410

411
                // TODO(roasbeef): refresh periodically on a time basis due to
412
                // possible addr changes from node
413

414
                // Exit at once if a quit signal is detected.
415
                case <-s.quit:
5✔
416
                        return
5✔
417
                }
418
        }
419
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc