• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/autopilot/manager.go
1
package autopilot
2

3
import (
4
        "fmt"
5
        "sync"
6

7
        "github.com/btcsuite/btcd/btcec/v2"
8
        "github.com/btcsuite/btcd/wire"
9
        "github.com/lightningnetwork/lnd/graph"
10
        "github.com/lightningnetwork/lnd/lnwallet"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
// ManagerCfg houses a set of values and methods that is passed to the Manager
15
// for it to properly manage its autopilot agent.
16
type ManagerCfg struct {
17
        // Self is the public key of the lnd instance. It is used to making
18
        // sure the autopilot is not opening channels to itself.
19
        Self *btcec.PublicKey
20

21
        // PilotCfg is the config of the autopilot agent managed by the
22
        // Manager.
23
        PilotCfg *Config
24

25
        // ChannelState is a function closure that returns the current set of
26
        // channels managed by this node.
27
        ChannelState func() ([]LocalChannel, error)
28

29
        // ChannelInfo is a function closure that returns the channel managed
30
        // by the node given by the passed channel point.
31
        ChannelInfo func(wire.OutPoint) (*LocalChannel, error)
32

33
        // SubscribeTransactions is used to get a subscription for transactions
34
        // relevant to this node's wallet.
35
        SubscribeTransactions func() (lnwallet.TransactionSubscription, error)
36

37
        // SubscribeTopology is used to get a subscription for topology changes
38
        // on the network.
39
        SubscribeTopology func() (*graph.TopologyClient, error)
40
}
41

42
// Manager is struct that manages an autopilot agent, making it possible to
43
// enable and disable it at will, and hand it relevant external information.
44
// It implements the autopilot grpc service, which is used to get data about
45
// the running autopilot, and gives it relevant information.
46
type Manager struct {
47
        started sync.Once
48
        stopped sync.Once
49

50
        cfg *ManagerCfg
51

52
        // pilot is the current autopilot agent. It will be nil if the agent is
53
        // disabled.
54
        pilot *Agent
55

56
        quit chan struct{}
57
        wg   sync.WaitGroup
58
        sync.Mutex
59
}
60

61
// NewManager creates a new instance of the Manager from the passed config.
UNCOV
62
func NewManager(cfg *ManagerCfg) (*Manager, error) {
×
UNCOV
63
        return &Manager{
×
UNCOV
64
                cfg:  cfg,
×
UNCOV
65
                quit: make(chan struct{}),
×
UNCOV
66
        }, nil
×
UNCOV
67
}
×
68

69
// Start starts the Manager.
UNCOV
70
func (m *Manager) Start() error {
×
UNCOV
71
        m.started.Do(func() {})
×
UNCOV
72
        return nil
×
73
}
74

75
// Stop stops the Manager. If an autopilot agent is active, it will also be
76
// stopped.
UNCOV
77
func (m *Manager) Stop() error {
×
UNCOV
78
        m.stopped.Do(func() {
×
UNCOV
79
                if err := m.StopAgent(); err != nil {
×
80
                        log.Errorf("Unable to stop pilot: %v", err)
×
81
                }
×
82

UNCOV
83
                close(m.quit)
×
UNCOV
84
                m.wg.Wait()
×
85
        })
UNCOV
86
        return nil
×
87
}
88

89
// IsActive returns whether the autopilot agent is currently active.
UNCOV
90
func (m *Manager) IsActive() bool {
×
UNCOV
91
        m.Lock()
×
UNCOV
92
        defer m.Unlock()
×
UNCOV
93

×
UNCOV
94
        return m.pilot != nil
×
UNCOV
95
}
×
96

97
// StartAgent creates and starts an autopilot agent from the Manager's
98
// config.
99
func (m *Manager) StartAgent() error {
×
100
        m.Lock()
×
101
        defer m.Unlock()
×
102

×
103
        // Already active.
×
104
        if m.pilot != nil {
×
105
                return nil
×
106
        }
×
107

108
        // Next, we'll fetch the current state of open channels from the
109
        // database to use as initial state for the auto-pilot agent.
110
        initialChanState, err := m.cfg.ChannelState()
×
111
        if err != nil {
×
112
                return err
×
113
        }
×
114

115
        // Now that we have all the initial dependencies, we can create the
116
        // auto-pilot instance itself.
117
        pilot, err := New(*m.cfg.PilotCfg, initialChanState)
×
118
        if err != nil {
×
119
                return err
×
120
        }
×
121

122
        if err := pilot.Start(); err != nil {
×
123
                return err
×
124
        }
×
125

126
        // Finally, we'll need to subscribe to two things: incoming
127
        // transactions that modify the wallet's balance, and also any graph
128
        // topology updates.
129
        txnSubscription, err := m.cfg.SubscribeTransactions()
×
130
        if err != nil {
×
131
                pilot.Stop()
×
132
                return err
×
133
        }
×
134
        graphSubscription, err := m.cfg.SubscribeTopology()
×
135
        if err != nil {
×
136
                txnSubscription.Cancel()
×
137
                pilot.Stop()
×
138
                return err
×
139
        }
×
140

141
        m.pilot = pilot
×
142

×
143
        // We'll launch a goroutine to provide the agent with notifications
×
144
        // whenever the balance of the wallet changes.
×
145
        // TODO(halseth): can lead to panic if in process of shutting down.
×
146
        m.wg.Add(1)
×
147
        go func() {
×
148
                defer txnSubscription.Cancel()
×
149
                defer m.wg.Done()
×
150

×
151
                for {
×
152
                        select {
×
153
                        case <-txnSubscription.ConfirmedTransactions():
×
154
                                pilot.OnBalanceChange()
×
155

156
                        // We won't act upon new unconfirmed transaction, as
157
                        // we'll only use confirmed outputs when funding.
158
                        // However, we will still drain this request in order
159
                        // to avoid goroutine leaks, and ensure we promptly
160
                        // read from the channel if available.
161
                        case <-txnSubscription.UnconfirmedTransactions():
×
162
                        case <-pilot.quit:
×
163
                                return
×
164
                        case <-m.quit:
×
165
                                return
×
166
                        }
167
                }
168

169
        }()
170

171
        // We'll also launch a goroutine to provide the agent with
172
        // notifications for when the graph topology controlled by the node
173
        // changes.
174
        m.wg.Add(1)
×
175
        go func() {
×
176
                defer graphSubscription.Cancel()
×
177
                defer m.wg.Done()
×
178

×
179
                for {
×
180
                        select {
×
181
                        case topChange, ok := <-graphSubscription.TopologyChanges:
×
182
                                // If the router is shutting down, then we will
×
183
                                // as well.
×
184
                                if !ok {
×
185
                                        return
×
186
                                }
×
187

188
                                for _, edgeUpdate := range topChange.ChannelEdgeUpdates {
×
189
                                        // If this isn't an advertisement by
×
190
                                        // the backing lnd node, then we'll
×
191
                                        // continue as we only want to add
×
192
                                        // channels that we've created
×
193
                                        // ourselves.
×
194
                                        if !edgeUpdate.AdvertisingNode.IsEqual(m.cfg.Self) {
×
195
                                                continue
×
196
                                        }
197

198
                                        // If this is indeed a channel we
199
                                        // opened, then we'll convert it to the
200
                                        // autopilot.Channel format, and notify
201
                                        // the pilot of the new channel.
202
                                        cp := edgeUpdate.ChanPoint
×
203
                                        edge, err := m.cfg.ChannelInfo(cp)
×
204
                                        if err != nil {
×
205
                                                log.Errorf("Unable to fetch "+
×
206
                                                        "channel info for %v: "+
×
207
                                                        "%v", cp, err)
×
208
                                                continue
×
209
                                        }
210

211
                                        pilot.OnChannelOpen(*edge)
×
212
                                }
213

214
                                // For each closed channel, we'll obtain
215
                                // the chanID of the closed channel and send it
216
                                // to the pilot.
217
                                for _, chanClose := range topChange.ClosedChannels {
×
218
                                        chanID := lnwire.NewShortChanIDFromInt(
×
219
                                                chanClose.ChanID,
×
220
                                        )
×
221

×
222
                                        pilot.OnChannelClose(chanID)
×
223
                                }
×
224

225
                                // If new nodes were added to the graph, or
226
                                // node information has changed, we'll poke
227
                                // autopilot to see if it can make use of them.
228
                                if len(topChange.NodeUpdates) > 0 {
×
229
                                        pilot.OnNodeUpdates()
×
230
                                }
×
231

232
                        case <-pilot.quit:
×
233
                                return
×
234
                        case <-m.quit:
×
235
                                return
×
236
                        }
237
                }
238
        }()
239

240
        log.Debugf("Manager started autopilot agent")
×
241

×
242
        return nil
×
243
}
244

245
// StopAgent stops any active autopilot agent.
UNCOV
246
func (m *Manager) StopAgent() error {
×
UNCOV
247
        m.Lock()
×
UNCOV
248
        defer m.Unlock()
×
UNCOV
249

×
UNCOV
250
        // Not active, so we can return early.
×
UNCOV
251
        if m.pilot == nil {
×
UNCOV
252
                return nil
×
UNCOV
253
        }
×
254

255
        if err := m.pilot.Stop(); err != nil {
×
256
                return err
×
257
        }
×
258

259
        // Make sure to nil the current agent, indicating it is no longer
260
        // active.
261
        m.pilot = nil
×
262

×
263
        log.Debugf("Manager stopped autopilot agent")
×
264

×
265
        return nil
×
266
}
267

268
// QueryHeuristics queries the available autopilot heuristics for node scores.
269
func (m *Manager) QueryHeuristics(nodes []NodeID, localState bool) (
270
        HeuristicScores, error) {
×
271

×
272
        m.Lock()
×
273
        defer m.Unlock()
×
274

×
275
        n := make(map[NodeID]struct{})
×
276
        for _, node := range nodes {
×
277
                n[node] = struct{}{}
×
278
        }
×
279

280
        log.Debugf("Querying heuristics for %d nodes", len(n))
×
281
        return m.queryHeuristics(n, localState)
×
282
}
283

284
// HeuristicScores is an alias for a map that maps heuristic names to a map of
285
// scores for pubkeys.
286
type HeuristicScores map[string]map[NodeID]float64
287

288
// queryHeuristics gets node scores from all available simple heuristics, and
289
// the agent's current active heuristic.
290
//
291
// NOTE: Must be called with the manager's lock.
292
func (m *Manager) queryHeuristics(nodes map[NodeID]struct{}, localState bool) (
293
        HeuristicScores, error) {
×
294

×
295
        // If we want to take the local state into action when querying the
×
296
        // heuristics, we fetch it. If not we'll just pass an empty slice to
×
297
        // the heuristic.
×
298
        var totalChans []LocalChannel
×
299
        var err error
×
300
        if localState {
×
301
                // Fetch the current set of channels.
×
302
                totalChans, err = m.cfg.ChannelState()
×
303
                if err != nil {
×
304
                        return nil, err
×
305
                }
×
306

307
                // If the agent is active, we can merge the channel state with
308
                // the channels pending open.
309
                if m.pilot != nil {
×
310
                        m.pilot.chanStateMtx.Lock()
×
311
                        m.pilot.pendingMtx.Lock()
×
312
                        totalChans = mergeChanState(
×
313
                                m.pilot.pendingOpens, m.pilot.chanState,
×
314
                        )
×
315
                        m.pilot.pendingMtx.Unlock()
×
316
                        m.pilot.chanStateMtx.Unlock()
×
317
                }
×
318
        }
319

320
        // As channel size we'll use the maximum size.
321
        chanSize := m.cfg.PilotCfg.Constraints.MaxChanSize()
×
322

×
323
        // We'll start by getting the scores from each available sub-heuristic,
×
324
        // in addition the current agent heuristic.
×
325
        var heuristics []AttachmentHeuristic
×
326
        heuristics = append(heuristics, availableHeuristics...)
×
327
        heuristics = append(heuristics, m.cfg.PilotCfg.Heuristic)
×
328

×
329
        report := make(HeuristicScores)
×
330
        for _, h := range heuristics {
×
331
                name := h.Name()
×
332

×
333
                // If the agent heuristic is among the simple heuristics it
×
334
                // might get queried more than once. As an optimization we'll
×
335
                // just skip it the second time.
×
336
                if _, ok := report[name]; ok {
×
337
                        continue
×
338
                }
339

340
                s, err := h.NodeScores(
×
341
                        m.cfg.PilotCfg.Graph, totalChans, chanSize, nodes,
×
342
                )
×
343
                if err != nil {
×
344
                        return nil, fmt.Errorf("unable to get sub score: %w",
×
345
                                err)
×
346
                }
×
347

348
                log.Debugf("Heuristic \"%v\" scored %d nodes", name, len(s))
×
349

×
350
                scores := make(map[NodeID]float64)
×
351
                for nID, score := range s {
×
352
                        scores[nID] = score.Score
×
353
                }
×
354

355
                report[name] = scores
×
356
        }
357

358
        return report, nil
×
359
}
360

361
// SetNodeScores is used to set the scores of the given heuristic, if it is
362
// active, and ScoreSettable.
363
func (m *Manager) SetNodeScores(name string, scores map[NodeID]float64) error {
×
364
        m.Lock()
×
365
        defer m.Unlock()
×
366

×
367
        // It must be ScoreSettable to be available for external
×
368
        // scores.
×
369
        s, ok := m.cfg.PilotCfg.Heuristic.(ScoreSettable)
×
370
        if !ok {
×
371
                return fmt.Errorf("current heuristic doesn't support " +
×
372
                        "external scoring")
×
373
        }
×
374

375
        // Heuristic was found, set its node scores.
376
        applied, err := s.SetNodeScores(name, scores)
×
377
        if err != nil {
×
378
                return err
×
379
        }
×
380

381
        if !applied {
×
382
                return fmt.Errorf("heuristic with name %v not found", name)
×
383
        }
×
384

385
        // If the autopilot agent is active, notify about the updated
386
        // heuristic.
387
        if m.pilot != nil {
×
388
                m.pilot.OnHeuristicUpdate(m.cfg.PilotCfg.Heuristic)
×
389
        }
×
390

391
        return nil
×
392
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc