• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/autopilot/manager.go
1
package autopilot
2

3
import (
4
        "fmt"
5
        "sync"
6

7
        "github.com/btcsuite/btcd/btcec/v2"
8
        "github.com/btcsuite/btcd/wire"
9
        "github.com/lightningnetwork/lnd/graph"
10
        "github.com/lightningnetwork/lnd/lnwallet"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
// ManagerCfg houses a set of values and methods that is passed to the Manager
15
// for it to properly manage its autopilot agent.
16
type ManagerCfg struct {
17
        // Self is the public key of the lnd instance. It is used to making
18
        // sure the autopilot is not opening channels to itself.
19
        Self *btcec.PublicKey
20

21
        // PilotCfg is the config of the autopilot agent managed by the
22
        // Manager.
23
        PilotCfg *Config
24

25
        // ChannelState is a function closure that returns the current set of
26
        // channels managed by this node.
27
        ChannelState func() ([]LocalChannel, error)
28

29
        // ChannelInfo is a function closure that returns the channel managed
30
        // by the node given by the passed channel point.
31
        ChannelInfo func(wire.OutPoint) (*LocalChannel, error)
32

33
        // SubscribeTransactions is used to get a subscription for transactions
34
        // relevant to this node's wallet.
35
        SubscribeTransactions func() (lnwallet.TransactionSubscription, error)
36

37
        // SubscribeTopology is used to get a subscription for topology changes
38
        // on the network.
39
        SubscribeTopology func() (*graph.TopologyClient, error)
40
}
41

42
// Manager is struct that manages an autopilot agent, making it possible to
43
// enable and disable it at will, and hand it relevant external information.
44
// It implements the autopilot grpc service, which is used to get data about
45
// the running autopilot, and gives it relevant information.
46
type Manager struct {
47
        started sync.Once
48
        stopped sync.Once
49

50
        cfg *ManagerCfg
51

52
        // pilot is the current autopilot agent. It will be nil if the agent is
53
        // disabled.
54
        pilot *Agent
55

56
        quit chan struct{}
57
        wg   sync.WaitGroup
58
        sync.Mutex
59
}
60

61
// NewManager creates a new instance of the Manager from the passed config.
62
func NewManager(cfg *ManagerCfg) (*Manager, error) {
×
63
        return &Manager{
×
64
                cfg:  cfg,
×
65
                quit: make(chan struct{}),
×
66
        }, nil
×
67
}
×
68

69
// Start starts the Manager.
70
func (m *Manager) Start() error {
×
71
        m.started.Do(func() {})
×
72
        return nil
×
73
}
74

75
// Stop stops the Manager. If an autopilot agent is active, it will also be
76
// stopped.
77
func (m *Manager) Stop() error {
×
78
        m.stopped.Do(func() {
×
79
                if err := m.StopAgent(); err != nil {
×
80
                        log.Errorf("Unable to stop pilot: %v", err)
×
81
                }
×
82

83
                close(m.quit)
×
84
                m.wg.Wait()
×
85
        })
86
        return nil
×
87
}
88

89
// IsActive returns whether the autopilot agent is currently active.
90
func (m *Manager) IsActive() bool {
×
91
        m.Lock()
×
92
        defer m.Unlock()
×
93

×
94
        return m.pilot != nil
×
95
}
×
96

97
// StartAgent creates and starts an autopilot agent from the Manager's
98
// config.
99
func (m *Manager) StartAgent() error {
×
100
        m.Lock()
×
101
        defer m.Unlock()
×
102

×
103
        // Already active.
×
104
        if m.pilot != nil {
×
105
                return nil
×
106
        }
×
107

108
        // Next, we'll fetch the current state of open channels from the
109
        // database to use as initial state for the auto-pilot agent.
110
        initialChanState, err := m.cfg.ChannelState()
×
111
        if err != nil {
×
112
                return err
×
113
        }
×
114

115
        // Now that we have all the initial dependencies, we can create the
116
        // auto-pilot instance itself.
117
        pilot, err := New(*m.cfg.PilotCfg, initialChanState)
×
118
        if err != nil {
×
119
                return err
×
120
        }
×
121

122
        if err := pilot.Start(); err != nil {
×
123
                return err
×
124
        }
×
125

126
        // Finally, we'll need to subscribe to two things: incoming
127
        // transactions that modify the wallet's balance, and also any graph
128
        // topology updates.
129
        txnSubscription, err := m.cfg.SubscribeTransactions()
×
130
        if err != nil {
×
131
                pilot.Stop()
×
132
                return err
×
133
        }
×
134
        graphSubscription, err := m.cfg.SubscribeTopology()
×
135
        if err != nil {
×
136
                txnSubscription.Cancel()
×
137
                pilot.Stop()
×
138
                return err
×
139
        }
×
140

141
        m.pilot = pilot
×
142

×
143
        // We'll launch a goroutine to provide the agent with notifications
×
144
        // whenever the balance of the wallet changes.
×
145
        // TODO(halseth): can lead to panic if in process of shutting down.
×
146
        m.wg.Add(1)
×
147
        go func() {
×
148
                defer txnSubscription.Cancel()
×
149
                defer m.wg.Done()
×
150

×
151
                for {
×
152
                        select {
×
153
                        case <-txnSubscription.ConfirmedTransactions():
×
154
                                pilot.OnBalanceChange()
×
155

156
                        // We won't act upon new unconfirmed transaction, as
157
                        // we'll only use confirmed outputs when funding.
158
                        // However, we will still drain this request in order
159
                        // to avoid goroutine leaks, and ensure we promptly
160
                        // read from the channel if available.
161
                        case <-txnSubscription.UnconfirmedTransactions():
×
162
                        case <-pilot.quit:
×
163
                                return
×
164
                        case <-m.quit:
×
165
                                return
×
166
                        }
167
                }
168

169
        }()
170

171
        // We'll also launch a goroutine to provide the agent with
172
        // notifications for when the graph topology controlled by the node
173
        // changes.
174
        m.wg.Add(1)
×
175
        go func() {
×
176
                defer graphSubscription.Cancel()
×
177
                defer m.wg.Done()
×
178

×
179
                for {
×
180
                        select {
×
181
                        case topChange, ok := <-graphSubscription.TopologyChanges:
×
182
                                // If the router is shutting down, then we will
×
183
                                // as well.
×
184
                                if !ok {
×
185
                                        return
×
186
                                }
×
187

188
                                for _, edgeUpdate := range topChange.ChannelEdgeUpdates {
×
189
                                        // If this isn't an advertisement by
×
190
                                        // the backing lnd node, then we'll
×
191
                                        // continue as we only want to add
×
192
                                        // channels that we've created
×
193
                                        // ourselves.
×
194
                                        if !edgeUpdate.AdvertisingNode.IsEqual(m.cfg.Self) {
×
195
                                                continue
×
196
                                        }
197

198
                                        // If this is indeed a channel we
199
                                        // opened, then we'll convert it to the
200
                                        // autopilot.Channel format, and notify
201
                                        // the pilot of the new channel.
202
                                        cp := edgeUpdate.ChanPoint
×
203
                                        edge, err := m.cfg.ChannelInfo(cp)
×
204
                                        if err != nil {
×
205
                                                log.Errorf("Unable to fetch "+
×
206
                                                        "channel info for %v: "+
×
207
                                                        "%v", cp, err)
×
208
                                                continue
×
209
                                        }
210

211
                                        pilot.OnChannelOpen(*edge)
×
212
                                }
213

214
                                // For each closed channel, we'll obtain
215
                                // the chanID of the closed channel and send it
216
                                // to the pilot.
217
                                for _, chanClose := range topChange.ClosedChannels {
×
218
                                        chanID := lnwire.NewShortChanIDFromInt(
×
219
                                                chanClose.ChanID,
×
220
                                        )
×
221

×
222
                                        pilot.OnChannelClose(chanID)
×
223
                                }
×
224

225
                                // If new nodes were added to the graph, or
226
                                // node information has changed, we'll poke
227
                                // autopilot to see if it can make use of them.
228
                                if len(topChange.NodeUpdates) > 0 {
×
229
                                        pilot.OnNodeUpdates()
×
230
                                }
×
231

232
                        case <-pilot.quit:
×
233
                                return
×
234
                        case <-m.quit:
×
235
                                return
×
236
                        }
237
                }
238
        }()
239

240
        log.Debugf("Manager started autopilot agent")
×
241

×
242
        return nil
×
243
}
244

245
// StopAgent stops any active autopilot agent.
246
func (m *Manager) StopAgent() error {
×
247
        m.Lock()
×
248
        defer m.Unlock()
×
249

×
250
        // Not active, so we can return early.
×
251
        if m.pilot == nil {
×
252
                return nil
×
253
        }
×
254

255
        if err := m.pilot.Stop(); err != nil {
×
256
                return err
×
257
        }
×
258

259
        // Make sure to nil the current agent, indicating it is no longer
260
        // active.
261
        m.pilot = nil
×
262

×
263
        log.Debugf("Manager stopped autopilot agent")
×
264

×
265
        return nil
×
266
}
267

268
// QueryHeuristics queries the available autopilot heuristics for node scores.
269
func (m *Manager) QueryHeuristics(nodes []NodeID, localState bool) (
270
        HeuristicScores, error) {
×
271

×
272
        m.Lock()
×
273
        defer m.Unlock()
×
274

×
275
        n := make(map[NodeID]struct{})
×
276
        for _, node := range nodes {
×
277
                n[node] = struct{}{}
×
278
        }
×
279

280
        log.Debugf("Querying heuristics for %d nodes", len(n))
×
281
        return m.queryHeuristics(n, localState)
×
282
}
283

284
// HeuristicScores is an alias for a map that maps heuristic names to a map of
285
// scores for pubkeys.
286
type HeuristicScores map[string]map[NodeID]float64
287

288
// queryHeuristics gets node scores from all available simple heuristics, and
289
// the agent's current active heuristic.
290
//
291
// NOTE: Must be called with the manager's lock.
292
func (m *Manager) queryHeuristics(nodes map[NodeID]struct{}, localState bool) (
293
        HeuristicScores, error) {
×
294

×
295
        // If we want to take the local state into action when querying the
×
296
        // heuristics, we fetch it. If not we'll just pass an empty slice to
×
297
        // the heuristic.
×
298
        var totalChans []LocalChannel
×
299
        var err error
×
300
        if localState {
×
301
                // Fetch the current set of channels.
×
302
                totalChans, err = m.cfg.ChannelState()
×
303
                if err != nil {
×
304
                        return nil, err
×
305
                }
×
306

307
                // If the agent is active, we can merge the channel state with
308
                // the channels pending open.
309
                if m.pilot != nil {
×
310
                        m.pilot.chanStateMtx.Lock()
×
311
                        m.pilot.pendingMtx.Lock()
×
312
                        totalChans = mergeChanState(
×
313
                                m.pilot.pendingOpens, m.pilot.chanState,
×
314
                        )
×
315
                        m.pilot.pendingMtx.Unlock()
×
316
                        m.pilot.chanStateMtx.Unlock()
×
317
                }
×
318
        }
319

320
        // As channel size we'll use the maximum size.
321
        chanSize := m.cfg.PilotCfg.Constraints.MaxChanSize()
×
322

×
323
        // We'll start by getting the scores from each available sub-heuristic,
×
324
        // in addition the current agent heuristic.
×
325
        var heuristics []AttachmentHeuristic
×
326
        heuristics = append(heuristics, availableHeuristics...)
×
327
        heuristics = append(heuristics, m.cfg.PilotCfg.Heuristic)
×
328

×
329
        report := make(HeuristicScores)
×
330
        for _, h := range heuristics {
×
331
                name := h.Name()
×
332

×
333
                // If the agent heuristic is among the simple heuristics it
×
334
                // might get queried more than once. As an optimization we'll
×
335
                // just skip it the second time.
×
336
                if _, ok := report[name]; ok {
×
337
                        continue
×
338
                }
339

340
                s, err := h.NodeScores(
×
341
                        m.cfg.PilotCfg.Graph, totalChans, chanSize, nodes,
×
342
                )
×
343
                if err != nil {
×
344
                        return nil, fmt.Errorf("unable to get sub score: %w",
×
345
                                err)
×
346
                }
×
347

348
                log.Debugf("Heuristic \"%v\" scored %d nodes", name, len(s))
×
349

×
350
                scores := make(map[NodeID]float64)
×
351
                for nID, score := range s {
×
352
                        scores[nID] = score.Score
×
353
                }
×
354

355
                report[name] = scores
×
356
        }
357

358
        return report, nil
×
359
}
360

361
// SetNodeScores is used to set the scores of the given heuristic, if it is
362
// active, and ScoreSettable.
363
func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error {
×
364
        m.Lock()
×
365
        defer m.Unlock()
×
366

×
367
        // It must be ScoreSettable to be available for external
×
368
        // scores.
×
369
        s, ok := m.cfg.PilotCfg.Heuristic.(ScoreSettable)
×
370
        if !ok {
×
371
                return fmt.Errorf("current heuristic doesn't support " +
×
372
                        "external scoring")
×
373
        }
×
374

375
        // Heuristic was found, set its node scores.
376
        applied, err := s.SetNodeScores(name, scores)
×
377
        if err != nil {
×
378
                return err
×
379
        }
×
380

381
        if !applied {
×
382
                return fmt.Errorf("heuristic with name %v not found", name)
×
383
        }
×
384

385
        // If the autopilot agent is active, notify about the updated
386
        // heuristic.
387
        if m.pilot != nil {
×
388
                m.pilot.OnHeuristicUpdate(m.cfg.PilotCfg.Heuristic)
×
389
        }
×
390

391
        return nil
×
392
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc