• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13035292482

29 Jan 2025 03:59PM UTC coverage: 49.3% (-9.5%) from 58.777%
13035292482

Pull #9456

github

mohamedawnallah
docs: update release-notes-0.19.0.md

In this commit, we warn users about the removal
of RPCs `SendToRoute`, `SendToRouteSync`, `SendPayment`,
and `SendPaymentSync` in the next release 0.20.
Pull Request #9456: lnrpc+docs: deprecate warning `SendToRoute`, `SendToRouteSync`, `SendPayment`, and `SendPaymentSync` in Release 0.19

100634 of 204126 relevant lines covered (49.3%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/autopilot/agent.go
1
package autopilot
2

3
import (
4
        "bytes"
5
        "fmt"
6
        "math/rand"
7
        "net"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcutil"
13
        "github.com/davecgh/go-spew/spew"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
)
16

17
// Config couples all the items that an autopilot agent needs to function.
18
// All items within the struct MUST be populated for the Agent to be able to
19
// carry out its duties.
20
type Config struct {
21
        // Self is the identity public key of the Lightning Network node that
22
        // is being driven by the agent. This is used to ensure that we don't
23
        // accidentally attempt to open a channel with ourselves.
24
        Self *btcec.PublicKey
25

26
        // Heuristic is an attachment heuristic which will govern to whom we
27
        // open channels to, and also what those channels look like in terms of
28
        // desired capacity. The Heuristic will take into account the current
29
        // state of the graph, our set of open channels, and the amount of
30
        // available funds when determining how channels are to be opened.
31
        // Additionally, a heuristic make also factor in extra-graph
32
        // information in order to make more pertinent recommendations.
33
        Heuristic AttachmentHeuristic
34

35
        // ChanController is an interface that is able to directly manage the
36
        // creation, closing and update of channels within the network.
37
        ChanController ChannelController
38

39
        // ConnectToPeer attempts to connect to the peer using one of its
40
        // advertised addresses. The boolean returned signals whether the peer
41
        // was already connected.
42
        ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error)
43

44
        // DisconnectPeer attempts to disconnect the peer with the given public
45
        // key.
46
        DisconnectPeer func(*btcec.PublicKey) error
47

48
        // WalletBalance is a function closure that should return the current
49
        // available balance of the backing wallet.
50
        WalletBalance func() (btcutil.Amount, error)
51

52
        // Graph is an abstract channel graph that the Heuristic and the Agent
53
        // will use to make decisions w.r.t channel allocation and placement
54
        // within the graph.
55
        Graph ChannelGraph
56

57
        // Constraints is the set of constraints the autopilot must adhere to
58
        // when opening channels.
59
        Constraints AgentConstraints
60

61
        // TODO(roasbeef): add additional signals from fee rates and revenue of
62
        // currently opened channels
63
}
64

65
// channelState is a type that represents the set of active channels of the
66
// backing LN node that the Agent should be aware of. This type contains a few
67
// helper utility methods.
68
type channelState map[lnwire.ShortChannelID]LocalChannel
69

70
// Channels returns a slice of all the active channels.
71
func (c channelState) Channels() []LocalChannel {
×
72
        chans := make([]LocalChannel, 0, len(c))
×
73
        for _, channel := range c {
×
74
                chans = append(chans, channel)
×
75
        }
×
76
        return chans
×
77
}
78

79
// ConnectedNodes returns the set of nodes we currently have a channel with.
80
// This information is needed as we want to avoid making repeated channels with
81
// any node.
82
func (c channelState) ConnectedNodes() map[NodeID]struct{} {
×
83
        nodes := make(map[NodeID]struct{})
×
84
        for _, channels := range c {
×
85
                nodes[channels.Node] = struct{}{}
×
86
        }
×
87

88
        // TODO(roasbeef): add outgoing, nodes, allow incoming and outgoing to
89
        // per node
90
        //  * only add node is chan as funding amt set
91

92
        return nodes
×
93
}
94

95
// Agent implements a closed-loop control system which seeks to autonomously
96
// optimize the allocation of satoshis within channels throughput the network's
97
// channel graph. An agent is configurable by swapping out different
98
// AttachmentHeuristic strategies. The agent uses external signals such as the
99
// wallet balance changing, or new channels being opened/closed for the local
100
// node as an indicator to re-examine its internal state, and the amount of
101
// available funds in order to make updated decisions w.r.t the channel graph.
102
// The Agent will automatically open, close, and splice in/out channel as
103
// necessary for it to step closer to its optimal state.
104
//
105
// TODO(roasbeef): prob re-word
106
type Agent struct {
107
        started sync.Once
108
        stopped sync.Once
109

110
        // cfg houses the configuration state of the Ant.
111
        cfg Config
112

113
        // chanState tracks the current set of open channels.
114
        chanState    channelState
115
        chanStateMtx sync.Mutex
116

117
        // stateUpdates is a channel that any external state updates that may
118
        // affect the heuristics of the agent will be sent over.
119
        stateUpdates chan interface{}
120

121
        // balanceUpdates is a channel where notifications about updates to the
122
        // wallet's balance will be sent. This channel will be buffered to
123
        // ensure we have at most one pending update of this type to handle at
124
        // a given time.
125
        balanceUpdates chan *balanceUpdate
126

127
        // nodeUpdates is a channel that changes to the graph node landscape
128
        // will be sent over. This channel will be buffered to ensure we have
129
        // at most one pending update of this type to handle at a given time.
130
        nodeUpdates chan *nodeUpdates
131

132
        // pendingOpenUpdates is a channel where updates about channel pending
133
        // opening will be sent. This channel will be buffered to ensure we
134
        // have at most one pending update of this type to handle at a given
135
        // time.
136
        pendingOpenUpdates chan *chanPendingOpenUpdate
137

138
        // chanOpenFailures is a channel where updates about channel open
139
        // failures will be sent. This channel will be buffered to ensure we
140
        // have at most one pending update of this type to handle at a given
141
        // time.
142
        chanOpenFailures chan *chanOpenFailureUpdate
143

144
        // heuristicUpdates is a channel where updates from active heuristics
145
        // will be sent.
146
        heuristicUpdates chan *heuristicUpdate
147

148
        // totalBalance is the total number of satoshis the backing wallet is
149
        // known to control at any given instance. This value will be updated
150
        // when the agent receives external balance update signals.
151
        totalBalance btcutil.Amount
152

153
        // failedNodes lists nodes that we've previously attempted to initiate
154
        // channels with, but didn't succeed.
155
        failedNodes map[NodeID]struct{}
156

157
        // pendingConns tracks the nodes that we are attempting to make
158
        // connections to. This prevents us from making duplicate connection
159
        // requests to the same node.
160
        pendingConns map[NodeID]struct{}
161

162
        // pendingOpens tracks the channels that we've requested to be
163
        // initiated, but haven't yet been confirmed as being fully opened.
164
        // This state is required as otherwise, we may go over our allotted
165
        // channel limit, or open multiple channels to the same node.
166
        pendingOpens map[NodeID]LocalChannel
167
        pendingMtx   sync.Mutex
168

169
        quit chan struct{}
170
        wg   sync.WaitGroup
171
}
172

173
// New creates a new instance of the Agent instantiated using the passed
174
// configuration and initial channel state. The initial channel state slice
175
// should be populated with the set of Channels that are currently opened by
176
// the backing Lightning Node.
177
func New(cfg Config, initialState []LocalChannel) (*Agent, error) {
×
178
        a := &Agent{
×
179
                cfg:                cfg,
×
180
                chanState:          make(map[lnwire.ShortChannelID]LocalChannel),
×
181
                quit:               make(chan struct{}),
×
182
                stateUpdates:       make(chan interface{}),
×
183
                balanceUpdates:     make(chan *balanceUpdate, 1),
×
184
                nodeUpdates:        make(chan *nodeUpdates, 1),
×
185
                chanOpenFailures:   make(chan *chanOpenFailureUpdate, 1),
×
186
                heuristicUpdates:   make(chan *heuristicUpdate, 1),
×
187
                pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
×
188
                failedNodes:        make(map[NodeID]struct{}),
×
189
                pendingConns:       make(map[NodeID]struct{}),
×
190
                pendingOpens:       make(map[NodeID]LocalChannel),
×
191
        }
×
192

×
193
        for _, c := range initialState {
×
194
                a.chanState[c.ChanID] = c
×
195
        }
×
196

197
        return a, nil
×
198
}
199

200
// Start starts the agent along with any goroutines it needs to perform its
201
// normal duties.
202
func (a *Agent) Start() error {
×
203
        var err error
×
204
        a.started.Do(func() {
×
205
                err = a.start()
×
206
        })
×
207
        return err
×
208
}
209

210
func (a *Agent) start() error {
×
211
        rand.Seed(time.Now().Unix())
×
212
        log.Infof("Autopilot Agent starting")
×
213

×
214
        a.wg.Add(1)
×
215
        go a.controller()
×
216

×
217
        return nil
×
218
}
×
219

220
// Stop signals the Agent to gracefully shutdown. This function will block
221
// until all goroutines have exited.
222
func (a *Agent) Stop() error {
×
223
        var err error
×
224
        a.stopped.Do(func() {
×
225
                err = a.stop()
×
226
        })
×
227
        return err
×
228
}
229

230
func (a *Agent) stop() error {
×
231
        log.Infof("Autopilot Agent stopping")
×
232

×
233
        close(a.quit)
×
234
        a.wg.Wait()
×
235

×
236
        return nil
×
237
}
×
238

239
// balanceUpdate is a type of external state update that reflects an
240
// increase/decrease in the funds currently available to the wallet.
241
type balanceUpdate struct {
242
}
243

244
// nodeUpdates is a type of external state update that reflects an addition or
245
// modification in channel graph node membership.
246
type nodeUpdates struct{}
247

248
// chanOpenUpdate is a type of external state update that indicates a new
249
// channel has been opened, either by the Agent itself (within the main
250
// controller loop), or by an external user to the system.
251
type chanOpenUpdate struct {
252
        newChan LocalChannel
253
}
254

255
// chanPendingOpenUpdate is a type of external state update that indicates a new
256
// channel has been opened, either by the agent itself or an external subsystem,
257
// but is still pending.
258
type chanPendingOpenUpdate struct{}
259

260
// chanOpenFailureUpdate is a type of external state update that indicates
261
// a previous channel open failed, and that it might be possible to try again.
262
type chanOpenFailureUpdate struct{}
263

264
// heuristicUpdate is an update sent when one of the autopilot heuristics has
265
// changed, and prompts the agent to make a new attempt at opening more
266
// channels.
267
type heuristicUpdate struct {
268
        heuristic AttachmentHeuristic
269
}
270

271
// chanCloseUpdate is a type of external state update that indicates that the
272
// backing Lightning Node has closed a previously open channel.
273
type chanCloseUpdate struct {
274
        closedChans []lnwire.ShortChannelID
275
}
276

277
// OnBalanceChange is a callback that should be executed each time the balance
278
// of the backing wallet changes.
279
func (a *Agent) OnBalanceChange() {
×
280
        select {
×
281
        case a.balanceUpdates <- &balanceUpdate{}:
×
282
        default:
×
283
        }
284
}
285

286
// OnNodeUpdates is a callback that should be executed each time our channel
287
// graph has new nodes or their node announcements are updated.
288
func (a *Agent) OnNodeUpdates() {
×
289
        select {
×
290
        case a.nodeUpdates <- &nodeUpdates{}:
×
291
        default:
×
292
        }
293
}
294

295
// OnChannelOpen is a callback that should be executed each time a new channel
296
// is manually opened by the user or any system outside the autopilot agent.
297
func (a *Agent) OnChannelOpen(c LocalChannel) {
×
298
        a.wg.Add(1)
×
299
        go func() {
×
300
                defer a.wg.Done()
×
301

×
302
                select {
×
303
                case a.stateUpdates <- &chanOpenUpdate{newChan: c}:
×
304
                case <-a.quit:
×
305
                }
306
        }()
307
}
308

309
// OnChannelPendingOpen is a callback that should be executed each time a new
310
// channel is opened, either by the agent or an external subsystems, but is
311
// still pending.
312
func (a *Agent) OnChannelPendingOpen() {
×
313
        select {
×
314
        case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}:
×
315
        default:
×
316
        }
317
}
318

319
// OnChannelOpenFailure is a callback that should be executed when the
320
// autopilot has attempted to open a channel, but failed. In this case we can
321
// retry channel creation with a different node.
322
func (a *Agent) OnChannelOpenFailure() {
×
323
        select {
×
324
        case a.chanOpenFailures <- &chanOpenFailureUpdate{}:
×
325
        default:
×
326
        }
327
}
328

329
// OnChannelClose is a callback that should be executed each time a prior
330
// channel has been closed for any reason. This includes regular
331
// closes, force closes, and channel breaches.
332
func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
×
333
        a.wg.Add(1)
×
334
        go func() {
×
335
                defer a.wg.Done()
×
336

×
337
                select {
×
338
                case a.stateUpdates <- &chanCloseUpdate{closedChans: closedChans}:
×
339
                case <-a.quit:
×
340
                }
341
        }()
342
}
343

344
// OnHeuristicUpdate is a method called when a heuristic has been updated, to
345
// trigger the agent to do a new state assessment.
346
func (a *Agent) OnHeuristicUpdate(h AttachmentHeuristic) {
×
347
        select {
×
348
        case a.heuristicUpdates <- &heuristicUpdate{
349
                heuristic: h,
350
        }:
×
351
        default:
×
352
        }
353
}
354

355
// mergeNodeMaps merges the Agent's set of nodes that it already has active
356
// channels open to, with the other sets of nodes that should be removed from
357
// consideration during heuristic selection. This ensures that the Agent doesn't
358
// attempt to open any "duplicate" channels to the same node.
359
func mergeNodeMaps(c map[NodeID]LocalChannel,
360
        skips ...map[NodeID]struct{}) map[NodeID]struct{} {
×
361

×
362
        numNodes := len(c)
×
363
        for _, skip := range skips {
×
364
                numNodes += len(skip)
×
365
        }
×
366

367
        res := make(map[NodeID]struct{}, numNodes)
×
368
        for nodeID := range c {
×
369
                res[nodeID] = struct{}{}
×
370
        }
×
371
        for _, skip := range skips {
×
372
                for nodeID := range skip {
×
373
                        res[nodeID] = struct{}{}
×
374
                }
×
375
        }
376

377
        return res
×
378
}
379

380
// mergeChanState merges the Agent's set of active channels, with the set of
381
// channels awaiting confirmation. This ensures that the agent doesn't go over
382
// the prescribed channel limit or fund allocation limit.
383
func mergeChanState(pendingChans map[NodeID]LocalChannel,
384
        activeChans channelState) []LocalChannel {
×
385

×
386
        numChans := len(pendingChans) + len(activeChans)
×
387
        totalChans := make([]LocalChannel, 0, numChans)
×
388

×
389
        totalChans = append(totalChans, activeChans.Channels()...)
×
390

×
391
        for _, pendingChan := range pendingChans {
×
392
                totalChans = append(totalChans, pendingChan)
×
393
        }
×
394

395
        return totalChans
×
396
}
397

398
// controller implements the closed-loop control system of the Agent. The
399
// controller will make a decision w.r.t channel placement within the graph
400
// based on: its current internal state of the set of active channels open,
401
// and external state changes as a result of decisions it makes w.r.t channel
402
// allocation, or attributes affecting its control loop being updated by the
403
// backing Lightning Node.
404
func (a *Agent) controller() {
×
405
        defer a.wg.Done()
×
406

×
407
        // We'll start off by assigning our starting balance, and injecting
×
408
        // that amount as an initial wake up to the main controller goroutine.
×
409
        a.OnBalanceChange()
×
410

×
411
        // TODO(roasbeef): do we in fact need to maintain order?
×
412
        //  * use sync.Cond if so
×
413
        updateBalance := func() {
×
414
                newBalance, err := a.cfg.WalletBalance()
×
415
                if err != nil {
×
416
                        log.Warnf("unable to update wallet balance: %v", err)
×
417
                        return
×
418
                }
×
419

420
                a.totalBalance = newBalance
×
421
        }
422

423
        // TODO(roasbeef): add 10-minute wake up timer
424
        for {
×
425
                select {
×
426
                // A new external signal has arrived. We'll use this to update
427
                // our internal state, then determine if we should trigger a
428
                // channel state modification (open/close, splice in/out).
429
                case signal := <-a.stateUpdates:
×
430
                        log.Infof("Processing new external signal")
×
431

×
432
                        switch update := signal.(type) {
×
433
                        // A new channel has been opened successfully. This was
434
                        // either opened by the Agent, or an external system
435
                        // that is able to drive the Lightning Node.
436
                        case *chanOpenUpdate:
×
437
                                log.Debugf("New channel successfully opened, "+
×
438
                                        "updating state with: %v",
×
439
                                        spew.Sdump(update.newChan))
×
440

×
441
                                newChan := update.newChan
×
442
                                a.chanStateMtx.Lock()
×
443
                                a.chanState[newChan.ChanID] = newChan
×
444
                                a.chanStateMtx.Unlock()
×
445

×
446
                                a.pendingMtx.Lock()
×
447
                                delete(a.pendingOpens, newChan.Node)
×
448
                                a.pendingMtx.Unlock()
×
449

×
450
                                updateBalance()
×
451
                        // A channel has been closed, this may free up an
452
                        // available slot, triggering a new channel update.
453
                        case *chanCloseUpdate:
×
454
                                log.Debugf("Applying closed channel "+
×
455
                                        "updates: %v",
×
456
                                        spew.Sdump(update.closedChans))
×
457

×
458
                                a.chanStateMtx.Lock()
×
459
                                for _, closedChan := range update.closedChans {
×
460
                                        delete(a.chanState, closedChan)
×
461
                                }
×
462
                                a.chanStateMtx.Unlock()
×
463

×
464
                                updateBalance()
×
465
                        }
466

467
                // A new channel has been opened by the agent or an external
468
                // subsystem, but is still pending confirmation.
469
                case <-a.pendingOpenUpdates:
×
470
                        updateBalance()
×
471

472
                // The balance of the backing wallet has changed, if more funds
473
                // are now available, we may attempt to open up an additional
474
                // channel, or splice in funds to an existing one.
475
                case <-a.balanceUpdates:
×
476
                        log.Debug("Applying external balance state update")
×
477

×
478
                        updateBalance()
×
479

480
                // The channel we tried to open previously failed for whatever
481
                // reason.
482
                case <-a.chanOpenFailures:
×
483
                        log.Debug("Retrying after previous channel open " +
×
484
                                "failure.")
×
485

×
486
                        updateBalance()
×
487

488
                // New nodes have been added to the graph or their node
489
                // announcements have been updated. We will consider opening
490
                // channels to these nodes if we haven't stabilized.
491
                case <-a.nodeUpdates:
×
492
                        log.Debugf("Node updates received, assessing " +
×
493
                                "need for more channels")
×
494

495
                // Any of the deployed heuristics has been updated, check
496
                // whether we have new channel candidates available.
497
                case upd := <-a.heuristicUpdates:
×
498
                        log.Debugf("Heuristic %v updated, assessing need for "+
×
499
                                "more channels", upd.heuristic.Name())
×
500

501
                // The agent has been signalled to exit, so we'll bail out
502
                // immediately.
503
                case <-a.quit:
×
504
                        return
×
505
                }
506

507
                a.pendingMtx.Lock()
×
508
                log.Debugf("Pending channels: %v", spew.Sdump(a.pendingOpens))
×
509
                a.pendingMtx.Unlock()
×
510

×
511
                // With all the updates applied, we'll obtain a set of the
×
512
                // current active channels (confirmed channels), and also
×
513
                // factor in our set of unconfirmed channels.
×
514
                a.chanStateMtx.Lock()
×
515
                a.pendingMtx.Lock()
×
516
                totalChans := mergeChanState(a.pendingOpens, a.chanState)
×
517
                a.pendingMtx.Unlock()
×
518
                a.chanStateMtx.Unlock()
×
519

×
520
                // Now that we've updated our internal state, we'll consult our
×
521
                // channel attachment heuristic to determine if we can open
×
522
                // up any additional channels while staying within our
×
523
                // constraints.
×
524
                availableFunds, numChans := a.cfg.Constraints.ChannelBudget(
×
525
                        totalChans, a.totalBalance,
×
526
                )
×
527
                switch {
×
528
                case numChans == 0:
×
529
                        continue
×
530

531
                // If the amount is too small, we don't want to attempt opening
532
                // another channel.
533
                case availableFunds == 0:
×
534
                        continue
×
535
                case availableFunds < a.cfg.Constraints.MinChanSize():
×
536
                        continue
×
537
                }
538

539
                log.Infof("Triggering attachment directive dispatch, "+
×
540
                        "total_funds=%v", a.totalBalance)
×
541

×
542
                err := a.openChans(availableFunds, numChans, totalChans)
×
543
                if err != nil {
×
544
                        log.Errorf("Unable to open channels: %v", err)
×
545
                }
×
546
        }
547
}
548

549
// openChans queries the agent's heuristic for a set of channel candidates, and
550
// attempts to open channels to them.
551
func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
552
        totalChans []LocalChannel) error {
×
553

×
554
        // As channel size we'll use the maximum channel size available.
×
555
        chanSize := a.cfg.Constraints.MaxChanSize()
×
556
        if availableFunds < chanSize {
×
557
                chanSize = availableFunds
×
558
        }
×
559

560
        if chanSize < a.cfg.Constraints.MinChanSize() {
×
561
                return fmt.Errorf("not enough funds available to open a " +
×
562
                        "single channel")
×
563
        }
×
564

565
        // We're to attempt an attachment so we'll obtain the set of
566
        // nodes that we currently have channels with so we avoid
567
        // duplicate edges.
568
        a.chanStateMtx.Lock()
×
569
        connectedNodes := a.chanState.ConnectedNodes()
×
570
        a.chanStateMtx.Unlock()
×
571

×
572
        for nID := range connectedNodes {
×
573
                log.Tracef("Skipping node %x with open channel", nID[:])
×
574
        }
×
575

576
        a.pendingMtx.Lock()
×
577

×
578
        for nID := range a.pendingOpens {
×
579
                log.Tracef("Skipping node %x with pending channel open", nID[:])
×
580
        }
×
581

582
        for nID := range a.pendingConns {
×
583
                log.Tracef("Skipping node %x with pending connection", nID[:])
×
584
        }
×
585

586
        for nID := range a.failedNodes {
×
587
                log.Tracef("Skipping failed node %v", nID[:])
×
588
        }
×
589

590
        nodesToSkip := mergeNodeMaps(a.pendingOpens,
×
591
                a.pendingConns, connectedNodes, a.failedNodes,
×
592
        )
×
593

×
594
        a.pendingMtx.Unlock()
×
595

×
596
        // Gather the set of all nodes in the graph, except those we
×
597
        // want to skip.
×
598
        selfPubBytes := a.cfg.Self.SerializeCompressed()
×
599
        nodes := make(map[NodeID]struct{})
×
600
        addresses := make(map[NodeID][]net.Addr)
×
601
        if err := a.cfg.Graph.ForEachNode(func(node Node) error {
×
602
                nID := NodeID(node.PubKey())
×
603

×
604
                // If we come across ourselves, them we'll continue in
×
605
                // order to avoid attempting to make a channel with
×
606
                // ourselves.
×
607
                if bytes.Equal(nID[:], selfPubBytes) {
×
608
                        log.Tracef("Skipping self node %x", nID[:])
×
609
                        return nil
×
610
                }
×
611

612
                // If the node has no known addresses, we cannot connect to it,
613
                // so we'll skip it.
614
                addrs := node.Addrs()
×
615
                if len(addrs) == 0 {
×
616
                        log.Tracef("Skipping node %x since no addresses known",
×
617
                                nID[:])
×
618
                        return nil
×
619
                }
×
620
                addresses[nID] = addrs
×
621

×
622
                // Additionally, if this node is in the blacklist, then
×
623
                // we'll skip it.
×
624
                if _, ok := nodesToSkip[nID]; ok {
×
625
                        log.Tracef("Skipping blacklisted node %x", nID[:])
×
626
                        return nil
×
627
                }
×
628

629
                nodes[nID] = struct{}{}
×
630
                return nil
×
631
        }); err != nil {
×
632
                return fmt.Errorf("unable to get graph nodes: %w", err)
×
633
        }
×
634

635
        // Use the heuristic to calculate a score for each node in the
636
        // graph.
637
        log.Debugf("Scoring %d nodes for chan_size=%v", len(nodes), chanSize)
×
638
        scores, err := a.cfg.Heuristic.NodeScores(
×
639
                a.cfg.Graph, totalChans, chanSize, nodes,
×
640
        )
×
641
        if err != nil {
×
642
                return fmt.Errorf("unable to calculate node scores : %w", err)
×
643
        }
×
644

645
        log.Debugf("Got scores for %d nodes", len(scores))
×
646

×
647
        // Now use the score to make a weighted choice which nodes to attempt
×
648
        // to open channels to.
×
649
        scores, err = chooseN(numChans, scores)
×
650
        if err != nil {
×
651
                return fmt.Errorf("unable to make weighted choice: %w",
×
652
                        err)
×
653
        }
×
654

655
        chanCandidates := make(map[NodeID]*AttachmentDirective)
×
656
        for nID := range scores {
×
657
                log.Tracef("Creating attachment directive for chosen node %x",
×
658
                        nID[:])
×
659

×
660
                // Track the available funds we have left.
×
661
                if availableFunds < chanSize {
×
662
                        chanSize = availableFunds
×
663
                }
×
664
                availableFunds -= chanSize
×
665

×
666
                // If we run out of funds, we can break early.
×
667
                if chanSize < a.cfg.Constraints.MinChanSize() {
×
668
                        log.Tracef("Chan size %v too small to satisfy min "+
×
669
                                "channel size %v, breaking", chanSize,
×
670
                                a.cfg.Constraints.MinChanSize())
×
671
                        break
×
672
                }
673

674
                chanCandidates[nID] = &AttachmentDirective{
×
675
                        NodeID:  nID,
×
676
                        ChanAmt: chanSize,
×
677
                        Addrs:   addresses[nID],
×
678
                }
×
679
        }
680

681
        if len(chanCandidates) == 0 {
×
682
                log.Infof("No eligible candidates to connect to")
×
683
                return nil
×
684
        }
×
685

686
        log.Infof("Attempting to execute channel attachment "+
×
687
                "directives: %v", spew.Sdump(chanCandidates))
×
688

×
689
        // Before proceeding, check to see if we have any slots
×
690
        // available to open channels. If there are any, we will attempt
×
691
        // to dispatch the retrieved directives since we can't be
×
692
        // certain which ones may actually succeed. If too many
×
693
        // connections succeed, they will be ignored and made
×
694
        // available to future heuristic selections.
×
695
        a.pendingMtx.Lock()
×
696
        defer a.pendingMtx.Unlock()
×
697
        if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
×
698
                log.Debugf("Reached cap of %v pending "+
×
699
                        "channel opens, will retry "+
×
700
                        "after success/failure",
×
701
                        a.cfg.Constraints.MaxPendingOpens())
×
702
                return nil
×
703
        }
×
704

705
        // For each recommended attachment directive, we'll launch a
706
        // new goroutine to attempt to carry out the directive. If any
707
        // of these succeed, then we'll receive a new state update,
708
        // taking us back to the top of our controller loop.
709
        for _, chanCandidate := range chanCandidates {
×
710
                // Skip candidates which we are already trying
×
711
                // to establish a connection with.
×
712
                nodeID := chanCandidate.NodeID
×
713
                if _, ok := a.pendingConns[nodeID]; ok {
×
714
                        continue
×
715
                }
716
                a.pendingConns[nodeID] = struct{}{}
×
717

×
718
                a.wg.Add(1)
×
719
                go a.executeDirective(*chanCandidate)
×
720
        }
721
        return nil
×
722
}
723

724
// executeDirective attempts to connect to the channel candidate specified by
725
// the given attachment directive, and open a channel of the given size.
726
//
727
// NOTE: MUST be run as a goroutine.
728
func (a *Agent) executeDirective(directive AttachmentDirective) {
×
729
        defer a.wg.Done()
×
730

×
731
        // We'll start out by attempting to connect to the peer in order to
×
732
        // begin the funding workflow.
×
733
        nodeID := directive.NodeID
×
734
        pub, err := btcec.ParsePubKey(nodeID[:])
×
735
        if err != nil {
×
736
                log.Errorf("Unable to parse pubkey %x: %v", nodeID, err)
×
737
                return
×
738
        }
×
739

740
        connected := make(chan bool)
×
741
        errChan := make(chan error)
×
742

×
743
        // To ensure a call to ConnectToPeer doesn't block the agent from
×
744
        // shutting down, we'll launch it in a non-waitgrouped goroutine, that
×
745
        // will signal when a result is returned.
×
746
        // TODO(halseth): use DialContext to cancel on transport level.
×
747
        go func() {
×
748
                alreadyConnected, err := a.cfg.ConnectToPeer(
×
749
                        pub, directive.Addrs,
×
750
                )
×
751
                if err != nil {
×
752
                        select {
×
753
                        case errChan <- err:
×
754
                        case <-a.quit:
×
755
                        }
756
                        return
×
757
                }
758

759
                select {
×
760
                case connected <- alreadyConnected:
×
761
                case <-a.quit:
×
762
                        return
×
763
                }
764
        }()
765

766
        var alreadyConnected bool
×
767
        select {
×
768
        case alreadyConnected = <-connected:
×
769
        case err = <-errChan:
×
770
        case <-a.quit:
×
771
                return
×
772
        }
773

774
        if err != nil {
×
775
                log.Warnf("Unable to connect to %x: %v",
×
776
                        pub.SerializeCompressed(), err)
×
777

×
778
                // Since we failed to connect to them, we'll mark them as
×
779
                // failed so that we don't attempt to connect to them again.
×
780
                a.pendingMtx.Lock()
×
781
                delete(a.pendingConns, nodeID)
×
782
                a.failedNodes[nodeID] = struct{}{}
×
783
                a.pendingMtx.Unlock()
×
784

×
785
                // Finally, we'll trigger the agent to select new peers to
×
786
                // connect to.
×
787
                a.OnChannelOpenFailure()
×
788

×
789
                return
×
790
        }
×
791

792
        // The connection was successful, though before progressing we must
793
        // check that we have not already met our quota for max pending open
794
        // channels. This can happen if multiple directives were spawned but
795
        // fewer slots were available, and other successful attempts finished
796
        // first.
797
        a.pendingMtx.Lock()
×
798
        if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
×
799
                // Since we've reached our max number of pending opens, we'll
×
800
                // disconnect this peer and exit. However, if we were
×
801
                // previously connected to them, then we'll make sure to
×
802
                // maintain the connection alive.
×
803
                if alreadyConnected {
×
804
                        // Since we succeeded in connecting, we won't add this
×
805
                        // peer to the failed nodes map, but we will remove it
×
806
                        // from a.pendingConns so that it can be retried in the
×
807
                        // future.
×
808
                        delete(a.pendingConns, nodeID)
×
809
                        a.pendingMtx.Unlock()
×
810
                        return
×
811
                }
×
812

813
                err = a.cfg.DisconnectPeer(pub)
×
814
                if err != nil {
×
815
                        log.Warnf("Unable to disconnect peer %x: %v",
×
816
                                pub.SerializeCompressed(), err)
×
817
                }
×
818

819
                // Now that we have disconnected, we can remove this node from
820
                // our pending conns map, permitting subsequent connection
821
                // attempts.
822
                delete(a.pendingConns, nodeID)
×
823
                a.pendingMtx.Unlock()
×
824
                return
×
825
        }
826

827
        // If we were successful, we'll track this peer in our set of pending
828
        // opens. We do this here to ensure we don't stall on selecting new
829
        // peers if the connection attempt happens to take too long.
830
        delete(a.pendingConns, nodeID)
×
831
        a.pendingOpens[nodeID] = LocalChannel{
×
832
                Balance: directive.ChanAmt,
×
833
                Node:    nodeID,
×
834
        }
×
835
        a.pendingMtx.Unlock()
×
836

×
837
        // We can then begin the funding workflow with this peer.
×
838
        err = a.cfg.ChanController.OpenChannel(pub, directive.ChanAmt)
×
839
        if err != nil {
×
840
                log.Warnf("Unable to open channel to %x of %v: %v",
×
841
                        pub.SerializeCompressed(), directive.ChanAmt, err)
×
842

×
843
                // As the attempt failed, we'll clear the peer from the set of
×
844
                // pending opens and mark them as failed so we don't attempt to
×
845
                // open a channel to them again.
×
846
                a.pendingMtx.Lock()
×
847
                delete(a.pendingOpens, nodeID)
×
848
                a.failedNodes[nodeID] = struct{}{}
×
849
                a.pendingMtx.Unlock()
×
850

×
851
                // Trigger the agent to re-evaluate everything and possibly
×
852
                // retry with a different node.
×
853
                a.OnChannelOpenFailure()
×
854

×
855
                // Finally, we should also disconnect the peer if we weren't
×
856
                // already connected to them beforehand by an external
×
857
                // subsystem.
×
858
                if alreadyConnected {
×
859
                        return
×
860
                }
×
861

862
                err = a.cfg.DisconnectPeer(pub)
×
863
                if err != nil {
×
864
                        log.Warnf("Unable to disconnect peer %x: %v",
×
865
                                pub.SerializeCompressed(), err)
×
866
                }
×
867
        }
868

869
        // Since the channel open was successful and is currently pending,
870
        // we'll trigger the autopilot agent to query for more peers.
871
        // TODO(halseth): this triggers a new loop before all the new channels
872
        // are added to the pending channels map. Should add before executing
873
        // directive in goroutine?
874
        a.OnChannelPendingOpen()
×
875
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc