• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/autopilot/agent.go
1
package autopilot
2

3
import (
4
        "bytes"
5
        "fmt"
6
        "math/rand"
7
        "net"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcec/v2"
12
        "github.com/btcsuite/btcd/btcutil"
13
        "github.com/davecgh/go-spew/spew"
14
        "github.com/lightningnetwork/lnd/lnwire"
15
)
16

17
// Config couples all the items that an autopilot agent needs to function.
18
// All items within the struct MUST be populated for the Agent to be able to
19
// carry out its duties.
20
type Config struct {
21
        // Self is the identity public key of the Lightning Network node that
22
        // is being driven by the agent. This is used to ensure that we don't
23
        // accidentally attempt to open a channel with ourselves.
24
        Self *btcec.PublicKey
25

26
        // Heuristic is an attachment heuristic which will govern to whom we
27
        // open channels to, and also what those channels look like in terms of
28
        // desired capacity. The Heuristic will take into account the current
29
        // state of the graph, our set of open channels, and the amount of
30
        // available funds when determining how channels are to be opened.
31
        // Additionally, a heuristic make also factor in extra-graph
32
        // information in order to make more pertinent recommendations.
33
        Heuristic AttachmentHeuristic
34

35
        // ChanController is an interface that is able to directly manage the
36
        // creation, closing and update of channels within the network.
37
        ChanController ChannelController
38

39
        // ConnectToPeer attempts to connect to the peer using one of its
40
        // advertised addresses. The boolean returned signals whether the peer
41
        // was already connected.
42
        ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error)
43

44
        // DisconnectPeer attempts to disconnect the peer with the given public
45
        // key.
46
        DisconnectPeer func(*btcec.PublicKey) error
47

48
        // WalletBalance is a function closure that should return the current
49
        // available balance of the backing wallet.
50
        WalletBalance func() (btcutil.Amount, error)
51

52
        // Graph is an abstract channel graph that the Heuristic and the Agent
53
        // will use to make decisions w.r.t channel allocation and placement
54
        // within the graph.
55
        Graph ChannelGraph
56

57
        // Constraints is the set of constraints the autopilot must adhere to
58
        // when opening channels.
59
        Constraints AgentConstraints
60

61
        // TODO(roasbeef): add additional signals from fee rates and revenue of
62
        // currently opened channels
63
}
64

65
// channelState is a type that represents the set of active channels of the
66
// backing LN node that the Agent should be aware of. This type contains a few
67
// helper utility methods.
68
type channelState map[lnwire.ShortChannelID]LocalChannel
69

70
// Channels returns a slice of all the active channels.
UNCOV
71
func (c channelState) Channels() []LocalChannel {
×
UNCOV
72
        chans := make([]LocalChannel, 0, len(c))
×
UNCOV
73
        for _, channel := range c {
×
UNCOV
74
                chans = append(chans, channel)
×
UNCOV
75
        }
×
UNCOV
76
        return chans
×
77
}
78

79
// ConnectedNodes returns the set of nodes we currently have a channel with.
80
// This information is needed as we want to avoid making repeated channels with
81
// any node.
UNCOV
82
func (c channelState) ConnectedNodes() map[NodeID]struct{} {
×
UNCOV
83
        nodes := make(map[NodeID]struct{})
×
UNCOV
84
        for _, channels := range c {
×
85
                nodes[channels.Node] = struct{}{}
×
86
        }
×
87

88
        // TODO(roasbeef): add outgoing, nodes, allow incoming and outgoing to
89
        // per node
90
        //  * only add node is chan as funding amt set
91

UNCOV
92
        return nodes
×
93
}
94

95
// Agent implements a closed-loop control system which seeks to autonomously
96
// optimize the allocation of satoshis within channels throughput the network's
97
// channel graph. An agent is configurable by swapping out different
98
// AttachmentHeuristic strategies. The agent uses external signals such as the
99
// wallet balance changing, or new channels being opened/closed for the local
100
// node as an indicator to re-examine its internal state, and the amount of
101
// available funds in order to make updated decisions w.r.t the channel graph.
102
// The Agent will automatically open, close, and splice in/out channel as
103
// necessary for it to step closer to its optimal state.
104
//
105
// TODO(roasbeef): prob re-word
106
type Agent struct {
107
        started sync.Once
108
        stopped sync.Once
109

110
        // cfg houses the configuration state of the Ant.
111
        cfg Config
112

113
        // chanState tracks the current set of open channels.
114
        chanState    channelState
115
        chanStateMtx sync.Mutex
116

117
        // stateUpdates is a channel that any external state updates that may
118
        // affect the heuristics of the agent will be sent over.
119
        stateUpdates chan interface{}
120

121
        // balanceUpdates is a channel where notifications about updates to the
122
        // wallet's balance will be sent. This channel will be buffered to
123
        // ensure we have at most one pending update of this type to handle at
124
        // a given time.
125
        balanceUpdates chan *balanceUpdate
126

127
        // nodeUpdates is a channel that changes to the graph node landscape
128
        // will be sent over. This channel will be buffered to ensure we have
129
        // at most one pending update of this type to handle at a given time.
130
        nodeUpdates chan *nodeUpdates
131

132
        // pendingOpenUpdates is a channel where updates about channel pending
133
        // opening will be sent. This channel will be buffered to ensure we
134
        // have at most one pending update of this type to handle at a given
135
        // time.
136
        pendingOpenUpdates chan *chanPendingOpenUpdate
137

138
        // chanOpenFailures is a channel where updates about channel open
139
        // failures will be sent. This channel will be buffered to ensure we
140
        // have at most one pending update of this type to handle at a given
141
        // time.
142
        chanOpenFailures chan *chanOpenFailureUpdate
143

144
        // heuristicUpdates is a channel where updates from active heuristics
145
        // will be sent.
146
        heuristicUpdates chan *heuristicUpdate
147

148
        // totalBalance is the total number of satoshis the backing wallet is
149
        // known to control at any given instance. This value will be updated
150
        // when the agent receives external balance update signals.
151
        totalBalance btcutil.Amount
152

153
        // failedNodes lists nodes that we've previously attempted to initiate
154
        // channels with, but didn't succeed.
155
        failedNodes map[NodeID]struct{}
156

157
        // pendingConns tracks the nodes that we are attempting to make
158
        // connections to. This prevents us from making duplicate connection
159
        // requests to the same node.
160
        pendingConns map[NodeID]struct{}
161

162
        // pendingOpens tracks the channels that we've requested to be
163
        // initiated, but haven't yet been confirmed as being fully opened.
164
        // This state is required as otherwise, we may go over our allotted
165
        // channel limit, or open multiple channels to the same node.
166
        pendingOpens map[NodeID]LocalChannel
167
        pendingMtx   sync.Mutex
168

169
        quit chan struct{}
170
        wg   sync.WaitGroup
171
}
172

173
// New creates a new instance of the Agent instantiated using the passed
174
// configuration and initial channel state. The initial channel state slice
175
// should be populated with the set of Channels that are currently opened by
176
// the backing Lightning Node.
UNCOV
177
func New(cfg Config, initialState []LocalChannel) (*Agent, error) {
×
UNCOV
178
        a := &Agent{
×
UNCOV
179
                cfg:                cfg,
×
UNCOV
180
                chanState:          make(map[lnwire.ShortChannelID]LocalChannel),
×
UNCOV
181
                quit:               make(chan struct{}),
×
UNCOV
182
                stateUpdates:       make(chan interface{}),
×
UNCOV
183
                balanceUpdates:     make(chan *balanceUpdate, 1),
×
UNCOV
184
                nodeUpdates:        make(chan *nodeUpdates, 1),
×
UNCOV
185
                chanOpenFailures:   make(chan *chanOpenFailureUpdate, 1),
×
UNCOV
186
                heuristicUpdates:   make(chan *heuristicUpdate, 1),
×
UNCOV
187
                pendingOpenUpdates: make(chan *chanPendingOpenUpdate, 1),
×
UNCOV
188
                failedNodes:        make(map[NodeID]struct{}),
×
UNCOV
189
                pendingConns:       make(map[NodeID]struct{}),
×
UNCOV
190
                pendingOpens:       make(map[NodeID]LocalChannel),
×
UNCOV
191
        }
×
UNCOV
192

×
UNCOV
193
        for _, c := range initialState {
×
UNCOV
194
                a.chanState[c.ChanID] = c
×
UNCOV
195
        }
×
196

UNCOV
197
        return a, nil
×
198
}
199

200
// Start starts the agent along with any goroutines it needs to perform its
201
// normal duties.
UNCOV
202
func (a *Agent) Start() error {
×
UNCOV
203
        var err error
×
UNCOV
204
        a.started.Do(func() {
×
UNCOV
205
                err = a.start()
×
UNCOV
206
        })
×
UNCOV
207
        return err
×
208
}
209

UNCOV
210
func (a *Agent) start() error {
×
UNCOV
211
        rand.Seed(time.Now().Unix())
×
UNCOV
212
        log.Infof("Autopilot Agent starting")
×
UNCOV
213

×
UNCOV
214
        a.wg.Add(1)
×
UNCOV
215
        go a.controller()
×
UNCOV
216

×
UNCOV
217
        return nil
×
UNCOV
218
}
×
219

220
// Stop signals the Agent to gracefully shutdown. This function will block
221
// until all goroutines have exited.
UNCOV
222
func (a *Agent) Stop() error {
×
UNCOV
223
        var err error
×
UNCOV
224
        a.stopped.Do(func() {
×
UNCOV
225
                err = a.stop()
×
UNCOV
226
        })
×
UNCOV
227
        return err
×
228
}
229

UNCOV
230
func (a *Agent) stop() error {
×
UNCOV
231
        log.Infof("Autopilot Agent stopping")
×
UNCOV
232

×
UNCOV
233
        close(a.quit)
×
UNCOV
234
        a.wg.Wait()
×
UNCOV
235

×
UNCOV
236
        return nil
×
UNCOV
237
}
×
238

239
// balanceUpdate is a type of external state update that reflects an
240
// increase/decrease in the funds currently available to the wallet.
241
type balanceUpdate struct {
242
}
243

244
// nodeUpdates is a type of external state update that reflects an addition or
245
// modification in channel graph node membership.
246
type nodeUpdates struct{}
247

248
// chanOpenUpdate is a type of external state update that indicates a new
249
// channel has been opened, either by the Agent itself (within the main
250
// controller loop), or by an external user to the system.
251
type chanOpenUpdate struct {
252
        newChan LocalChannel
253
}
254

255
// chanPendingOpenUpdate is a type of external state update that indicates a new
256
// channel has been opened, either by the agent itself or an external subsystem,
257
// but is still pending.
258
type chanPendingOpenUpdate struct{}
259

260
// chanOpenFailureUpdate is a type of external state update that indicates
261
// a previous channel open failed, and that it might be possible to try again.
262
type chanOpenFailureUpdate struct{}
263

264
// heuristicUpdate is an update sent when one of the autopilot heuristics has
265
// changed, and prompts the agent to make a new attempt at opening more
266
// channels.
267
type heuristicUpdate struct {
268
        heuristic AttachmentHeuristic
269
}
270

271
// chanCloseUpdate is a type of external state update that indicates that the
272
// backing Lightning Node has closed a previously open channel.
273
type chanCloseUpdate struct {
274
        closedChans []lnwire.ShortChannelID
275
}
276

277
// OnBalanceChange is a callback that should be executed each time the balance
278
// of the backing wallet changes.
UNCOV
279
func (a *Agent) OnBalanceChange() {
×
UNCOV
280
        select {
×
UNCOV
281
        case a.balanceUpdates <- &balanceUpdate{}:
×
282
        default:
×
283
        }
284
}
285

286
// OnNodeUpdates is a callback that should be executed each time our channel
287
// graph has new nodes or their node announcements are updated.
UNCOV
288
func (a *Agent) OnNodeUpdates() {
×
UNCOV
289
        select {
×
UNCOV
290
        case a.nodeUpdates <- &nodeUpdates{}:
×
291
        default:
×
292
        }
293
}
294

295
// OnChannelOpen is a callback that should be executed each time a new channel
296
// is manually opened by the user or any system outside the autopilot agent.
UNCOV
297
func (a *Agent) OnChannelOpen(c LocalChannel) {
×
UNCOV
298
        a.wg.Add(1)
×
UNCOV
299
        go func() {
×
UNCOV
300
                defer a.wg.Done()
×
UNCOV
301

×
UNCOV
302
                select {
×
UNCOV
303
                case a.stateUpdates <- &chanOpenUpdate{newChan: c}:
×
304
                case <-a.quit:
×
305
                }
306
        }()
307
}
308

309
// OnChannelPendingOpen is a callback that should be executed each time a new
310
// channel is opened, either by the agent or an external subsystems, but is
311
// still pending.
UNCOV
312
func (a *Agent) OnChannelPendingOpen() {
×
UNCOV
313
        select {
×
UNCOV
314
        case a.pendingOpenUpdates <- &chanPendingOpenUpdate{}:
×
UNCOV
315
        default:
×
316
        }
317
}
318

319
// OnChannelOpenFailure is a callback that should be executed when the
320
// autopilot has attempted to open a channel, but failed. In this case we can
321
// retry channel creation with a different node.
UNCOV
322
func (a *Agent) OnChannelOpenFailure() {
×
UNCOV
323
        select {
×
UNCOV
324
        case a.chanOpenFailures <- &chanOpenFailureUpdate{}:
×
325
        default:
×
326
        }
327
}
328

329
// OnChannelClose is a callback that should be executed each time a prior
330
// channel has been closed for any reason. This includes regular
331
// closes, force closes, and channel breaches.
UNCOV
332
func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) {
×
UNCOV
333
        a.wg.Add(1)
×
UNCOV
334
        go func() {
×
UNCOV
335
                defer a.wg.Done()
×
UNCOV
336

×
UNCOV
337
                select {
×
UNCOV
338
                case a.stateUpdates <- &chanCloseUpdate{closedChans: closedChans}:
×
339
                case <-a.quit:
×
340
                }
341
        }()
342
}
343

344
// OnHeuristicUpdate is a method called when a heuristic has been updated, to
345
// trigger the agent to do a new state assessment.
UNCOV
346
func (a *Agent) OnHeuristicUpdate(h AttachmentHeuristic) {
×
UNCOV
347
        select {
×
348
        case a.heuristicUpdates <- &heuristicUpdate{
349
                heuristic: h,
UNCOV
350
        }:
×
351
        default:
×
352
        }
353
}
354

355
// mergeNodeMaps merges the Agent's set of nodes that it already has active
356
// channels open to, with the other sets of nodes that should be removed from
357
// consideration during heuristic selection. This ensures that the Agent doesn't
358
// attempt to open any "duplicate" channels to the same node.
359
func mergeNodeMaps(c map[NodeID]LocalChannel,
UNCOV
360
        skips ...map[NodeID]struct{}) map[NodeID]struct{} {
×
UNCOV
361

×
UNCOV
362
        numNodes := len(c)
×
UNCOV
363
        for _, skip := range skips {
×
UNCOV
364
                numNodes += len(skip)
×
UNCOV
365
        }
×
366

UNCOV
367
        res := make(map[NodeID]struct{}, numNodes)
×
UNCOV
368
        for nodeID := range c {
×
UNCOV
369
                res[nodeID] = struct{}{}
×
UNCOV
370
        }
×
UNCOV
371
        for _, skip := range skips {
×
UNCOV
372
                for nodeID := range skip {
×
UNCOV
373
                        res[nodeID] = struct{}{}
×
UNCOV
374
                }
×
375
        }
376

UNCOV
377
        return res
×
378
}
379

380
// mergeChanState merges the Agent's set of active channels, with the set of
381
// channels awaiting confirmation. This ensures that the agent doesn't go over
382
// the prescribed channel limit or fund allocation limit.
383
func mergeChanState(pendingChans map[NodeID]LocalChannel,
UNCOV
384
        activeChans channelState) []LocalChannel {
×
UNCOV
385

×
UNCOV
386
        numChans := len(pendingChans) + len(activeChans)
×
UNCOV
387
        totalChans := make([]LocalChannel, 0, numChans)
×
UNCOV
388

×
UNCOV
389
        totalChans = append(totalChans, activeChans.Channels()...)
×
UNCOV
390

×
UNCOV
391
        for _, pendingChan := range pendingChans {
×
UNCOV
392
                totalChans = append(totalChans, pendingChan)
×
UNCOV
393
        }
×
394

UNCOV
395
        return totalChans
×
396
}
397

398
// controller implements the closed-loop control system of the Agent. The
399
// controller will make a decision w.r.t channel placement within the graph
400
// based on: its current internal state of the set of active channels open,
401
// and external state changes as a result of decisions it makes w.r.t channel
402
// allocation, or attributes affecting its control loop being updated by the
403
// backing Lightning Node.
UNCOV
404
func (a *Agent) controller() {
×
UNCOV
405
        defer a.wg.Done()
×
UNCOV
406

×
UNCOV
407
        // We'll start off by assigning our starting balance, and injecting
×
UNCOV
408
        // that amount as an initial wake up to the main controller goroutine.
×
UNCOV
409
        a.OnBalanceChange()
×
UNCOV
410

×
UNCOV
411
        // TODO(roasbeef): do we in fact need to maintain order?
×
UNCOV
412
        //  * use sync.Cond if so
×
UNCOV
413
        updateBalance := func() {
×
UNCOV
414
                newBalance, err := a.cfg.WalletBalance()
×
UNCOV
415
                if err != nil {
×
416
                        log.Warnf("unable to update wallet balance: %v", err)
×
417
                        return
×
418
                }
×
419

UNCOV
420
                a.totalBalance = newBalance
×
421
        }
422

423
        // TODO(roasbeef): add 10-minute wake up timer
UNCOV
424
        for {
×
UNCOV
425
                select {
×
426
                // A new external signal has arrived. We'll use this to update
427
                // our internal state, then determine if we should trigger a
428
                // channel state modification (open/close, splice in/out).
UNCOV
429
                case signal := <-a.stateUpdates:
×
UNCOV
430
                        log.Infof("Processing new external signal")
×
UNCOV
431

×
UNCOV
432
                        switch update := signal.(type) {
×
433
                        // A new channel has been opened successfully. This was
434
                        // either opened by the Agent, or an external system
435
                        // that is able to drive the Lightning Node.
UNCOV
436
                        case *chanOpenUpdate:
×
UNCOV
437
                                log.Debugf("New channel successfully opened, "+
×
UNCOV
438
                                        "updating state with: %v",
×
UNCOV
439
                                        spew.Sdump(update.newChan))
×
UNCOV
440

×
UNCOV
441
                                newChan := update.newChan
×
UNCOV
442
                                a.chanStateMtx.Lock()
×
UNCOV
443
                                a.chanState[newChan.ChanID] = newChan
×
UNCOV
444
                                a.chanStateMtx.Unlock()
×
UNCOV
445

×
UNCOV
446
                                a.pendingMtx.Lock()
×
UNCOV
447
                                delete(a.pendingOpens, newChan.Node)
×
UNCOV
448
                                a.pendingMtx.Unlock()
×
UNCOV
449

×
UNCOV
450
                                updateBalance()
×
451
                        // A channel has been closed, this may free up an
452
                        // available slot, triggering a new channel update.
UNCOV
453
                        case *chanCloseUpdate:
×
UNCOV
454
                                log.Debugf("Applying closed channel "+
×
UNCOV
455
                                        "updates: %v",
×
UNCOV
456
                                        spew.Sdump(update.closedChans))
×
UNCOV
457

×
UNCOV
458
                                a.chanStateMtx.Lock()
×
UNCOV
459
                                for _, closedChan := range update.closedChans {
×
UNCOV
460
                                        delete(a.chanState, closedChan)
×
UNCOV
461
                                }
×
UNCOV
462
                                a.chanStateMtx.Unlock()
×
UNCOV
463

×
UNCOV
464
                                updateBalance()
×
465
                        }
466

467
                // A new channel has been opened by the agent or an external
468
                // subsystem, but is still pending confirmation.
UNCOV
469
                case <-a.pendingOpenUpdates:
×
UNCOV
470
                        updateBalance()
×
471

472
                // The balance of the backing wallet has changed, if more funds
473
                // are now available, we may attempt to open up an additional
474
                // channel, or splice in funds to an existing one.
UNCOV
475
                case <-a.balanceUpdates:
×
UNCOV
476
                        log.Debug("Applying external balance state update")
×
UNCOV
477

×
UNCOV
478
                        updateBalance()
×
479

480
                // The channel we tried to open previously failed for whatever
481
                // reason.
UNCOV
482
                case <-a.chanOpenFailures:
×
UNCOV
483
                        log.Debug("Retrying after previous channel open " +
×
UNCOV
484
                                "failure.")
×
UNCOV
485

×
UNCOV
486
                        updateBalance()
×
487

488
                // New nodes have been added to the graph or their node
489
                // announcements have been updated. We will consider opening
490
                // channels to these nodes if we haven't stabilized.
UNCOV
491
                case <-a.nodeUpdates:
×
UNCOV
492
                        log.Debugf("Node updates received, assessing " +
×
UNCOV
493
                                "need for more channels")
×
494

495
                // Any of the deployed heuristics has been updated, check
496
                // whether we have new channel candidates available.
UNCOV
497
                case upd := <-a.heuristicUpdates:
×
UNCOV
498
                        log.Debugf("Heuristic %v updated, assessing need for "+
×
UNCOV
499
                                "more channels", upd.heuristic.Name())
×
500

501
                // The agent has been signalled to exit, so we'll bail out
502
                // immediately.
UNCOV
503
                case <-a.quit:
×
UNCOV
504
                        return
×
505
                }
506

UNCOV
507
                a.pendingMtx.Lock()
×
UNCOV
508
                log.Debugf("Pending channels: %v", spew.Sdump(a.pendingOpens))
×
UNCOV
509
                a.pendingMtx.Unlock()
×
UNCOV
510

×
UNCOV
511
                // With all the updates applied, we'll obtain a set of the
×
UNCOV
512
                // current active channels (confirmed channels), and also
×
UNCOV
513
                // factor in our set of unconfirmed channels.
×
UNCOV
514
                a.chanStateMtx.Lock()
×
UNCOV
515
                a.pendingMtx.Lock()
×
UNCOV
516
                totalChans := mergeChanState(a.pendingOpens, a.chanState)
×
UNCOV
517
                a.pendingMtx.Unlock()
×
UNCOV
518
                a.chanStateMtx.Unlock()
×
UNCOV
519

×
UNCOV
520
                // Now that we've updated our internal state, we'll consult our
×
UNCOV
521
                // channel attachment heuristic to determine if we can open
×
UNCOV
522
                // up any additional channels while staying within our
×
UNCOV
523
                // constraints.
×
UNCOV
524
                availableFunds, numChans := a.cfg.Constraints.ChannelBudget(
×
UNCOV
525
                        totalChans, a.totalBalance,
×
UNCOV
526
                )
×
UNCOV
527
                switch {
×
UNCOV
528
                case numChans == 0:
×
UNCOV
529
                        continue
×
530

531
                // If the amount is too small, we don't want to attempt opening
532
                // another channel.
533
                case availableFunds == 0:
×
534
                        continue
×
535
                case availableFunds < a.cfg.Constraints.MinChanSize():
×
536
                        continue
×
537
                }
538

UNCOV
539
                log.Infof("Triggering attachment directive dispatch, "+
×
UNCOV
540
                        "total_funds=%v", a.totalBalance)
×
UNCOV
541

×
UNCOV
542
                err := a.openChans(availableFunds, numChans, totalChans)
×
UNCOV
543
                if err != nil {
×
UNCOV
544
                        log.Errorf("Unable to open channels: %v", err)
×
UNCOV
545
                }
×
546
        }
547
}
548

549
// openChans queries the agent's heuristic for a set of channel candidates, and
550
// attempts to open channels to them.
551
func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
UNCOV
552
        totalChans []LocalChannel) error {
×
UNCOV
553

×
UNCOV
554
        // As channel size we'll use the maximum channel size available.
×
UNCOV
555
        chanSize := a.cfg.Constraints.MaxChanSize()
×
UNCOV
556
        if availableFunds < chanSize {
×
557
                chanSize = availableFunds
×
558
        }
×
559

UNCOV
560
        if chanSize < a.cfg.Constraints.MinChanSize() {
×
561
                return fmt.Errorf("not enough funds available to open a " +
×
562
                        "single channel")
×
563
        }
×
564

565
        // We're to attempt an attachment so we'll obtain the set of
566
        // nodes that we currently have channels with so we avoid
567
        // duplicate edges.
UNCOV
568
        a.chanStateMtx.Lock()
×
UNCOV
569
        connectedNodes := a.chanState.ConnectedNodes()
×
UNCOV
570
        a.chanStateMtx.Unlock()
×
UNCOV
571

×
UNCOV
572
        for nID := range connectedNodes {
×
573
                log.Tracef("Skipping node %x with open channel", nID[:])
×
574
        }
×
575

UNCOV
576
        a.pendingMtx.Lock()
×
UNCOV
577

×
UNCOV
578
        for nID := range a.pendingOpens {
×
UNCOV
579
                log.Tracef("Skipping node %x with pending channel open", nID[:])
×
UNCOV
580
        }
×
581

UNCOV
582
        for nID := range a.pendingConns {
×
UNCOV
583
                log.Tracef("Skipping node %x with pending connection", nID[:])
×
UNCOV
584
        }
×
585

UNCOV
586
        for nID := range a.failedNodes {
×
UNCOV
587
                log.Tracef("Skipping failed node %v", nID[:])
×
UNCOV
588
        }
×
589

UNCOV
590
        nodesToSkip := mergeNodeMaps(a.pendingOpens,
×
UNCOV
591
                a.pendingConns, connectedNodes, a.failedNodes,
×
UNCOV
592
        )
×
UNCOV
593

×
UNCOV
594
        a.pendingMtx.Unlock()
×
UNCOV
595

×
UNCOV
596
        // Gather the set of all nodes in the graph, except those we
×
UNCOV
597
        // want to skip.
×
UNCOV
598
        selfPubBytes := a.cfg.Self.SerializeCompressed()
×
UNCOV
599
        nodes := make(map[NodeID]struct{})
×
UNCOV
600
        addresses := make(map[NodeID][]net.Addr)
×
UNCOV
601
        if err := a.cfg.Graph.ForEachNode(func(node Node) error {
×
UNCOV
602
                nID := NodeID(node.PubKey())
×
UNCOV
603

×
UNCOV
604
                // If we come across ourselves, them we'll continue in
×
UNCOV
605
                // order to avoid attempting to make a channel with
×
UNCOV
606
                // ourselves.
×
UNCOV
607
                if bytes.Equal(nID[:], selfPubBytes) {
×
608
                        log.Tracef("Skipping self node %x", nID[:])
×
609
                        return nil
×
610
                }
×
611

612
                // If the node has no known addresses, we cannot connect to it,
613
                // so we'll skip it.
UNCOV
614
                addrs := node.Addrs()
×
UNCOV
615
                if len(addrs) == 0 {
×
616
                        log.Tracef("Skipping node %x since no addresses known",
×
617
                                nID[:])
×
618
                        return nil
×
619
                }
×
UNCOV
620
                addresses[nID] = addrs
×
UNCOV
621

×
UNCOV
622
                // Additionally, if this node is in the blacklist, then
×
UNCOV
623
                // we'll skip it.
×
UNCOV
624
                if _, ok := nodesToSkip[nID]; ok {
×
UNCOV
625
                        log.Tracef("Skipping blacklisted node %x", nID[:])
×
UNCOV
626
                        return nil
×
UNCOV
627
                }
×
628

UNCOV
629
                nodes[nID] = struct{}{}
×
UNCOV
630
                return nil
×
631
        }); err != nil {
×
632
                return fmt.Errorf("unable to get graph nodes: %w", err)
×
633
        }
×
634

635
        // Use the heuristic to calculate a score for each node in the
636
        // graph.
UNCOV
637
        log.Debugf("Scoring %d nodes for chan_size=%v", len(nodes), chanSize)
×
UNCOV
638
        scores, err := a.cfg.Heuristic.NodeScores(
×
UNCOV
639
                a.cfg.Graph, totalChans, chanSize, nodes,
×
UNCOV
640
        )
×
UNCOV
641
        if err != nil {
×
UNCOV
642
                return fmt.Errorf("unable to calculate node scores : %w", err)
×
UNCOV
643
        }
×
644

UNCOV
645
        log.Debugf("Got scores for %d nodes", len(scores))
×
UNCOV
646

×
UNCOV
647
        // Now use the score to make a weighted choice which nodes to attempt
×
UNCOV
648
        // to open channels to.
×
UNCOV
649
        scores, err = chooseN(numChans, scores)
×
UNCOV
650
        if err != nil {
×
651
                return fmt.Errorf("unable to make weighted choice: %w",
×
652
                        err)
×
653
        }
×
654

UNCOV
655
        chanCandidates := make(map[NodeID]*AttachmentDirective)
×
UNCOV
656
        for nID := range scores {
×
UNCOV
657
                log.Tracef("Creating attachment directive for chosen node %x",
×
UNCOV
658
                        nID[:])
×
UNCOV
659

×
UNCOV
660
                // Track the available funds we have left.
×
UNCOV
661
                if availableFunds < chanSize {
×
UNCOV
662
                        chanSize = availableFunds
×
UNCOV
663
                }
×
UNCOV
664
                availableFunds -= chanSize
×
UNCOV
665

×
UNCOV
666
                // If we run out of funds, we can break early.
×
UNCOV
667
                if chanSize < a.cfg.Constraints.MinChanSize() {
×
UNCOV
668
                        log.Tracef("Chan size %v too small to satisfy min "+
×
UNCOV
669
                                "channel size %v, breaking", chanSize,
×
UNCOV
670
                                a.cfg.Constraints.MinChanSize())
×
UNCOV
671
                        break
×
672
                }
673

UNCOV
674
                chanCandidates[nID] = &AttachmentDirective{
×
UNCOV
675
                        NodeID:  nID,
×
UNCOV
676
                        ChanAmt: chanSize,
×
UNCOV
677
                        Addrs:   addresses[nID],
×
UNCOV
678
                }
×
679
        }
680

UNCOV
681
        if len(chanCandidates) == 0 {
×
UNCOV
682
                log.Infof("No eligible candidates to connect to")
×
UNCOV
683
                return nil
×
UNCOV
684
        }
×
685

UNCOV
686
        log.Infof("Attempting to execute channel attachment "+
×
UNCOV
687
                "directives: %v", spew.Sdump(chanCandidates))
×
UNCOV
688

×
UNCOV
689
        // Before proceeding, check to see if we have any slots
×
UNCOV
690
        // available to open channels. If there are any, we will attempt
×
UNCOV
691
        // to dispatch the retrieved directives since we can't be
×
UNCOV
692
        // certain which ones may actually succeed. If too many
×
UNCOV
693
        // connections succeed, they will be ignored and made
×
UNCOV
694
        // available to future heuristic selections.
×
UNCOV
695
        a.pendingMtx.Lock()
×
UNCOV
696
        defer a.pendingMtx.Unlock()
×
UNCOV
697
        if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
×
698
                log.Debugf("Reached cap of %v pending "+
×
699
                        "channel opens, will retry "+
×
700
                        "after success/failure",
×
701
                        a.cfg.Constraints.MaxPendingOpens())
×
702
                return nil
×
703
        }
×
704

705
        // For each recommended attachment directive, we'll launch a
706
        // new goroutine to attempt to carry out the directive. If any
707
        // of these succeed, then we'll receive a new state update,
708
        // taking us back to the top of our controller loop.
UNCOV
709
        for _, chanCandidate := range chanCandidates {
×
UNCOV
710
                // Skip candidates which we are already trying
×
UNCOV
711
                // to establish a connection with.
×
UNCOV
712
                nodeID := chanCandidate.NodeID
×
UNCOV
713
                if _, ok := a.pendingConns[nodeID]; ok {
×
714
                        continue
×
715
                }
UNCOV
716
                a.pendingConns[nodeID] = struct{}{}
×
UNCOV
717

×
UNCOV
718
                a.wg.Add(1)
×
UNCOV
719
                go a.executeDirective(*chanCandidate)
×
720
        }
UNCOV
721
        return nil
×
722
}
723

724
// executeDirective attempts to connect to the channel candidate specified by
725
// the given attachment directive, and open a channel of the given size.
726
//
727
// NOTE: MUST be run as a goroutine.
UNCOV
728
func (a *Agent) executeDirective(directive AttachmentDirective) {
×
UNCOV
729
        defer a.wg.Done()
×
UNCOV
730

×
UNCOV
731
        // We'll start out by attempting to connect to the peer in order to
×
UNCOV
732
        // begin the funding workflow.
×
UNCOV
733
        nodeID := directive.NodeID
×
UNCOV
734
        pub, err := btcec.ParsePubKey(nodeID[:])
×
UNCOV
735
        if err != nil {
×
736
                log.Errorf("Unable to parse pubkey %x: %v", nodeID, err)
×
737
                return
×
738
        }
×
739

UNCOV
740
        connected := make(chan bool)
×
UNCOV
741
        errChan := make(chan error)
×
UNCOV
742

×
UNCOV
743
        // To ensure a call to ConnectToPeer doesn't block the agent from
×
UNCOV
744
        // shutting down, we'll launch it in a non-waitgrouped goroutine, that
×
UNCOV
745
        // will signal when a result is returned.
×
UNCOV
746
        // TODO(halseth): use DialContext to cancel on transport level.
×
UNCOV
747
        go func() {
×
UNCOV
748
                alreadyConnected, err := a.cfg.ConnectToPeer(
×
UNCOV
749
                        pub, directive.Addrs,
×
UNCOV
750
                )
×
UNCOV
751
                if err != nil {
×
UNCOV
752
                        select {
×
UNCOV
753
                        case errChan <- err:
×
UNCOV
754
                        case <-a.quit:
×
755
                        }
UNCOV
756
                        return
×
757
                }
758

UNCOV
759
                select {
×
UNCOV
760
                case connected <- alreadyConnected:
×
761
                case <-a.quit:
×
762
                        return
×
763
                }
764
        }()
765

UNCOV
766
        var alreadyConnected bool
×
UNCOV
767
        select {
×
UNCOV
768
        case alreadyConnected = <-connected:
×
UNCOV
769
        case err = <-errChan:
×
UNCOV
770
        case <-a.quit:
×
UNCOV
771
                return
×
772
        }
773

UNCOV
774
        if err != nil {
×
UNCOV
775
                log.Warnf("Unable to connect to %x: %v",
×
UNCOV
776
                        pub.SerializeCompressed(), err)
×
UNCOV
777

×
UNCOV
778
                // Since we failed to connect to them, we'll mark them as
×
UNCOV
779
                // failed so that we don't attempt to connect to them again.
×
UNCOV
780
                a.pendingMtx.Lock()
×
UNCOV
781
                delete(a.pendingConns, nodeID)
×
UNCOV
782
                a.failedNodes[nodeID] = struct{}{}
×
UNCOV
783
                a.pendingMtx.Unlock()
×
UNCOV
784

×
UNCOV
785
                // Finally, we'll trigger the agent to select new peers to
×
UNCOV
786
                // connect to.
×
UNCOV
787
                a.OnChannelOpenFailure()
×
UNCOV
788

×
UNCOV
789
                return
×
UNCOV
790
        }
×
791

792
        // The connection was successful, though before progressing we must
793
        // check that we have not already met our quota for max pending open
794
        // channels. This can happen if multiple directives were spawned but
795
        // fewer slots were available, and other successful attempts finished
796
        // first.
UNCOV
797
        a.pendingMtx.Lock()
×
UNCOV
798
        if uint16(len(a.pendingOpens)) >= a.cfg.Constraints.MaxPendingOpens() {
×
799
                // Since we've reached our max number of pending opens, we'll
×
800
                // disconnect this peer and exit. However, if we were
×
801
                // previously connected to them, then we'll make sure to
×
802
                // maintain the connection alive.
×
803
                if alreadyConnected {
×
804
                        // Since we succeeded in connecting, we won't add this
×
805
                        // peer to the failed nodes map, but we will remove it
×
806
                        // from a.pendingConns so that it can be retried in the
×
807
                        // future.
×
808
                        delete(a.pendingConns, nodeID)
×
809
                        a.pendingMtx.Unlock()
×
810
                        return
×
811
                }
×
812

813
                err = a.cfg.DisconnectPeer(pub)
×
814
                if err != nil {
×
815
                        log.Warnf("Unable to disconnect peer %x: %v",
×
816
                                pub.SerializeCompressed(), err)
×
817
                }
×
818

819
                // Now that we have disconnected, we can remove this node from
820
                // our pending conns map, permitting subsequent connection
821
                // attempts.
822
                delete(a.pendingConns, nodeID)
×
823
                a.pendingMtx.Unlock()
×
824
                return
×
825
        }
826

827
        // If we were successful, we'll track this peer in our set of pending
828
        // opens. We do this here to ensure we don't stall on selecting new
829
        // peers if the connection attempt happens to take too long.
UNCOV
830
        delete(a.pendingConns, nodeID)
×
UNCOV
831
        a.pendingOpens[nodeID] = LocalChannel{
×
UNCOV
832
                Balance: directive.ChanAmt,
×
UNCOV
833
                Node:    nodeID,
×
UNCOV
834
        }
×
UNCOV
835
        a.pendingMtx.Unlock()
×
UNCOV
836

×
UNCOV
837
        // We can then begin the funding workflow with this peer.
×
UNCOV
838
        err = a.cfg.ChanController.OpenChannel(pub, directive.ChanAmt)
×
UNCOV
839
        if err != nil {
×
840
                log.Warnf("Unable to open channel to %x of %v: %v",
×
841
                        pub.SerializeCompressed(), directive.ChanAmt, err)
×
842

×
843
                // As the attempt failed, we'll clear the peer from the set of
×
844
                // pending opens and mark them as failed so we don't attempt to
×
845
                // open a channel to them again.
×
846
                a.pendingMtx.Lock()
×
847
                delete(a.pendingOpens, nodeID)
×
848
                a.failedNodes[nodeID] = struct{}{}
×
849
                a.pendingMtx.Unlock()
×
850

×
851
                // Trigger the agent to re-evaluate everything and possibly
×
852
                // retry with a different node.
×
853
                a.OnChannelOpenFailure()
×
854

×
855
                // Finally, we should also disconnect the peer if we weren't
×
856
                // already connected to them beforehand by an external
×
857
                // subsystem.
×
858
                if alreadyConnected {
×
859
                        return
×
860
                }
×
861

862
                err = a.cfg.DisconnectPeer(pub)
×
863
                if err != nil {
×
864
                        log.Warnf("Unable to disconnect peer %x: %v",
×
865
                                pub.SerializeCompressed(), err)
×
866
                }
×
867
        }
868

869
        // Since the channel open was successful and is currently pending,
870
        // we'll trigger the autopilot agent to query for more peers.
871
        // TODO(halseth): this triggers a new loop before all the new channels
872
        // are added to the pending channels map. Should add before executing
873
        // directive in goroutine?
UNCOV
874
        a.OnChannelPendingOpen()
×
875
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc