• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15736109134

18 Jun 2025 02:46PM UTC coverage: 58.197% (-10.1%) from 68.248%
15736109134

Pull #9752

github

web-flow
Merge d2634a68c into 31c74f20f
Pull Request #9752: routerrpc: reject payment to invoice that don't have payment secret or blinded paths

6 of 13 new or added lines in 2 files covered. (46.15%)

28331 existing lines in 455 files now uncovered.

97860 of 168153 relevant lines covered (58.2%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.72
/chanbackup/pubsub.go
1
package chanbackup
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "net"
8
        "os"
9
        "sync"
10

11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/channeldb"
13
        "github.com/lightningnetwork/lnd/keychain"
14
        "github.com/lightningnetwork/lnd/lnutils"
15
)
16

17
// Swapper is an interface that allows the chanbackup.SubSwapper to update the
18
// main multi backup location once it learns of new channels or that prior
19
// channels have been closed.
20
type Swapper interface {
21
        // UpdateAndSwap attempts to atomically update the main multi back up
22
        // file location with the new fully packed multi-channel backup.
23
        UpdateAndSwap(newBackup PackedMulti) error
24

25
        // ExtractMulti attempts to obtain and decode the current SCB instance
26
        // stored by the Swapper instance.
27
        ExtractMulti(keychain keychain.KeyRing) (*Multi, error)
28
}
29

30
// ChannelWithAddrs bundles an open channel along with all the addresses for
31
// the channel peer.
32
type ChannelWithAddrs struct {
33
        *channeldb.OpenChannel
34

35
        // Addrs is the set of addresses that we can use to reach the target
36
        // peer.
37
        Addrs []net.Addr
38
}
39

40
// ChannelEvent packages a new update of new channels since subscription, and
41
// channels that have been opened since prior channel event.
42
type ChannelEvent struct {
43
        // ClosedChans are the set of channels that have been closed since the
44
        // last event.
45
        ClosedChans []wire.OutPoint
46

47
        // NewChans is the set of channels that have been opened since the last
48
        // event.
49
        NewChans []ChannelWithAddrs
50
}
51

52
// manualUpdate holds a group of channel state updates and an error channel
53
// to send back an error happened upon update processing or file updating.
54
type manualUpdate struct {
55
        // singles hold channels backups. They can be either new or known
56
        // channels in the Swapper.
57
        singles []Single
58

59
        // errChan is the channel to send an error back. If the update handling
60
        // and the subsequent file updating succeeds, nil is sent.
61
        // The channel must have capacity of 1 to prevent Swapper blocking.
62
        errChan chan error
63
}
64

65
// ChannelSubscription represents an intent to be notified of any updates to
66
// the primary channel state.
67
type ChannelSubscription struct {
68
        // ChanUpdates is a channel that will be sent upon once the primary
69
        // channel state is updated.
70
        ChanUpdates chan ChannelEvent
71

72
        // Cancel is a closure that allows the caller to cancel their
73
        // subscription and free up any resources allocated.
74
        Cancel func()
75
}
76

77
// ChannelNotifier represents a system that allows the chanbackup.SubSwapper to
78
// be notified of any changes to the primary channel state.
79
type ChannelNotifier interface {
80
        // SubscribeChans requests a new channel subscription relative to the
81
        // initial set of known channels. We use the knownChans as a
82
        // synchronization point to ensure that the chanbackup.SubSwapper does
83
        // not miss any channel open or close events in the period between when
84
        // it's created, and when it requests the channel subscription.
85
        SubscribeChans(context.Context,
86
                map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
87
}
88

89
// SubSwapper subscribes to new updates to the open channel state, and then
90
// swaps out the on-disk channel backup state in response.  This sub-system
91
// that will ensure that the multi chan backup file on disk will always be
92
// updated with the latest channel back up state. We'll receive new
93
// opened/closed channels from the ChannelNotifier, then use the Swapper to
94
// update the file state on disk with the new set of open channels.  This can
95
// be used to implement a system that always keeps the multi-chan backup file
96
// on disk in a consistent state for safety purposes.
97
type SubSwapper struct {
98
        started sync.Once
99
        stopped sync.Once
100

101
        // backupState are the set of SCBs for all open channels we know of.
102
        backupState map[wire.OutPoint]Single
103

104
        // chanEvents is an active subscription to receive new channel state
105
        // over.
106
        chanEvents *ChannelSubscription
107

108
        manualUpdates chan manualUpdate
109

110
        // keyRing is the main key ring that will allow us to pack the new
111
        // multi backup.
112
        keyRing keychain.KeyRing
113

114
        Swapper
115

116
        quit chan struct{}
117
        wg   sync.WaitGroup
118
}
119

120
// NewSubSwapper creates a new instance of the SubSwapper given the starting
121
// set of channels, and the required interfaces to be notified of new channel
122
// updates, pack a multi backup, and swap the current best backup from its
123
// storage location.
124
func NewSubSwapper(ctx context.Context, startingChans []Single,
125
        chanNotifier ChannelNotifier, keyRing keychain.KeyRing,
126
        backupSwapper Swapper) (*SubSwapper, error) {
3✔
127

3✔
128
        // First, we'll subscribe to the latest set of channel updates given
3✔
129
        // the set of channels we already know of.
3✔
130
        knownChans := make(map[wire.OutPoint]struct{})
3✔
131
        for _, chanBackup := range startingChans {
6✔
132
                knownChans[chanBackup.FundingOutpoint] = struct{}{}
3✔
133
        }
3✔
134
        chanEvents, err := chanNotifier.SubscribeChans(ctx, knownChans)
3✔
135
        if err != nil {
3✔
UNCOV
136
                return nil, err
×
UNCOV
137
        }
×
138

139
        // Next, we'll construct our own backup state so we can add/remove
140
        // channels that have been opened and closed.
141
        backupState := make(map[wire.OutPoint]Single)
3✔
142
        for _, chanBackup := range startingChans {
6✔
143
                backupState[chanBackup.FundingOutpoint] = chanBackup
3✔
144
        }
3✔
145

146
        return &SubSwapper{
3✔
147
                backupState:   backupState,
3✔
148
                chanEvents:    chanEvents,
3✔
149
                keyRing:       keyRing,
3✔
150
                Swapper:       backupSwapper,
3✔
151
                quit:          make(chan struct{}),
3✔
152
                manualUpdates: make(chan manualUpdate),
3✔
153
        }, nil
3✔
154
}
155

156
// Start starts the chanbackup.SubSwapper.
157
func (s *SubSwapper) Start() error {
3✔
158
        var startErr error
3✔
159
        s.started.Do(func() {
6✔
160
                log.Infof("chanbackup.SubSwapper starting")
3✔
161

3✔
162
                // Before we enter our main loop, we'll update the on-disk
3✔
163
                // state with the latest Single state, as nodes may have new
3✔
164
                // advertised addresses.
3✔
165
                if err := s.updateBackupFile(); err != nil {
3✔
166
                        startErr = fmt.Errorf("unable to refresh backup "+
×
167
                                "file: %v", err)
×
168
                        return
×
169
                }
×
170

171
                s.wg.Add(1)
3✔
172
                go s.backupUpdater()
3✔
173
        })
174

175
        return startErr
3✔
176
}
177

178
// Stop signals the SubSwapper to being a graceful shutdown.
179
func (s *SubSwapper) Stop() error {
3✔
180
        s.stopped.Do(func() {
6✔
181
                log.Infof("chanbackup.SubSwapper shutting down...")
3✔
182
                defer log.Debug("chanbackup.SubSwapper shutdown complete")
3✔
183

3✔
184
                close(s.quit)
3✔
185
                s.wg.Wait()
3✔
186
        })
3✔
187
        return nil
3✔
188
}
189

190
// ManualUpdate inserts/updates channel states into the swapper. The updates
191
// are processed in another goroutine. The method waits for the updates to be
192
// fully processed and the file to be updated on-disk before returning.
193
func (s *SubSwapper) ManualUpdate(singles []Single) error {
3✔
194
        // Create the channel to send an error back. If the update handling
3✔
195
        // and the subsequent file updating succeeds, nil is sent.
3✔
196
        // The channel must have capacity of 1 to prevent Swapper blocking.
3✔
197
        errChan := make(chan error, 1)
3✔
198

3✔
199
        // Create the update object to insert into the processing loop.
3✔
200
        update := manualUpdate{
3✔
201
                singles: singles,
3✔
202
                errChan: errChan,
3✔
203
        }
3✔
204

3✔
205
        select {
3✔
206
        case s.manualUpdates <- update:
3✔
207
        case <-s.quit:
×
208
                return fmt.Errorf("swapper stopped when sending manual update")
×
209
        }
210

211
        // Wait for processing, block on errChan.
212
        select {
3✔
213
        case err := <-errChan:
3✔
214
                if err != nil {
6✔
215
                        return fmt.Errorf("processing of manual update "+
3✔
216
                                "failed: %w", err)
3✔
217
                }
3✔
218

219
        case <-s.quit:
×
220
                return fmt.Errorf("swapper stopped when waiting for outcome")
×
221
        }
222

223
        // Success.
224
        return nil
3✔
225
}
226

227
// updateBackupFile updates the backup file in place given the current state of
228
// the SubSwapper. We accept the set of channels that were closed between this
229
// update and the last to make sure we leave them out of our backup set union.
230
func (s *SubSwapper) updateBackupFile(closedChans ...wire.OutPoint) error {
3✔
231
        // Before we pack the new set of SCBs, we'll first decode what we
3✔
232
        // already have on-disk, to make sure we can decode it (proper seed)
3✔
233
        // and that we're able to combine it with our new data.
3✔
234
        diskMulti, err := s.Swapper.ExtractMulti(s.keyRing)
3✔
235

3✔
236
        // If the file doesn't exist on disk, then that's OK as it was never
3✔
237
        // created. In this case we'll continue onwards as it isn't a critical
3✔
238
        // error.
3✔
239
        if err != nil && !os.IsNotExist(err) {
3✔
240
                return fmt.Errorf("unable to extract on disk encrypted "+
×
241
                        "SCB: %v", err)
×
242
        }
×
243

244
        // Now that we have channels stored on-disk, we'll create a new set of
245
        // the combined old and new channels to make sure we retain what's
246
        // already on-disk.
247
        //
248
        // NOTE: The ordering of this operations means that our in-memory
249
        // structure will replace what we read from disk.
250
        combinedBackup := make(map[wire.OutPoint]Single)
3✔
251
        if diskMulti != nil {
6✔
252
                for _, diskChannel := range diskMulti.StaticBackups {
6✔
253
                        chanPoint := diskChannel.FundingOutpoint
3✔
254
                        combinedBackup[chanPoint] = diskChannel
3✔
255
                }
3✔
256
        }
257
        for _, memChannel := range s.backupState {
6✔
258
                chanPoint := memChannel.FundingOutpoint
3✔
259
                if _, ok := combinedBackup[chanPoint]; ok {
6✔
260
                        log.Warnf("Replacing disk backup for ChannelPoint(%v) "+
3✔
261
                                "w/ newer version", chanPoint)
3✔
262
                }
3✔
263

264
                combinedBackup[chanPoint] = memChannel
3✔
265
        }
266

267
        // Remove the set of closed channels from the final set of backups.
268
        for _, closedChan := range closedChans {
6✔
269
                delete(combinedBackup, closedChan)
3✔
270
        }
3✔
271

272
        // With our updated channel state obtained, we'll create a new multi
273
        // from our series of singles.
274
        var newMulti Multi
3✔
275
        for _, backup := range combinedBackup {
6✔
276
                newMulti.StaticBackups = append(
3✔
277
                        newMulti.StaticBackups, backup,
3✔
278
                )
3✔
279
        }
3✔
280

281
        // Now that our multi has been assembled, we'll attempt to pack
282
        // (encrypt+encode) the new channel state to our target reader.
283
        var b bytes.Buffer
3✔
284
        err = newMulti.PackToWriter(&b, s.keyRing)
3✔
285
        if err != nil {
3✔
286
                return fmt.Errorf("unable to pack multi backup: %w", err)
×
287
        }
×
288

289
        // Finally, we'll swap out the old backup for this new one in a single
290
        // atomic step, combining the file already on-disk with this set of new
291
        // channels.
292
        err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes()))
3✔
293
        if err != nil {
6✔
294
                return fmt.Errorf("unable to update multi backup: %w", err)
3✔
295
        }
3✔
296

297
        return nil
3✔
298
}
299

300
// backupFileUpdater is the primary goroutine of the SubSwapper which is
301
// responsible for listening for changes to the channel, and updating the
302
// persistent multi backup state with a new packed multi of the latest channel
303
// state.
304
func (s *SubSwapper) backupUpdater() {
3✔
305
        // Ensure that once we exit, we'll cancel our active channel
3✔
306
        // subscription.
3✔
307
        defer s.chanEvents.Cancel()
3✔
308
        defer s.wg.Done()
3✔
309

3✔
310
        log.Debugf("SubSwapper's backupUpdater is active!")
3✔
311

3✔
312
        for {
6✔
313
                select {
3✔
314
                // The channel state has been modified! We'll evaluate all
315
                // changes, and swap out the old packed multi with a new one
316
                // with the latest channel state.
317
                case chanUpdate := <-s.chanEvents.ChanUpdates:
3✔
318
                        oldStateSize := len(s.backupState)
3✔
319

3✔
320
                        // For all new open channels, we'll create a new SCB
3✔
321
                        // given the required information.
3✔
322
                        for _, newChan := range chanUpdate.NewChans {
6✔
323
                                log.Debugf("Adding channel %v to backup state",
3✔
324
                                        newChan.FundingOutpoint)
3✔
325

3✔
326
                                single := NewSingle(
3✔
327
                                        newChan.OpenChannel, newChan.Addrs,
3✔
328
                                )
3✔
329
                                s.backupState[newChan.FundingOutpoint] = single
3✔
330
                        }
3✔
331

332
                        // For all closed channels, we'll remove the prior
333
                        // backup state.
334
                        closedChans := make(
3✔
335
                                []wire.OutPoint, 0, len(chanUpdate.ClosedChans),
3✔
336
                        )
3✔
337
                        for i, closedChan := range chanUpdate.ClosedChans {
6✔
338
                                log.Debugf("Removing channel %v from backup "+
3✔
339
                                        "state", lnutils.NewLogClosure(
3✔
340
                                        chanUpdate.ClosedChans[i].String))
3✔
341

3✔
342
                                delete(s.backupState, closedChan)
3✔
343

3✔
344
                                closedChans = append(closedChans, closedChan)
3✔
345
                        }
3✔
346

347
                        newStateSize := len(s.backupState)
3✔
348

3✔
349
                        log.Infof("Updating on-disk multi SCB backup: "+
3✔
350
                                "num_old_chans=%v, num_new_chans=%v",
3✔
351
                                oldStateSize, newStateSize)
3✔
352

3✔
353
                        // Without new state constructed, we'll, atomically
3✔
354
                        // update the on-disk backup state.
3✔
355
                        if err := s.updateBackupFile(closedChans...); err != nil {
3✔
356
                                log.Errorf("unable to update backup file: %v",
×
357
                                        err)
×
358
                        }
×
359

360
                // We received a manual update. Handle it and update the file.
361
                case manualUpdate := <-s.manualUpdates:
3✔
362
                        oldStateSize := len(s.backupState)
3✔
363

3✔
364
                        // For all open channels, we'll create a new SCB given
3✔
365
                        // the required information.
3✔
366
                        for _, single := range manualUpdate.singles {
6✔
367
                                log.Debugf("Manual update of channel %v",
3✔
368
                                        single.FundingOutpoint)
3✔
369

3✔
370
                                s.backupState[single.FundingOutpoint] = single
3✔
371
                        }
3✔
372

373
                        newStateSize := len(s.backupState)
3✔
374

3✔
375
                        log.Infof("Updating on-disk multi SCB backup: "+
3✔
376
                                "num_old_chans=%v, num_new_chans=%v",
3✔
377
                                oldStateSize, newStateSize)
3✔
378

3✔
379
                        // Without new state constructed, we'll, atomically
3✔
380
                        // update the on-disk backup state.
3✔
381
                        err := s.updateBackupFile()
3✔
382
                        if err != nil {
6✔
383
                                log.Errorf("unable to update backup file: %v",
3✔
384
                                        err)
3✔
385
                        }
3✔
386

387
                        // Send the error (or nil) to the caller of
388
                        // ManualUpdate. The error channel must have capacity of
389
                        // 1 not to block here.
390
                        manualUpdate.errChan <- err
3✔
391

392
                // TODO(roasbeef): refresh periodically on a time basis due to
393
                // possible addr changes from node
394

395
                // Exit at once if a quit signal is detected.
396
                case <-s.quit:
3✔
397
                        return
3✔
398
                }
399
        }
400
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc