• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13725358077

07 Mar 2025 04:51PM UTC coverage: 58.224% (-10.4%) from 68.615%
13725358077

Pull #9458

github

web-flow
Merge bf4c6625f into ab2dc09eb
Pull Request #9458: multi+server.go: add initial permissions for some peers

346 of 549 new or added lines in 10 files covered. (63.02%)

27466 existing lines in 443 files now uncovered.

94609 of 162492 relevant lines covered (58.22%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/autopilot/agent.go
1
package autopilot
2

3
import (
4
        "bytes"
5
        "fmt"
6
        "math/rand"
7
        "net"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcutil"
13
        "github.com/davecgh/go-spew/spew"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
)
16

17
// Config couples all the items that an autopilot agent needs to function.
18
// All items within the struct MUST be populated for the Agent to be able to
19
// carry out its duties.
20
type Config struct {
21
        // Self is the identity public key of the Lightning Network node that
22
        // is being driven by the agent. This is used to ensure that we don't
23
        // accidentally attempt to open a channel with ourselves.
24
        Self *btcec.PublicKey
25

26
        // Heuristic is an attachment heuristic which will govern to whom we
27
        // open channels to, and also what those channels look like in terms of
28
        // desired capacity. The Heuristic will take into account the current
29
        // state of the graph, our set of open channels, and the amount of
30
        // available funds when determining how channels are to be opened.
31
        // Additionally, a heuristic make also factor in extra-graph
32
        // information in order to make more pertinent recommendations.
33
        Heuristic AttachmentHeuristic
34

35
        // ChanController is an interface that is able to directly manage the
36
        // creation, closing and update of channels within the network.
37
        ChanController ChannelController
38

39
        // ConnectToPeer attempts to connect to the peer using one of its
40
        // advertised addresses. The boolean returned signals whether the peer
41
        // was already connected.
42
        ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error)
43

44
        // DisconnectPeer attempts to disconnect the peer with the given public
45
        // key.
46
        DisconnectPeer func(*btcec.PublicKey) error
47

48
        // WalletBalance is a function closure that should return the current
49
        // available balance of the backing wallet.
50
        WalletBalance func() (btcutil.Amount, error)
51

52
        // Graph is an abstract channel graph that the Heuristic and the Agent
53
        // will use to make decisions w.r.t channel allocation and placement
54
        // within the graph.
55
        Graph ChannelGraph
56

57
        // Constraints is the set of constraints the autopilot must adhere to
58
        // when opening channels.
59
        Constraints AgentConstraints
60

61
        // TODO(roasbeef): add additional signals from fee rates and revenue of
62
        // currently opened channels
63
}
64

65
// channelState is a type that represents the set of active channels of the
66
// backing LN node that the Agent should be aware of. This type contains a few
67
// helper utility methods.
68
type channelState map[lnwire.ShortChannelID]LocalChannel
69

70
// Channels returns a slice of all the active channels.
UNCOV
71
func (c channelState) Channels() []LocalChannel {
×
UNCOV
72
        chans := make([]LocalChannel, 0, len(c))
×
UNCOV
73
        for _, channel := range c {
×
UNCOV
74
                chans = append(chans, channel)
×
UNCOV
75
        }
×
UNCOV
76
        return chans
×
77
}
78

79
// ConnectedNodes returns the set of nodes we currently have a channel with.
80
// This information is needed as we want to avoid making repeated channels with
81
// any node.
UNCOV
82
func (c channelState) ConnectedNodes() map[NodeID]struct{} {
×
UNCOV
83
        nodes := make(map[NodeID]struct{})
×
UNCOV
84
        for _, channels := range c {
×
85
                nodes[channels.Node] = struct{}{}
×
86
        }
×
87

88
        // TODO(roasbeef): add outgoing, nodes, allow incoming and outgoing to
89
        // per node
90
        //  * only add node is chan as funding amt set
91

UNCOV
92
        return nodes
×
93
}
94

95
// Agent implements a closed-loop control system which seeks to autonomously
96
// optimize the allocation of satoshis within channels throughput the network's
97
// channel graph. An agent is configurable by swapping out different
98
// AttachmentHeuristic strategies. The agent uses external signals such as the
99
// wallet balance changing, or new channels being opened/closed for the local
100
// node as an indicator to re-examine its internal state, and the amount of
101
// available funds in order to make updated decisions w.r.t the channel graph.
102
// The Agent will automatically open, close, and splice in/out channel as
103
// necessary for it to step closer to its optimal state.
104
//
105
// TODO(roasbeef): prob re-word
106
type Agent struct {
107
        started sync.Once
108
        stopped sync.Once
109

110
        // cfg houses the configuration state of the Ant.
111
        cfg Config
112

113
        // chanState tracks the current set of open channels.
114
        chanState    channelState
115
        chanStateMtx sync.Mutex
116

117
        // stateUpdates is a channel that any external state updates that may
118
        // affect the heuristics of the agent will be sent over.
119
        stateUpdates chan interface{}
120

121
        // balanceUpdates is a channel where notifications about updates to the
122
        // wallet's balance will be sent. This channel will be buffered to
123
        // ensure we have at most one pending update of this type to handle at
124
        // a given time.
125
        balanceUpdates chan *balanceUpdate
126

127
        // nodeUpdates is a channel that changes to the graph node landscape
128
        // will be sent over. This channel will be buffered to ensure we have
129
        // at most one pending update of this type to handle at a given time.
130
        nodeUpdates chan *nodeUpdates
131

132
        // pendingOpenUpdates is a channel where updates about channel pending
133
        // opening will be sent. This channel will be buffered to ensure we
134
        // have at most one pending update of this type to handle at a given
135
        // time.
136
        pendingOpenUpdates chan *chanPendingOpenUpdate
137

138
        // chanOpenFailures is a channel where updates about channel open
139
        // failures will be sent. This channel will be buffered to ensure we
140
        // have at most one pending update of this type to handle at a given
141
        // time.
142
        chanOpenFailures chan *chanOpenFailureUpdate
143

144
        // heuristicUpdates is a channel where updates from active heuristics
145
        // will be sent.
146
        heuristicUpdates chan *heuristicUpdate
147

148
        // totalBalance is the total number of satoshis the backing wallet is
149
        // known to control at any given instance. This value will be updated
150
        // when the agent receives external balance update signals.
151
        totalBalance btcutil.Amount
152

153
        // failedNodes lists nodes that we've previously attempted to initiate
154
        // channels with, but didn't succeed.
155
        failedNodes map[NodeID]struct{}
156

157
        // pendingConns tracks the nodes that we are attempting to make
158
        // connections to. This prevents us from making duplicate connection
159
        // requests to the same node.
160
        pendingConns map[NodeID]struct{}
161

162
        // pendingOpens tracks the channels that we've requested to be
163
        // initiated, but haven't yet been confirmed as being fully opened.
164
        // This state is required as otherwise, we may go over our allotted
165
        // channel limit, or open multiple channels to the same node.
166
        pendingOpens map[NodeID]LocalChannel
167
        pendingMtx   sync.Mutex
168

169
        quit chan struct{}
170
        wg   sync.WaitGroup
171
}
172

173
// New creates a new instance of the Agent instantiated using the passed
174
// configuration and initial channel state. The initial channel state slice
175
// should be populated with the set of Channels that are currently opened by
176
// the backing Lightning Node.
UNCOV
177
func New(cfg Config, initialState []LocalChannel) (*Agent, error) {
×
UNCOV
178
        a := &Agent{
×
UNCOV
179
                cfg:                cfg,
×
UNCOV
180
                chanState:          make(map[lnwire.ShortChannelID]LocalChannel),
×
UNCOV
181
                quit:               make(chan struct{}),
×
UNCOV
182
                stateUpdates:       make(chan interface{}),
×
UNCOV
183
                balanceUpdates:     make(chan *balanceUpdate, 1),
×
UNCOV
184
                nodeUpdates:        make(chan *nodeUpdates, 1),
×
UNCOV
185
                chanOpenFailures:   make(chan *chanOpenFailureUpdate, 1),
×
UNCOV
186
                heuristicUpdates:   make(chan *heuristicUpdate, 1),
×
UNCOV
187
                pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
×
UNCOV
188
                failedNodes:        make(map[NodeID]struct{}),
×
UNCOV
189
                pendingConns:       make(map[NodeID]struct{}),
×
UNCOV
190
                pendingOpens:       make(map[NodeID]LocalChannel),
×
UNCOV
191
        }
×
UNCOV
192

×
UNCOV
193
        for _, c := range initialState {
×
UNCOV
194
                a.chanState[c.ChanID] = c
×
UNCOV
195
        }
×
196

UNCOV
197
        return a, nil
×
198
}
199

200
// Start starts the agent along with any goroutines it needs to perform its
201
// normal duties.
UNCOV
202
func (a *Agent) Start() error {
×
UNCOV
203
        var err error
×
UNCOV
204
        a.started.Do(func() {
×
UNCOV
205
                err = a.start()
×
UNCOV
206
        })
×
UNCOV
207
        return err
×
208
}
209

UNCOV
210
func (a *Agent) start() error {
×
UNCOV
211
        rand.Seed(time.Now().Unix())
×
UNCOV
212
        log.Infof("Autopilot Agent starting")
×
UNCOV
213

×
UNCOV
214
        a.wg.Add(1)
×
UNCOV
215
        go a.controller()
×
UNCOV
216

×
UNCOV
217
        return nil
×
UNCOV
218
}
×
219

220
// Stop signals the Agent to gracefully shutdown. This function will block
221
// until all goroutines have exited.
UNCOV
222
func (a *Agent) Stop() error {
×
UNCOV
223
        var err error
×
UNCOV
224
        a.stopped.Do(func() {
×
UNCOV
225
                err = a.stop()
×
UNCOV
226
        })
×
UNCOV
227
        return err
×
228
}
229

UNCOV
230
func (a *Agent) stop() error {
×
UNCOV
231
        log.Infof("Autopilot Agent stopping")
×
UNCOV
232

×
UNCOV
233
        close(a.quit)
×
UNCOV
234
        a.wg.Wait()
×
UNCOV
235

×
UNCOV
236
        return nil
×
UNCOV
237
}
×
238

239
// balanceUpdate is a type of external state update that reflects an
240
// increase/decrease in the funds currently available to the wallet.
241
type balanceUpdate struct {
242
}
243

244
// nodeUpdates is a type of external state update that reflects an addition or
245
// modification in channel graph node membership.
246
type nodeUpdates struct{}
247

248
// chanOpenUpdate is a type of external state update that indicates a new
249
// channel has been opened, either by the Agent itself (within the main
250
// controller loop), or by an external user to the system.
251
type chanOpenUpdate struct {
252
        newChan LocalChannel
253
}
254

255
// chanPendingOpenUpdate is a type of external state update that indicates a new
256
// channel has been opened, either by the agent itself or an external subsystem,
257
// but is still pending.
258
type chanPendingOpenUpdate struct{}
259

260
// chanOpenFailureUpdate is a type of external state update that indicates
261
// a previous channel open failed, and that it might be possible to try again.
262
type chanOpenFailureUpdate struct{}
263

264
// heuristicUpdate is an update sent when one of the autopilot heuristics has
265
// changed, and prompts the agent to make a new attempt at opening more
266
// channels.
267
type heuristicUpdate struct {
268
        heuristic AttachmentHeuristic
269
}
270

271
// chanCloseUpdate is a type of external state update that indicates that the
272
// backing Lightning Node has closed a previously open channel.
273
type chanCloseUpdate struct {
274
        closedChans []lnwire.ShortChannelID
275
}
276

277
// OnBalanceChange is a callback that should be executed each time the balance
278
// of the backing wallet changes.
UNCOV
279
func (a *Agent) OnBalanceChange() {
×
UNCOV
280
        select {
×
UNCOV
281
        case a.balanceUpdates <- &balanceUpdate{}:
×
282
        default:
×
283
        }
284
}
285

286
// OnNodeUpdates is a callback that should be executed each time our channel
287
// graph has new nodes or their node announcements are updated.
UNCOV
288
func (a *Agent) OnNodeUpdates() {
×
UNCOV
289
        select {
×
UNCOV
290
        case a.nodeUpdates <- &nodeUpdates{}:
×
291
        default:
×
292
        }
293
}
294

295
// OnChannelOpen is a callback that should be executed each time a new channel
296
// is manually opened by the user or any system outside the autopilot agent.
UNCOV
297
func (a *Agent) OnChannelOpen(c LocalChannel) {
×
UNCOV
298
        a.wg.Add(1)
×
UNCOV
299
        go func() {
×
UNCOV
300
                defer a.wg.Done()
×
UNCOV
301

×
UNCOV
302
                select {
×
UNCOV
303
                case a.stateUpdates <- &chanOpenUpdate{newChan: c}:
×
304
                case <-a.quit:
×
305
                }
306
        }()
307
}
308

309
// OnChannelPendingOpen is a callback that should be executed each time a new
310
// channel is opened, either by the agent or an external subsystems, but is
311
// still pending.
UNCOV
312
func (a *Agent) OnChannelPendingOpen() {
×
UNCOV
313
        select {
×
UNCOV
314
        case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}:
×
UNCOV
315
        default:
×
316
        }
317
}
318

319
// OnChannelOpenFailure is a callback that should be executed when the
320
// autopilot has attempted to open a channel, but failed. In this case we can
321
// retry channel creation with a different node.
UNCOV
322
func (a *Agent) OnChannelOpenFailure() {
×
UNCOV
323
        select {
×
UNCOV
324
        case a.chanOpenFailures <- &chanOpenFailureUpdate{}:
×
325
        default:
×
326
        }
327
}
328

329
// OnChannelClose is a callback that should be executed each time a prior
330
// channel has been closed for any reason. This includes regular
331
// closes, force closes, and channel breaches.
UNCOV
332
func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
×
UNCOV
333
        a.wg.Add(1)
×
UNCOV
334
        go func() {
×
UNCOV
335
                defer a.wg.Done()
×
UNCOV
336

×
UNCOV
337
                select {
×
UNCOV
338
                case a.stateUpdates <- &chanCloseUpdate{closedChans: closedChans}:
×
339
                case <-a.quit:
×
340
                }
341
        }()
342
}
343

344
// OnHeuristicUpdate is a method called when a heuristic has been updated, to
345
// trigger the agent to do a new state assessment.
UNCOV
346
func (a *Agent) OnHeuristicUpdate(h AttachmentHeuristic) {
×
UNCOV
347
        select {
×
348
        case a.heuristicUpdates <- &heuristicUpdate{
349
                heuristic: h,
UNCOV
350
        }:
×
351
        default:
×
352
        }
353
}
354

355
// mergeNodeMaps merges the Agent's set of nodes that it already has active
356
// channels open to, with the other sets of nodes that should be removed from
357
// consideration during heuristic selection. This ensures that the Agent doesn't
358
// attempt to open any "duplicate" channels to the same node.
359
func mergeNodeMaps(c map[NodeID]LocalChannel,
UNCOV
360
        skips ...map[NodeID]struct{}) map[NodeID]struct{} {
×
UNCOV
361

×
UNCOV
362
        numNodes := len(c)
×
UNCOV
363
        for _, skip := range skips {
×
UNCOV
364
                numNodes += len(skip)
×
UNCOV
365
        }
×
366

UNCOV
367
        res := make(map[NodeID]struct{}, numNodes)
×
UNCOV
368
        for nodeID := range c {
×
UNCOV
369
                res[nodeID] = struct{}{}
×
UNCOV
370
        }
×
UNCOV
371
        for _, skip := range skips {
×
UNCOV
372
                for nodeID := range skip {
×
UNCOV
373
                        res[nodeID] = struct{}{}
×
UNCOV
374
                }
×
375
        }
376

UNCOV
377
        return res
×
378
}
379

380
// mergeChanState merges the Agent's set of active channels, with the set of
381
// channels awaiting confirmation. This ensures that the agent doesn't go over
382
// the prescribed channel limit or fund allocation limit.
383
func mergeChanState(pendingChans map[NodeID]LocalChannel,
UNCOV
384
        activeChans channelState) []LocalChannel {
×
UNCOV
385

×
UNCOV
386
        numChans := len(pendingChans) + len(activeChans)
×
UNCOV
387
        totalChans := make([]LocalChannel, 0, numChans)
×
UNCOV
388

×
UNCOV
389
        totalChans = append(totalChans, activeChans.Channels()...)
×
UNCOV
390

×
UNCOV
391
        for _, pendingChan := range pendingChans {
×
UNCOV
392
                totalChans = append(totalChans, pendingChan)
×
UNCOV
393
        }
×
394

UNCOV
395
        return totalChans
×
396
}
397

398
// controller implements the closed-loop control system of the Agent. The
399
// controller will make a decision w.r.t channel placement within the graph
400
// based on: its current internal state of the set of active channels open,
401
// and external state changes as a result of decisions it makes w.r.t channel
402
// allocation, or attributes affecting its control loop being updated by the
403
// backing Lightning Node.
UNCOV
404
func (a *Agent) controller() {
×
UNCOV
405
        defer a.wg.Done()
×
UNCOV
406

×
UNCOV
407
        // We'll start off by assigning our starting balance, and injecting
×
UNCOV
408
        // that amount as an initial wake up to the main controller goroutine.
×
UNCOV
409
        a.OnBalanceChange()
×
UNCOV
410

×
UNCOV
411
        // TODO(roasbeef): do we in fact need to maintain order?
×
UNCOV
412
        //  * use sync.Cond if so
×
UNCOV
413
        updateBalance := func() {
×
UNCOV
414
                newBalance, err := a.cfg.WalletBalance()
×
UNCOV
415
                if err != nil {
×
416
                        log.Warnf("unable to update wallet balance: %v", err)
×
417
                        return
×
418
                }
×
419

UNCOV
420
                a.totalBalance = newBalance
×
421
        }
422

423
        // TODO(roasbeef): add 10-minute wake up timer
UNCOV
424
        for {
×
UNCOV
425
                select {
×
426
                // A new external signal has arrived. We'll use this to update
427
                // our internal state, then determine if we should trigger a
428
                // channel state modification (open/close, splice in/out).
UNCOV
429
                case signal := <-a.stateUpdates:
×
UNCOV
430
                        log.Infof("Processing new external signal")
×
UNCOV
431

×
UNCOV
432
                        switch update := signal.(type) {
×
433
                        // A new channel has been opened successfully. This was
434
                        // either opened by the Agent, or an external system
435
                        // that is able to drive the Lightning Node.
UNCOV
436
                        case *chanOpenUpdate:
×
UNCOV
437
                                log.Debugf("New channel successfully opened, "+
×
UNCOV
438
                                        "updating state with: %v",
×
UNCOV
439
                                        spew.Sdump(update.newChan))
×
UNCOV
440

×
UNCOV
441
                                newChan := update.newChan
×
UNCOV
442
                                a.chanStateMtx.Lock()
×
UNCOV
443
                                a.chanState[newChan.ChanID] = newChan
×
UNCOV
444
                                a.chanStateMtx.Unlock()
×
UNCOV
445

×
UNCOV
446
                                a.pendingMtx.Lock()
×
UNCOV
447
                                delete(a.pendingOpens, newChan.Node)
×
UNCOV
448
                                a.pendingMtx.Unlock()
×
UNCOV
449

×
UNCOV
450
                                updateBalance()
×
451
                        // A channel has been closed, this may free up an
452
                        // available slot, triggering a new channel update.
UNCOV
453
                        case *chanCloseUpdate:
×
UNCOV
454
                                log.Debugf("Applying closed channel "+
×
UNCOV
455
                                        "updates: %v",
×
UNCOV
456
                                        spew.Sdump(update.closedChans))
×
UNCOV
457

×
UNCOV
458
                                a.chanStateMtx.Lock()
×
UNCOV
459
                                for _, closedChan := range update.closedChans {
×
UNCOV
460
                                        delete(a.chanState, closedChan)
×
UNCOV
461
                                }
×
UNCOV
462
                                a.chanStateMtx.Unlock()
×
UNCOV
463

×
UNCOV
464
                                updateBalance()
×
465
                        }
466

467
                // A new channel has been opened by the agent or an external
468
                // subsystem, but is still pending confirmation.
UNCOV
469
                case <-a.pendingOpenUpdates:
×
UNCOV
470
                        updateBalance()
×
471

472
                // The balance of the backing wallet has changed, if more funds
473
                // are now available, we may attempt to open up an additional
474
                // channel, or splice in funds to an existing one.
UNCOV
475
                case <-a.balanceUpdates:
×
UNCOV
476
                        log.Debug("Applying external balance state update")
×
UNCOV
477

×
UNCOV
478
                        updateBalance()
×
479

480
                // The channel we tried to open previously failed for whatever
481
                // reason.
UNCOV
482
                case <-a.chanOpenFailures:
×
UNCOV
483
                        log.Debug("Retrying after previous channel open " +
×
UNCOV
484
                                "failure.")
×
UNCOV
485

×
UNCOV
486
                        updateBalance()
×
487

488
                // New nodes have been added to the graph or their node
489
                // announcements have been updated. We will consider opening
490
                // channels to these nodes if we haven't stabilized.
UNCOV
491
                case <-a.nodeUpdates:
×
UNCOV
492
                        log.Debugf("Node updates received, assessing " +
×
UNCOV
493
                                "need for more channels")
×
494

495
                // Any of the deployed heuristics has been updated, check
496
                // whether we have new channel candidates available.
UNCOV
497
                case upd := <-a.heuristicUpdates:
×
UNCOV
498
                        log.Debugf("Heuristic %v updated, assessing need for "+
×
UNCOV
499
                                "more channels", upd.heuristic.Name())
×
500

501
                // The agent has been signalled to exit, so we'll bail out
502
                // immediately.
UNCOV
503
                case <-a.quit:
×
UNCOV
504
                        return
×
505
                }
506

UNCOV
507
                a.pendingMtx.Lock()
×
UNCOV
508
                log.Debugf("Pending channels: %v", spew.Sdump(a.pendingOpens))
×
UNCOV
509
                a.pendingMtx.Unlock()
×
UNCOV
510

×
UNCOV
511
                // With all the updates applied, we'll obtain a set of the
×
UNCOV
512
                // current active channels (confirmed channels), and also
×
UNCOV
513
                // factor in our set of unconfirmed channels.
×
UNCOV
514
                a.chanStateMtx.Lock()
×
UNCOV
515
                a.pendingMtx.Lock()
×
UNCOV
516
                totalChans := mergeChanState(a.pendingOpens, a.chanState)
×
UNCOV
517
                a.pendingMtx.Unlock()
×
UNCOV
518
                a.chanStateMtx.Unlock()
×
UNCOV
519

×
UNCOV
520
                // Now that we've updated our internal state, we'll consult our
×
UNCOV
521
                // channel attachment heuristic to determine if we can open
×
UNCOV
522
                // up any additional channels while staying within our
×
UNCOV
523
                // constraints.
×
UNCOV
524
                availableFunds, numChans := a.cfg.Constraints.ChannelBudget(
×
UNCOV
525
                        totalChans, a.totalBalance,
×
UNCOV
526
                )
×
UNCOV
527
                switch {
×
UNCOV
528
                case numChans == 0:
×
UNCOV
529
                        continue
×
530

531
                // If the amount is too small, we don't want to attempt opening
532
                // another channel.
533
                case availableFunds == 0:
×
534
                        continue
×
535
                case availableFunds < a.cfg.Constraints.MinChanSize():
×
536
                        continue
×
537
                }
538

UNCOV
539
                log.Infof("Triggering attachment directive dispatch, "+
×
UNCOV
540
                        "total_funds=%v", a.totalBalance)
×
UNCOV
541

×
UNCOV
542
                err := a.openChans(availableFunds, numChans, totalChans)
×
UNCOV
543
                if err != nil {
×
UNCOV
544
                        log.Errorf("Unable to open channels: %v", err)
×
UNCOV
545
                }
×
546
        }
547
}
548

549
// openChans queries the agent's heuristic for a set of channel candidates, and
550
// attempts to open channels to them.
551
func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
UNCOV
552
        totalChans []LocalChannel) error {
×
UNCOV
553

×
UNCOV
554
        // As channel size we'll use the maximum channel size available.
×
UNCOV
555
        chanSize := a.cfg.Constraints.MaxChanSize()
×
UNCOV
556
        if availableFunds < chanSize {
×
557
                chanSize = availableFunds
×
558
        }
×
559

UNCOV
560
        if chanSize < a.cfg.Constraints.MinChanSize() {
×
561
                return fmt.Errorf("not enough funds available to open a " +
×
562
                        "single channel")
×
563
        }
×
564

565
        // We're to attempt an attachment so we'll obtain the set of
566
        // nodes that we currently have channels with so we avoid
567
        // duplicate edges.
UNCOV
568
        a.chanStateMtx.Lock()
×
UNCOV
569
        connectedNodes := a.chanState.ConnectedNodes()
×
UNCOV
570
        a.chanStateMtx.Unlock()
×
UNCOV
571

×
UNCOV
572
        for nID := range connectedNodes {
×
573
                log.Tracef("Skipping node %x with open channel", nID[:])
×
574
        }
×
575

UNCOV
576
        a.pendingMtx.Lock()
×
UNCOV
577

×
UNCOV
578
        for nID := range a.pendingOpens {
×
UNCOV
579
                log.Tracef("Skipping node %x with pending channel open", nID[:])
×
UNCOV
580
        }
×
581

UNCOV
582
        for nID := range a.pendingConns {
×
UNCOV
583
                log.Tracef("Skipping node %x with pending connection", nID[:])
×
UNCOV
584
        }
×
585

UNCOV
586
        for nID := range a.failedNodes {
×
UNCOV
587
                log.Tracef("Skipping failed node %v", nID[:])
×
UNCOV
588
        }
×
589

UNCOV
590
        nodesToSkip := mergeNodeMaps(a.pendingOpens,
×
UNCOV
591
                a.pendingConns, connectedNodes, a.failedNodes,
×
UNCOV
592
        )
×
UNCOV
593

×
UNCOV
594
        a.pendingMtx.Unlock()
×
UNCOV
595

×
UNCOV
596
        // Gather the set of all nodes in the graph, except those we
×
UNCOV
597
        // want to skip.
×
UNCOV
598
        selfPubBytes := a.cfg.Self.SerializeCompressed()
×
UNCOV
599
        nodes := make(map[NodeID]struct{})
×
UNCOV
600
        addresses := make(map[NodeID][]net.Addr)
×
UNCOV
601
        if err := a.cfg.Graph.ForEachNode(func(node Node) error {
×
UNCOV
602
                nID := NodeID(node.PubKey())
×
UNCOV
603

×
UNCOV
604
                // If we come across ourselves, them we'll continue in
×
UNCOV
605
                // order to avoid attempting to make a channel with
×
UNCOV
606
                // ourselves.
×
UNCOV
607
                if bytes.Equal(nID[:], selfPubBytes) {
×
608
                        log.Tracef("Skipping self node %x", nID[:])
×
609
                        return nil
×
610
                }
×
611

612
                // If the node has no known addresses, we cannot connect to it,
613
                // so we'll skip it.
UNCOV
614
                addrs := node.Addrs()
×
UNCOV
615
                if len(addrs) == 0 {
×
616
                        log.Tracef("Skipping node %x since no addresses known",
×
617
                                nID[:])
×
618
                        return nil
×
619
                }
×
UNCOV
620
                addresses[nID] = addrs
×
UNCOV
621

×
UNCOV
622
                // Additionally, if this node is in the blacklist, then
×
UNCOV
623
                // we'll skip it.
×
UNCOV
624
                if _, ok := nodesToSkip[nID]; ok {
×
UNCOV
625
                        log.Tracef("Skipping blacklisted node %x", nID[:])
×
UNCOV
626
                        return nil
×
UNCOV
627
                }
×
628

UNCOV
629
                nodes[nID] = struct{}{}
×
UNCOV
630
                return nil
×
631
        }); err != nil {
×
632
                return fmt.Errorf("unable to get graph nodes: %w", err)
×
633
        }
×
634

635
        // Use the heuristic to calculate a score for each node in the
636
        // graph.
UNCOV
637
        log.Debugf("Scoring %d nodes for chan_size=%v", len(nodes), chanSize)
×
UNCOV
638
        scores, err := a.cfg.Heuristic.NodeScores(
×
UNCOV
639
                a.cfg.Graph, totalChans, chanSize, nodes,
×
UNCOV
640
        )
×
UNCOV
641
        if err != nil {
×
UNCOV
642
                return fmt.Errorf("unable to calculate node scores : %w", err)
×
UNCOV
643
        }
×
644

UNCOV
645
        log.Debugf("Got scores for %d nodes", len(scores))
×
UNCOV
646

×
UNCOV
647
        // Now use the score to make a weighted choice which nodes to attempt
×
UNCOV
648
        // to open channels to.
×
UNCOV
649
        scores, err = chooseN(numChans, scores)
×
UNCOV
650
        if err != nil {
×
651
                return fmt.Errorf("unable to make weighted choice: %w",
×
652
                        err)
×
653
        }
×
654

UNCOV
655
        chanCandidates := make(map[NodeID]*AttachmentDirective)
×
UNCOV
656
        for nID := range scores {
×
UNCOV
657
                log.Tracef("Creating attachment directive for chosen node %x",
×
UNCOV
658
                        nID[:])
×
UNCOV
659

×
UNCOV
660
                // Track the available funds we have left.
×
UNCOV
661
                if availableFunds < chanSize {
×
UNCOV
662
                        chanSize = availableFunds
×
UNCOV
663
                }
×
UNCOV
664
                availableFunds -= chanSize
×
UNCOV
665

×
UNCOV
666
                // If we run out of funds, we can break early.
×
UNCOV
667
                if chanSize < a.cfg.Constraints.MinChanSize() {
×
UNCOV
668
                        log.Tracef("Chan size %v too small to satisfy min "+
×
UNCOV
669
                                "channel size %v, breaking", chanSize,
×
UNCOV
670
                                a.cfg.Constraints.MinChanSize())
×
UNCOV
671
                        break
×
672
                }
673

UNCOV
674
                chanCandidates[nID] = &AttachmentDirective{
×
UNCOV
675
                        NodeID:  nID,
×
UNCOV
676
                        ChanAmt: chanSize,
×
UNCOV
677
                        Addrs:   addresses[nID],
×
UNCOV
678
                }
×
679
        }
680

UNCOV
681
        if len(chanCandidates) == 0 {
×
UNCOV
682
                log.Infof("No eligible candidates to connect to")
×
UNCOV
683
                return nil
×
UNCOV
684
        }
×
685

UNCOV
686
        log.Infof("Attempting to execute channel attachment "+
×
UNCOV
687
                "directives: %v", spew.Sdump(chanCandidates))
×
UNCOV
688

×
UNCOV
689
        // Before proceeding, check to see if we have any slots
×
UNCOV
690
        // available to open channels. If there are any, we will attempt
×
UNCOV
691
        // to dispatch the retrieved directives since we can't be
×
UNCOV
692
        // certain which ones may actually succeed. If too many
×
UNCOV
693
        // connections succeed, they will be ignored and made
×
UNCOV
694
        // available to future heuristic selections.
×
UNCOV
695
        a.pendingMtx.Lock()
×
UNCOV
696
        defer a.pendingMtx.Unlock()
×
UNCOV
697
        if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
×
698
                log.Debugf("Reached cap of %v pending "+
×
699
                        "channel opens, will retry "+
×
700
                        "after success/failure",
×
701
                        a.cfg.Constraints.MaxPendingOpens())
×
702
                return nil
×
703
        }
×
704

705
        // For each recommended attachment directive, we'll launch a
706
        // new goroutine to attempt to carry out the directive. If any
707
        // of these succeed, then we'll receive a new state update,
708
        // taking us back to the top of our controller loop.
UNCOV
709
        for _, chanCandidate := range chanCandidates {
×
UNCOV
710
                // Skip candidates which we are already trying
×
UNCOV
711
                // to establish a connection with.
×
UNCOV
712
                nodeID := chanCandidate.NodeID
×
UNCOV
713
                if _, ok := a.pendingConns[nodeID]; ok {
×
714
                        continue
×
715
                }
UNCOV
716
                a.pendingConns[nodeID] = struct{}{}
×
UNCOV
717

×
UNCOV
718
                a.wg.Add(1)
×
UNCOV
719
                go a.executeDirective(*chanCandidate)
×
720
        }
UNCOV
721
        return nil
×
722
}
723

724
// executeDirective attempts to connect to the channel candidate specified by
725
// the given attachment directive, and open a channel of the given size.
726
//
727
// NOTE: MUST be run as a goroutine.
UNCOV
728
func (a *Agent) executeDirective(directive AttachmentDirective) {
×
UNCOV
729
        defer a.wg.Done()
×
UNCOV
730

×
UNCOV
731
        // We'll start out by attempting to connect to the peer in order to
×
UNCOV
732
        // begin the funding workflow.
×
UNCOV
733
        nodeID := directive.NodeID
×
UNCOV
734
        pub, err := btcec.ParsePubKey(nodeID[:])
×
UNCOV
735
        if err != nil {
×
736
                log.Errorf("Unable to parse pubkey %x: %v", nodeID, err)
×
737
                return
×
738
        }
×
739

UNCOV
740
        connected := make(chan bool)
×
UNCOV
741
        errChan := make(chan error)
×
UNCOV
742

×
UNCOV
743
        // To ensure a call to ConnectToPeer doesn't block the agent from
×
UNCOV
744
        // shutting down, we'll launch it in a non-waitgrouped goroutine, that
×
UNCOV
745
        // will signal when a result is returned.
×
UNCOV
746
        // TODO(halseth): use DialContext to cancel on transport level.
×
UNCOV
747
        go func() {
×
UNCOV
748
                alreadyConnected, err := a.cfg.ConnectToPeer(
×
UNCOV
749
                        pub, directive.Addrs,
×
UNCOV
750
                )
×
UNCOV
751
                if err != nil {
×
UNCOV
752
                        select {
×
UNCOV
753
                        case errChan <- err:
×
UNCOV
754
                        case <-a.quit:
×
755
                        }
UNCOV
756
                        return
×
757
                }
758

UNCOV
759
                select {
×
UNCOV
760
                case connected <- alreadyConnected:
×
761
                case <-a.quit:
×
762
                        return
×
763
                }
764
        }()
765

UNCOV
766
        var alreadyConnected bool
×
UNCOV
767
        select {
×
UNCOV
768
        case alreadyConnected = <-connected:
×
UNCOV
769
        case err = <-errChan:
×
UNCOV
770
        case <-a.quit:
×
UNCOV
771
                return
×
772
        }
773

UNCOV
774
        if err != nil {
×
UNCOV
775
                log.Warnf("Unable to connect to %x: %v",
×
UNCOV
776
                        pub.SerializeCompressed(), err)
×
UNCOV
777

×
UNCOV
778
                // Since we failed to connect to them, we'll mark them as
×
UNCOV
779
                // failed so that we don't attempt to connect to them again.
×
UNCOV
780
                a.pendingMtx.Lock()
×
UNCOV
781
                delete(a.pendingConns, nodeID)
×
UNCOV
782
                a.failedNodes[nodeID] = struct{}{}
×
UNCOV
783
                a.pendingMtx.Unlock()
×
UNCOV
784

×
UNCOV
785
                // Finally, we'll trigger the agent to select new peers to
×
UNCOV
786
                // connect to.
×
UNCOV
787
                a.OnChannelOpenFailure()
×
UNCOV
788

×
UNCOV
789
                return
×
UNCOV
790
        }
×
791

792
        // The connection was successful, though before progressing we must
793
        // check that we have not already met our quota for max pending open
794
        // channels. This can happen if multiple directives were spawned but
795
        // fewer slots were available, and other successful attempts finished
796
        // first.
UNCOV
797
        a.pendingMtx.Lock()
×
UNCOV
798
        if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
×
799
                // Since we've reached our max number of pending opens, we'll
×
800
                // disconnect this peer and exit. However, if we were
×
801
                // previously connected to them, then we'll make sure to
×
802
                // maintain the connection alive.
×
803
                if alreadyConnected {
×
804
                        // Since we succeeded in connecting, we won't add this
×
805
                        // peer to the failed nodes map, but we will remove it
×
806
                        // from a.pendingConns so that it can be retried in the
×
807
                        // future.
×
808
                        delete(a.pendingConns, nodeID)
×
809
                        a.pendingMtx.Unlock()
×
810
                        return
×
811
                }
×
812

813
                err = a.cfg.DisconnectPeer(pub)
×
814
                if err != nil {
×
815
                        log.Warnf("Unable to disconnect peer %x: %v",
×
816
                                pub.SerializeCompressed(), err)
×
817
                }
×
818

819
                // Now that we have disconnected, we can remove this node from
820
                // our pending conns map, permitting subsequent connection
821
                // attempts.
822
                delete(a.pendingConns, nodeID)
×
823
                a.pendingMtx.Unlock()
×
824
                return
×
825
        }
826

827
        // If we were successful, we'll track this peer in our set of pending
828
        // opens. We do this here to ensure we don't stall on selecting new
829
        // peers if the connection attempt happens to take too long.
UNCOV
830
        delete(a.pendingConns, nodeID)
×
UNCOV
831
        a.pendingOpens[nodeID] = LocalChannel{
×
UNCOV
832
                Balance: directive.ChanAmt,
×
UNCOV
833
                Node:    nodeID,
×
UNCOV
834
        }
×
UNCOV
835
        a.pendingMtx.Unlock()
×
UNCOV
836

×
UNCOV
837
        // We can then begin the funding workflow with this peer.
×
UNCOV
838
        err = a.cfg.ChanController.OpenChannel(pub, directive.ChanAmt)
×
UNCOV
839
        if err != nil {
×
840
                log.Warnf("Unable to open channel to %x of %v: %v",
×
841
                        pub.SerializeCompressed(), directive.ChanAmt, err)
×
842

×
843
                // As the attempt failed, we'll clear the peer from the set of
×
844
                // pending opens and mark them as failed so we don't attempt to
×
845
                // open a channel to them again.
×
846
                a.pendingMtx.Lock()
×
847
                delete(a.pendingOpens, nodeID)
×
848
                a.failedNodes[nodeID] = struct{}{}
×
849
                a.pendingMtx.Unlock()
×
850

×
851
                // Trigger the agent to re-evaluate everything and possibly
×
852
                // retry with a different node.
×
853
                a.OnChannelOpenFailure()
×
854

×
855
                // Finally, we should also disconnect the peer if we weren't
×
856
                // already connected to them beforehand by an external
×
857
                // subsystem.
×
858
                if alreadyConnected {
×
859
                        return
×
860
                }
×
861

862
                err = a.cfg.DisconnectPeer(pub)
×
863
                if err != nil {
×
864
                        log.Warnf("Unable to disconnect peer %x: %v",
×
865
                                pub.SerializeCompressed(), err)
×
866
                }
×
867
        }
868

869
        // Since the channel open was successful and is currently pending,
870
        // we'll trigger the autopilot agent to query for more peers.
871
        // TODO(halseth): this triggers a new loop before all the new channels
872
        // are added to the pending channels map. Should add before executing
873
        // directive in goroutine?
UNCOV
874
        a.OnChannelPendingOpen()
×
875
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc