• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/autopilot/agent.go
1
package autopilot
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "math/rand"
8
        "net"
9
        "sync"
10
        "time"
11

12
        "github.com/btcsuite/btcd/btcec/v2"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/davecgh/go-spew/spew"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/lnwire"
17
)
18

19
// Config couples all the items that an autopilot agent needs to function.
20
// All items within the struct MUST be populated for the Agent to be able to
21
// carry out its duties.
22
type Config struct {
23
        // Self is the identity public key of the Lightning Network node that
24
        // is being driven by the agent. This is used to ensure that we don't
25
        // accidentally attempt to open a channel with ourselves.
26
        Self *btcec.PublicKey
27

28
        // Heuristic is an attachment heuristic which will govern to whom we
29
        // open channels to, and also what those channels look like in terms of
30
        // desired capacity. The Heuristic will take into account the current
31
        // state of the graph, our set of open channels, and the amount of
32
        // available funds when determining how channels are to be opened.
33
        // Additionally, a heuristic make also factor in extra-graph
34
        // information in order to make more pertinent recommendations.
35
        Heuristic AttachmentHeuristic
36

37
        // ChanController is an interface that is able to directly manage the
38
        // creation, closing and update of channels within the network.
39
        ChanController ChannelController
40

41
        // ConnectToPeer attempts to connect to the peer using one of its
42
        // advertised addresses. The boolean returned signals whether the peer
43
        // was already connected.
44
        ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error)
45

46
        // DisconnectPeer attempts to disconnect the peer with the given public
47
        // key.
48
        DisconnectPeer func(*btcec.PublicKey) error
49

50
        // WalletBalance is a function closure that should return the current
51
        // available balance of the backing wallet.
52
        WalletBalance func() (btcutil.Amount, error)
53

54
        // Graph is an abstract channel graph that the Heuristic and the Agent
55
        // will use to make decisions w.r.t channel allocation and placement
56
        // within the graph.
57
        Graph ChannelGraph
58

59
        // Constraints is the set of constraints the autopilot must adhere to
60
        // when opening channels.
61
        Constraints AgentConstraints
62

63
        // TODO(roasbeef): add additional signals from fee rates and revenue of
64
        // currently opened channels
65
}
66

67
// channelState is a type that represents the set of active channels of the
68
// backing LN node that the Agent should be aware of. This type contains a few
69
// helper utility methods.
70
type channelState map[lnwire.ShortChannelID]LocalChannel
71

72
// Channels returns a slice of all the active channels.
UNCOV
73
func (c channelState) Channels() []LocalChannel {
×
UNCOV
74
        chans := make([]LocalChannel, 0, len(c))
×
UNCOV
75
        for _, channel := range c {
×
UNCOV
76
                chans = append(chans, channel)
×
UNCOV
77
        }
×
UNCOV
78
        return chans
×
79
}
80

81
// ConnectedNodes returns the set of nodes we currently have a channel with.
82
// This information is needed as we want to avoid making repeated channels with
83
// any node.
UNCOV
84
func (c channelState) ConnectedNodes() map[NodeID]struct{} {
×
UNCOV
85
        nodes := make(map[NodeID]struct{})
×
UNCOV
86
        for _, channels := range c {
×
87
                nodes[channels.Node] = struct{}{}
×
88
        }
×
89

90
        // TODO(roasbeef): add outgoing, nodes, allow incoming and outgoing to
91
        // per node
92
        //  * only add node is chan as funding amt set
93

UNCOV
94
        return nodes
×
95
}
96

97
// Agent implements a closed-loop control system which seeks to autonomously
98
// optimize the allocation of satoshis within channels throughput the network's
99
// channel graph. An agent is configurable by swapping out different
100
// AttachmentHeuristic strategies. The agent uses external signals such as the
101
// wallet balance changing, or new channels being opened/closed for the local
102
// node as an indicator to re-examine its internal state, and the amount of
103
// available funds in order to make updated decisions w.r.t the channel graph.
104
// The Agent will automatically open, close, and splice in/out channel as
105
// necessary for it to step closer to its optimal state.
106
//
107
// TODO(roasbeef): prob re-word
108
type Agent struct {
109
        started sync.Once
110
        stopped sync.Once
111

112
        // cfg houses the configuration state of the Ant.
113
        cfg Config
114

115
        // chanState tracks the current set of open channels.
116
        chanState    channelState
117
        chanStateMtx sync.Mutex
118

119
        // stateUpdates is a channel that any external state updates that may
120
        // affect the heuristics of the agent will be sent over.
121
        stateUpdates chan interface{}
122

123
        // balanceUpdates is a channel where notifications about updates to the
124
        // wallet's balance will be sent. This channel will be buffered to
125
        // ensure we have at most one pending update of this type to handle at
126
        // a given time.
127
        balanceUpdates chan *balanceUpdate
128

129
        // nodeUpdates is a channel that changes to the graph node landscape
130
        // will be sent over. This channel will be buffered to ensure we have
131
        // at most one pending update of this type to handle at a given time.
132
        nodeUpdates chan *nodeUpdates
133

134
        // pendingOpenUpdates is a channel where updates about channel pending
135
        // opening will be sent. This channel will be buffered to ensure we
136
        // have at most one pending update of this type to handle at a given
137
        // time.
138
        pendingOpenUpdates chan *chanPendingOpenUpdate
139

140
        // chanOpenFailures is a channel where updates about channel open
141
        // failures will be sent. This channel will be buffered to ensure we
142
        // have at most one pending update of this type to handle at a given
143
        // time.
144
        chanOpenFailures chan *chanOpenFailureUpdate
145

146
        // heuristicUpdates is a channel where updates from active heuristics
147
        // will be sent.
148
        heuristicUpdates chan *heuristicUpdate
149

150
        // totalBalance is the total number of satoshis the backing wallet is
151
        // known to control at any given instance. This value will be updated
152
        // when the agent receives external balance update signals.
153
        totalBalance btcutil.Amount
154

155
        // failedNodes lists nodes that we've previously attempted to initiate
156
        // channels with, but didn't succeed.
157
        failedNodes map[NodeID]struct{}
158

159
        // pendingConns tracks the nodes that we are attempting to make
160
        // connections to. This prevents us from making duplicate connection
161
        // requests to the same node.
162
        pendingConns map[NodeID]struct{}
163

164
        // pendingOpens tracks the channels that we've requested to be
165
        // initiated, but haven't yet been confirmed as being fully opened.
166
        // This state is required as otherwise, we may go over our allotted
167
        // channel limit, or open multiple channels to the same node.
168
        pendingOpens map[NodeID]LocalChannel
169
        pendingMtx   sync.Mutex
170

171
        quit   chan struct{}
172
        wg     sync.WaitGroup
173
        cancel fn.Option[context.CancelFunc]
174
}
175

176
// New creates a new instance of the Agent instantiated using the passed
177
// configuration and initial channel state. The initial channel state slice
178
// should be populated with the set of Channels that are currently opened by
179
// the backing Lightning Node.
UNCOV
180
func New(cfg Config, initialState []LocalChannel) (*Agent, error) {
×
UNCOV
181
        a := &Agent{
×
UNCOV
182
                cfg:                cfg,
×
UNCOV
183
                chanState:          make(map[lnwire.ShortChannelID]LocalChannel),
×
UNCOV
184
                quit:               make(chan struct{}),
×
UNCOV
185
                stateUpdates:       make(chan interface{}),
×
UNCOV
186
                balanceUpdates:     make(chan *balanceUpdate, 1),
×
UNCOV
187
                nodeUpdates:        make(chan *nodeUpdates, 1),
×
UNCOV
188
                chanOpenFailures:   make(chan *chanOpenFailureUpdate, 1),
×
UNCOV
189
                heuristicUpdates:   make(chan *heuristicUpdate, 1),
×
UNCOV
190
                pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
×
UNCOV
191
                failedNodes:        make(map[NodeID]struct{}),
×
UNCOV
192
                pendingConns:       make(map[NodeID]struct{}),
×
UNCOV
193
                pendingOpens:       make(map[NodeID]LocalChannel),
×
UNCOV
194
        }
×
UNCOV
195

×
UNCOV
196
        for _, c := range initialState {
×
UNCOV
197
                a.chanState[c.ChanID] = c
×
UNCOV
198
        }
×
199

UNCOV
200
        return a, nil
×
201
}
202

203
// Start starts the agent along with any goroutines it needs to perform its
204
// normal duties.
UNCOV
205
func (a *Agent) Start() error {
×
UNCOV
206
        var err error
×
UNCOV
207
        a.started.Do(func() {
×
UNCOV
208
                ctx, cancel := context.WithCancel(context.Background())
×
UNCOV
209
                a.cancel = fn.Some(cancel)
×
UNCOV
210

×
UNCOV
211
                err = a.start(ctx)
×
UNCOV
212
        })
×
UNCOV
213
        return err
×
214
}
215

UNCOV
216
func (a *Agent) start(ctx context.Context) error {
×
UNCOV
217
        rand.Seed(time.Now().Unix())
×
UNCOV
218
        log.Infof("Autopilot Agent starting")
×
UNCOV
219

×
UNCOV
220
        a.wg.Add(1)
×
UNCOV
221
        go a.controller(ctx)
×
UNCOV
222

×
UNCOV
223
        return nil
×
UNCOV
224
}
×
225

226
// Stop signals the Agent to gracefully shutdown. This function will block
227
// until all goroutines have exited.
UNCOV
228
func (a *Agent) Stop() error {
×
UNCOV
229
        var err error
×
UNCOV
230
        a.stopped.Do(func() {
×
UNCOV
231
                err = a.stop()
×
UNCOV
232
        })
×
UNCOV
233
        return err
×
234
}
235

UNCOV
236
func (a *Agent) stop() error {
×
UNCOV
237
        log.Infof("Autopilot Agent stopping")
×
UNCOV
238

×
UNCOV
239
        a.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
×
UNCOV
240
        close(a.quit)
×
UNCOV
241
        a.wg.Wait()
×
UNCOV
242

×
UNCOV
243
        return nil
×
244
}
245

246
// balanceUpdate is a type of external state update that reflects an
247
// increase/decrease in the funds currently available to the wallet.
248
type balanceUpdate struct {
249
}
250

251
// nodeUpdates is a type of external state update that reflects an addition or
252
// modification in channel graph node membership.
253
type nodeUpdates struct{}
254

255
// chanOpenUpdate is a type of external state update that indicates a new
256
// channel has been opened, either by the Agent itself (within the main
257
// controller loop), or by an external user to the system.
258
type chanOpenUpdate struct {
259
        newChan LocalChannel
260
}
261

262
// chanPendingOpenUpdate is a type of external state update that indicates a new
263
// channel has been opened, either by the agent itself or an external subsystem,
264
// but is still pending.
265
type chanPendingOpenUpdate struct{}
266

267
// chanOpenFailureUpdate is a type of external state update that indicates
268
// a previous channel open failed, and that it might be possible to try again.
269
type chanOpenFailureUpdate struct{}
270

271
// heuristicUpdate is an update sent when one of the autopilot heuristics has
272
// changed, and prompts the agent to make a new attempt at opening more
273
// channels.
274
type heuristicUpdate struct {
275
        heuristic AttachmentHeuristic
276
}
277

278
// chanCloseUpdate is a type of external state update that indicates that the
279
// backing Lightning Node has closed a previously open channel.
280
type chanCloseUpdate struct {
281
        closedChans []lnwire.ShortChannelID
282
}
283

284
// OnBalanceChange is a callback that should be executed each time the balance
285
// of the backing wallet changes.
UNCOV
286
func (a *Agent) OnBalanceChange() {
×
UNCOV
287
        select {
×
UNCOV
288
        case a.balanceUpdates <- &balanceUpdate{}:
×
289
        default:
×
290
        }
291
}
292

293
// OnNodeUpdates is a callback that should be executed each time our channel
294
// graph has new nodes or their node announcements are updated.
UNCOV
295
func (a *Agent) OnNodeUpdates() {
×
UNCOV
296
        select {
×
UNCOV
297
        case a.nodeUpdates <- &nodeUpdates{}:
×
298
        default:
×
299
        }
300
}
301

302
// OnChannelOpen is a callback that should be executed each time a new channel
303
// is manually opened by the user or any system outside the autopilot agent.
UNCOV
304
func (a *Agent) OnChannelOpen(c LocalChannel) {
×
UNCOV
305
        a.wg.Add(1)
×
UNCOV
306
        go func() {
×
UNCOV
307
                defer a.wg.Done()
×
UNCOV
308

×
UNCOV
309
                select {
×
UNCOV
310
                case a.stateUpdates <- &chanOpenUpdate{newChan: c}:
×
311
                case <-a.quit:
×
312
                }
313
        }()
314
}
315

316
// OnChannelPendingOpen is a callback that should be executed each time a new
317
// channel is opened, either by the agent or an external subsystems, but is
318
// still pending.
UNCOV
319
func (a *Agent) OnChannelPendingOpen() {
×
UNCOV
320
        select {
×
UNCOV
321
        case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}:
×
UNCOV
322
        default:
×
323
        }
324
}
325

326
// OnChannelOpenFailure is a callback that should be executed when the
327
// autopilot has attempted to open a channel, but failed. In this case we can
328
// retry channel creation with a different node.
UNCOV
329
func (a *Agent) OnChannelOpenFailure() {
×
UNCOV
330
        select {
×
UNCOV
331
        case a.chanOpenFailures <- &chanOpenFailureUpdate{}:
×
332
        default:
×
333
        }
334
}
335

336
// OnChannelClose is a callback that should be executed each time a prior
337
// channel has been closed for any reason. This includes regular
338
// closes, force closes, and channel breaches.
UNCOV
339
func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
×
UNCOV
340
        a.wg.Add(1)
×
UNCOV
341
        go func() {
×
UNCOV
342
                defer a.wg.Done()
×
UNCOV
343

×
UNCOV
344
                select {
×
UNCOV
345
                case a.stateUpdates <- &chanCloseUpdate{closedChans: closedChans}:
×
346
                case <-a.quit:
×
347
                }
348
        }()
349
}
350

351
// OnHeuristicUpdate is a method called when a heuristic has been updated, to
352
// trigger the agent to do a new state assessment.
UNCOV
353
func (a *Agent) OnHeuristicUpdate(h AttachmentHeuristic) {
×
UNCOV
354
        select {
×
355
        case a.heuristicUpdates <- &heuristicUpdate{
356
                heuristic: h,
UNCOV
357
        }:
×
358
        default:
×
359
        }
360
}
361

362
// mergeNodeMaps merges the Agent's set of nodes that it already has active
363
// channels open to, with the other sets of nodes that should be removed from
364
// consideration during heuristic selection. This ensures that the Agent doesn't
365
// attempt to open any "duplicate" channels to the same node.
366
func mergeNodeMaps(c map[NodeID]LocalChannel,
UNCOV
367
        skips ...map[NodeID]struct{}) map[NodeID]struct{} {
×
UNCOV
368

×
UNCOV
369
        numNodes := len(c)
×
UNCOV
370
        for _, skip := range skips {
×
UNCOV
371
                numNodes += len(skip)
×
UNCOV
372
        }
×
373

UNCOV
374
        res := make(map[NodeID]struct{}, numNodes)
×
UNCOV
375
        for nodeID := range c {
×
UNCOV
376
                res[nodeID] = struct{}{}
×
UNCOV
377
        }
×
UNCOV
378
        for _, skip := range skips {
×
UNCOV
379
                for nodeID := range skip {
×
UNCOV
380
                        res[nodeID] = struct{}{}
×
UNCOV
381
                }
×
382
        }
383

UNCOV
384
        return res
×
385
}
386

387
// mergeChanState merges the Agent's set of active channels, with the set of
388
// channels awaiting confirmation. This ensures that the agent doesn't go over
389
// the prescribed channel limit or fund allocation limit.
390
func mergeChanState(pendingChans map[NodeID]LocalChannel,
UNCOV
391
        activeChans channelState) []LocalChannel {
×
UNCOV
392

×
UNCOV
393
        numChans := len(pendingChans) + len(activeChans)
×
UNCOV
394
        totalChans := make([]LocalChannel, 0, numChans)
×
UNCOV
395

×
UNCOV
396
        totalChans = append(totalChans, activeChans.Channels()...)
×
UNCOV
397

×
UNCOV
398
        for _, pendingChan := range pendingChans {
×
UNCOV
399
                totalChans = append(totalChans, pendingChan)
×
UNCOV
400
        }
×
401

UNCOV
402
        return totalChans
×
403
}
404

405
// controller implements the closed-loop control system of the Agent. The
406
// controller will make a decision w.r.t channel placement within the graph
407
// based on: its current internal state of the set of active channels open,
408
// and external state changes as a result of decisions it makes w.r.t channel
409
// allocation, or attributes affecting its control loop being updated by the
410
// backing Lightning Node.
UNCOV
411
func (a *Agent) controller(ctx context.Context) {
×
UNCOV
412
        defer a.wg.Done()
×
UNCOV
413

×
UNCOV
414
        // We'll start off by assigning our starting balance, and injecting
×
UNCOV
415
        // that amount as an initial wake up to the main controller goroutine.
×
UNCOV
416
        a.OnBalanceChange()
×
UNCOV
417

×
UNCOV
418
        // TODO(roasbeef): do we in fact need to maintain order?
×
UNCOV
419
        //  * use sync.Cond if so
×
UNCOV
420
        updateBalance := func() {
×
UNCOV
421
                newBalance, err := a.cfg.WalletBalance()
×
UNCOV
422
                if err != nil {
×
423
                        log.Warnf("unable to update wallet balance: %v", err)
×
424
                        return
×
425
                }
×
426

UNCOV
427
                a.totalBalance = newBalance
×
428
        }
429

430
        // TODO(roasbeef): add 10-minute wake up timer
UNCOV
431
        for {
×
UNCOV
432
                select {
×
433
                // A new external signal has arrived. We'll use this to update
434
                // our internal state, then determine if we should trigger a
435
                // channel state modification (open/close, splice in/out).
UNCOV
436
                case signal := <-a.stateUpdates:
×
UNCOV
437
                        log.Infof("Processing new external signal")
×
UNCOV
438

×
UNCOV
439
                        switch update := signal.(type) {
×
440
                        // A new channel has been opened successfully. This was
441
                        // either opened by the Agent, or an external system
442
                        // that is able to drive the Lightning Node.
UNCOV
443
                        case *chanOpenUpdate:
×
UNCOV
444
                                log.Debugf("New channel successfully opened, "+
×
UNCOV
445
                                        "updating state with: %v",
×
UNCOV
446
                                        spew.Sdump(update.newChan))
×
UNCOV
447

×
UNCOV
448
                                newChan := update.newChan
×
UNCOV
449
                                a.chanStateMtx.Lock()
×
UNCOV
450
                                a.chanState[newChan.ChanID] = newChan
×
UNCOV
451
                                a.chanStateMtx.Unlock()
×
UNCOV
452

×
UNCOV
453
                                a.pendingMtx.Lock()
×
UNCOV
454
                                delete(a.pendingOpens, newChan.Node)
×
UNCOV
455
                                a.pendingMtx.Unlock()
×
UNCOV
456

×
UNCOV
457
                                updateBalance()
×
458
                        // A channel has been closed, this may free up an
459
                        // available slot, triggering a new channel update.
UNCOV
460
                        case *chanCloseUpdate:
×
UNCOV
461
                                log.Debugf("Applying closed channel "+
×
UNCOV
462
                                        "updates: %v",
×
UNCOV
463
                                        spew.Sdump(update.closedChans))
×
UNCOV
464

×
UNCOV
465
                                a.chanStateMtx.Lock()
×
UNCOV
466
                                for _, closedChan := range update.closedChans {
×
UNCOV
467
                                        delete(a.chanState, closedChan)
×
UNCOV
468
                                }
×
UNCOV
469
                                a.chanStateMtx.Unlock()
×
UNCOV
470

×
UNCOV
471
                                updateBalance()
×
472
                        }
473

474
                // A new channel has been opened by the agent or an external
475
                // subsystem, but is still pending confirmation.
UNCOV
476
                case <-a.pendingOpenUpdates:
×
UNCOV
477
                        updateBalance()
×
478

479
                // The balance of the backing wallet has changed, if more funds
480
                // are now available, we may attempt to open up an additional
481
                // channel, or splice in funds to an existing one.
UNCOV
482
                case <-a.balanceUpdates:
×
UNCOV
483
                        log.Debug("Applying external balance state update")
×
UNCOV
484

×
UNCOV
485
                        updateBalance()
×
486

487
                // The channel we tried to open previously failed for whatever
488
                // reason.
UNCOV
489
                case <-a.chanOpenFailures:
×
UNCOV
490
                        log.Debug("Retrying after previous channel open " +
×
UNCOV
491
                                "failure.")
×
UNCOV
492

×
UNCOV
493
                        updateBalance()
×
494

495
                // New nodes have been added to the graph or their node
496
                // announcements have been updated. We will consider opening
497
                // channels to these nodes if we haven't stabilized.
UNCOV
498
                case <-a.nodeUpdates:
×
UNCOV
499
                        log.Debugf("Node updates received, assessing " +
×
UNCOV
500
                                "need for more channels")
×
501

502
                // Any of the deployed heuristics has been updated, check
503
                // whether we have new channel candidates available.
UNCOV
504
                case upd := <-a.heuristicUpdates:
×
UNCOV
505
                        log.Debugf("Heuristic %v updated, assessing need for "+
×
UNCOV
506
                                "more channels", upd.heuristic.Name())
×
507

508
                // The agent has been signalled to exit, so we'll bail out
509
                // immediately.
UNCOV
510
                case <-a.quit:
×
UNCOV
511
                        return
×
512

UNCOV
513
                case <-ctx.Done():
×
UNCOV
514
                        return
×
515
                }
516

UNCOV
517
                a.pendingMtx.Lock()
×
UNCOV
518
                log.Debugf("Pending channels: %v", spew.Sdump(a.pendingOpens))
×
UNCOV
519
                a.pendingMtx.Unlock()
×
UNCOV
520

×
UNCOV
521
                // With all the updates applied, we'll obtain a set of the
×
UNCOV
522
                // current active channels (confirmed channels), and also
×
UNCOV
523
                // factor in our set of unconfirmed channels.
×
UNCOV
524
                a.chanStateMtx.Lock()
×
UNCOV
525
                a.pendingMtx.Lock()
×
UNCOV
526
                totalChans := mergeChanState(a.pendingOpens, a.chanState)
×
UNCOV
527
                a.pendingMtx.Unlock()
×
UNCOV
528
                a.chanStateMtx.Unlock()
×
UNCOV
529

×
UNCOV
530
                // Now that we've updated our internal state, we'll consult our
×
UNCOV
531
                // channel attachment heuristic to determine if we can open
×
UNCOV
532
                // up any additional channels while staying within our
×
UNCOV
533
                // constraints.
×
UNCOV
534
                availableFunds, numChans := a.cfg.Constraints.ChannelBudget(
×
UNCOV
535
                        totalChans, a.totalBalance,
×
UNCOV
536
                )
×
UNCOV
537
                switch {
×
UNCOV
538
                case numChans == 0:
×
UNCOV
539
                        continue
×
540

541
                // If the amount is too small, we don't want to attempt opening
542
                // another channel.
543
                case availableFunds == 0:
×
544
                        continue
×
545
                case availableFunds < a.cfg.Constraints.MinChanSize():
×
546
                        continue
×
547
                }
548

UNCOV
549
                log.Infof("Triggering attachment directive dispatch, "+
×
UNCOV
550
                        "total_funds=%v", a.totalBalance)
×
UNCOV
551

×
UNCOV
552
                err := a.openChans(ctx, availableFunds, numChans, totalChans)
×
UNCOV
553
                if err != nil {
×
UNCOV
554
                        log.Errorf("Unable to open channels: %v", err)
×
UNCOV
555
                }
×
556
        }
557
}
558

559
// openChans queries the agent's heuristic for a set of channel candidates, and
560
// attempts to open channels to them.
561
func (a *Agent) openChans(ctx context.Context, availableFunds btcutil.Amount,
UNCOV
562
        numChans uint32, totalChans []LocalChannel) error {
×
UNCOV
563

×
UNCOV
564
        // As channel size we'll use the maximum channel size available.
×
UNCOV
565
        chanSize := a.cfg.Constraints.MaxChanSize()
×
UNCOV
566
        if availableFunds < chanSize {
×
567
                chanSize = availableFunds
×
568
        }
×
569

UNCOV
570
        if chanSize < a.cfg.Constraints.MinChanSize() {
×
571
                return fmt.Errorf("not enough funds available to open a " +
×
572
                        "single channel")
×
573
        }
×
574

575
        // We're to attempt an attachment so we'll obtain the set of
576
        // nodes that we currently have channels with so we avoid
577
        // duplicate edges.
UNCOV
578
        a.chanStateMtx.Lock()
×
UNCOV
579
        connectedNodes := a.chanState.ConnectedNodes()
×
UNCOV
580
        a.chanStateMtx.Unlock()
×
UNCOV
581

×
UNCOV
582
        for nID := range connectedNodes {
×
583
                log.Tracef("Skipping node %x with open channel", nID[:])
×
584
        }
×
585

UNCOV
586
        a.pendingMtx.Lock()
×
UNCOV
587

×
UNCOV
588
        for nID := range a.pendingOpens {
×
UNCOV
589
                log.Tracef("Skipping node %x with pending channel open", nID[:])
×
UNCOV
590
        }
×
591

UNCOV
592
        for nID := range a.pendingConns {
×
UNCOV
593
                log.Tracef("Skipping node %x with pending connection", nID[:])
×
UNCOV
594
        }
×
595

UNCOV
596
        for nID := range a.failedNodes {
×
UNCOV
597
                log.Tracef("Skipping failed node %v", nID[:])
×
UNCOV
598
        }
×
599

UNCOV
600
        nodesToSkip := mergeNodeMaps(a.pendingOpens,
×
UNCOV
601
                a.pendingConns, connectedNodes, a.failedNodes,
×
UNCOV
602
        )
×
UNCOV
603

×
UNCOV
604
        a.pendingMtx.Unlock()
×
UNCOV
605

×
UNCOV
606
        // Gather the set of all nodes in the graph, except those we
×
UNCOV
607
        // want to skip.
×
UNCOV
608
        selfPubBytes := a.cfg.Self.SerializeCompressed()
×
UNCOV
609
        nodes := make(map[NodeID]struct{})
×
UNCOV
610
        addresses := make(map[NodeID][]net.Addr)
×
UNCOV
611
        if err := a.cfg.Graph.ForEachNode(ctx, func(_ context.Context,
×
UNCOV
612
                node Node) error {
×
UNCOV
613

×
UNCOV
614
                nID := NodeID(node.PubKey())
×
UNCOV
615

×
UNCOV
616
                // If we come across ourselves, them we'll continue in
×
UNCOV
617
                // order to avoid attempting to make a channel with
×
UNCOV
618
                // ourselves.
×
UNCOV
619
                if bytes.Equal(nID[:], selfPubBytes) {
×
620
                        log.Tracef("Skipping self node %x", nID[:])
×
621
                        return nil
×
622
                }
×
623

624
                // If the node has no known addresses, we cannot connect to it,
625
                // so we'll skip it.
UNCOV
626
                addrs := node.Addrs()
×
UNCOV
627
                if len(addrs) == 0 {
×
628
                        log.Tracef("Skipping node %x since no addresses known",
×
629
                                nID[:])
×
630
                        return nil
×
631
                }
×
UNCOV
632
                addresses[nID] = addrs
×
UNCOV
633

×
UNCOV
634
                // Additionally, if this node is in the blacklist, then
×
UNCOV
635
                // we'll skip it.
×
UNCOV
636
                if _, ok := nodesToSkip[nID]; ok {
×
UNCOV
637
                        log.Tracef("Skipping blacklisted node %x", nID[:])
×
UNCOV
638
                        return nil
×
UNCOV
639
                }
×
640

UNCOV
641
                nodes[nID] = struct{}{}
×
UNCOV
642
                return nil
×
643
        }); err != nil {
×
644
                return fmt.Errorf("unable to get graph nodes: %w", err)
×
645
        }
×
646

647
        // Use the heuristic to calculate a score for each node in the
648
        // graph.
UNCOV
649
        log.Debugf("Scoring %d nodes for chan_size=%v", len(nodes), chanSize)
×
UNCOV
650
        scores, err := a.cfg.Heuristic.NodeScores(
×
UNCOV
651
                ctx, a.cfg.Graph, totalChans, chanSize, nodes,
×
UNCOV
652
        )
×
UNCOV
653
        if err != nil {
×
UNCOV
654
                return fmt.Errorf("unable to calculate node scores : %w", err)
×
UNCOV
655
        }
×
656

UNCOV
657
        log.Debugf("Got scores for %d nodes", len(scores))
×
UNCOV
658

×
UNCOV
659
        // Now use the score to make a weighted choice which nodes to attempt
×
UNCOV
660
        // to open channels to.
×
UNCOV
661
        scores, err = chooseN(numChans, scores)
×
UNCOV
662
        if err != nil {
×
663
                return fmt.Errorf("unable to make weighted choice: %w",
×
664
                        err)
×
665
        }
×
666

UNCOV
667
        chanCandidates := make(map[NodeID]*AttachmentDirective)
×
UNCOV
668
        for nID := range scores {
×
UNCOV
669
                log.Tracef("Creating attachment directive for chosen node %x",
×
UNCOV
670
                        nID[:])
×
UNCOV
671

×
UNCOV
672
                // Track the available funds we have left.
×
UNCOV
673
                if availableFunds < chanSize {
×
UNCOV
674
                        chanSize = availableFunds
×
UNCOV
675
                }
×
UNCOV
676
                availableFunds -= chanSize
×
UNCOV
677

×
UNCOV
678
                // If we run out of funds, we can break early.
×
UNCOV
679
                if chanSize < a.cfg.Constraints.MinChanSize() {
×
UNCOV
680
                        log.Tracef("Chan size %v too small to satisfy min "+
×
UNCOV
681
                                "channel size %v, breaking", chanSize,
×
UNCOV
682
                                a.cfg.Constraints.MinChanSize())
×
UNCOV
683
                        break
×
684
                }
685

UNCOV
686
                chanCandidates[nID] = &AttachmentDirective{
×
UNCOV
687
                        NodeID:  nID,
×
UNCOV
688
                        ChanAmt: chanSize,
×
UNCOV
689
                        Addrs:   addresses[nID],
×
UNCOV
690
                }
×
691
        }
692

UNCOV
693
        if len(chanCandidates) == 0 {
×
UNCOV
694
                log.Infof("No eligible candidates to connect to")
×
UNCOV
695
                return nil
×
UNCOV
696
        }
×
697

UNCOV
698
        log.Infof("Attempting to execute channel attachment "+
×
UNCOV
699
                "directives: %v", spew.Sdump(chanCandidates))
×
UNCOV
700

×
UNCOV
701
        // Before proceeding, check to see if we have any slots
×
UNCOV
702
        // available to open channels. If there are any, we will attempt
×
UNCOV
703
        // to dispatch the retrieved directives since we can't be
×
UNCOV
704
        // certain which ones may actually succeed. If too many
×
UNCOV
705
        // connections succeed, they will be ignored and made
×
UNCOV
706
        // available to future heuristic selections.
×
UNCOV
707
        a.pendingMtx.Lock()
×
UNCOV
708
        defer a.pendingMtx.Unlock()
×
UNCOV
709
        if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
×
710
                log.Debugf("Reached cap of %v pending "+
×
711
                        "channel opens, will retry "+
×
712
                        "after success/failure",
×
713
                        a.cfg.Constraints.MaxPendingOpens())
×
714
                return nil
×
715
        }
×
716

717
        // For each recommended attachment directive, we'll launch a
718
        // new goroutine to attempt to carry out the directive. If any
719
        // of these succeed, then we'll receive a new state update,
720
        // taking us back to the top of our controller loop.
UNCOV
721
        for _, chanCandidate := range chanCandidates {
×
UNCOV
722
                // Skip candidates which we are already trying
×
UNCOV
723
                // to establish a connection with.
×
UNCOV
724
                nodeID := chanCandidate.NodeID
×
UNCOV
725
                if _, ok := a.pendingConns[nodeID]; ok {
×
726
                        continue
×
727
                }
UNCOV
728
                a.pendingConns[nodeID] = struct{}{}
×
UNCOV
729

×
UNCOV
730
                a.wg.Add(1)
×
UNCOV
731
                go a.executeDirective(*chanCandidate)
×
732
        }
UNCOV
733
        return nil
×
734
}
735

736
// executeDirective attempts to connect to the channel candidate specified by
737
// the given attachment directive, and open a channel of the given size.
738
//
739
// NOTE: MUST be run as a goroutine.
UNCOV
740
func (a *Agent) executeDirective(directive AttachmentDirective) {
×
UNCOV
741
        defer a.wg.Done()
×
UNCOV
742

×
UNCOV
743
        // We'll start out by attempting to connect to the peer in order to
×
UNCOV
744
        // begin the funding workflow.
×
UNCOV
745
        nodeID := directive.NodeID
×
UNCOV
746
        pub, err := btcec.ParsePubKey(nodeID[:])
×
UNCOV
747
        if err != nil {
×
748
                log.Errorf("Unable to parse pubkey %x: %v", nodeID, err)
×
749
                return
×
750
        }
×
751

UNCOV
752
        connected := make(chan bool)
×
UNCOV
753
        errChan := make(chan error)
×
UNCOV
754

×
UNCOV
755
        // To ensure a call to ConnectToPeer doesn't block the agent from
×
UNCOV
756
        // shutting down, we'll launch it in a non-waitgrouped goroutine, that
×
UNCOV
757
        // will signal when a result is returned.
×
UNCOV
758
        // TODO(halseth): use DialContext to cancel on transport level.
×
UNCOV
759
        go func() {
×
UNCOV
760
                alreadyConnected, err := a.cfg.ConnectToPeer(
×
UNCOV
761
                        pub, directive.Addrs,
×
UNCOV
762
                )
×
UNCOV
763
                if err != nil {
×
UNCOV
764
                        select {
×
UNCOV
765
                        case errChan <- err:
×
UNCOV
766
                        case <-a.quit:
×
767
                        }
UNCOV
768
                        return
×
769
                }
770

UNCOV
771
                select {
×
UNCOV
772
                case connected <- alreadyConnected:
×
773
                case <-a.quit:
×
774
                        return
×
775
                }
776
        }()
777

UNCOV
778
        var alreadyConnected bool
×
UNCOV
779
        select {
×
UNCOV
780
        case alreadyConnected = <-connected:
×
UNCOV
781
        case err = <-errChan:
×
UNCOV
782
        case <-a.quit:
×
UNCOV
783
                return
×
784
        }
785

UNCOV
786
        if err != nil {
×
UNCOV
787
                log.Warnf("Unable to connect to %x: %v",
×
UNCOV
788
                        pub.SerializeCompressed(), err)
×
UNCOV
789

×
UNCOV
790
                // Since we failed to connect to them, we'll mark them as
×
UNCOV
791
                // failed so that we don't attempt to connect to them again.
×
UNCOV
792
                a.pendingMtx.Lock()
×
UNCOV
793
                delete(a.pendingConns, nodeID)
×
UNCOV
794
                a.failedNodes[nodeID] = struct{}{}
×
UNCOV
795
                a.pendingMtx.Unlock()
×
UNCOV
796

×
UNCOV
797
                // Finally, we'll trigger the agent to select new peers to
×
UNCOV
798
                // connect to.
×
UNCOV
799
                a.OnChannelOpenFailure()
×
UNCOV
800

×
UNCOV
801
                return
×
UNCOV
802
        }
×
803

804
        // The connection was successful, though before progressing we must
805
        // check that we have not already met our quota for max pending open
806
        // channels. This can happen if multiple directives were spawned but
807
        // fewer slots were available, and other successful attempts finished
808
        // first.
UNCOV
809
        a.pendingMtx.Lock()
×
UNCOV
810
        if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
×
811
                // Since we've reached our max number of pending opens, we'll
×
812
                // disconnect this peer and exit. However, if we were
×
813
                // previously connected to them, then we'll make sure to
×
814
                // maintain the connection alive.
×
815
                if alreadyConnected {
×
816
                        // Since we succeeded in connecting, we won't add this
×
817
                        // peer to the failed nodes map, but we will remove it
×
818
                        // from a.pendingConns so that it can be retried in the
×
819
                        // future.
×
820
                        delete(a.pendingConns, nodeID)
×
821
                        a.pendingMtx.Unlock()
×
822
                        return
×
823
                }
×
824

825
                err = a.cfg.DisconnectPeer(pub)
×
826
                if err != nil {
×
827
                        log.Warnf("Unable to disconnect peer %x: %v",
×
828
                                pub.SerializeCompressed(), err)
×
829
                }
×
830

831
                // Now that we have disconnected, we can remove this node from
832
                // our pending conns map, permitting subsequent connection
833
                // attempts.
834
                delete(a.pendingConns, nodeID)
×
835
                a.pendingMtx.Unlock()
×
836
                return
×
837
        }
838

839
        // If we were successful, we'll track this peer in our set of pending
840
        // opens. We do this here to ensure we don't stall on selecting new
841
        // peers if the connection attempt happens to take too long.
UNCOV
842
        delete(a.pendingConns, nodeID)
×
UNCOV
843
        a.pendingOpens[nodeID] = LocalChannel{
×
UNCOV
844
                Balance: directive.ChanAmt,
×
UNCOV
845
                Node:    nodeID,
×
UNCOV
846
        }
×
UNCOV
847
        a.pendingMtx.Unlock()
×
UNCOV
848

×
UNCOV
849
        // We can then begin the funding workflow with this peer.
×
UNCOV
850
        err = a.cfg.ChanController.OpenChannel(pub, directive.ChanAmt)
×
UNCOV
851
        if err != nil {
×
852
                log.Warnf("Unable to open channel to %x of %v: %v",
×
853
                        pub.SerializeCompressed(), directive.ChanAmt, err)
×
854

×
855
                // As the attempt failed, we'll clear the peer from the set of
×
856
                // pending opens and mark them as failed so we don't attempt to
×
857
                // open a channel to them again.
×
858
                a.pendingMtx.Lock()
×
859
                delete(a.pendingOpens, nodeID)
×
860
                a.failedNodes[nodeID] = struct{}{}
×
861
                a.pendingMtx.Unlock()
×
862

×
863
                // Trigger the agent to re-evaluate everything and possibly
×
864
                // retry with a different node.
×
865
                a.OnChannelOpenFailure()
×
866

×
867
                // Finally, we should also disconnect the peer if we weren't
×
868
                // already connected to them beforehand by an external
×
869
                // subsystem.
×
870
                if alreadyConnected {
×
871
                        return
×
872
                }
×
873

874
                err = a.cfg.DisconnectPeer(pub)
×
875
                if err != nil {
×
876
                        log.Warnf("Unable to disconnect peer %x: %v",
×
877
                                pub.SerializeCompressed(), err)
×
878
                }
×
879
        }
880

881
        // Since the channel open was successful and is currently pending,
882
        // we'll trigger the autopilot agent to query for more peers.
883
        // TODO(halseth): this triggers a new loop before all the new channels
884
        // are added to the pending channels map. Should add before executing
885
        // directive in goroutine?
UNCOV
886
        a.OnChannelPendingOpen()
×
887
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc