• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16181619122

09 Jul 2025 10:33PM UTC coverage: 55.326% (-2.3%) from 57.611%
16181619122

Pull #10060

github

web-flow
Merge d15e8671f into 0e830da9d
Pull Request #10060: sweep: fix expected spending events being missed

9 of 26 new or added lines in 2 files covered. (34.62%)

23695 existing lines in 280 files now uncovered.

108518 of 196143 relevant lines covered (55.33%)

22354.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/autopilot/manager.go
1
package autopilot
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7

8
        "github.com/btcsuite/btcd/btcec/v2"
9
        "github.com/btcsuite/btcd/wire"
10
        graphdb "github.com/lightningnetwork/lnd/graph/db"
11
        "github.com/lightningnetwork/lnd/lnwallet"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
)
14

15
// ManagerCfg houses a set of values and methods that is passed to the Manager
16
// for it to properly manage its autopilot agent.
17
type ManagerCfg struct {
18
        // Self is the public key of the lnd instance. It is used to making
19
        // sure the autopilot is not opening channels to itself.
20
        Self *btcec.PublicKey
21

22
        // PilotCfg is the config of the autopilot agent managed by the
23
        // Manager.
24
        PilotCfg *Config
25

26
        // ChannelState is a function closure that returns the current set of
27
        // channels managed by this node.
28
        ChannelState func() ([]LocalChannel, error)
29

30
        // ChannelInfo is a function closure that returns the channel managed
31
        // by the node given by the passed channel point.
32
        ChannelInfo func(wire.OutPoint) (*LocalChannel, error)
33

34
        // SubscribeTransactions is used to get a subscription for transactions
35
        // relevant to this node's wallet.
36
        SubscribeTransactions func() (lnwallet.TransactionSubscription, error)
37

38
        // SubscribeTopology is used to get a subscription for topology changes
39
        // on the network.
40
        SubscribeTopology func() (*graphdb.TopologyClient, error)
41
}
42

43
// Manager is struct that manages an autopilot agent, making it possible to
44
// enable and disable it at will, and hand it relevant external information.
45
// It implements the autopilot grpc service, which is used to get data about
46
// the running autopilot, and gives it relevant information.
47
type Manager struct {
48
        started sync.Once
49
        stopped sync.Once
50

51
        cfg *ManagerCfg
52

53
        // pilot is the current autopilot agent. It will be nil if the agent is
54
        // disabled.
55
        pilot *Agent
56

57
        quit chan struct{}
58
        wg   sync.WaitGroup
59
        sync.Mutex
60
}
61

62
// NewManager creates a new instance of the Manager from the passed config.
UNCOV
63
func NewManager(cfg *ManagerCfg) (*Manager, error) {
×
UNCOV
64
        return &Manager{
×
UNCOV
65
                cfg:  cfg,
×
UNCOV
66
                quit: make(chan struct{}),
×
UNCOV
67
        }, nil
×
UNCOV
68
}
×
69

70
// Start starts the Manager.
UNCOV
71
func (m *Manager) Start() error {
×
UNCOV
72
        m.started.Do(func() {})
×
UNCOV
73
        return nil
×
74
}
75

76
// Stop stops the Manager. If an autopilot agent is active, it will also be
77
// stopped.
UNCOV
78
func (m *Manager) Stop() error {
×
UNCOV
79
        m.stopped.Do(func() {
×
UNCOV
80
                if err := m.StopAgent(); err != nil {
×
81
                        log.Errorf("Unable to stop pilot: %v", err)
×
82
                }
×
83

UNCOV
84
                close(m.quit)
×
UNCOV
85
                m.wg.Wait()
×
86
        })
UNCOV
87
        return nil
×
88
}
89

90
// IsActive returns whether the autopilot agent is currently active.
UNCOV
91
func (m *Manager) IsActive() bool {
×
UNCOV
92
        m.Lock()
×
UNCOV
93
        defer m.Unlock()
×
UNCOV
94

×
UNCOV
95
        return m.pilot != nil
×
UNCOV
96
}
×
97

98
// StartAgent creates and starts an autopilot agent from the Manager's
99
// config.
100
func (m *Manager) StartAgent() error {
×
101
        m.Lock()
×
102
        defer m.Unlock()
×
103

×
104
        // Already active.
×
105
        if m.pilot != nil {
×
106
                return nil
×
107
        }
×
108

109
        // Next, we'll fetch the current state of open channels from the
110
        // database to use as initial state for the auto-pilot agent.
111
        initialChanState, err := m.cfg.ChannelState()
×
112
        if err != nil {
×
113
                return err
×
114
        }
×
115

116
        // Now that we have all the initial dependencies, we can create the
117
        // auto-pilot instance itself.
118
        pilot, err := New(*m.cfg.PilotCfg, initialChanState)
×
119
        if err != nil {
×
120
                return err
×
121
        }
×
122

123
        if err := pilot.Start(); err != nil {
×
124
                return err
×
125
        }
×
126

127
        // Finally, we'll need to subscribe to two things: incoming
128
        // transactions that modify the wallet's balance, and also any graph
129
        // topology updates.
130
        txnSubscription, err := m.cfg.SubscribeTransactions()
×
131
        if err != nil {
×
132
                pilot.Stop()
×
133
                return err
×
134
        }
×
135
        graphSubscription, err := m.cfg.SubscribeTopology()
×
136
        if err != nil {
×
137
                txnSubscription.Cancel()
×
138
                pilot.Stop()
×
139
                return err
×
140
        }
×
141

142
        m.pilot = pilot
×
143

×
144
        // We'll launch a goroutine to provide the agent with notifications
×
145
        // whenever the balance of the wallet changes.
×
146
        // TODO(halseth): can lead to panic if in process of shutting down.
×
147
        m.wg.Add(1)
×
148
        go func() {
×
149
                defer txnSubscription.Cancel()
×
150
                defer m.wg.Done()
×
151

×
152
                for {
×
153
                        select {
×
154
                        case <-txnSubscription.ConfirmedTransactions():
×
155
                                pilot.OnBalanceChange()
×
156

157
                        // We won't act upon new unconfirmed transaction, as
158
                        // we'll only use confirmed outputs when funding.
159
                        // However, we will still drain this request in order
160
                        // to avoid goroutine leaks, and ensure we promptly
161
                        // read from the channel if available.
162
                        case <-txnSubscription.UnconfirmedTransactions():
×
163
                        case <-pilot.quit:
×
164
                                return
×
165
                        case <-m.quit:
×
166
                                return
×
167
                        }
168
                }
169

170
        }()
171

172
        // We'll also launch a goroutine to provide the agent with
173
        // notifications for when the graph topology controlled by the node
174
        // changes.
175
        m.wg.Add(1)
×
176
        go func() {
×
177
                defer graphSubscription.Cancel()
×
178
                defer m.wg.Done()
×
179

×
180
                for {
×
181
                        select {
×
182
                        case topChange, ok := <-graphSubscription.TopologyChanges:
×
183
                                // If the router is shutting down, then we will
×
184
                                // as well.
×
185
                                if !ok {
×
186
                                        return
×
187
                                }
×
188

189
                                for _, edgeUpdate := range topChange.ChannelEdgeUpdates {
×
190
                                        // If this isn't an advertisement by
×
191
                                        // the backing lnd node, then we'll
×
192
                                        // continue as we only want to add
×
193
                                        // channels that we've created
×
194
                                        // ourselves.
×
195
                                        if !edgeUpdate.AdvertisingNode.IsEqual(m.cfg.Self) {
×
196
                                                continue
×
197
                                        }
198

199
                                        // If this is indeed a channel we
200
                                        // opened, then we'll convert it to the
201
                                        // autopilot.Channel format, and notify
202
                                        // the pilot of the new channel.
203
                                        cp := edgeUpdate.ChanPoint
×
204
                                        edge, err := m.cfg.ChannelInfo(cp)
×
205
                                        if err != nil {
×
206
                                                log.Errorf("Unable to fetch "+
×
207
                                                        "channel info for %v: "+
×
208
                                                        "%v", cp, err)
×
209
                                                continue
×
210
                                        }
211

212
                                        pilot.OnChannelOpen(*edge)
×
213
                                }
214

215
                                // For each closed channel, we'll obtain
216
                                // the chanID of the closed channel and send it
217
                                // to the pilot.
218
                                for _, chanClose := range topChange.ClosedChannels {
×
219
                                        chanID := lnwire.NewShortChanIDFromInt(
×
220
                                                chanClose.ChanID,
×
221
                                        )
×
222

×
223
                                        pilot.OnChannelClose(chanID)
×
224
                                }
×
225

226
                                // If new nodes were added to the graph, or
227
                                // node information has changed, we'll poke
228
                                // autopilot to see if it can make use of them.
229
                                if len(topChange.NodeUpdates) > 0 {
×
230
                                        pilot.OnNodeUpdates()
×
231
                                }
×
232

233
                        case <-pilot.quit:
×
234
                                return
×
235
                        case <-m.quit:
×
236
                                return
×
237
                        }
238
                }
239
        }()
240

241
        log.Debugf("Manager started autopilot agent")
×
242

×
243
        return nil
×
244
}
245

246
// StopAgent stops any active autopilot agent.
UNCOV
247
func (m *Manager) StopAgent() error {
×
UNCOV
248
        m.Lock()
×
UNCOV
249
        defer m.Unlock()
×
UNCOV
250

×
UNCOV
251
        // Not active, so we can return early.
×
UNCOV
252
        if m.pilot == nil {
×
UNCOV
253
                return nil
×
UNCOV
254
        }
×
255

256
        if err := m.pilot.Stop(); err != nil {
×
257
                return err
×
258
        }
×
259

260
        // Make sure to nil the current agent, indicating it is no longer
261
        // active.
262
        m.pilot = nil
×
263

×
264
        log.Debugf("Manager stopped autopilot agent")
×
265

×
266
        return nil
×
267
}
268

269
// QueryHeuristics queries the available autopilot heuristics for node scores.
270
func (m *Manager) QueryHeuristics(ctx context.Context, nodes []NodeID,
271
        localState bool) (HeuristicScores, error) {
×
272

×
273
        m.Lock()
×
274
        defer m.Unlock()
×
275

×
276
        n := make(map[NodeID]struct{})
×
277
        for _, node := range nodes {
×
278
                n[node] = struct{}{}
×
279
        }
×
280

281
        log.Debugf("Querying heuristics for %d nodes", len(n))
×
282

×
283
        return m.queryHeuristics(ctx, n, localState)
×
284
}
285

286
// HeuristicScores is an alias for a map that maps heuristic names to a map of
287
// scores for pubkeys.
288
type HeuristicScores map[string]map[NodeID]float64
289

290
// queryHeuristics gets node scores from all available simple heuristics, and
291
// the agent's current active heuristic.
292
//
293
// NOTE: Must be called with the manager's lock.
294
func (m *Manager) queryHeuristics(ctx context.Context,
295
        nodes map[NodeID]struct{}, localState bool) (HeuristicScores, error) {
×
296

×
297
        // If we want to take the local state into action when querying the
×
298
        // heuristics, we fetch it. If not we'll just pass an empty slice to
×
299
        // the heuristic.
×
300
        var totalChans []LocalChannel
×
301
        var err error
×
302
        if localState {
×
303
                // Fetch the current set of channels.
×
304
                totalChans, err = m.cfg.ChannelState()
×
305
                if err != nil {
×
306
                        return nil, err
×
307
                }
×
308

309
                // If the agent is active, we can merge the channel state with
310
                // the channels pending open.
311
                if m.pilot != nil {
×
312
                        m.pilot.chanStateMtx.Lock()
×
313
                        m.pilot.pendingMtx.Lock()
×
314
                        totalChans = mergeChanState(
×
315
                                m.pilot.pendingOpens, m.pilot.chanState,
×
316
                        )
×
317
                        m.pilot.pendingMtx.Unlock()
×
318
                        m.pilot.chanStateMtx.Unlock()
×
319
                }
×
320
        }
321

322
        // As channel size we'll use the maximum size.
323
        chanSize := m.cfg.PilotCfg.Constraints.MaxChanSize()
×
324

×
325
        // We'll start by getting the scores from each available sub-heuristic,
×
326
        // in addition the current agent heuristic.
×
327
        var heuristics []AttachmentHeuristic
×
328
        heuristics = append(heuristics, availableHeuristics...)
×
329
        heuristics = append(heuristics, m.cfg.PilotCfg.Heuristic)
×
330

×
331
        report := make(HeuristicScores)
×
332
        for _, h := range heuristics {
×
333
                name := h.Name()
×
334

×
335
                // If the agent heuristic is among the simple heuristics it
×
336
                // might get queried more than once. As an optimization we'll
×
337
                // just skip it the second time.
×
338
                if _, ok := report[name]; ok {
×
339
                        continue
×
340
                }
341

342
                s, err := h.NodeScores(
×
343
                        ctx, m.cfg.PilotCfg.Graph, totalChans, chanSize, nodes,
×
344
                )
×
345
                if err != nil {
×
346
                        return nil, fmt.Errorf("unable to get sub score: %w",
×
347
                                err)
×
348
                }
×
349

350
                log.Debugf("Heuristic \"%v\" scored %d nodes", name, len(s))
×
351

×
352
                scores := make(map[NodeID]float64)
×
353
                for nID, score := range s {
×
354
                        scores[nID] = score.Score
×
355
                }
×
356

357
                report[name] = scores
×
358
        }
359

360
        return report, nil
×
361
}
362

363
// SetNodeScores is used to set the scores of the given heuristic, if it is
364
// active, and ScoreSettable.
365
func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error {
×
366
        m.Lock()
×
367
        defer m.Unlock()
×
368

×
369
        // It must be ScoreSettable to be available for external
×
370
        // scores.
×
371
        s, ok := m.cfg.PilotCfg.Heuristic.(ScoreSettable)
×
372
        if !ok {
×
373
                return fmt.Errorf("current heuristic doesn't support " +
×
374
                        "external scoring")
×
375
        }
×
376

377
        // Heuristic was found, set its node scores.
378
        applied, err := s.SetNodeScores(name, scores)
×
379
        if err != nil {
×
380
                return err
×
381
        }
×
382

383
        if !applied {
×
384
                return fmt.Errorf("heuristic with name %v not found", name)
×
385
        }
×
386

387
        // If the autopilot agent is active, notify about the updated
388
        // heuristic.
389
        if m.pilot != nil {
×
390
                m.pilot.OnHeuristicUpdate(m.cfg.PilotCfg.Heuristic)
×
391
        }
×
392

393
        return nil
×
394
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc