• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14350633513

09 Apr 2025 06:37AM UTC coverage: 58.642%. First build
14350633513

Pull #9690

github

web-flow
Merge 9f7b6f71c into ac052988c
Pull Request #9690: autopilot: thread contexts through in preparation for GraphSource methods taking a context

1 of 83 new or added lines in 13 files covered. (1.2%)

97190 of 165734 relevant lines covered (58.64%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.16
/autopilot/manager.go
1
package autopilot
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7

8
        "github.com/btcsuite/btcd/btcec/v2"
9
        "github.com/btcsuite/btcd/wire"
10
        "github.com/lightningnetwork/lnd/fn/v2"
11
        graphdb "github.com/lightningnetwork/lnd/graph/db"
12
        "github.com/lightningnetwork/lnd/lnwallet"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
)
15

16
// ManagerCfg houses a set of values and methods that is passed to the Manager
17
// for it to properly manage its autopilot agent.
18
type ManagerCfg struct {
19
        // Self is the public key of the lnd instance. It is used to making
20
        // sure the autopilot is not opening channels to itself.
21
        Self *btcec.PublicKey
22

23
        // PilotCfg is the config of the autopilot agent managed by the
24
        // Manager.
25
        PilotCfg *Config
26

27
        // ChannelState is a function closure that returns the current set of
28
        // channels managed by this node.
29
        ChannelState func() ([]LocalChannel, error)
30

31
        // ChannelInfo is a function closure that returns the channel managed
32
        // by the node given by the passed channel point.
33
        ChannelInfo func(wire.OutPoint) (*LocalChannel, error)
34

35
        // SubscribeTransactions is used to get a subscription for transactions
36
        // relevant to this node's wallet.
37
        SubscribeTransactions func() (lnwallet.TransactionSubscription, error)
38

39
        // SubscribeTopology is used to get a subscription for topology changes
40
        // on the network.
41
        SubscribeTopology func() (*graphdb.TopologyClient, error)
42
}
43

44
// Manager is struct that manages an autopilot agent, making it possible to
45
// enable and disable it at will, and hand it relevant external information.
46
// It implements the autopilot grpc service, which is used to get data about
47
// the running autopilot, and gives it relevant information.
48
type Manager struct {
49
        started sync.Once
50
        stopped sync.Once
51

52
        cfg *ManagerCfg
53

54
        // pilot is the current autopilot agent. It will be nil if the agent is
55
        // disabled.
56
        pilot *Agent
57

58
        quit   chan struct{}
59
        wg     sync.WaitGroup
60
        cancel fn.Option[context.CancelFunc]
61
        sync.Mutex
62
}
63

64
// NewManager creates a new instance of the Manager from the passed config.
65
func NewManager(cfg *ManagerCfg) (*Manager, error) {
3✔
66
        return &Manager{
3✔
67
                cfg:  cfg,
3✔
68
                quit: make(chan struct{}),
3✔
69
        }, nil
3✔
70
}
3✔
71

72
// Start starts the Manager.
73
func (m *Manager) Start() error {
3✔
74
        m.started.Do(func() {})
6✔
75
        return nil
3✔
76
}
77

78
// Stop stops the Manager. If an autopilot agent is active, it will also be
79
// stopped.
80
func (m *Manager) Stop() error {
3✔
81
        m.stopped.Do(func() {
6✔
82
                if err := m.StopAgent(); err != nil {
3✔
83
                        log.Errorf("Unable to stop pilot: %v", err)
×
84
                }
×
85

86
                m.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
3✔
87
                close(m.quit)
3✔
88
                m.wg.Wait()
3✔
89
        })
90
        return nil
3✔
91
}
92

93
// IsActive returns whether the autopilot agent is currently active.
94
func (m *Manager) IsActive() bool {
3✔
95
        m.Lock()
3✔
96
        defer m.Unlock()
3✔
97

3✔
98
        return m.pilot != nil
3✔
99
}
3✔
100

101
// StartAgent creates and starts an autopilot agent from the Manager's
102
// config.
NEW
103
func (m *Manager) StartAgent(ctx context.Context) error {
×
104
        m.Lock()
×
105
        defer m.Unlock()
×
106

×
107
        // Already active.
×
108
        if m.pilot != nil {
×
109
                return nil
×
110
        }
×
NEW
111
        ctx, cancel := context.WithCancel(ctx)
×
NEW
112
        m.cancel = fn.Some(cancel)
×
113

×
114
        // Next, we'll fetch the current state of open channels from the
×
115
        // database to use as initial state for the auto-pilot agent.
×
116
        initialChanState, err := m.cfg.ChannelState()
×
117
        if err != nil {
×
118
                return err
×
119
        }
×
120

121
        // Now that we have all the initial dependencies, we can create the
122
        // auto-pilot instance itself.
123
        pilot, err := New(*m.cfg.PilotCfg, initialChanState)
×
124
        if err != nil {
×
125
                return err
×
126
        }
×
127

NEW
128
        if err := pilot.Start(ctx); err != nil {
×
129
                return err
×
130
        }
×
131

132
        // Finally, we'll need to subscribe to two things: incoming
133
        // transactions that modify the wallet's balance, and also any graph
134
        // topology updates.
135
        txnSubscription, err := m.cfg.SubscribeTransactions()
×
136
        if err != nil {
×
137
                pilot.Stop()
×
138
                return err
×
139
        }
×
140
        graphSubscription, err := m.cfg.SubscribeTopology()
×
141
        if err != nil {
×
142
                txnSubscription.Cancel()
×
143
                pilot.Stop()
×
144
                return err
×
145
        }
×
146

147
        m.pilot = pilot
×
148

×
149
        // We'll launch a goroutine to provide the agent with notifications
×
150
        // whenever the balance of the wallet changes.
×
151
        // TODO(halseth): can lead to panic if in process of shutting down.
×
152
        m.wg.Add(1)
×
153
        go func() {
×
154
                defer txnSubscription.Cancel()
×
155
                defer m.wg.Done()
×
156

×
157
                for {
×
158
                        select {
×
159
                        case <-txnSubscription.ConfirmedTransactions():
×
160
                                pilot.OnBalanceChange()
×
161

162
                        // We won't act upon new unconfirmed transaction, as
163
                        // we'll only use confirmed outputs when funding.
164
                        // However, we will still drain this request in order
165
                        // to avoid goroutine leaks, and ensure we promptly
166
                        // read from the channel if available.
167
                        case <-txnSubscription.UnconfirmedTransactions():
×
168
                        case <-pilot.quit:
×
169
                                return
×
170
                        case <-m.quit:
×
171
                                return
×
172
                        }
173
                }
174

175
        }()
176

177
        // We'll also launch a goroutine to provide the agent with
178
        // notifications for when the graph topology controlled by the node
179
        // changes.
180
        m.wg.Add(1)
×
181
        go func() {
×
182
                defer graphSubscription.Cancel()
×
183
                defer m.wg.Done()
×
184

×
185
                for {
×
186
                        select {
×
187
                        case topChange, ok := <-graphSubscription.TopologyChanges:
×
188
                                // If the router is shutting down, then we will
×
189
                                // as well.
×
190
                                if !ok {
×
191
                                        return
×
192
                                }
×
193

194
                                for _, edgeUpdate := range topChange.ChannelEdgeUpdates {
×
195
                                        // If this isn't an advertisement by
×
196
                                        // the backing lnd node, then we'll
×
197
                                        // continue as we only want to add
×
198
                                        // channels that we've created
×
199
                                        // ourselves.
×
200
                                        if !edgeUpdate.AdvertisingNode.IsEqual(m.cfg.Self) {
×
201
                                                continue
×
202
                                        }
203

204
                                        // If this is indeed a channel we
205
                                        // opened, then we'll convert it to the
206
                                        // autopilot.Channel format, and notify
207
                                        // the pilot of the new channel.
208
                                        cp := edgeUpdate.ChanPoint
×
209
                                        edge, err := m.cfg.ChannelInfo(cp)
×
210
                                        if err != nil {
×
211
                                                log.Errorf("Unable to fetch "+
×
212
                                                        "channel info for %v: "+
×
213
                                                        "%v", cp, err)
×
214
                                                continue
×
215
                                        }
216

217
                                        pilot.OnChannelOpen(*edge)
×
218
                                }
219

220
                                // For each closed channel, we'll obtain
221
                                // the chanID of the closed channel and send it
222
                                // to the pilot.
223
                                for _, chanClose := range topChange.ClosedChannels {
×
224
                                        chanID := lnwire.NewShortChanIDFromInt(
×
225
                                                chanClose.ChanID,
×
226
                                        )
×
227

×
228
                                        pilot.OnChannelClose(chanID)
×
229
                                }
×
230

231
                                // If new nodes were added to the graph, or
232
                                // node information has changed, we'll poke
233
                                // autopilot to see if it can make use of them.
234
                                if len(topChange.NodeUpdates) > 0 {
×
235
                                        pilot.OnNodeUpdates()
×
236
                                }
×
237

238
                        case <-pilot.quit:
×
239
                                return
×
240
                        case <-m.quit:
×
241
                                return
×
242
                        }
243
                }
244
        }()
245

246
        log.Debugf("Manager started autopilot agent")
×
247

×
248
        return nil
×
249
}
250

251
// StopAgent stops any active autopilot agent.
252
func (m *Manager) StopAgent() error {
3✔
253
        m.Lock()
3✔
254
        defer m.Unlock()
3✔
255

3✔
256
        // Not active, so we can return early.
3✔
257
        if m.pilot == nil {
6✔
258
                return nil
3✔
259
        }
3✔
260

261
        if err := m.pilot.Stop(); err != nil {
×
262
                return err
×
263
        }
×
264

265
        // Make sure to nil the current agent, indicating it is no longer
266
        // active.
267
        m.pilot = nil
×
268

×
269
        log.Debugf("Manager stopped autopilot agent")
×
270

×
271
        return nil
×
272
}
273

274
// QueryHeuristics queries the available autopilot heuristics for node scores.
275
func (m *Manager) QueryHeuristics(ctx context.Context, nodes []NodeID,
NEW
276
        localState bool) (HeuristicScores, error) {
×
277

×
278
        m.Lock()
×
279
        defer m.Unlock()
×
280

×
281
        n := make(map[NodeID]struct{})
×
282
        for _, node := range nodes {
×
283
                n[node] = struct{}{}
×
284
        }
×
285

286
        log.Debugf("Querying heuristics for %d nodes", len(n))
×
NEW
287

×
NEW
288
        return m.queryHeuristics(ctx, n, localState)
×
289
}
290

291
// HeuristicScores is an alias for a map that maps heuristic names to a map of
292
// scores for pubkeys.
293
type HeuristicScores map[string]map[NodeID]float64
294

295
// queryHeuristics gets node scores from all available simple heuristics, and
296
// the agent's current active heuristic.
297
//
298
// NOTE: Must be called with the manager's lock.
299
func (m *Manager) queryHeuristics(ctx context.Context,
NEW
300
        nodes map[NodeID]struct{}, localState bool) (HeuristicScores, error) {
×
301

×
302
        // If we want to take the local state into action when querying the
×
303
        // heuristics, we fetch it. If not we'll just pass an empty slice to
×
304
        // the heuristic.
×
305
        var totalChans []LocalChannel
×
306
        var err error
×
307
        if localState {
×
308
                // Fetch the current set of channels.
×
309
                totalChans, err = m.cfg.ChannelState()
×
310
                if err != nil {
×
311
                        return nil, err
×
312
                }
×
313

314
                // If the agent is active, we can merge the channel state with
315
                // the channels pending open.
316
                if m.pilot != nil {
×
317
                        m.pilot.chanStateMtx.Lock()
×
318
                        m.pilot.pendingMtx.Lock()
×
319
                        totalChans = mergeChanState(
×
320
                                m.pilot.pendingOpens, m.pilot.chanState,
×
321
                        )
×
322
                        m.pilot.pendingMtx.Unlock()
×
323
                        m.pilot.chanStateMtx.Unlock()
×
324
                }
×
325
        }
326

327
        // As channel size we'll use the maximum size.
328
        chanSize := m.cfg.PilotCfg.Constraints.MaxChanSize()
×
329

×
330
        // We'll start by getting the scores from each available sub-heuristic,
×
331
        // in addition the current agent heuristic.
×
332
        var heuristics []AttachmentHeuristic
×
333
        heuristics = append(heuristics, availableHeuristics...)
×
334
        heuristics = append(heuristics, m.cfg.PilotCfg.Heuristic)
×
335

×
336
        report := make(HeuristicScores)
×
337
        for _, h := range heuristics {
×
338
                name := h.Name()
×
339

×
340
                // If the agent heuristic is among the simple heuristics it
×
341
                // might get queried more than once. As an optimization we'll
×
342
                // just skip it the second time.
×
343
                if _, ok := report[name]; ok {
×
344
                        continue
×
345
                }
346

347
                s, err := h.NodeScores(
×
NEW
348
                        ctx, m.cfg.PilotCfg.Graph, totalChans, chanSize, nodes,
×
349
                )
×
350
                if err != nil {
×
351
                        return nil, fmt.Errorf("unable to get sub score: %w",
×
352
                                err)
×
353
                }
×
354

355
                log.Debugf("Heuristic \"%v\" scored %d nodes", name, len(s))
×
356

×
357
                scores := make(map[NodeID]float64)
×
358
                for nID, score := range s {
×
359
                        scores[nID] = score.Score
×
360
                }
×
361

362
                report[name] = scores
×
363
        }
364

365
        return report, nil
×
366
}
367

368
// SetNodeScores is used to set the scores of the given heuristic, if it is
369
// active, and ScoreSettable.
370
func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error {
×
371
        m.Lock()
×
372
        defer m.Unlock()
×
373

×
374
        // It must be ScoreSettable to be available for external
×
375
        // scores.
×
376
        s, ok := m.cfg.PilotCfg.Heuristic.(ScoreSettable)
×
377
        if !ok {
×
378
                return fmt.Errorf("current heuristic doesn't support " +
×
379
                        "external scoring")
×
380
        }
×
381

382
        // Heuristic was found, set its node scores.
383
        applied, err := s.SetNodeScores(name, scores)
×
384
        if err != nil {
×
385
                return err
×
386
        }
×
387

388
        if !applied {
×
389
                return fmt.Errorf("heuristic with name %v not found", name)
×
390
        }
×
391

392
        // If the autopilot agent is active, notify about the updated
393
        // heuristic.
394
        if m.pilot != nil {
×
395
                m.pilot.OnHeuristicUpdate(m.cfg.PilotCfg.Heuristic)
×
396
        }
×
397

398
        return nil
×
399
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc