• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.28
/server.go
1
package lnd
2

3
import (
4
        "bytes"
5
        "context"
6
        "crypto/rand"
7
        "encoding/hex"
8
        "fmt"
9
        "math/big"
10
        prand "math/rand"
11
        "net"
12
        "strconv"
13
        "strings"
14
        "sync"
15
        "sync/atomic"
16
        "time"
17

18
        "github.com/btcsuite/btcd/btcec/v2"
19
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
20
        "github.com/btcsuite/btcd/btcutil"
21
        "github.com/btcsuite/btcd/chaincfg"
22
        "github.com/btcsuite/btcd/chaincfg/chainhash"
23
        "github.com/btcsuite/btcd/connmgr"
24
        "github.com/btcsuite/btcd/txscript"
25
        "github.com/btcsuite/btcd/wire"
26
        "github.com/go-errors/errors"
27
        sphinx "github.com/lightningnetwork/lightning-onion"
28
        "github.com/lightningnetwork/lnd/aliasmgr"
29
        "github.com/lightningnetwork/lnd/autopilot"
30
        "github.com/lightningnetwork/lnd/brontide"
31
        "github.com/lightningnetwork/lnd/chainio"
32
        "github.com/lightningnetwork/lnd/chainreg"
33
        "github.com/lightningnetwork/lnd/chanacceptor"
34
        "github.com/lightningnetwork/lnd/chanbackup"
35
        "github.com/lightningnetwork/lnd/chanfitness"
36
        "github.com/lightningnetwork/lnd/channeldb"
37
        "github.com/lightningnetwork/lnd/channelnotifier"
38
        "github.com/lightningnetwork/lnd/clock"
39
        "github.com/lightningnetwork/lnd/cluster"
40
        "github.com/lightningnetwork/lnd/contractcourt"
41
        "github.com/lightningnetwork/lnd/discovery"
42
        "github.com/lightningnetwork/lnd/feature"
43
        "github.com/lightningnetwork/lnd/fn/v2"
44
        "github.com/lightningnetwork/lnd/funding"
45
        "github.com/lightningnetwork/lnd/graph"
46
        graphdb "github.com/lightningnetwork/lnd/graph/db"
47
        "github.com/lightningnetwork/lnd/graph/db/models"
48
        "github.com/lightningnetwork/lnd/graph/graphsession"
49
        "github.com/lightningnetwork/lnd/healthcheck"
50
        "github.com/lightningnetwork/lnd/htlcswitch"
51
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
52
        "github.com/lightningnetwork/lnd/input"
53
        "github.com/lightningnetwork/lnd/invoices"
54
        "github.com/lightningnetwork/lnd/keychain"
55
        "github.com/lightningnetwork/lnd/kvdb"
56
        "github.com/lightningnetwork/lnd/lncfg"
57
        "github.com/lightningnetwork/lnd/lnencrypt"
58
        "github.com/lightningnetwork/lnd/lnpeer"
59
        "github.com/lightningnetwork/lnd/lnrpc"
60
        "github.com/lightningnetwork/lnd/lnrpc/routerrpc"
61
        "github.com/lightningnetwork/lnd/lnwallet"
62
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
63
        "github.com/lightningnetwork/lnd/lnwallet/chanfunding"
64
        "github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
65
        "github.com/lightningnetwork/lnd/lnwire"
66
        "github.com/lightningnetwork/lnd/nat"
67
        "github.com/lightningnetwork/lnd/netann"
68
        "github.com/lightningnetwork/lnd/peer"
69
        "github.com/lightningnetwork/lnd/peernotifier"
70
        "github.com/lightningnetwork/lnd/pool"
71
        "github.com/lightningnetwork/lnd/queue"
72
        "github.com/lightningnetwork/lnd/routing"
73
        "github.com/lightningnetwork/lnd/routing/localchans"
74
        "github.com/lightningnetwork/lnd/routing/route"
75
        "github.com/lightningnetwork/lnd/subscribe"
76
        "github.com/lightningnetwork/lnd/sweep"
77
        "github.com/lightningnetwork/lnd/ticker"
78
        "github.com/lightningnetwork/lnd/tor"
79
        "github.com/lightningnetwork/lnd/walletunlocker"
80
        "github.com/lightningnetwork/lnd/watchtower/blob"
81
        "github.com/lightningnetwork/lnd/watchtower/wtclient"
82
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
83
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
84
)
85

86
const (
87
        // defaultMinPeers is the minimum number of peers nodes should always be
88
        // connected to.
89
        defaultMinPeers = 3
90

91
        // defaultStableConnDuration is a floor under which all reconnection
92
        // attempts will apply exponential randomized backoff. Connections
93
        // durations exceeding this value will be eligible to have their
94
        // backoffs reduced.
95
        defaultStableConnDuration = 10 * time.Minute
96

97
        // numInstantInitReconnect specifies how many persistent peers we should
98
        // always attempt outbound connections to immediately. After this value
99
        // is surpassed, the remaining peers will be randomly delayed using
100
        // maxInitReconnectDelay.
101
        numInstantInitReconnect = 10
102

103
        // maxInitReconnectDelay specifies the maximum delay in seconds we will
104
        // apply in attempting to reconnect to persistent peers on startup. The
105
        // value used or a particular peer will be chosen between 0s and this
106
        // value.
107
        maxInitReconnectDelay = 30
108

109
        // multiAddrConnectionStagger is the number of seconds to wait between
110
        // attempting to a peer with each of its advertised addresses.
111
        multiAddrConnectionStagger = 10 * time.Second
112
)
113

114
var (
115
        // ErrPeerNotConnected signals that the server has no connection to the
116
        // given peer.
117
        ErrPeerNotConnected = errors.New("peer is not connected")
118

119
        // ErrServerNotActive indicates that the server has started but hasn't
120
        // fully finished the startup process.
121
        ErrServerNotActive = errors.New("server is still in the process of " +
122
                "starting")
123

124
        // ErrServerShuttingDown indicates that the server is in the process of
125
        // gracefully exiting.
126
        ErrServerShuttingDown = errors.New("server is shutting down")
127

128
        // MaxFundingAmount is a soft-limit of the maximum channel size
129
        // currently accepted within the Lightning Protocol. This is
130
        // defined in BOLT-0002, and serves as an initial precautionary limit
131
        // while implementations are battle tested in the real world.
132
        //
133
        // At the moment, this value depends on which chain is active. It is set
134
        // to the value under the Bitcoin chain as default.
135
        //
136
        // TODO(roasbeef): add command line param to modify.
137
        MaxFundingAmount = funding.MaxBtcFundingAmount
138

139
        // EndorsementExperimentEnd is the time after which nodes should stop
140
        // propagating experimental endorsement signals.
141
        //
142
        // Per blip04: January 1, 2026 12:00:00 AM UTC in unix seconds.
143
        EndorsementExperimentEnd = time.Unix(1767225600, 0)
144
)
145

146
// errPeerAlreadyConnected is an error returned by the server when we're
147
// commanded to connect to a peer, but they're already connected.
148
type errPeerAlreadyConnected struct {
149
        peer *peer.Brontide
150
}
151

152
// Error returns the human readable version of this error type.
153
//
154
// NOTE: Part of the error interface.
UNCOV
155
func (e *errPeerAlreadyConnected) Error() string {
×
UNCOV
156
        return fmt.Sprintf("already connected to peer: %v", e.peer)
×
UNCOV
157
}
×
158

159
// server is the main server of the Lightning Network Daemon. The server houses
160
// global state pertaining to the wallet, database, and the rpcserver.
161
// Additionally, the server is also used as a central messaging bus to interact
162
// with any of its companion objects.
163
type server struct {
164
        active   int32 // atomic
165
        stopping int32 // atomic
166

167
        start sync.Once
168
        stop  sync.Once
169

170
        cfg *Config
171

172
        implCfg *ImplementationCfg
173

174
        // identityECDH is an ECDH capable wrapper for the private key used
175
        // to authenticate any incoming connections.
176
        identityECDH keychain.SingleKeyECDH
177

178
        // identityKeyLoc is the key locator for the above wrapped identity key.
179
        identityKeyLoc keychain.KeyLocator
180

181
        // nodeSigner is an implementation of the MessageSigner implementation
182
        // that's backed by the identity private key of the running lnd node.
183
        nodeSigner *netann.NodeSigner
184

185
        chanStatusMgr *netann.ChanStatusManager
186

187
        // listenAddrs is the list of addresses the server is currently
188
        // listening on.
189
        listenAddrs []net.Addr
190

191
        // torController is a client that will communicate with a locally
192
        // running Tor server. This client will handle initiating and
193
        // authenticating the connection to the Tor server, automatically
194
        // creating and setting up onion services, etc.
195
        torController *tor.Controller
196

197
        // natTraversal is the specific NAT traversal technique used to
198
        // automatically set up port forwarding rules in order to advertise to
199
        // the network that the node is accepting inbound connections.
200
        natTraversal nat.Traversal
201

202
        // lastDetectedIP is the last IP detected by the NAT traversal technique
203
        // above. This IP will be watched periodically in a goroutine in order
204
        // to handle dynamic IP changes.
205
        lastDetectedIP net.IP
206

207
        mu sync.RWMutex
208

209
        // peersByPub is a map of the active peers.
210
        //
211
        // NOTE: The key used here is the raw bytes of the peer's public key to
212
        // string conversion, which means it cannot be printed using `%s` as it
213
        // will just print the binary.
214
        //
215
        // TODO(yy): Use the hex string instead.
216
        peersByPub map[string]*peer.Brontide
217

218
        inboundPeers  map[string]*peer.Brontide
219
        outboundPeers map[string]*peer.Brontide
220

221
        peerConnectedListeners    map[string][]chan<- lnpeer.Peer
222
        peerDisconnectedListeners map[string][]chan<- struct{}
223

224
        // TODO(yy): the Brontide.Start doesn't know this value, which means it
225
        // will continue to send messages even if there are no active channels
226
        // and the value below is false. Once it's pruned, all its connections
227
        // will be closed, thus the Brontide.Start will return an error.
228
        persistentPeers        map[string]bool
229
        persistentPeersBackoff map[string]time.Duration
230
        persistentPeerAddrs    map[string][]*lnwire.NetAddress
231
        persistentConnReqs     map[string][]*connmgr.ConnReq
232
        persistentRetryCancels map[string]chan struct{}
233

234
        // peerErrors keeps a set of peer error buffers for peers that have
235
        // disconnected from us. This allows us to track historic peer errors
236
        // over connections. The string of the peer's compressed pubkey is used
237
        // as a key for this map.
238
        peerErrors map[string]*queue.CircularBuffer
239

240
        // ignorePeerTermination tracks peers for which the server has initiated
241
        // a disconnect. Adding a peer to this map causes the peer termination
242
        // watcher to short circuit in the event that peers are purposefully
243
        // disconnected.
244
        ignorePeerTermination map[*peer.Brontide]struct{}
245

246
        // scheduledPeerConnection maps a pubkey string to a callback that
247
        // should be executed in the peerTerminationWatcher the prior peer with
248
        // the same pubkey exits.  This allows the server to wait until the
249
        // prior peer has cleaned up successfully, before adding the new peer
250
        // intended to replace it.
251
        scheduledPeerConnection map[string]func()
252

253
        // pongBuf is a shared pong reply buffer we'll use across all active
254
        // peer goroutines. We know the max size of a pong message
255
        // (lnwire.MaxPongBytes), so we can allocate this ahead of time, and
256
        // avoid allocations each time we need to send a pong message.
257
        pongBuf []byte
258

259
        cc *chainreg.ChainControl
260

261
        fundingMgr *funding.Manager
262

263
        graphDB *graphdb.ChannelGraph
264

265
        chanStateDB *channeldb.ChannelStateDB
266

267
        addrSource channeldb.AddrSource
268

269
        // miscDB is the DB that contains all "other" databases within the main
270
        // channel DB that haven't been separated out yet.
271
        miscDB *channeldb.DB
272

273
        invoicesDB invoices.InvoiceDB
274

275
        aliasMgr *aliasmgr.Manager
276

277
        htlcSwitch *htlcswitch.Switch
278

279
        interceptableSwitch *htlcswitch.InterceptableSwitch
280

281
        invoices *invoices.InvoiceRegistry
282

283
        invoiceHtlcModifier *invoices.HtlcModificationInterceptor
284

285
        channelNotifier *channelnotifier.ChannelNotifier
286

287
        peerNotifier *peernotifier.PeerNotifier
288

289
        htlcNotifier *htlcswitch.HtlcNotifier
290

291
        witnessBeacon contractcourt.WitnessBeacon
292

293
        breachArbitrator *contractcourt.BreachArbitrator
294

295
        missionController *routing.MissionController
296
        defaultMC         *routing.MissionControl
297

298
        graphBuilder *graph.Builder
299

300
        chanRouter *routing.ChannelRouter
301

302
        controlTower routing.ControlTower
303

304
        authGossiper *discovery.AuthenticatedGossiper
305

306
        localChanMgr *localchans.Manager
307

308
        utxoNursery *contractcourt.UtxoNursery
309

310
        sweeper *sweep.UtxoSweeper
311

312
        chainArb *contractcourt.ChainArbitrator
313

314
        sphinx *hop.OnionProcessor
315

316
        towerClientMgr *wtclient.Manager
317

318
        connMgr *connmgr.ConnManager
319

320
        sigPool *lnwallet.SigPool
321

322
        writePool *pool.Write
323

324
        readPool *pool.Read
325

326
        tlsManager *TLSManager
327

328
        // featureMgr dispatches feature vectors for various contexts within the
329
        // daemon.
330
        featureMgr *feature.Manager
331

332
        // currentNodeAnn is the node announcement that has been broadcast to
333
        // the network upon startup, if the attributes of the node (us) has
334
        // changed since last start.
335
        currentNodeAnn *lnwire.NodeAnnouncement
336

337
        // chansToRestore is the set of channels that upon starting, the server
338
        // should attempt to restore/recover.
339
        chansToRestore walletunlocker.ChannelsToRecover
340

341
        // chanSubSwapper is a sub-system that will ensure our on-disk channel
342
        // backups are consistent at all times. It interacts with the
343
        // channelNotifier to be notified of newly opened and closed channels.
344
        chanSubSwapper *chanbackup.SubSwapper
345

346
        // chanEventStore tracks the behaviour of channels and their remote peers to
347
        // provide insights into their health and performance.
348
        chanEventStore *chanfitness.ChannelEventStore
349

350
        hostAnn *netann.HostAnnouncer
351

352
        // livenessMonitor monitors that lnd has access to critical resources.
353
        livenessMonitor *healthcheck.Monitor
354

355
        customMessageServer *subscribe.Server
356

357
        // txPublisher is a publisher with fee-bumping capability.
358
        txPublisher *sweep.TxPublisher
359

360
        // blockbeatDispatcher is a block dispatcher that notifies subscribers
361
        // of new blocks.
362
        blockbeatDispatcher *chainio.BlockbeatDispatcher
363

364
        quit chan struct{}
365

366
        wg sync.WaitGroup
367
}
368

369
// updatePersistentPeerAddrs subscribes to topology changes and stores
370
// advertised addresses for any NodeAnnouncements from our persisted peers.
UNCOV
371
func (s *server) updatePersistentPeerAddrs() error {
×
UNCOV
372
        graphSub, err := s.graphBuilder.SubscribeTopology()
×
UNCOV
373
        if err != nil {
×
374
                return err
×
375
        }
×
376

UNCOV
377
        s.wg.Add(1)
×
UNCOV
378
        go func() {
×
UNCOV
379
                defer func() {
×
UNCOV
380
                        graphSub.Cancel()
×
UNCOV
381
                        s.wg.Done()
×
UNCOV
382
                }()
×
383

UNCOV
384
                for {
×
UNCOV
385
                        select {
×
UNCOV
386
                        case <-s.quit:
×
UNCOV
387
                                return
×
388

UNCOV
389
                        case topChange, ok := <-graphSub.TopologyChanges:
×
UNCOV
390
                                // If the router is shutting down, then we will
×
UNCOV
391
                                // as well.
×
UNCOV
392
                                if !ok {
×
393
                                        return
×
394
                                }
×
395

UNCOV
396
                                for _, update := range topChange.NodeUpdates {
×
UNCOV
397
                                        pubKeyStr := string(
×
UNCOV
398
                                                update.IdentityKey.
×
UNCOV
399
                                                        SerializeCompressed(),
×
UNCOV
400
                                        )
×
UNCOV
401

×
UNCOV
402
                                        // We only care about updates from
×
UNCOV
403
                                        // our persistentPeers.
×
UNCOV
404
                                        s.mu.RLock()
×
UNCOV
405
                                        _, ok := s.persistentPeers[pubKeyStr]
×
UNCOV
406
                                        s.mu.RUnlock()
×
UNCOV
407
                                        if !ok {
×
UNCOV
408
                                                continue
×
409
                                        }
410

UNCOV
411
                                        addrs := make([]*lnwire.NetAddress, 0,
×
UNCOV
412
                                                len(update.Addresses))
×
UNCOV
413

×
UNCOV
414
                                        for _, addr := range update.Addresses {
×
UNCOV
415
                                                addrs = append(addrs,
×
UNCOV
416
                                                        &lnwire.NetAddress{
×
UNCOV
417
                                                                IdentityKey: update.IdentityKey,
×
UNCOV
418
                                                                Address:     addr,
×
UNCOV
419
                                                                ChainNet:    s.cfg.ActiveNetParams.Net,
×
UNCOV
420
                                                        },
×
UNCOV
421
                                                )
×
UNCOV
422
                                        }
×
423

UNCOV
424
                                        s.mu.Lock()
×
UNCOV
425

×
UNCOV
426
                                        // Update the stored addresses for this
×
UNCOV
427
                                        // to peer to reflect the new set.
×
UNCOV
428
                                        s.persistentPeerAddrs[pubKeyStr] = addrs
×
UNCOV
429

×
UNCOV
430
                                        // If there are no outstanding
×
UNCOV
431
                                        // connection requests for this peer
×
UNCOV
432
                                        // then our work is done since we are
×
UNCOV
433
                                        // not currently trying to connect to
×
UNCOV
434
                                        // them.
×
UNCOV
435
                                        if len(s.persistentConnReqs[pubKeyStr]) == 0 {
×
UNCOV
436
                                                s.mu.Unlock()
×
UNCOV
437
                                                continue
×
438
                                        }
439

UNCOV
440
                                        s.mu.Unlock()
×
UNCOV
441

×
UNCOV
442
                                        s.connectToPersistentPeer(pubKeyStr)
×
443
                                }
444
                        }
445
                }
446
        }()
447

UNCOV
448
        return nil
×
449
}
450

451
// CustomMessage is a custom message that is received from a peer.
452
type CustomMessage struct {
453
        // Peer is the peer pubkey
454
        Peer [33]byte
455

456
        // Msg is the custom wire message.
457
        Msg *lnwire.Custom
458
}
459

460
// parseAddr parses an address from its string format to a net.Addr.
UNCOV
461
func parseAddr(address string, netCfg tor.Net) (net.Addr, error) {
×
UNCOV
462
        var (
×
UNCOV
463
                host string
×
UNCOV
464
                port int
×
UNCOV
465
        )
×
UNCOV
466

×
UNCOV
467
        // Split the address into its host and port components.
×
UNCOV
468
        h, p, err := net.SplitHostPort(address)
×
UNCOV
469
        if err != nil {
×
470
                // If a port wasn't specified, we'll assume the address only
×
471
                // contains the host so we'll use the default port.
×
472
                host = address
×
473
                port = defaultPeerPort
×
UNCOV
474
        } else {
×
UNCOV
475
                // Otherwise, we'll note both the host and ports.
×
UNCOV
476
                host = h
×
UNCOV
477
                portNum, err := strconv.Atoi(p)
×
UNCOV
478
                if err != nil {
×
479
                        return nil, err
×
480
                }
×
UNCOV
481
                port = portNum
×
482
        }
483

UNCOV
484
        if tor.IsOnionHost(host) {
×
485
                return &tor.OnionAddr{OnionService: host, Port: port}, nil
×
486
        }
×
487

488
        // If the host is part of a TCP address, we'll use the network
489
        // specific ResolveTCPAddr function in order to resolve these
490
        // addresses over Tor in order to prevent leaking your real IP
491
        // address.
UNCOV
492
        hostPort := net.JoinHostPort(host, strconv.Itoa(port))
×
UNCOV
493
        return netCfg.ResolveTCPAddr("tcp", hostPort)
×
494
}
495

496
// noiseDial is a factory function which creates a connmgr compliant dialing
497
// function by returning a closure which includes the server's identity key.
498
func noiseDial(idKey keychain.SingleKeyECDH,
UNCOV
499
        netCfg tor.Net, timeout time.Duration) func(net.Addr) (net.Conn, error) {
×
UNCOV
500

×
UNCOV
501
        return func(a net.Addr) (net.Conn, error) {
×
UNCOV
502
                lnAddr := a.(*lnwire.NetAddress)
×
UNCOV
503
                return brontide.Dial(idKey, lnAddr, timeout, netCfg.Dial)
×
UNCOV
504
        }
×
505
}
506

507
// newServer creates a new instance of the server which is to listen using the
508
// passed listener address.
509
func newServer(cfg *Config, listenAddrs []net.Addr,
510
        dbs *DatabaseInstances, cc *chainreg.ChainControl,
511
        nodeKeyDesc *keychain.KeyDescriptor,
512
        chansToRestore walletunlocker.ChannelsToRecover,
513
        chanPredicate chanacceptor.ChannelAcceptor,
514
        torController *tor.Controller, tlsManager *TLSManager,
515
        leaderElector cluster.LeaderElector,
UNCOV
516
        implCfg *ImplementationCfg) (*server, error) {
×
UNCOV
517

×
UNCOV
518
        var (
×
UNCOV
519
                err         error
×
UNCOV
520
                nodeKeyECDH = keychain.NewPubKeyECDH(*nodeKeyDesc, cc.KeyRing)
×
UNCOV
521

×
UNCOV
522
                // We just derived the full descriptor, so we know the public
×
UNCOV
523
                // key is set on it.
×
UNCOV
524
                nodeKeySigner = keychain.NewPubKeyMessageSigner(
×
UNCOV
525
                        nodeKeyDesc.PubKey, nodeKeyDesc.KeyLocator, cc.KeyRing,
×
UNCOV
526
                )
×
UNCOV
527
        )
×
UNCOV
528

×
UNCOV
529
        listeners := make([]net.Listener, len(listenAddrs))
×
UNCOV
530
        for i, listenAddr := range listenAddrs {
×
UNCOV
531
                // Note: though brontide.NewListener uses ResolveTCPAddr, it
×
UNCOV
532
                // doesn't need to call the general lndResolveTCP function
×
UNCOV
533
                // since we are resolving a local address.
×
UNCOV
534
                listeners[i], err = brontide.NewListener(
×
UNCOV
535
                        nodeKeyECDH, listenAddr.String(),
×
UNCOV
536
                )
×
UNCOV
537
                if err != nil {
×
538
                        return nil, err
×
539
                }
×
540
        }
541

UNCOV
542
        var serializedPubKey [33]byte
×
UNCOV
543
        copy(serializedPubKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
UNCOV
544

×
UNCOV
545
        netParams := cfg.ActiveNetParams.Params
×
UNCOV
546

×
UNCOV
547
        // Initialize the sphinx router.
×
UNCOV
548
        replayLog := htlcswitch.NewDecayedLog(
×
UNCOV
549
                dbs.DecayedLogDB, cc.ChainNotifier,
×
UNCOV
550
        )
×
UNCOV
551
        sphinxRouter := sphinx.NewRouter(nodeKeyECDH, replayLog)
×
UNCOV
552

×
UNCOV
553
        writeBufferPool := pool.NewWriteBuffer(
×
UNCOV
554
                pool.DefaultWriteBufferGCInterval,
×
UNCOV
555
                pool.DefaultWriteBufferExpiryInterval,
×
UNCOV
556
        )
×
UNCOV
557

×
UNCOV
558
        writePool := pool.NewWrite(
×
UNCOV
559
                writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout,
×
UNCOV
560
        )
×
UNCOV
561

×
UNCOV
562
        readBufferPool := pool.NewReadBuffer(
×
UNCOV
563
                pool.DefaultReadBufferGCInterval,
×
UNCOV
564
                pool.DefaultReadBufferExpiryInterval,
×
UNCOV
565
        )
×
UNCOV
566

×
UNCOV
567
        readPool := pool.NewRead(
×
UNCOV
568
                readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout,
×
UNCOV
569
        )
×
UNCOV
570

×
UNCOV
571
        // If the taproot overlay flag is set, but we don't have an aux funding
×
UNCOV
572
        // controller, then we'll exit as this is incompatible.
×
UNCOV
573
        if cfg.ProtocolOptions.TaprootOverlayChans &&
×
UNCOV
574
                implCfg.AuxFundingController.IsNone() {
×
575

×
576
                return nil, fmt.Errorf("taproot overlay flag set, but not " +
×
577
                        "aux controllers")
×
578
        }
×
579

580
        //nolint:ll
UNCOV
581
        featureMgr, err := feature.NewManager(feature.Config{
×
UNCOV
582
                NoTLVOnion:                cfg.ProtocolOptions.LegacyOnion(),
×
UNCOV
583
                NoStaticRemoteKey:         cfg.ProtocolOptions.NoStaticRemoteKey(),
×
UNCOV
584
                NoAnchors:                 cfg.ProtocolOptions.NoAnchorCommitments(),
×
UNCOV
585
                NoWumbo:                   !cfg.ProtocolOptions.Wumbo(),
×
UNCOV
586
                NoScriptEnforcementLease:  cfg.ProtocolOptions.NoScriptEnforcementLease(),
×
UNCOV
587
                NoKeysend:                 !cfg.AcceptKeySend,
×
UNCOV
588
                NoOptionScidAlias:         !cfg.ProtocolOptions.ScidAlias(),
×
UNCOV
589
                NoZeroConf:                !cfg.ProtocolOptions.ZeroConf(),
×
UNCOV
590
                NoAnySegwit:               cfg.ProtocolOptions.NoAnySegwit(),
×
UNCOV
591
                CustomFeatures:            cfg.ProtocolOptions.CustomFeatures(),
×
UNCOV
592
                NoTaprootChans:            !cfg.ProtocolOptions.TaprootChans,
×
UNCOV
593
                NoTaprootOverlay:          !cfg.ProtocolOptions.TaprootOverlayChans,
×
UNCOV
594
                NoRouteBlinding:           cfg.ProtocolOptions.NoRouteBlinding(),
×
UNCOV
595
                NoExperimentalEndorsement: cfg.ProtocolOptions.NoExperimentalEndorsement(),
×
UNCOV
596
                NoQuiescence:              cfg.ProtocolOptions.NoQuiescence(),
×
UNCOV
597
        })
×
UNCOV
598
        if err != nil {
×
599
                return nil, err
×
600
        }
×
601

UNCOV
602
        invoiceHtlcModifier := invoices.NewHtlcModificationInterceptor()
×
UNCOV
603
        registryConfig := invoices.RegistryConfig{
×
UNCOV
604
                FinalCltvRejectDelta:        lncfg.DefaultFinalCltvRejectDelta,
×
UNCOV
605
                HtlcHoldDuration:            invoices.DefaultHtlcHoldDuration,
×
UNCOV
606
                Clock:                       clock.NewDefaultClock(),
×
UNCOV
607
                AcceptKeySend:               cfg.AcceptKeySend,
×
UNCOV
608
                AcceptAMP:                   cfg.AcceptAMP,
×
UNCOV
609
                GcCanceledInvoicesOnStartup: cfg.GcCanceledInvoicesOnStartup,
×
UNCOV
610
                GcCanceledInvoicesOnTheFly:  cfg.GcCanceledInvoicesOnTheFly,
×
UNCOV
611
                KeysendHoldTime:             cfg.KeysendHoldTime,
×
UNCOV
612
                HtlcInterceptor:             invoiceHtlcModifier,
×
UNCOV
613
        }
×
UNCOV
614

×
UNCOV
615
        addrSource := channeldb.NewMultiAddrSource(dbs.ChanStateDB, dbs.GraphDB)
×
UNCOV
616

×
UNCOV
617
        s := &server{
×
UNCOV
618
                cfg:            cfg,
×
UNCOV
619
                implCfg:        implCfg,
×
UNCOV
620
                graphDB:        dbs.GraphDB,
×
UNCOV
621
                chanStateDB:    dbs.ChanStateDB.ChannelStateDB(),
×
UNCOV
622
                addrSource:     addrSource,
×
UNCOV
623
                miscDB:         dbs.ChanStateDB,
×
UNCOV
624
                invoicesDB:     dbs.InvoiceDB,
×
UNCOV
625
                cc:             cc,
×
UNCOV
626
                sigPool:        lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer),
×
UNCOV
627
                writePool:      writePool,
×
UNCOV
628
                readPool:       readPool,
×
UNCOV
629
                chansToRestore: chansToRestore,
×
UNCOV
630

×
UNCOV
631
                blockbeatDispatcher: chainio.NewBlockbeatDispatcher(
×
UNCOV
632
                        cc.ChainNotifier,
×
UNCOV
633
                ),
×
UNCOV
634
                channelNotifier: channelnotifier.New(
×
UNCOV
635
                        dbs.ChanStateDB.ChannelStateDB(),
×
UNCOV
636
                ),
×
UNCOV
637

×
UNCOV
638
                identityECDH:   nodeKeyECDH,
×
UNCOV
639
                identityKeyLoc: nodeKeyDesc.KeyLocator,
×
UNCOV
640
                nodeSigner:     netann.NewNodeSigner(nodeKeySigner),
×
UNCOV
641

×
UNCOV
642
                listenAddrs: listenAddrs,
×
UNCOV
643

×
UNCOV
644
                // TODO(roasbeef): derive proper onion key based on rotation
×
UNCOV
645
                // schedule
×
UNCOV
646
                sphinx: hop.NewOnionProcessor(sphinxRouter),
×
UNCOV
647

×
UNCOV
648
                torController: torController,
×
UNCOV
649

×
UNCOV
650
                persistentPeers:         make(map[string]bool),
×
UNCOV
651
                persistentPeersBackoff:  make(map[string]time.Duration),
×
UNCOV
652
                persistentConnReqs:      make(map[string][]*connmgr.ConnReq),
×
UNCOV
653
                persistentPeerAddrs:     make(map[string][]*lnwire.NetAddress),
×
UNCOV
654
                persistentRetryCancels:  make(map[string]chan struct{}),
×
UNCOV
655
                peerErrors:              make(map[string]*queue.CircularBuffer),
×
UNCOV
656
                ignorePeerTermination:   make(map[*peer.Brontide]struct{}),
×
UNCOV
657
                scheduledPeerConnection: make(map[string]func()),
×
UNCOV
658
                pongBuf:                 make([]byte, lnwire.MaxPongBytes),
×
UNCOV
659

×
UNCOV
660
                peersByPub:                make(map[string]*peer.Brontide),
×
UNCOV
661
                inboundPeers:              make(map[string]*peer.Brontide),
×
UNCOV
662
                outboundPeers:             make(map[string]*peer.Brontide),
×
UNCOV
663
                peerConnectedListeners:    make(map[string][]chan<- lnpeer.Peer),
×
UNCOV
664
                peerDisconnectedListeners: make(map[string][]chan<- struct{}),
×
UNCOV
665

×
UNCOV
666
                invoiceHtlcModifier: invoiceHtlcModifier,
×
UNCOV
667

×
UNCOV
668
                customMessageServer: subscribe.NewServer(),
×
UNCOV
669

×
UNCOV
670
                tlsManager: tlsManager,
×
UNCOV
671

×
UNCOV
672
                featureMgr: featureMgr,
×
UNCOV
673
                quit:       make(chan struct{}),
×
UNCOV
674
        }
×
UNCOV
675

×
UNCOV
676
        // Start the low-level services once they are initialized.
×
UNCOV
677
        //
×
UNCOV
678
        // TODO(yy): break the server startup into four steps,
×
UNCOV
679
        // 1. init the low-level services.
×
UNCOV
680
        // 2. start the low-level services.
×
UNCOV
681
        // 3. init the high-level services.
×
UNCOV
682
        // 4. start the high-level services.
×
UNCOV
683
        if err := s.startLowLevelServices(); err != nil {
×
684
                return nil, err
×
685
        }
×
686

UNCOV
687
        currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock()
×
UNCOV
688
        if err != nil {
×
689
                return nil, err
×
690
        }
×
691

UNCOV
692
        expiryWatcher := invoices.NewInvoiceExpiryWatcher(
×
UNCOV
693
                clock.NewDefaultClock(), cfg.Invoices.HoldExpiryDelta,
×
UNCOV
694
                uint32(currentHeight), currentHash, cc.ChainNotifier,
×
UNCOV
695
        )
×
UNCOV
696
        s.invoices = invoices.NewRegistry(
×
UNCOV
697
                dbs.InvoiceDB, expiryWatcher, &registryConfig,
×
UNCOV
698
        )
×
UNCOV
699

×
UNCOV
700
        s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
×
UNCOV
701

×
UNCOV
702
        thresholdSats := btcutil.Amount(cfg.MaxFeeExposure)
×
UNCOV
703
        thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
×
UNCOV
704

×
UNCOV
705
        linkUpdater := func(shortID lnwire.ShortChannelID) error {
×
UNCOV
706
                link, err := s.htlcSwitch.GetLinkByShortID(shortID)
×
UNCOV
707
                if err != nil {
×
708
                        return err
×
709
                }
×
710

UNCOV
711
                s.htlcSwitch.UpdateLinkAliases(link)
×
UNCOV
712

×
UNCOV
713
                return nil
×
714
        }
715

UNCOV
716
        s.aliasMgr, err = aliasmgr.NewManager(dbs.ChanStateDB, linkUpdater)
×
UNCOV
717
        if err != nil {
×
718
                return nil, err
×
719
        }
×
720

UNCOV
721
        s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{
×
UNCOV
722
                DB:                   dbs.ChanStateDB,
×
UNCOV
723
                FetchAllOpenChannels: s.chanStateDB.FetchAllOpenChannels,
×
UNCOV
724
                FetchAllChannels:     s.chanStateDB.FetchAllChannels,
×
UNCOV
725
                FetchClosedChannels:  s.chanStateDB.FetchClosedChannels,
×
UNCOV
726
                LocalChannelClose: func(pubKey []byte,
×
UNCOV
727
                        request *htlcswitch.ChanClose) {
×
UNCOV
728

×
UNCOV
729
                        peer, err := s.FindPeerByPubStr(string(pubKey))
×
UNCOV
730
                        if err != nil {
×
731
                                srvrLog.Errorf("unable to close channel, peer"+
×
732
                                        " with %v id can't be found: %v",
×
733
                                        pubKey, err,
×
734
                                )
×
735
                                return
×
736
                        }
×
737

UNCOV
738
                        peer.HandleLocalCloseChanReqs(request)
×
739
                },
740
                FwdingLog:              dbs.ChanStateDB.ForwardingLog(),
741
                SwitchPackager:         channeldb.NewSwitchPackager(),
742
                ExtractErrorEncrypter:  s.sphinx.ExtractErrorEncrypter,
743
                FetchLastChannelUpdate: s.fetchLastChanUpdate(),
744
                Notifier:               s.cc.ChainNotifier,
745
                HtlcNotifier:           s.htlcNotifier,
746
                FwdEventTicker:         ticker.New(htlcswitch.DefaultFwdEventInterval),
747
                LogEventTicker:         ticker.New(htlcswitch.DefaultLogInterval),
748
                AckEventTicker:         ticker.New(htlcswitch.DefaultAckInterval),
749
                AllowCircularRoute:     cfg.AllowCircularRoute,
750
                RejectHTLC:             cfg.RejectHTLC,
751
                Clock:                  clock.NewDefaultClock(),
752
                MailboxDeliveryTimeout: cfg.Htlcswitch.MailboxDeliveryTimeout,
753
                MaxFeeExposure:         thresholdMSats,
754
                SignAliasUpdate:        s.signAliasUpdate,
755
                IsAlias:                aliasmgr.IsAlias,
756
        }, uint32(currentHeight))
UNCOV
757
        if err != nil {
×
758
                return nil, err
×
759
        }
×
UNCOV
760
        s.interceptableSwitch, err = htlcswitch.NewInterceptableSwitch(
×
UNCOV
761
                &htlcswitch.InterceptableSwitchConfig{
×
UNCOV
762
                        Switch:             s.htlcSwitch,
×
UNCOV
763
                        CltvRejectDelta:    lncfg.DefaultFinalCltvRejectDelta,
×
UNCOV
764
                        CltvInterceptDelta: lncfg.DefaultCltvInterceptDelta,
×
UNCOV
765
                        RequireInterceptor: s.cfg.RequireInterceptor,
×
UNCOV
766
                        Notifier:           s.cc.ChainNotifier,
×
UNCOV
767
                },
×
UNCOV
768
        )
×
UNCOV
769
        if err != nil {
×
770
                return nil, err
×
771
        }
×
772

UNCOV
773
        s.witnessBeacon = newPreimageBeacon(
×
UNCOV
774
                dbs.ChanStateDB.NewWitnessCache(),
×
UNCOV
775
                s.interceptableSwitch.ForwardPacket,
×
UNCOV
776
        )
×
UNCOV
777

×
UNCOV
778
        chanStatusMgrCfg := &netann.ChanStatusConfig{
×
UNCOV
779
                ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
×
UNCOV
780
                ChanEnableTimeout:        cfg.ChanEnableTimeout,
×
UNCOV
781
                ChanDisableTimeout:       cfg.ChanDisableTimeout,
×
UNCOV
782
                OurPubKey:                nodeKeyDesc.PubKey,
×
UNCOV
783
                OurKeyLoc:                nodeKeyDesc.KeyLocator,
×
UNCOV
784
                MessageSigner:            s.nodeSigner,
×
UNCOV
785
                IsChannelActive:          s.htlcSwitch.HasActiveLink,
×
UNCOV
786
                ApplyChannelUpdate:       s.applyChannelUpdate,
×
UNCOV
787
                DB:                       s.chanStateDB,
×
UNCOV
788
                Graph:                    dbs.GraphDB,
×
UNCOV
789
        }
×
UNCOV
790

×
UNCOV
791
        chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
×
UNCOV
792
        if err != nil {
×
793
                return nil, err
×
794
        }
×
UNCOV
795
        s.chanStatusMgr = chanStatusMgr
×
UNCOV
796

×
UNCOV
797
        // If enabled, use either UPnP or NAT-PMP to automatically configure
×
UNCOV
798
        // port forwarding for users behind a NAT.
×
UNCOV
799
        if cfg.NAT {
×
800
                srvrLog.Info("Scanning local network for a UPnP enabled device")
×
801

×
802
                discoveryTimeout := time.Duration(10 * time.Second)
×
803

×
804
                ctx, cancel := context.WithTimeout(
×
805
                        context.Background(), discoveryTimeout,
×
806
                )
×
807
                defer cancel()
×
808
                upnp, err := nat.DiscoverUPnP(ctx)
×
809
                if err == nil {
×
810
                        s.natTraversal = upnp
×
811
                } else {
×
812
                        // If we were not able to discover a UPnP enabled device
×
813
                        // on the local network, we'll fall back to attempting
×
814
                        // to discover a NAT-PMP enabled device.
×
815
                        srvrLog.Errorf("Unable to discover a UPnP enabled "+
×
816
                                "device on the local network: %v", err)
×
817

×
818
                        srvrLog.Info("Scanning local network for a NAT-PMP " +
×
819
                                "enabled device")
×
820

×
821
                        pmp, err := nat.DiscoverPMP(discoveryTimeout)
×
822
                        if err != nil {
×
823
                                err := fmt.Errorf("unable to discover a "+
×
824
                                        "NAT-PMP enabled device on the local "+
×
825
                                        "network: %v", err)
×
826
                                srvrLog.Error(err)
×
827
                                return nil, err
×
828
                        }
×
829

830
                        s.natTraversal = pmp
×
831
                }
832
        }
833

834
        // If we were requested to automatically configure port forwarding,
835
        // we'll use the ports that the server will be listening on.
UNCOV
836
        externalIPStrings := make([]string, len(cfg.ExternalIPs))
×
UNCOV
837
        for idx, ip := range cfg.ExternalIPs {
×
UNCOV
838
                externalIPStrings[idx] = ip.String()
×
UNCOV
839
        }
×
UNCOV
840
        if s.natTraversal != nil {
×
841
                listenPorts := make([]uint16, 0, len(listenAddrs))
×
842
                for _, listenAddr := range listenAddrs {
×
843
                        // At this point, the listen addresses should have
×
844
                        // already been normalized, so it's safe to ignore the
×
845
                        // errors.
×
846
                        _, portStr, _ := net.SplitHostPort(listenAddr.String())
×
847
                        port, _ := strconv.Atoi(portStr)
×
848

×
849
                        listenPorts = append(listenPorts, uint16(port))
×
850
                }
×
851

852
                ips, err := s.configurePortForwarding(listenPorts...)
×
853
                if err != nil {
×
854
                        srvrLog.Errorf("Unable to automatically set up port "+
×
855
                                "forwarding using %s: %v",
×
856
                                s.natTraversal.Name(), err)
×
857
                } else {
×
858
                        srvrLog.Infof("Automatically set up port forwarding "+
×
859
                                "using %s to advertise external IP",
×
860
                                s.natTraversal.Name())
×
861
                        externalIPStrings = append(externalIPStrings, ips...)
×
862
                }
×
863
        }
864

865
        // If external IP addresses have been specified, add those to the list
866
        // of this server's addresses.
UNCOV
867
        externalIPs, err := lncfg.NormalizeAddresses(
×
UNCOV
868
                externalIPStrings, strconv.Itoa(defaultPeerPort),
×
UNCOV
869
                cfg.net.ResolveTCPAddr,
×
UNCOV
870
        )
×
UNCOV
871
        if err != nil {
×
872
                return nil, err
×
873
        }
×
874

UNCOV
875
        selfAddrs := make([]net.Addr, 0, len(externalIPs))
×
UNCOV
876
        selfAddrs = append(selfAddrs, externalIPs...)
×
UNCOV
877

×
UNCOV
878
        // We'll now reconstruct a node announcement based on our current
×
UNCOV
879
        // configuration so we can send it out as a sort of heart beat within
×
UNCOV
880
        // the network.
×
UNCOV
881
        //
×
UNCOV
882
        // We'll start by parsing the node color from configuration.
×
UNCOV
883
        color, err := lncfg.ParseHexColor(cfg.Color)
×
UNCOV
884
        if err != nil {
×
885
                srvrLog.Errorf("unable to parse color: %v\n", err)
×
886
                return nil, err
×
887
        }
×
888

889
        // If no alias is provided, default to first 10 characters of public
890
        // key.
UNCOV
891
        alias := cfg.Alias
×
UNCOV
892
        if alias == "" {
×
UNCOV
893
                alias = hex.EncodeToString(serializedPubKey[:10])
×
UNCOV
894
        }
×
UNCOV
895
        nodeAlias, err := lnwire.NewNodeAlias(alias)
×
UNCOV
896
        if err != nil {
×
897
                return nil, err
×
898
        }
×
UNCOV
899
        selfNode := &models.LightningNode{
×
UNCOV
900
                HaveNodeAnnouncement: true,
×
UNCOV
901
                LastUpdate:           time.Now(),
×
UNCOV
902
                Addresses:            selfAddrs,
×
UNCOV
903
                Alias:                nodeAlias.String(),
×
UNCOV
904
                Features:             s.featureMgr.Get(feature.SetNodeAnn),
×
UNCOV
905
                Color:                color,
×
UNCOV
906
        }
×
UNCOV
907
        copy(selfNode.PubKeyBytes[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
UNCOV
908

×
UNCOV
909
        // Based on the disk representation of the node announcement generated
×
UNCOV
910
        // above, we'll generate a node announcement that can go out on the
×
UNCOV
911
        // network so we can properly sign it.
×
UNCOV
912
        nodeAnn, err := selfNode.NodeAnnouncement(false)
×
UNCOV
913
        if err != nil {
×
914
                return nil, fmt.Errorf("unable to gen self node ann: %w", err)
×
915
        }
×
916

917
        // With the announcement generated, we'll sign it to properly
918
        // authenticate the message on the network.
UNCOV
919
        authSig, err := netann.SignAnnouncement(
×
UNCOV
920
                s.nodeSigner, nodeKeyDesc.KeyLocator, nodeAnn,
×
UNCOV
921
        )
×
UNCOV
922
        if err != nil {
×
923
                return nil, fmt.Errorf("unable to generate signature for "+
×
924
                        "self node announcement: %v", err)
×
925
        }
×
UNCOV
926
        selfNode.AuthSigBytes = authSig.Serialize()
×
UNCOV
927
        nodeAnn.Signature, err = lnwire.NewSigFromECDSARawSignature(
×
UNCOV
928
                selfNode.AuthSigBytes,
×
UNCOV
929
        )
×
UNCOV
930
        if err != nil {
×
931
                return nil, err
×
932
        }
×
933

934
        // Finally, we'll update the representation on disk, and update our
935
        // cached in-memory version as well.
UNCOV
936
        if err := dbs.GraphDB.SetSourceNode(selfNode); err != nil {
×
937
                return nil, fmt.Errorf("can't set self node: %w", err)
×
938
        }
×
UNCOV
939
        s.currentNodeAnn = nodeAnn
×
UNCOV
940

×
UNCOV
941
        // The router will get access to the payment ID sequencer, such that it
×
UNCOV
942
        // can generate unique payment IDs.
×
UNCOV
943
        sequencer, err := htlcswitch.NewPersistentSequencer(dbs.ChanStateDB)
×
UNCOV
944
        if err != nil {
×
945
                return nil, err
×
946
        }
×
947

948
        // Instantiate mission control with config from the sub server.
949
        //
950
        // TODO(joostjager): When we are further in the process of moving to sub
951
        // servers, the mission control instance itself can be moved there too.
UNCOV
952
        routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
×
UNCOV
953

×
UNCOV
954
        // We only initialize a probability estimator if there's no custom one.
×
UNCOV
955
        var estimator routing.Estimator
×
UNCOV
956
        if cfg.Estimator != nil {
×
957
                estimator = cfg.Estimator
×
UNCOV
958
        } else {
×
UNCOV
959
                switch routingConfig.ProbabilityEstimatorType {
×
UNCOV
960
                case routing.AprioriEstimatorName:
×
UNCOV
961
                        aCfg := routingConfig.AprioriConfig
×
UNCOV
962
                        aprioriConfig := routing.AprioriConfig{
×
UNCOV
963
                                AprioriHopProbability: aCfg.HopProbability,
×
UNCOV
964
                                PenaltyHalfLife:       aCfg.PenaltyHalfLife,
×
UNCOV
965
                                AprioriWeight:         aCfg.Weight,
×
UNCOV
966
                                CapacityFraction:      aCfg.CapacityFraction,
×
UNCOV
967
                        }
×
UNCOV
968

×
UNCOV
969
                        estimator, err = routing.NewAprioriEstimator(
×
UNCOV
970
                                aprioriConfig,
×
UNCOV
971
                        )
×
UNCOV
972
                        if err != nil {
×
973
                                return nil, err
×
974
                        }
×
975

976
                case routing.BimodalEstimatorName:
×
977
                        bCfg := routingConfig.BimodalConfig
×
978
                        bimodalConfig := routing.BimodalConfig{
×
979
                                BimodalNodeWeight: bCfg.NodeWeight,
×
980
                                BimodalScaleMsat: lnwire.MilliSatoshi(
×
981
                                        bCfg.Scale,
×
982
                                ),
×
983
                                BimodalDecayTime: bCfg.DecayTime,
×
984
                        }
×
985

×
986
                        estimator, err = routing.NewBimodalEstimator(
×
987
                                bimodalConfig,
×
988
                        )
×
989
                        if err != nil {
×
990
                                return nil, err
×
991
                        }
×
992

993
                default:
×
994
                        return nil, fmt.Errorf("unknown estimator type %v",
×
995
                                routingConfig.ProbabilityEstimatorType)
×
996
                }
997
        }
998

UNCOV
999
        mcCfg := &routing.MissionControlConfig{
×
UNCOV
1000
                OnConfigUpdate:          fn.Some(s.UpdateRoutingConfig),
×
UNCOV
1001
                Estimator:               estimator,
×
UNCOV
1002
                MaxMcHistory:            routingConfig.MaxMcHistory,
×
UNCOV
1003
                McFlushInterval:         routingConfig.McFlushInterval,
×
UNCOV
1004
                MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
×
UNCOV
1005
        }
×
UNCOV
1006

×
UNCOV
1007
        s.missionController, err = routing.NewMissionController(
×
UNCOV
1008
                dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg,
×
UNCOV
1009
        )
×
UNCOV
1010
        if err != nil {
×
1011
                return nil, fmt.Errorf("can't create mission control "+
×
1012
                        "manager: %w", err)
×
1013
        }
×
UNCOV
1014
        s.defaultMC, err = s.missionController.GetNamespacedStore(
×
UNCOV
1015
                routing.DefaultMissionControlNamespace,
×
UNCOV
1016
        )
×
UNCOV
1017
        if err != nil {
×
1018
                return nil, fmt.Errorf("can't create mission control in the "+
×
1019
                        "default namespace: %w", err)
×
1020
        }
×
1021

UNCOV
1022
        srvrLog.Debugf("Instantiating payment session source with config: "+
×
UNCOV
1023
                "AttemptCost=%v + %v%%, MinRouteProbability=%v",
×
UNCOV
1024
                int64(routingConfig.AttemptCost),
×
UNCOV
1025
                float64(routingConfig.AttemptCostPPM)/10000,
×
UNCOV
1026
                routingConfig.MinRouteProbability)
×
UNCOV
1027

×
UNCOV
1028
        pathFindingConfig := routing.PathFindingConfig{
×
UNCOV
1029
                AttemptCost: lnwire.NewMSatFromSatoshis(
×
UNCOV
1030
                        routingConfig.AttemptCost,
×
UNCOV
1031
                ),
×
UNCOV
1032
                AttemptCostPPM: routingConfig.AttemptCostPPM,
×
UNCOV
1033
                MinProbability: routingConfig.MinRouteProbability,
×
UNCOV
1034
        }
×
UNCOV
1035

×
UNCOV
1036
        sourceNode, err := dbs.GraphDB.SourceNode()
×
UNCOV
1037
        if err != nil {
×
1038
                return nil, fmt.Errorf("error getting source node: %w", err)
×
1039
        }
×
UNCOV
1040
        paymentSessionSource := &routing.SessionSource{
×
UNCOV
1041
                GraphSessionFactory: graphsession.NewGraphSessionFactory(
×
UNCOV
1042
                        dbs.GraphDB,
×
UNCOV
1043
                ),
×
UNCOV
1044
                SourceNode:        sourceNode,
×
UNCOV
1045
                MissionControl:    s.defaultMC,
×
UNCOV
1046
                GetLink:           s.htlcSwitch.GetLinkByShortID,
×
UNCOV
1047
                PathFindingConfig: pathFindingConfig,
×
UNCOV
1048
        }
×
UNCOV
1049

×
UNCOV
1050
        paymentControl := channeldb.NewPaymentControl(dbs.ChanStateDB)
×
UNCOV
1051

×
UNCOV
1052
        s.controlTower = routing.NewControlTower(paymentControl)
×
UNCOV
1053

×
UNCOV
1054
        strictPruning := cfg.Bitcoin.Node == "neutrino" ||
×
UNCOV
1055
                cfg.Routing.StrictZombiePruning
×
UNCOV
1056

×
UNCOV
1057
        s.graphBuilder, err = graph.NewBuilder(&graph.Config{
×
UNCOV
1058
                SelfNode:            selfNode.PubKeyBytes,
×
UNCOV
1059
                Graph:               dbs.GraphDB,
×
UNCOV
1060
                Chain:               cc.ChainIO,
×
UNCOV
1061
                ChainView:           cc.ChainView,
×
UNCOV
1062
                Notifier:            cc.ChainNotifier,
×
UNCOV
1063
                ChannelPruneExpiry:  graph.DefaultChannelPruneExpiry,
×
UNCOV
1064
                GraphPruneInterval:  time.Hour,
×
UNCOV
1065
                FirstTimePruneDelay: graph.DefaultFirstTimePruneDelay,
×
UNCOV
1066
                AssumeChannelValid:  cfg.Routing.AssumeChannelValid,
×
UNCOV
1067
                StrictZombiePruning: strictPruning,
×
UNCOV
1068
                IsAlias:             aliasmgr.IsAlias,
×
UNCOV
1069
        })
×
UNCOV
1070
        if err != nil {
×
1071
                return nil, fmt.Errorf("can't create graph builder: %w", err)
×
1072
        }
×
1073

UNCOV
1074
        s.chanRouter, err = routing.New(routing.Config{
×
UNCOV
1075
                SelfNode:           selfNode.PubKeyBytes,
×
UNCOV
1076
                RoutingGraph:       graphsession.NewRoutingGraph(dbs.GraphDB),
×
UNCOV
1077
                Chain:              cc.ChainIO,
×
UNCOV
1078
                Payer:              s.htlcSwitch,
×
UNCOV
1079
                Control:            s.controlTower,
×
UNCOV
1080
                MissionControl:     s.defaultMC,
×
UNCOV
1081
                SessionSource:      paymentSessionSource,
×
UNCOV
1082
                GetLink:            s.htlcSwitch.GetLinkByShortID,
×
UNCOV
1083
                NextPaymentID:      sequencer.NextID,
×
UNCOV
1084
                PathFindingConfig:  pathFindingConfig,
×
UNCOV
1085
                Clock:              clock.NewDefaultClock(),
×
UNCOV
1086
                ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate,
×
UNCOV
1087
                ClosedSCIDs:        s.fetchClosedChannelSCIDs(),
×
UNCOV
1088
                TrafficShaper:      implCfg.TrafficShaper,
×
UNCOV
1089
        })
×
UNCOV
1090
        if err != nil {
×
1091
                return nil, fmt.Errorf("can't create router: %w", err)
×
1092
        }
×
1093

UNCOV
1094
        chanSeries := discovery.NewChanSeries(s.graphDB)
×
UNCOV
1095
        gossipMessageStore, err := discovery.NewMessageStore(dbs.ChanStateDB)
×
UNCOV
1096
        if err != nil {
×
1097
                return nil, err
×
1098
        }
×
UNCOV
1099
        waitingProofStore, err := channeldb.NewWaitingProofStore(dbs.ChanStateDB)
×
UNCOV
1100
        if err != nil {
×
1101
                return nil, err
×
1102
        }
×
1103

UNCOV
1104
        scidCloserMan := discovery.NewScidCloserMan(s.graphDB, s.chanStateDB)
×
UNCOV
1105

×
UNCOV
1106
        s.authGossiper = discovery.New(discovery.Config{
×
UNCOV
1107
                Graph:                 s.graphBuilder,
×
UNCOV
1108
                ChainIO:               s.cc.ChainIO,
×
UNCOV
1109
                Notifier:              s.cc.ChainNotifier,
×
UNCOV
1110
                ChainHash:             *s.cfg.ActiveNetParams.GenesisHash,
×
UNCOV
1111
                Broadcast:             s.BroadcastMessage,
×
UNCOV
1112
                ChanSeries:            chanSeries,
×
UNCOV
1113
                NotifyWhenOnline:      s.NotifyWhenOnline,
×
UNCOV
1114
                NotifyWhenOffline:     s.NotifyWhenOffline,
×
UNCOV
1115
                FetchSelfAnnouncement: s.getNodeAnnouncement,
×
UNCOV
1116
                UpdateSelfAnnouncement: func() (lnwire.NodeAnnouncement,
×
UNCOV
1117
                        error) {
×
1118

×
1119
                        return s.genNodeAnnouncement(nil)
×
1120
                },
×
1121
                ProofMatureDelta:        cfg.Gossip.AnnouncementConf,
1122
                TrickleDelay:            time.Millisecond * time.Duration(cfg.TrickleDelay),
1123
                RetransmitTicker:        ticker.New(time.Minute * 30),
1124
                RebroadcastInterval:     time.Hour * 24,
1125
                WaitingProofStore:       waitingProofStore,
1126
                MessageStore:            gossipMessageStore,
1127
                AnnSigner:               s.nodeSigner,
1128
                RotateTicker:            ticker.New(discovery.DefaultSyncerRotationInterval),
1129
                HistoricalSyncTicker:    ticker.New(cfg.HistoricalSyncInterval),
1130
                NumActiveSyncers:        cfg.NumGraphSyncPeers,
1131
                NoTimestampQueries:      cfg.ProtocolOptions.NoTimestampQueryOption, //nolint:ll
1132
                MinimumBatchSize:        10,
1133
                SubBatchDelay:           cfg.Gossip.SubBatchDelay,
1134
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters,
1135
                PinnedSyncers:           cfg.Gossip.PinnedSyncers,
1136
                MaxChannelUpdateBurst:   cfg.Gossip.MaxChannelUpdateBurst,
1137
                ChannelUpdateInterval:   cfg.Gossip.ChannelUpdateInterval,
1138
                IsAlias:                 aliasmgr.IsAlias,
1139
                SignAliasUpdate:         s.signAliasUpdate,
1140
                FindBaseByAlias:         s.aliasMgr.FindBaseSCID,
1141
                GetAlias:                s.aliasMgr.GetPeerAlias,
1142
                FindChannel:             s.findChannel,
1143
                IsStillZombieChannel:    s.graphBuilder.IsZombieChannel,
1144
                ScidCloser:              scidCloserMan,
1145
        }, nodeKeyDesc)
1146

UNCOV
1147
        selfVertex := route.Vertex(nodeKeyDesc.PubKey.SerializeCompressed())
×
UNCOV
1148
        //nolint:ll
×
UNCOV
1149
        s.localChanMgr = &localchans.Manager{
×
UNCOV
1150
                SelfPub:              nodeKeyDesc.PubKey,
×
UNCOV
1151
                DefaultRoutingPolicy: cc.RoutingPolicy,
×
UNCOV
1152
                ForAllOutgoingChannels: func(cb func(*models.ChannelEdgeInfo,
×
UNCOV
1153
                        *models.ChannelEdgePolicy) error) error {
×
UNCOV
1154

×
UNCOV
1155
                        return s.graphDB.ForEachNodeChannel(selfVertex,
×
UNCOV
1156
                                func(_ kvdb.RTx, c *models.ChannelEdgeInfo,
×
UNCOV
1157
                                        e *models.ChannelEdgePolicy,
×
UNCOV
1158
                                        _ *models.ChannelEdgePolicy) error {
×
UNCOV
1159

×
UNCOV
1160
                                        // NOTE: The invoked callback here may
×
UNCOV
1161
                                        // receive a nil channel policy.
×
UNCOV
1162
                                        return cb(c, e)
×
UNCOV
1163
                                },
×
1164
                        )
1165
                },
1166
                PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
1167
                UpdateForwardingPolicies:  s.htlcSwitch.UpdateForwardingPolicies,
1168
                FetchChannel:              s.chanStateDB.FetchChannel,
1169
                AddEdge: func(edge *models.ChannelEdgeInfo) error {
×
1170
                        return s.graphBuilder.AddEdge(edge)
×
1171
                },
×
1172
        }
1173

UNCOV
1174
        utxnStore, err := contractcourt.NewNurseryStore(
×
UNCOV
1175
                s.cfg.ActiveNetParams.GenesisHash, dbs.ChanStateDB,
×
UNCOV
1176
        )
×
UNCOV
1177
        if err != nil {
×
1178
                srvrLog.Errorf("unable to create nursery store: %v", err)
×
1179
                return nil, err
×
1180
        }
×
1181

UNCOV
1182
        sweeperStore, err := sweep.NewSweeperStore(
×
UNCOV
1183
                dbs.ChanStateDB, s.cfg.ActiveNetParams.GenesisHash,
×
UNCOV
1184
        )
×
UNCOV
1185
        if err != nil {
×
1186
                srvrLog.Errorf("unable to create sweeper store: %v", err)
×
1187
                return nil, err
×
1188
        }
×
1189

UNCOV
1190
        aggregator := sweep.NewBudgetAggregator(
×
UNCOV
1191
                cc.FeeEstimator, sweep.DefaultMaxInputsPerTx,
×
UNCOV
1192
                s.implCfg.AuxSweeper,
×
UNCOV
1193
        )
×
UNCOV
1194

×
UNCOV
1195
        s.txPublisher = sweep.NewTxPublisher(sweep.TxPublisherConfig{
×
UNCOV
1196
                Signer:     cc.Wallet.Cfg.Signer,
×
UNCOV
1197
                Wallet:     cc.Wallet,
×
UNCOV
1198
                Estimator:  cc.FeeEstimator,
×
UNCOV
1199
                Notifier:   cc.ChainNotifier,
×
UNCOV
1200
                AuxSweeper: s.implCfg.AuxSweeper,
×
UNCOV
1201
        })
×
UNCOV
1202

×
UNCOV
1203
        s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
×
UNCOV
1204
                FeeEstimator: cc.FeeEstimator,
×
UNCOV
1205
                GenSweepScript: newSweepPkScriptGen(
×
UNCOV
1206
                        cc.Wallet, s.cfg.ActiveNetParams.Params,
×
UNCOV
1207
                ),
×
UNCOV
1208
                Signer:               cc.Wallet.Cfg.Signer,
×
UNCOV
1209
                Wallet:               newSweeperWallet(cc.Wallet),
×
UNCOV
1210
                Mempool:              cc.MempoolNotifier,
×
UNCOV
1211
                Notifier:             cc.ChainNotifier,
×
UNCOV
1212
                Store:                sweeperStore,
×
UNCOV
1213
                MaxInputsPerTx:       sweep.DefaultMaxInputsPerTx,
×
UNCOV
1214
                MaxFeeRate:           cfg.Sweeper.MaxFeeRate,
×
UNCOV
1215
                Aggregator:           aggregator,
×
UNCOV
1216
                Publisher:            s.txPublisher,
×
UNCOV
1217
                NoDeadlineConfTarget: cfg.Sweeper.NoDeadlineConfTarget,
×
UNCOV
1218
        })
×
UNCOV
1219

×
UNCOV
1220
        s.utxoNursery = contractcourt.NewUtxoNursery(&contractcourt.NurseryConfig{
×
UNCOV
1221
                ChainIO:             cc.ChainIO,
×
UNCOV
1222
                ConfDepth:           1,
×
UNCOV
1223
                FetchClosedChannels: s.chanStateDB.FetchClosedChannels,
×
UNCOV
1224
                FetchClosedChannel:  s.chanStateDB.FetchClosedChannel,
×
UNCOV
1225
                Notifier:            cc.ChainNotifier,
×
UNCOV
1226
                PublishTransaction:  cc.Wallet.PublishTransaction,
×
UNCOV
1227
                Store:               utxnStore,
×
UNCOV
1228
                SweepInput:          s.sweeper.SweepInput,
×
UNCOV
1229
                Budget:              s.cfg.Sweeper.Budget,
×
UNCOV
1230
        })
×
UNCOV
1231

×
UNCOV
1232
        // Construct a closure that wraps the htlcswitch's CloseLink method.
×
UNCOV
1233
        closeLink := func(chanPoint *wire.OutPoint,
×
UNCOV
1234
                closureType contractcourt.ChannelCloseType) {
×
UNCOV
1235
                // TODO(conner): Properly respect the update and error channels
×
UNCOV
1236
                // returned by CloseLink.
×
UNCOV
1237

×
UNCOV
1238
                // Instruct the switch to close the channel.  Provide no close out
×
UNCOV
1239
                // delivery script or target fee per kw because user input is not
×
UNCOV
1240
                // available when the remote peer closes the channel.
×
UNCOV
1241
                s.htlcSwitch.CloseLink(chanPoint, closureType, 0, 0, nil)
×
UNCOV
1242
        }
×
1243

1244
        // We will use the following channel to reliably hand off contract
1245
        // breach events from the ChannelArbitrator to the BreachArbitrator,
UNCOV
1246
        contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1)
×
UNCOV
1247

×
UNCOV
1248
        s.breachArbitrator = contractcourt.NewBreachArbitrator(
×
UNCOV
1249
                &contractcourt.BreachConfig{
×
UNCOV
1250
                        CloseLink: closeLink,
×
UNCOV
1251
                        DB:        s.chanStateDB,
×
UNCOV
1252
                        Estimator: s.cc.FeeEstimator,
×
UNCOV
1253
                        GenSweepScript: newSweepPkScriptGen(
×
UNCOV
1254
                                cc.Wallet, s.cfg.ActiveNetParams.Params,
×
UNCOV
1255
                        ),
×
UNCOV
1256
                        Notifier:           cc.ChainNotifier,
×
UNCOV
1257
                        PublishTransaction: cc.Wallet.PublishTransaction,
×
UNCOV
1258
                        ContractBreaches:   contractBreaches,
×
UNCOV
1259
                        Signer:             cc.Wallet.Cfg.Signer,
×
UNCOV
1260
                        Store: contractcourt.NewRetributionStore(
×
UNCOV
1261
                                dbs.ChanStateDB,
×
UNCOV
1262
                        ),
×
UNCOV
1263
                        AuxSweeper: s.implCfg.AuxSweeper,
×
UNCOV
1264
                },
×
UNCOV
1265
        )
×
UNCOV
1266

×
UNCOV
1267
        //nolint:ll
×
UNCOV
1268
        s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{
×
UNCOV
1269
                ChainHash:              *s.cfg.ActiveNetParams.GenesisHash,
×
UNCOV
1270
                IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta,
×
UNCOV
1271
                OutgoingBroadcastDelta: lncfg.DefaultOutgoingBroadcastDelta,
×
UNCOV
1272
                NewSweepAddr: func() ([]byte, error) {
×
1273
                        addr, err := newSweepPkScriptGen(
×
1274
                                cc.Wallet, netParams,
×
1275
                        )().Unpack()
×
1276
                        if err != nil {
×
1277
                                return nil, err
×
1278
                        }
×
1279

1280
                        return addr.DeliveryAddress, nil
×
1281
                },
1282
                PublishTx: cc.Wallet.PublishTransaction,
UNCOV
1283
                DeliverResolutionMsg: func(msgs ...contractcourt.ResolutionMsg) error {
×
UNCOV
1284
                        for _, msg := range msgs {
×
UNCOV
1285
                                err := s.htlcSwitch.ProcessContractResolution(msg)
×
UNCOV
1286
                                if err != nil {
×
1287
                                        return err
×
1288
                                }
×
1289
                        }
UNCOV
1290
                        return nil
×
1291
                },
1292
                IncubateOutputs: func(chanPoint wire.OutPoint,
1293
                        outHtlcRes fn.Option[lnwallet.OutgoingHtlcResolution],
1294
                        inHtlcRes fn.Option[lnwallet.IncomingHtlcResolution],
1295
                        broadcastHeight uint32,
UNCOV
1296
                        deadlineHeight fn.Option[int32]) error {
×
UNCOV
1297

×
UNCOV
1298
                        return s.utxoNursery.IncubateOutputs(
×
UNCOV
1299
                                chanPoint, outHtlcRes, inHtlcRes,
×
UNCOV
1300
                                broadcastHeight, deadlineHeight,
×
UNCOV
1301
                        )
×
UNCOV
1302
                },
×
1303
                PreimageDB:   s.witnessBeacon,
1304
                Notifier:     cc.ChainNotifier,
1305
                Mempool:      cc.MempoolNotifier,
1306
                Signer:       cc.Wallet.Cfg.Signer,
1307
                FeeEstimator: cc.FeeEstimator,
1308
                ChainIO:      cc.ChainIO,
UNCOV
1309
                MarkLinkInactive: func(chanPoint wire.OutPoint) error {
×
UNCOV
1310
                        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
×
UNCOV
1311
                        s.htlcSwitch.RemoveLink(chanID)
×
UNCOV
1312
                        return nil
×
UNCOV
1313
                },
×
1314
                IsOurAddress: cc.Wallet.IsOurAddress,
1315
                ContractBreach: func(chanPoint wire.OutPoint,
UNCOV
1316
                        breachRet *lnwallet.BreachRetribution) error {
×
UNCOV
1317

×
UNCOV
1318
                        // processACK will handle the BreachArbitrator ACKing
×
UNCOV
1319
                        // the event.
×
UNCOV
1320
                        finalErr := make(chan error, 1)
×
UNCOV
1321
                        processACK := func(brarErr error) {
×
UNCOV
1322
                                if brarErr != nil {
×
1323
                                        finalErr <- brarErr
×
1324
                                        return
×
1325
                                }
×
1326

1327
                                // If the BreachArbitrator successfully handled
1328
                                // the event, we can signal that the handoff
1329
                                // was successful.
UNCOV
1330
                                finalErr <- nil
×
1331
                        }
1332

UNCOV
1333
                        event := &contractcourt.ContractBreachEvent{
×
UNCOV
1334
                                ChanPoint:         chanPoint,
×
UNCOV
1335
                                ProcessACK:        processACK,
×
UNCOV
1336
                                BreachRetribution: breachRet,
×
UNCOV
1337
                        }
×
UNCOV
1338

×
UNCOV
1339
                        // Send the contract breach event to the
×
UNCOV
1340
                        // BreachArbitrator.
×
UNCOV
1341
                        select {
×
UNCOV
1342
                        case contractBreaches <- event:
×
1343
                        case <-s.quit:
×
1344
                                return ErrServerShuttingDown
×
1345
                        }
1346

1347
                        // We'll wait for a final error to be available from
1348
                        // the BreachArbitrator.
UNCOV
1349
                        select {
×
UNCOV
1350
                        case err := <-finalErr:
×
UNCOV
1351
                                return err
×
1352
                        case <-s.quit:
×
1353
                                return ErrServerShuttingDown
×
1354
                        }
1355
                },
UNCOV
1356
                DisableChannel: func(chanPoint wire.OutPoint) error {
×
UNCOV
1357
                        return s.chanStatusMgr.RequestDisable(chanPoint, false)
×
UNCOV
1358
                },
×
1359
                Sweeper:                       s.sweeper,
1360
                Registry:                      s.invoices,
1361
                NotifyClosedChannel:           s.channelNotifier.NotifyClosedChannelEvent,
1362
                NotifyFullyResolvedChannel:    s.channelNotifier.NotifyFullyResolvedChannelEvent,
1363
                OnionProcessor:                s.sphinx,
1364
                PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
1365
                IsForwardedHTLC:               s.htlcSwitch.IsForwardedHTLC,
1366
                Clock:                         clock.NewDefaultClock(),
1367
                SubscribeBreachComplete:       s.breachArbitrator.SubscribeBreachComplete,
1368
                PutFinalHtlcOutcome:           s.chanStateDB.PutOnchainFinalHtlcOutcome,
1369
                HtlcNotifier:                  s.htlcNotifier,
1370
                Budget:                        *s.cfg.Sweeper.Budget,
1371

1372
                // TODO(yy): remove this hack once PaymentCircuit is interfaced.
1373
                QueryIncomingCircuit: func(
UNCOV
1374
                        circuit models.CircuitKey) *models.CircuitKey {
×
UNCOV
1375

×
UNCOV
1376
                        // Get the circuit map.
×
UNCOV
1377
                        circuits := s.htlcSwitch.CircuitLookup()
×
UNCOV
1378

×
UNCOV
1379
                        // Lookup the outgoing circuit.
×
UNCOV
1380
                        pc := circuits.LookupOpenCircuit(circuit)
×
UNCOV
1381
                        if pc == nil {
×
UNCOV
1382
                                return nil
×
UNCOV
1383
                        }
×
1384

UNCOV
1385
                        return &pc.Incoming
×
1386
                },
1387
                AuxLeafStore: implCfg.AuxLeafStore,
1388
                AuxSigner:    implCfg.AuxSigner,
1389
                AuxResolver:  implCfg.AuxContractResolver,
1390
        }, dbs.ChanStateDB)
1391

1392
        // Select the configuration and funding parameters for Bitcoin.
UNCOV
1393
        chainCfg := cfg.Bitcoin
×
UNCOV
1394
        minRemoteDelay := funding.MinBtcRemoteDelay
×
UNCOV
1395
        maxRemoteDelay := funding.MaxBtcRemoteDelay
×
UNCOV
1396

×
UNCOV
1397
        var chanIDSeed [32]byte
×
UNCOV
1398
        if _, err := rand.Read(chanIDSeed[:]); err != nil {
×
1399
                return nil, err
×
1400
        }
×
1401

1402
        // Wrap the DeleteChannelEdges method so that the funding manager can
1403
        // use it without depending on several layers of indirection.
UNCOV
1404
        deleteAliasEdge := func(scid lnwire.ShortChannelID) (
×
UNCOV
1405
                *models.ChannelEdgePolicy, error) {
×
UNCOV
1406

×
UNCOV
1407
                info, e1, e2, err := s.graphDB.FetchChannelEdgesByID(
×
UNCOV
1408
                        scid.ToUint64(),
×
UNCOV
1409
                )
×
UNCOV
1410
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
×
1411
                        // This is unlikely but there is a slim chance of this
×
1412
                        // being hit if lnd was killed via SIGKILL and the
×
1413
                        // funding manager was stepping through the delete
×
1414
                        // alias edge logic.
×
1415
                        return nil, nil
×
UNCOV
1416
                } else if err != nil {
×
1417
                        return nil, err
×
1418
                }
×
1419

1420
                // Grab our key to find our policy.
UNCOV
1421
                var ourKey [33]byte
×
UNCOV
1422
                copy(ourKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
UNCOV
1423

×
UNCOV
1424
                var ourPolicy *models.ChannelEdgePolicy
×
UNCOV
1425
                if info != nil && info.NodeKey1Bytes == ourKey {
×
UNCOV
1426
                        ourPolicy = e1
×
UNCOV
1427
                } else {
×
UNCOV
1428
                        ourPolicy = e2
×
UNCOV
1429
                }
×
1430

UNCOV
1431
                if ourPolicy == nil {
×
1432
                        // Something is wrong, so return an error.
×
1433
                        return nil, fmt.Errorf("we don't have an edge")
×
1434
                }
×
1435

UNCOV
1436
                err = s.graphDB.DeleteChannelEdges(
×
UNCOV
1437
                        false, false, scid.ToUint64(),
×
UNCOV
1438
                )
×
UNCOV
1439
                return ourPolicy, err
×
1440
        }
1441

1442
        // For the reservationTimeout and the zombieSweeperInterval different
1443
        // values are set in case we are in a dev environment so enhance test
1444
        // capacilities.
UNCOV
1445
        reservationTimeout := chanfunding.DefaultReservationTimeout
×
UNCOV
1446
        zombieSweeperInterval := lncfg.DefaultZombieSweeperInterval
×
UNCOV
1447

×
UNCOV
1448
        // Get the development config for funding manager. If we are not in
×
UNCOV
1449
        // development mode, this would be nil.
×
UNCOV
1450
        var devCfg *funding.DevConfig
×
UNCOV
1451
        if lncfg.IsDevBuild() {
×
UNCOV
1452
                devCfg = &funding.DevConfig{
×
UNCOV
1453
                        ProcessChannelReadyWait: cfg.Dev.ChannelReadyWait(),
×
UNCOV
1454
                }
×
UNCOV
1455

×
UNCOV
1456
                reservationTimeout = cfg.Dev.GetReservationTimeout()
×
UNCOV
1457
                zombieSweeperInterval = cfg.Dev.GetZombieSweeperInterval()
×
UNCOV
1458

×
UNCOV
1459
                srvrLog.Debugf("Using the dev config for the fundingMgr: %v, "+
×
UNCOV
1460
                        "reservationTimeout=%v, zombieSweeperInterval=%v",
×
UNCOV
1461
                        devCfg, reservationTimeout, zombieSweeperInterval)
×
UNCOV
1462
        }
×
1463

1464
        //nolint:ll
UNCOV
1465
        s.fundingMgr, err = funding.NewFundingManager(funding.Config{
×
UNCOV
1466
                Dev:                devCfg,
×
UNCOV
1467
                NoWumboChans:       !cfg.ProtocolOptions.Wumbo(),
×
UNCOV
1468
                IDKey:              nodeKeyDesc.PubKey,
×
UNCOV
1469
                IDKeyLoc:           nodeKeyDesc.KeyLocator,
×
UNCOV
1470
                Wallet:             cc.Wallet,
×
UNCOV
1471
                PublishTransaction: cc.Wallet.PublishTransaction,
×
UNCOV
1472
                UpdateLabel: func(hash chainhash.Hash, label string) error {
×
UNCOV
1473
                        return cc.Wallet.LabelTransaction(hash, label, true)
×
UNCOV
1474
                },
×
1475
                Notifier:     cc.ChainNotifier,
1476
                ChannelDB:    s.chanStateDB,
1477
                FeeEstimator: cc.FeeEstimator,
1478
                SignMessage:  cc.MsgSigner.SignMessage,
1479
                CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement,
UNCOV
1480
                        error) {
×
UNCOV
1481

×
UNCOV
1482
                        return s.genNodeAnnouncement(nil)
×
UNCOV
1483
                },
×
1484
                SendAnnouncement:     s.authGossiper.ProcessLocalAnnouncement,
1485
                NotifyWhenOnline:     s.NotifyWhenOnline,
1486
                TempChanIDSeed:       chanIDSeed,
1487
                FindChannel:          s.findChannel,
1488
                DefaultRoutingPolicy: cc.RoutingPolicy,
1489
                DefaultMinHtlcIn:     cc.MinHtlcIn,
1490
                NumRequiredConfs: func(chanAmt btcutil.Amount,
UNCOV
1491
                        pushAmt lnwire.MilliSatoshi) uint16 {
×
UNCOV
1492
                        // For large channels we increase the number
×
UNCOV
1493
                        // of confirmations we require for the
×
UNCOV
1494
                        // channel to be considered open. As it is
×
UNCOV
1495
                        // always the responder that gets to choose
×
UNCOV
1496
                        // value, the pushAmt is value being pushed
×
UNCOV
1497
                        // to us. This means we have more to lose
×
UNCOV
1498
                        // in the case this gets re-orged out, and
×
UNCOV
1499
                        // we will require more confirmations before
×
UNCOV
1500
                        // we consider it open.
×
UNCOV
1501

×
UNCOV
1502
                        // In case the user has explicitly specified
×
UNCOV
1503
                        // a default value for the number of
×
UNCOV
1504
                        // confirmations, we use it.
×
UNCOV
1505
                        defaultConf := uint16(chainCfg.DefaultNumChanConfs)
×
UNCOV
1506
                        if defaultConf != 0 {
×
UNCOV
1507
                                return defaultConf
×
UNCOV
1508
                        }
×
1509

1510
                        minConf := uint64(3)
×
1511
                        maxConf := uint64(6)
×
1512

×
1513
                        // If this is a wumbo channel, then we'll require the
×
1514
                        // max amount of confirmations.
×
1515
                        if chanAmt > MaxFundingAmount {
×
1516
                                return uint16(maxConf)
×
1517
                        }
×
1518

1519
                        // If not we return a value scaled linearly
1520
                        // between 3 and 6, depending on channel size.
1521
                        // TODO(halseth): Use 1 as minimum?
1522
                        maxChannelSize := uint64(
×
1523
                                lnwire.NewMSatFromSatoshis(MaxFundingAmount))
×
1524
                        stake := lnwire.NewMSatFromSatoshis(chanAmt) + pushAmt
×
1525
                        conf := maxConf * uint64(stake) / maxChannelSize
×
1526
                        if conf < minConf {
×
1527
                                conf = minConf
×
1528
                        }
×
1529
                        if conf > maxConf {
×
1530
                                conf = maxConf
×
1531
                        }
×
1532
                        return uint16(conf)
×
1533
                },
UNCOV
1534
                RequiredRemoteDelay: func(chanAmt btcutil.Amount) uint16 {
×
UNCOV
1535
                        // We scale the remote CSV delay (the time the
×
UNCOV
1536
                        // remote have to claim funds in case of a unilateral
×
UNCOV
1537
                        // close) linearly from minRemoteDelay blocks
×
UNCOV
1538
                        // for small channels, to maxRemoteDelay blocks
×
UNCOV
1539
                        // for channels of size MaxFundingAmount.
×
UNCOV
1540

×
UNCOV
1541
                        // In case the user has explicitly specified
×
UNCOV
1542
                        // a default value for the remote delay, we
×
UNCOV
1543
                        // use it.
×
UNCOV
1544
                        defaultDelay := uint16(chainCfg.DefaultRemoteDelay)
×
UNCOV
1545
                        if defaultDelay > 0 {
×
UNCOV
1546
                                return defaultDelay
×
UNCOV
1547
                        }
×
1548

1549
                        // If this is a wumbo channel, then we'll require the
1550
                        // max value.
1551
                        if chanAmt > MaxFundingAmount {
×
1552
                                return maxRemoteDelay
×
1553
                        }
×
1554

1555
                        // If not we scale according to channel size.
1556
                        delay := uint16(btcutil.Amount(maxRemoteDelay) *
×
1557
                                chanAmt / MaxFundingAmount)
×
1558
                        if delay < minRemoteDelay {
×
1559
                                delay = minRemoteDelay
×
1560
                        }
×
1561
                        if delay > maxRemoteDelay {
×
1562
                                delay = maxRemoteDelay
×
1563
                        }
×
1564
                        return delay
×
1565
                },
1566
                WatchNewChannel: func(channel *channeldb.OpenChannel,
UNCOV
1567
                        peerKey *btcec.PublicKey) error {
×
UNCOV
1568

×
UNCOV
1569
                        // First, we'll mark this new peer as a persistent peer
×
UNCOV
1570
                        // for re-connection purposes. If the peer is not yet
×
UNCOV
1571
                        // tracked or the user hasn't requested it to be perm,
×
UNCOV
1572
                        // we'll set false to prevent the server from continuing
×
UNCOV
1573
                        // to connect to this peer even if the number of
×
UNCOV
1574
                        // channels with this peer is zero.
×
UNCOV
1575
                        s.mu.Lock()
×
UNCOV
1576
                        pubStr := string(peerKey.SerializeCompressed())
×
UNCOV
1577
                        if _, ok := s.persistentPeers[pubStr]; !ok {
×
UNCOV
1578
                                s.persistentPeers[pubStr] = false
×
UNCOV
1579
                        }
×
UNCOV
1580
                        s.mu.Unlock()
×
UNCOV
1581

×
UNCOV
1582
                        // With that taken care of, we'll send this channel to
×
UNCOV
1583
                        // the chain arb so it can react to on-chain events.
×
UNCOV
1584
                        return s.chainArb.WatchNewChannel(channel)
×
1585
                },
UNCOV
1586
                ReportShortChanID: func(chanPoint wire.OutPoint) error {
×
UNCOV
1587
                        cid := lnwire.NewChanIDFromOutPoint(chanPoint)
×
UNCOV
1588
                        return s.htlcSwitch.UpdateShortChanID(cid)
×
UNCOV
1589
                },
×
1590
                RequiredRemoteChanReserve: func(chanAmt,
UNCOV
1591
                        dustLimit btcutil.Amount) btcutil.Amount {
×
UNCOV
1592

×
UNCOV
1593
                        // By default, we'll require the remote peer to maintain
×
UNCOV
1594
                        // at least 1% of the total channel capacity at all
×
UNCOV
1595
                        // times. If this value ends up dipping below the dust
×
UNCOV
1596
                        // limit, then we'll use the dust limit itself as the
×
UNCOV
1597
                        // reserve as required by BOLT #2.
×
UNCOV
1598
                        reserve := chanAmt / 100
×
UNCOV
1599
                        if reserve < dustLimit {
×
UNCOV
1600
                                reserve = dustLimit
×
UNCOV
1601
                        }
×
1602

UNCOV
1603
                        return reserve
×
1604
                },
UNCOV
1605
                RequiredRemoteMaxValue: func(chanAmt btcutil.Amount) lnwire.MilliSatoshi {
×
UNCOV
1606
                        // By default, we'll allow the remote peer to fully
×
UNCOV
1607
                        // utilize the full bandwidth of the channel, minus our
×
UNCOV
1608
                        // required reserve.
×
UNCOV
1609
                        reserve := lnwire.NewMSatFromSatoshis(chanAmt / 100)
×
UNCOV
1610
                        return lnwire.NewMSatFromSatoshis(chanAmt) - reserve
×
UNCOV
1611
                },
×
UNCOV
1612
                RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 {
×
UNCOV
1613
                        if cfg.DefaultRemoteMaxHtlcs > 0 {
×
UNCOV
1614
                                return cfg.DefaultRemoteMaxHtlcs
×
UNCOV
1615
                        }
×
1616

1617
                        // By default, we'll permit them to utilize the full
1618
                        // channel bandwidth.
1619
                        return uint16(input.MaxHTLCNumber / 2)
×
1620
                },
1621
                ZombieSweeperInterval:         zombieSweeperInterval,
1622
                ReservationTimeout:            reservationTimeout,
1623
                MinChanSize:                   btcutil.Amount(cfg.MinChanSize),
1624
                MaxChanSize:                   btcutil.Amount(cfg.MaxChanSize),
1625
                MaxPendingChannels:            cfg.MaxPendingChannels,
1626
                RejectPush:                    cfg.RejectPush,
1627
                MaxLocalCSVDelay:              chainCfg.MaxLocalDelay,
1628
                NotifyOpenChannelEvent:        s.channelNotifier.NotifyOpenChannelEvent,
1629
                OpenChannelPredicate:          chanPredicate,
1630
                NotifyPendingOpenChannelEvent: s.channelNotifier.NotifyPendingOpenChannelEvent,
1631
                EnableUpfrontShutdown:         cfg.EnableUpfrontShutdown,
1632
                MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
1633
                        s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
1634
                DeleteAliasEdge:      deleteAliasEdge,
1635
                AliasManager:         s.aliasMgr,
1636
                IsSweeperOutpoint:    s.sweeper.IsSweeperOutpoint,
1637
                AuxFundingController: implCfg.AuxFundingController,
1638
                AuxSigner:            implCfg.AuxSigner,
1639
                AuxResolver:          implCfg.AuxContractResolver,
1640
        })
UNCOV
1641
        if err != nil {
×
1642
                return nil, err
×
1643
        }
×
1644

1645
        // Next, we'll assemble the sub-system that will maintain an on-disk
1646
        // static backup of the latest channel state.
UNCOV
1647
        chanNotifier := &channelNotifier{
×
UNCOV
1648
                chanNotifier: s.channelNotifier,
×
UNCOV
1649
                addrs:        s.addrSource,
×
UNCOV
1650
        }
×
UNCOV
1651
        backupFile := chanbackup.NewMultiFile(
×
UNCOV
1652
                cfg.BackupFilePath, cfg.NoBackupArchive,
×
UNCOV
1653
        )
×
UNCOV
1654
        startingChans, err := chanbackup.FetchStaticChanBackups(
×
UNCOV
1655
                s.chanStateDB, s.addrSource,
×
UNCOV
1656
        )
×
UNCOV
1657
        if err != nil {
×
1658
                return nil, err
×
1659
        }
×
UNCOV
1660
        s.chanSubSwapper, err = chanbackup.NewSubSwapper(
×
UNCOV
1661
                startingChans, chanNotifier, s.cc.KeyRing, backupFile,
×
UNCOV
1662
        )
×
UNCOV
1663
        if err != nil {
×
1664
                return nil, err
×
1665
        }
×
1666

1667
        // Assemble a peer notifier which will provide clients with subscriptions
1668
        // to peer online and offline events.
UNCOV
1669
        s.peerNotifier = peernotifier.New()
×
UNCOV
1670

×
UNCOV
1671
        // Create a channel event store which monitors all open channels.
×
UNCOV
1672
        s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
×
UNCOV
1673
                SubscribeChannelEvents: func() (subscribe.Subscription, error) {
×
UNCOV
1674
                        return s.channelNotifier.SubscribeChannelEvents()
×
UNCOV
1675
                },
×
UNCOV
1676
                SubscribePeerEvents: func() (subscribe.Subscription, error) {
×
UNCOV
1677
                        return s.peerNotifier.SubscribePeerEvents()
×
UNCOV
1678
                },
×
1679
                GetOpenChannels: s.chanStateDB.FetchAllOpenChannels,
1680
                Clock:           clock.NewDefaultClock(),
1681
                ReadFlapCount:   s.miscDB.ReadFlapCount,
1682
                WriteFlapCount:  s.miscDB.WriteFlapCounts,
1683
                FlapCountTicker: ticker.New(chanfitness.FlapCountFlushRate),
1684
        })
1685

UNCOV
1686
        if cfg.WtClient.Active {
×
UNCOV
1687
                policy := wtpolicy.DefaultPolicy()
×
UNCOV
1688
                policy.MaxUpdates = cfg.WtClient.MaxUpdates
×
UNCOV
1689

×
UNCOV
1690
                // We expose the sweep fee rate in sat/vbyte, but the tower
×
UNCOV
1691
                // protocol operations on sat/kw.
×
UNCOV
1692
                sweepRateSatPerVByte := chainfee.SatPerKVByte(
×
UNCOV
1693
                        1000 * cfg.WtClient.SweepFeeRate,
×
UNCOV
1694
                )
×
UNCOV
1695

×
UNCOV
1696
                policy.SweepFeeRate = sweepRateSatPerVByte.FeePerKWeight()
×
UNCOV
1697

×
UNCOV
1698
                if err := policy.Validate(); err != nil {
×
1699
                        return nil, err
×
1700
                }
×
1701

1702
                // authDial is the wrapper around the btrontide.Dial for the
1703
                // watchtower.
UNCOV
1704
                authDial := func(localKey keychain.SingleKeyECDH,
×
UNCOV
1705
                        netAddr *lnwire.NetAddress,
×
UNCOV
1706
                        dialer tor.DialFunc) (wtserver.Peer, error) {
×
UNCOV
1707

×
UNCOV
1708
                        return brontide.Dial(
×
UNCOV
1709
                                localKey, netAddr, cfg.ConnectionTimeout, dialer,
×
UNCOV
1710
                        )
×
UNCOV
1711
                }
×
1712

1713
                // buildBreachRetribution is a call-back that can be used to
1714
                // query the BreachRetribution info and channel type given a
1715
                // channel ID and commitment height.
UNCOV
1716
                buildBreachRetribution := func(chanID lnwire.ChannelID,
×
UNCOV
1717
                        commitHeight uint64) (*lnwallet.BreachRetribution,
×
UNCOV
1718
                        channeldb.ChannelType, error) {
×
UNCOV
1719

×
UNCOV
1720
                        channel, err := s.chanStateDB.FetchChannelByID(
×
UNCOV
1721
                                nil, chanID,
×
UNCOV
1722
                        )
×
UNCOV
1723
                        if err != nil {
×
1724
                                return nil, 0, err
×
1725
                        }
×
1726

UNCOV
1727
                        br, err := lnwallet.NewBreachRetribution(
×
UNCOV
1728
                                channel, commitHeight, 0, nil,
×
UNCOV
1729
                                implCfg.AuxLeafStore,
×
UNCOV
1730
                                implCfg.AuxContractResolver,
×
UNCOV
1731
                        )
×
UNCOV
1732
                        if err != nil {
×
1733
                                return nil, 0, err
×
1734
                        }
×
1735

UNCOV
1736
                        return br, channel.ChanType, nil
×
1737
                }
1738

UNCOV
1739
                fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID
×
UNCOV
1740

×
UNCOV
1741
                // Copy the policy for legacy channels and set the blob flag
×
UNCOV
1742
                // signalling support for anchor channels.
×
UNCOV
1743
                anchorPolicy := policy
×
UNCOV
1744
                anchorPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel)
×
UNCOV
1745

×
UNCOV
1746
                // Copy the policy for legacy channels and set the blob flag
×
UNCOV
1747
                // signalling support for taproot channels.
×
UNCOV
1748
                taprootPolicy := policy
×
UNCOV
1749
                taprootPolicy.TxPolicy.BlobType |= blob.Type(
×
UNCOV
1750
                        blob.FlagTaprootChannel,
×
UNCOV
1751
                )
×
UNCOV
1752

×
UNCOV
1753
                s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{
×
UNCOV
1754
                        FetchClosedChannel:     fetchClosedChannel,
×
UNCOV
1755
                        BuildBreachRetribution: buildBreachRetribution,
×
UNCOV
1756
                        SessionCloseRange:      cfg.WtClient.SessionCloseRange,
×
UNCOV
1757
                        ChainNotifier:          s.cc.ChainNotifier,
×
UNCOV
1758
                        SubscribeChannelEvents: func() (subscribe.Subscription,
×
UNCOV
1759
                                error) {
×
UNCOV
1760

×
UNCOV
1761
                                return s.channelNotifier.
×
UNCOV
1762
                                        SubscribeChannelEvents()
×
UNCOV
1763
                        },
×
1764
                        Signer: cc.Wallet.Cfg.Signer,
UNCOV
1765
                        NewAddress: func() ([]byte, error) {
×
UNCOV
1766
                                addr, err := newSweepPkScriptGen(
×
UNCOV
1767
                                        cc.Wallet, netParams,
×
UNCOV
1768
                                )().Unpack()
×
UNCOV
1769
                                if err != nil {
×
1770
                                        return nil, err
×
1771
                                }
×
1772

UNCOV
1773
                                return addr.DeliveryAddress, nil
×
1774
                        },
1775
                        SecretKeyRing:      s.cc.KeyRing,
1776
                        Dial:               cfg.net.Dial,
1777
                        AuthDial:           authDial,
1778
                        DB:                 dbs.TowerClientDB,
1779
                        ChainHash:          *s.cfg.ActiveNetParams.GenesisHash,
1780
                        MinBackoff:         10 * time.Second,
1781
                        MaxBackoff:         5 * time.Minute,
1782
                        MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
1783
                }, policy, anchorPolicy, taprootPolicy)
UNCOV
1784
                if err != nil {
×
1785
                        return nil, err
×
1786
                }
×
1787
        }
1788

UNCOV
1789
        if len(cfg.ExternalHosts) != 0 {
×
1790
                advertisedIPs := make(map[string]struct{})
×
1791
                for _, addr := range s.currentNodeAnn.Addresses {
×
1792
                        advertisedIPs[addr.String()] = struct{}{}
×
1793
                }
×
1794

1795
                s.hostAnn = netann.NewHostAnnouncer(netann.HostAnnouncerConfig{
×
1796
                        Hosts:         cfg.ExternalHosts,
×
1797
                        RefreshTicker: ticker.New(defaultHostSampleInterval),
×
1798
                        LookupHost: func(host string) (net.Addr, error) {
×
1799
                                return lncfg.ParseAddressString(
×
1800
                                        host, strconv.Itoa(defaultPeerPort),
×
1801
                                        cfg.net.ResolveTCPAddr,
×
1802
                                )
×
1803
                        },
×
1804
                        AdvertisedIPs: advertisedIPs,
1805
                        AnnounceNewIPs: netann.IPAnnouncer(
1806
                                func(modifier ...netann.NodeAnnModifier) (
1807
                                        lnwire.NodeAnnouncement, error) {
×
1808

×
1809
                                        return s.genNodeAnnouncement(
×
1810
                                                nil, modifier...,
×
1811
                                        )
×
1812
                                }),
×
1813
                })
1814
        }
1815

1816
        // Create liveness monitor.
UNCOV
1817
        s.createLivenessMonitor(cfg, cc, leaderElector)
×
UNCOV
1818

×
UNCOV
1819
        // Create the connection manager which will be responsible for
×
UNCOV
1820
        // maintaining persistent outbound connections and also accepting new
×
UNCOV
1821
        // incoming connections
×
UNCOV
1822
        cmgr, err := connmgr.New(&connmgr.Config{
×
UNCOV
1823
                Listeners:      listeners,
×
UNCOV
1824
                OnAccept:       s.InboundPeerConnected,
×
UNCOV
1825
                RetryDuration:  time.Second * 5,
×
UNCOV
1826
                TargetOutbound: 100,
×
UNCOV
1827
                Dial: noiseDial(
×
UNCOV
1828
                        nodeKeyECDH, s.cfg.net, s.cfg.ConnectionTimeout,
×
UNCOV
1829
                ),
×
UNCOV
1830
                OnConnection: s.OutboundPeerConnected,
×
UNCOV
1831
        })
×
UNCOV
1832
        if err != nil {
×
1833
                return nil, err
×
1834
        }
×
UNCOV
1835
        s.connMgr = cmgr
×
UNCOV
1836

×
UNCOV
1837
        // Finally, register the subsystems in blockbeat.
×
UNCOV
1838
        s.registerBlockConsumers()
×
UNCOV
1839

×
UNCOV
1840
        return s, nil
×
1841
}
1842

1843
// UpdateRoutingConfig is a callback function to update the routing config
1844
// values in the main cfg.
UNCOV
1845
func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) {
×
UNCOV
1846
        routerCfg := s.cfg.SubRPCServers.RouterRPC
×
UNCOV
1847

×
UNCOV
1848
        switch c := cfg.Estimator.Config().(type) {
×
UNCOV
1849
        case routing.AprioriConfig:
×
UNCOV
1850
                routerCfg.ProbabilityEstimatorType =
×
UNCOV
1851
                        routing.AprioriEstimatorName
×
UNCOV
1852

×
UNCOV
1853
                targetCfg := routerCfg.AprioriConfig
×
UNCOV
1854
                targetCfg.PenaltyHalfLife = c.PenaltyHalfLife
×
UNCOV
1855
                targetCfg.Weight = c.AprioriWeight
×
UNCOV
1856
                targetCfg.CapacityFraction = c.CapacityFraction
×
UNCOV
1857
                targetCfg.HopProbability = c.AprioriHopProbability
×
1858

UNCOV
1859
        case routing.BimodalConfig:
×
UNCOV
1860
                routerCfg.ProbabilityEstimatorType =
×
UNCOV
1861
                        routing.BimodalEstimatorName
×
UNCOV
1862

×
UNCOV
1863
                targetCfg := routerCfg.BimodalConfig
×
UNCOV
1864
                targetCfg.Scale = int64(c.BimodalScaleMsat)
×
UNCOV
1865
                targetCfg.NodeWeight = c.BimodalNodeWeight
×
UNCOV
1866
                targetCfg.DecayTime = c.BimodalDecayTime
×
1867
        }
1868

UNCOV
1869
        routerCfg.MaxMcHistory = cfg.MaxMcHistory
×
1870
}
1871

1872
// registerBlockConsumers registers the subsystems that consume block events.
1873
// By calling `RegisterQueue`, a list of subsystems are registered in the
1874
// blockbeat for block notifications. When a new block arrives, the subsystems
1875
// in the same queue are notified sequentially, and different queues are
1876
// notified concurrently.
1877
//
1878
// NOTE: To put a subsystem in a different queue, create a slice and pass it to
1879
// a new `RegisterQueue` call.
UNCOV
1880
func (s *server) registerBlockConsumers() {
×
UNCOV
1881
        // In this queue, when a new block arrives, it will be received and
×
UNCOV
1882
        // processed in this order: chainArb -> sweeper -> txPublisher.
×
UNCOV
1883
        consumers := []chainio.Consumer{
×
UNCOV
1884
                s.chainArb,
×
UNCOV
1885
                s.sweeper,
×
UNCOV
1886
                s.txPublisher,
×
UNCOV
1887
        }
×
UNCOV
1888
        s.blockbeatDispatcher.RegisterQueue(consumers)
×
UNCOV
1889
}
×
1890

1891
// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
1892
// used for option_scid_alias channels where the ChannelUpdate to be sent back
1893
// may differ from what is on disk.
1894
func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
UNCOV
1895
        error) {
×
UNCOV
1896

×
UNCOV
1897
        data, err := u.DataToSign()
×
UNCOV
1898
        if err != nil {
×
1899
                return nil, err
×
1900
        }
×
1901

UNCOV
1902
        return s.cc.MsgSigner.SignMessage(s.identityKeyLoc, data, true)
×
1903
}
1904

1905
// createLivenessMonitor creates a set of health checks using our configured
1906
// values and uses these checks to create a liveness monitor. Available
1907
// health checks,
1908
//   - chainHealthCheck (will be disabled for --nochainbackend mode)
1909
//   - diskCheck
1910
//   - tlsHealthCheck
1911
//   - torController, only created when tor is enabled.
1912
//
1913
// If a health check has been disabled by setting attempts to 0, our monitor
1914
// will not run it.
1915
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl,
UNCOV
1916
        leaderElector cluster.LeaderElector) {
×
UNCOV
1917

×
UNCOV
1918
        chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts
×
UNCOV
1919
        if cfg.Bitcoin.Node == "nochainbackend" {
×
1920
                srvrLog.Info("Disabling chain backend checks for " +
×
1921
                        "nochainbackend mode")
×
1922

×
1923
                chainBackendAttempts = 0
×
1924
        }
×
1925

UNCOV
1926
        chainHealthCheck := healthcheck.NewObservation(
×
UNCOV
1927
                "chain backend",
×
UNCOV
1928
                cc.HealthCheck,
×
UNCOV
1929
                cfg.HealthChecks.ChainCheck.Interval,
×
UNCOV
1930
                cfg.HealthChecks.ChainCheck.Timeout,
×
UNCOV
1931
                cfg.HealthChecks.ChainCheck.Backoff,
×
UNCOV
1932
                chainBackendAttempts,
×
UNCOV
1933
        )
×
UNCOV
1934

×
UNCOV
1935
        diskCheck := healthcheck.NewObservation(
×
UNCOV
1936
                "disk space",
×
UNCOV
1937
                func() error {
×
1938
                        free, err := healthcheck.AvailableDiskSpaceRatio(
×
1939
                                cfg.LndDir,
×
1940
                        )
×
1941
                        if err != nil {
×
1942
                                return err
×
1943
                        }
×
1944

1945
                        // If we have more free space than we require,
1946
                        // we return a nil error.
1947
                        if free > cfg.HealthChecks.DiskCheck.RequiredRemaining {
×
1948
                                return nil
×
1949
                        }
×
1950

1951
                        return fmt.Errorf("require: %v free space, got: %v",
×
1952
                                cfg.HealthChecks.DiskCheck.RequiredRemaining,
×
1953
                                free)
×
1954
                },
1955
                cfg.HealthChecks.DiskCheck.Interval,
1956
                cfg.HealthChecks.DiskCheck.Timeout,
1957
                cfg.HealthChecks.DiskCheck.Backoff,
1958
                cfg.HealthChecks.DiskCheck.Attempts,
1959
        )
1960

UNCOV
1961
        tlsHealthCheck := healthcheck.NewObservation(
×
UNCOV
1962
                "tls",
×
UNCOV
1963
                func() error {
×
1964
                        expired, expTime, err := s.tlsManager.IsCertExpired(
×
1965
                                s.cc.KeyRing,
×
1966
                        )
×
1967
                        if err != nil {
×
1968
                                return err
×
1969
                        }
×
1970
                        if expired {
×
1971
                                return fmt.Errorf("TLS certificate is "+
×
1972
                                        "expired as of %v", expTime)
×
1973
                        }
×
1974

1975
                        // If the certificate is not outdated, no error needs
1976
                        // to be returned
1977
                        return nil
×
1978
                },
1979
                cfg.HealthChecks.TLSCheck.Interval,
1980
                cfg.HealthChecks.TLSCheck.Timeout,
1981
                cfg.HealthChecks.TLSCheck.Backoff,
1982
                cfg.HealthChecks.TLSCheck.Attempts,
1983
        )
1984

UNCOV
1985
        checks := []*healthcheck.Observation{
×
UNCOV
1986
                chainHealthCheck, diskCheck, tlsHealthCheck,
×
UNCOV
1987
        }
×
UNCOV
1988

×
UNCOV
1989
        // If Tor is enabled, add the healthcheck for tor connection.
×
UNCOV
1990
        if s.torController != nil {
×
1991
                torConnectionCheck := healthcheck.NewObservation(
×
1992
                        "tor connection",
×
1993
                        func() error {
×
1994
                                return healthcheck.CheckTorServiceStatus(
×
1995
                                        s.torController,
×
1996
                                        s.createNewHiddenService,
×
1997
                                )
×
1998
                        },
×
1999
                        cfg.HealthChecks.TorConnection.Interval,
2000
                        cfg.HealthChecks.TorConnection.Timeout,
2001
                        cfg.HealthChecks.TorConnection.Backoff,
2002
                        cfg.HealthChecks.TorConnection.Attempts,
2003
                )
2004
                checks = append(checks, torConnectionCheck)
×
2005
        }
2006

2007
        // If remote signing is enabled, add the healthcheck for the remote
2008
        // signing RPC interface.
UNCOV
2009
        if s.cfg.RemoteSigner != nil && s.cfg.RemoteSigner.Enable {
×
UNCOV
2010
                // Because we have two cascading timeouts here, we need to add
×
UNCOV
2011
                // some slack to the "outer" one of them in case the "inner"
×
UNCOV
2012
                // returns exactly on time.
×
UNCOV
2013
                overhead := time.Millisecond * 10
×
UNCOV
2014

×
UNCOV
2015
                remoteSignerConnectionCheck := healthcheck.NewObservation(
×
UNCOV
2016
                        "remote signer connection",
×
UNCOV
2017
                        rpcwallet.HealthCheck(
×
UNCOV
2018
                                s.cfg.RemoteSigner,
×
UNCOV
2019

×
UNCOV
2020
                                // For the health check we might to be even
×
UNCOV
2021
                                // stricter than the initial/normal connect, so
×
UNCOV
2022
                                // we use the health check timeout here.
×
UNCOV
2023
                                cfg.HealthChecks.RemoteSigner.Timeout,
×
UNCOV
2024
                        ),
×
UNCOV
2025
                        cfg.HealthChecks.RemoteSigner.Interval,
×
UNCOV
2026
                        cfg.HealthChecks.RemoteSigner.Timeout+overhead,
×
UNCOV
2027
                        cfg.HealthChecks.RemoteSigner.Backoff,
×
UNCOV
2028
                        cfg.HealthChecks.RemoteSigner.Attempts,
×
UNCOV
2029
                )
×
UNCOV
2030
                checks = append(checks, remoteSignerConnectionCheck)
×
UNCOV
2031
        }
×
2032

2033
        // If we have a leader elector, we add a health check to ensure we are
2034
        // still the leader. During normal operation, we should always be the
2035
        // leader, but there are circumstances where this may change, such as
2036
        // when we lose network connectivity for long enough expiring out lease.
UNCOV
2037
        if leaderElector != nil {
×
2038
                leaderCheck := healthcheck.NewObservation(
×
2039
                        "leader status",
×
2040
                        func() error {
×
2041
                                // Check if we are still the leader. Note that
×
2042
                                // we don't need to use a timeout context here
×
2043
                                // as the healthcheck observer will handle the
×
2044
                                // timeout case for us.
×
2045
                                timeoutCtx, cancel := context.WithTimeout(
×
2046
                                        context.Background(),
×
2047
                                        cfg.HealthChecks.LeaderCheck.Timeout,
×
2048
                                )
×
2049
                                defer cancel()
×
2050

×
2051
                                leader, err := leaderElector.IsLeader(
×
2052
                                        timeoutCtx,
×
2053
                                )
×
2054
                                if err != nil {
×
2055
                                        return fmt.Errorf("unable to check if "+
×
2056
                                                "still leader: %v", err)
×
2057
                                }
×
2058

2059
                                if !leader {
×
2060
                                        srvrLog.Debug("Not the current leader")
×
2061
                                        return fmt.Errorf("not the current " +
×
2062
                                                "leader")
×
2063
                                }
×
2064

2065
                                return nil
×
2066
                        },
2067
                        cfg.HealthChecks.LeaderCheck.Interval,
2068
                        cfg.HealthChecks.LeaderCheck.Timeout,
2069
                        cfg.HealthChecks.LeaderCheck.Backoff,
2070
                        cfg.HealthChecks.LeaderCheck.Attempts,
2071
                )
2072

2073
                checks = append(checks, leaderCheck)
×
2074
        }
2075

2076
        // If we have not disabled all of our health checks, we create a
2077
        // liveness monitor with our configured checks.
UNCOV
2078
        s.livenessMonitor = healthcheck.NewMonitor(
×
UNCOV
2079
                &healthcheck.Config{
×
UNCOV
2080
                        Checks:   checks,
×
UNCOV
2081
                        Shutdown: srvrLog.Criticalf,
×
UNCOV
2082
                },
×
UNCOV
2083
        )
×
2084
}
2085

2086
// Started returns true if the server has been started, and false otherwise.
2087
// NOTE: This function is safe for concurrent access.
UNCOV
2088
func (s *server) Started() bool {
×
UNCOV
2089
        return atomic.LoadInt32(&s.active) != 0
×
UNCOV
2090
}
×
2091

2092
// cleaner is used to aggregate "cleanup" functions during an operation that
2093
// starts several subsystems. In case one of the subsystem fails to start
2094
// and a proper resource cleanup is required, the "run" method achieves this
2095
// by running all these added "cleanup" functions.
2096
type cleaner []func() error
2097

2098
// add is used to add a cleanup function to be called when
2099
// the run function is executed.
UNCOV
2100
func (c cleaner) add(cleanup func() error) cleaner {
×
UNCOV
2101
        return append(c, cleanup)
×
UNCOV
2102
}
×
2103

2104
// run is used to run all the previousely added cleanup functions.
2105
func (c cleaner) run() {
×
2106
        for i := len(c) - 1; i >= 0; i-- {
×
2107
                if err := c[i](); err != nil {
×
2108
                        srvrLog.Infof("Cleanup failed: %v", err)
×
2109
                }
×
2110
        }
2111
}
2112

2113
// startLowLevelServices starts the low-level services of the server. These
2114
// services must be started successfully before running the main server. The
2115
// services are,
2116
// 1. the chain notifier.
2117
//
2118
// TODO(yy): identify and add more low-level services here.
UNCOV
2119
func (s *server) startLowLevelServices() error {
×
UNCOV
2120
        var startErr error
×
UNCOV
2121

×
UNCOV
2122
        cleanup := cleaner{}
×
UNCOV
2123

×
UNCOV
2124
        cleanup = cleanup.add(s.cc.ChainNotifier.Stop)
×
UNCOV
2125
        if err := s.cc.ChainNotifier.Start(); err != nil {
×
2126
                startErr = err
×
2127
        }
×
2128

UNCOV
2129
        if startErr != nil {
×
2130
                cleanup.run()
×
2131
        }
×
2132

UNCOV
2133
        return startErr
×
2134
}
2135

2136
// Start starts the main daemon server, all requested listeners, and any helper
2137
// goroutines.
2138
// NOTE: This function is safe for concurrent access.
2139
//
2140
//nolint:funlen
UNCOV
2141
func (s *server) Start() error {
×
UNCOV
2142
        // Get the current blockbeat.
×
UNCOV
2143
        beat, err := s.getStartingBeat()
×
UNCOV
2144
        if err != nil {
×
2145
                return err
×
2146
        }
×
2147

UNCOV
2148
        var startErr error
×
UNCOV
2149

×
UNCOV
2150
        // If one sub system fails to start, the following code ensures that the
×
UNCOV
2151
        // previous started ones are stopped. It also ensures a proper wallet
×
UNCOV
2152
        // shutdown which is important for releasing its resources (boltdb, etc...)
×
UNCOV
2153
        cleanup := cleaner{}
×
UNCOV
2154

×
UNCOV
2155
        s.start.Do(func() {
×
UNCOV
2156
                cleanup = cleanup.add(s.customMessageServer.Stop)
×
UNCOV
2157
                if err := s.customMessageServer.Start(); err != nil {
×
2158
                        startErr = err
×
2159
                        return
×
2160
                }
×
2161

UNCOV
2162
                if s.hostAnn != nil {
×
2163
                        cleanup = cleanup.add(s.hostAnn.Stop)
×
2164
                        if err := s.hostAnn.Start(); err != nil {
×
2165
                                startErr = err
×
2166
                                return
×
2167
                        }
×
2168
                }
2169

UNCOV
2170
                if s.livenessMonitor != nil {
×
UNCOV
2171
                        cleanup = cleanup.add(s.livenessMonitor.Stop)
×
UNCOV
2172
                        if err := s.livenessMonitor.Start(); err != nil {
×
2173
                                startErr = err
×
2174
                                return
×
2175
                        }
×
2176
                }
2177

2178
                // Start the notification server. This is used so channel
2179
                // management goroutines can be notified when a funding
2180
                // transaction reaches a sufficient number of confirmations, or
2181
                // when the input for the funding transaction is spent in an
2182
                // attempt at an uncooperative close by the counterparty.
UNCOV
2183
                cleanup = cleanup.add(s.sigPool.Stop)
×
UNCOV
2184
                if err := s.sigPool.Start(); err != nil {
×
2185
                        startErr = err
×
2186
                        return
×
2187
                }
×
2188

UNCOV
2189
                cleanup = cleanup.add(s.writePool.Stop)
×
UNCOV
2190
                if err := s.writePool.Start(); err != nil {
×
2191
                        startErr = err
×
2192
                        return
×
2193
                }
×
2194

UNCOV
2195
                cleanup = cleanup.add(s.readPool.Stop)
×
UNCOV
2196
                if err := s.readPool.Start(); err != nil {
×
2197
                        startErr = err
×
2198
                        return
×
2199
                }
×
2200

UNCOV
2201
                cleanup = cleanup.add(s.cc.BestBlockTracker.Stop)
×
UNCOV
2202
                if err := s.cc.BestBlockTracker.Start(); err != nil {
×
2203
                        startErr = err
×
2204
                        return
×
2205
                }
×
2206

UNCOV
2207
                cleanup = cleanup.add(s.channelNotifier.Stop)
×
UNCOV
2208
                if err := s.channelNotifier.Start(); err != nil {
×
2209
                        startErr = err
×
2210
                        return
×
2211
                }
×
2212

UNCOV
2213
                cleanup = cleanup.add(func() error {
×
2214
                        return s.peerNotifier.Stop()
×
2215
                })
×
UNCOV
2216
                if err := s.peerNotifier.Start(); err != nil {
×
2217
                        startErr = err
×
2218
                        return
×
2219
                }
×
2220

UNCOV
2221
                cleanup = cleanup.add(s.htlcNotifier.Stop)
×
UNCOV
2222
                if err := s.htlcNotifier.Start(); err != nil {
×
2223
                        startErr = err
×
2224
                        return
×
2225
                }
×
2226

UNCOV
2227
                if s.towerClientMgr != nil {
×
UNCOV
2228
                        cleanup = cleanup.add(s.towerClientMgr.Stop)
×
UNCOV
2229
                        if err := s.towerClientMgr.Start(); err != nil {
×
2230
                                startErr = err
×
2231
                                return
×
2232
                        }
×
2233
                }
2234

UNCOV
2235
                cleanup = cleanup.add(s.txPublisher.Stop)
×
UNCOV
2236
                if err := s.txPublisher.Start(beat); err != nil {
×
2237
                        startErr = err
×
2238
                        return
×
2239
                }
×
2240

UNCOV
2241
                cleanup = cleanup.add(s.sweeper.Stop)
×
UNCOV
2242
                if err := s.sweeper.Start(beat); err != nil {
×
2243
                        startErr = err
×
2244
                        return
×
2245
                }
×
2246

UNCOV
2247
                cleanup = cleanup.add(s.utxoNursery.Stop)
×
UNCOV
2248
                if err := s.utxoNursery.Start(); err != nil {
×
2249
                        startErr = err
×
2250
                        return
×
2251
                }
×
2252

UNCOV
2253
                cleanup = cleanup.add(s.breachArbitrator.Stop)
×
UNCOV
2254
                if err := s.breachArbitrator.Start(); err != nil {
×
2255
                        startErr = err
×
2256
                        return
×
2257
                }
×
2258

UNCOV
2259
                cleanup = cleanup.add(s.fundingMgr.Stop)
×
UNCOV
2260
                if err := s.fundingMgr.Start(); err != nil {
×
2261
                        startErr = err
×
2262
                        return
×
2263
                }
×
2264

2265
                // htlcSwitch must be started before chainArb since the latter
2266
                // relies on htlcSwitch to deliver resolution message upon
2267
                // start.
UNCOV
2268
                cleanup = cleanup.add(s.htlcSwitch.Stop)
×
UNCOV
2269
                if err := s.htlcSwitch.Start(); err != nil {
×
2270
                        startErr = err
×
2271
                        return
×
2272
                }
×
2273

UNCOV
2274
                cleanup = cleanup.add(s.interceptableSwitch.Stop)
×
UNCOV
2275
                if err := s.interceptableSwitch.Start(); err != nil {
×
2276
                        startErr = err
×
2277
                        return
×
2278
                }
×
2279

UNCOV
2280
                cleanup = cleanup.add(s.invoiceHtlcModifier.Stop)
×
UNCOV
2281
                if err := s.invoiceHtlcModifier.Start(); err != nil {
×
2282
                        startErr = err
×
2283
                        return
×
2284
                }
×
2285

UNCOV
2286
                cleanup = cleanup.add(s.chainArb.Stop)
×
UNCOV
2287
                if err := s.chainArb.Start(beat); err != nil {
×
2288
                        startErr = err
×
2289
                        return
×
2290
                }
×
2291

UNCOV
2292
                cleanup = cleanup.add(s.graphBuilder.Stop)
×
UNCOV
2293
                if err := s.graphBuilder.Start(); err != nil {
×
2294
                        startErr = err
×
2295
                        return
×
2296
                }
×
2297

UNCOV
2298
                cleanup = cleanup.add(s.chanRouter.Stop)
×
UNCOV
2299
                if err := s.chanRouter.Start(); err != nil {
×
2300
                        startErr = err
×
2301
                        return
×
2302
                }
×
2303
                // The authGossiper depends on the chanRouter and therefore
2304
                // should be started after it.
UNCOV
2305
                cleanup = cleanup.add(s.authGossiper.Stop)
×
UNCOV
2306
                if err := s.authGossiper.Start(); err != nil {
×
2307
                        startErr = err
×
2308
                        return
×
2309
                }
×
2310

UNCOV
2311
                cleanup = cleanup.add(s.invoices.Stop)
×
UNCOV
2312
                if err := s.invoices.Start(); err != nil {
×
2313
                        startErr = err
×
2314
                        return
×
2315
                }
×
2316

UNCOV
2317
                cleanup = cleanup.add(s.sphinx.Stop)
×
UNCOV
2318
                if err := s.sphinx.Start(); err != nil {
×
2319
                        startErr = err
×
2320
                        return
×
2321
                }
×
2322

UNCOV
2323
                cleanup = cleanup.add(s.chanStatusMgr.Stop)
×
UNCOV
2324
                if err := s.chanStatusMgr.Start(); err != nil {
×
2325
                        startErr = err
×
2326
                        return
×
2327
                }
×
2328

UNCOV
2329
                cleanup = cleanup.add(s.chanEventStore.Stop)
×
UNCOV
2330
                if err := s.chanEventStore.Start(); err != nil {
×
2331
                        startErr = err
×
2332
                        return
×
2333
                }
×
2334

UNCOV
2335
                cleanup.add(func() error {
×
2336
                        s.missionController.StopStoreTickers()
×
2337
                        return nil
×
2338
                })
×
UNCOV
2339
                s.missionController.RunStoreTickers()
×
UNCOV
2340

×
UNCOV
2341
                // Before we start the connMgr, we'll check to see if we have
×
UNCOV
2342
                // any backups to recover. We do this now as we want to ensure
×
UNCOV
2343
                // that have all the information we need to handle channel
×
UNCOV
2344
                // recovery _before_ we even accept connections from any peers.
×
UNCOV
2345
                chanRestorer := &chanDBRestorer{
×
UNCOV
2346
                        db:         s.chanStateDB,
×
UNCOV
2347
                        secretKeys: s.cc.KeyRing,
×
UNCOV
2348
                        chainArb:   s.chainArb,
×
UNCOV
2349
                }
×
UNCOV
2350
                if len(s.chansToRestore.PackedSingleChanBackups) != 0 {
×
2351
                        _, err := chanbackup.UnpackAndRecoverSingles(
×
2352
                                s.chansToRestore.PackedSingleChanBackups,
×
2353
                                s.cc.KeyRing, chanRestorer, s,
×
2354
                        )
×
2355
                        if err != nil {
×
2356
                                startErr = fmt.Errorf("unable to unpack single "+
×
2357
                                        "backups: %v", err)
×
2358
                                return
×
2359
                        }
×
2360
                }
UNCOV
2361
                if len(s.chansToRestore.PackedMultiChanBackup) != 0 {
×
UNCOV
2362
                        _, err := chanbackup.UnpackAndRecoverMulti(
×
UNCOV
2363
                                s.chansToRestore.PackedMultiChanBackup,
×
UNCOV
2364
                                s.cc.KeyRing, chanRestorer, s,
×
UNCOV
2365
                        )
×
UNCOV
2366
                        if err != nil {
×
2367
                                startErr = fmt.Errorf("unable to unpack chan "+
×
2368
                                        "backup: %v", err)
×
2369
                                return
×
2370
                        }
×
2371
                }
2372

2373
                // chanSubSwapper must be started after the `channelNotifier`
2374
                // because it depends on channel events as a synchronization
2375
                // point.
UNCOV
2376
                cleanup = cleanup.add(s.chanSubSwapper.Stop)
×
UNCOV
2377
                if err := s.chanSubSwapper.Start(); err != nil {
×
2378
                        startErr = err
×
2379
                        return
×
2380
                }
×
2381

UNCOV
2382
                if s.torController != nil {
×
2383
                        cleanup = cleanup.add(s.torController.Stop)
×
2384
                        if err := s.createNewHiddenService(); err != nil {
×
2385
                                startErr = err
×
2386
                                return
×
2387
                        }
×
2388
                }
2389

UNCOV
2390
                if s.natTraversal != nil {
×
2391
                        s.wg.Add(1)
×
2392
                        go s.watchExternalIP()
×
2393
                }
×
2394

2395
                // Start connmgr last to prevent connections before init.
UNCOV
2396
                cleanup = cleanup.add(func() error {
×
2397
                        s.connMgr.Stop()
×
2398
                        return nil
×
2399
                })
×
UNCOV
2400
                s.connMgr.Start()
×
UNCOV
2401

×
UNCOV
2402
                // If peers are specified as a config option, we'll add those
×
UNCOV
2403
                // peers first.
×
UNCOV
2404
                for _, peerAddrCfg := range s.cfg.AddPeers {
×
UNCOV
2405
                        parsedPubkey, parsedHost, err := lncfg.ParseLNAddressPubkey(
×
UNCOV
2406
                                peerAddrCfg,
×
UNCOV
2407
                        )
×
UNCOV
2408
                        if err != nil {
×
2409
                                startErr = fmt.Errorf("unable to parse peer "+
×
2410
                                        "pubkey from config: %v", err)
×
2411
                                return
×
2412
                        }
×
UNCOV
2413
                        addr, err := parseAddr(parsedHost, s.cfg.net)
×
UNCOV
2414
                        if err != nil {
×
2415
                                startErr = fmt.Errorf("unable to parse peer "+
×
2416
                                        "address provided as a config option: "+
×
2417
                                        "%v", err)
×
2418
                                return
×
2419
                        }
×
2420

UNCOV
2421
                        peerAddr := &lnwire.NetAddress{
×
UNCOV
2422
                                IdentityKey: parsedPubkey,
×
UNCOV
2423
                                Address:     addr,
×
UNCOV
2424
                                ChainNet:    s.cfg.ActiveNetParams.Net,
×
UNCOV
2425
                        }
×
UNCOV
2426

×
UNCOV
2427
                        err = s.ConnectToPeer(
×
UNCOV
2428
                                peerAddr, true,
×
UNCOV
2429
                                s.cfg.ConnectionTimeout,
×
UNCOV
2430
                        )
×
UNCOV
2431
                        if err != nil {
×
2432
                                startErr = fmt.Errorf("unable to connect to "+
×
2433
                                        "peer address provided as a config "+
×
2434
                                        "option: %v", err)
×
2435
                                return
×
2436
                        }
×
2437
                }
2438

2439
                // Subscribe to NodeAnnouncements that advertise new addresses
2440
                // our persistent peers.
UNCOV
2441
                if err := s.updatePersistentPeerAddrs(); err != nil {
×
2442
                        startErr = err
×
2443
                        return
×
2444
                }
×
2445

2446
                // With all the relevant sub-systems started, we'll now attempt
2447
                // to establish persistent connections to our direct channel
2448
                // collaborators within the network. Before doing so however,
2449
                // we'll prune our set of link nodes found within the database
2450
                // to ensure we don't reconnect to any nodes we no longer have
2451
                // open channels with.
UNCOV
2452
                if err := s.chanStateDB.PruneLinkNodes(); err != nil {
×
2453
                        startErr = err
×
2454
                        return
×
2455
                }
×
UNCOV
2456
                if err := s.establishPersistentConnections(); err != nil {
×
2457
                        startErr = err
×
2458
                        return
×
2459
                }
×
2460

2461
                // setSeedList is a helper function that turns multiple DNS seed
2462
                // server tuples from the command line or config file into the
2463
                // data structure we need and does a basic formal sanity check
2464
                // in the process.
UNCOV
2465
                setSeedList := func(tuples []string, genesisHash chainhash.Hash) {
×
2466
                        if len(tuples) == 0 {
×
2467
                                return
×
2468
                        }
×
2469

2470
                        result := make([][2]string, len(tuples))
×
2471
                        for idx, tuple := range tuples {
×
2472
                                tuple = strings.TrimSpace(tuple)
×
2473
                                if len(tuple) == 0 {
×
2474
                                        return
×
2475
                                }
×
2476

2477
                                servers := strings.Split(tuple, ",")
×
2478
                                if len(servers) > 2 || len(servers) == 0 {
×
2479
                                        srvrLog.Warnf("Ignoring invalid DNS "+
×
2480
                                                "seed tuple: %v", servers)
×
2481
                                        return
×
2482
                                }
×
2483

2484
                                copy(result[idx][:], servers)
×
2485
                        }
2486

2487
                        chainreg.ChainDNSSeeds[genesisHash] = result
×
2488
                }
2489

2490
                // Let users overwrite the DNS seed nodes. We only allow them
2491
                // for bitcoin mainnet/testnet/signet.
UNCOV
2492
                if s.cfg.Bitcoin.MainNet {
×
2493
                        setSeedList(
×
2494
                                s.cfg.Bitcoin.DNSSeeds,
×
2495
                                chainreg.BitcoinMainnetGenesis,
×
2496
                        )
×
2497
                }
×
UNCOV
2498
                if s.cfg.Bitcoin.TestNet3 {
×
2499
                        setSeedList(
×
2500
                                s.cfg.Bitcoin.DNSSeeds,
×
2501
                                chainreg.BitcoinTestnetGenesis,
×
2502
                        )
×
2503
                }
×
UNCOV
2504
                if s.cfg.Bitcoin.SigNet {
×
2505
                        setSeedList(
×
2506
                                s.cfg.Bitcoin.DNSSeeds,
×
2507
                                chainreg.BitcoinSignetGenesis,
×
2508
                        )
×
2509
                }
×
2510

2511
                // If network bootstrapping hasn't been disabled, then we'll
2512
                // configure the set of active bootstrappers, and launch a
2513
                // dedicated goroutine to maintain a set of persistent
2514
                // connections.
UNCOV
2515
                if shouldPeerBootstrap(s.cfg) {
×
2516
                        bootstrappers, err := initNetworkBootstrappers(s)
×
2517
                        if err != nil {
×
2518
                                startErr = err
×
2519
                                return
×
2520
                        }
×
2521

2522
                        s.wg.Add(1)
×
2523
                        go s.peerBootstrapper(defaultMinPeers, bootstrappers)
×
UNCOV
2524
                } else {
×
UNCOV
2525
                        srvrLog.Infof("Auto peer bootstrapping is disabled")
×
UNCOV
2526
                }
×
2527

2528
                // Start the blockbeat after all other subsystems have been
2529
                // started so they are ready to receive new blocks.
UNCOV
2530
                cleanup = cleanup.add(func() error {
×
2531
                        s.blockbeatDispatcher.Stop()
×
2532
                        return nil
×
2533
                })
×
UNCOV
2534
                if err := s.blockbeatDispatcher.Start(); err != nil {
×
2535
                        startErr = err
×
2536
                        return
×
2537
                }
×
2538

2539
                // Set the active flag now that we've completed the full
2540
                // startup.
UNCOV
2541
                atomic.StoreInt32(&s.active, 1)
×
2542
        })
2543

UNCOV
2544
        if startErr != nil {
×
2545
                cleanup.run()
×
2546
        }
×
UNCOV
2547
        return startErr
×
2548
}
2549

2550
// Stop gracefully shutsdown the main daemon server. This function will signal
2551
// any active goroutines, or helper objects to exit, then blocks until they've
2552
// all successfully exited. Additionally, any/all listeners are closed.
2553
// NOTE: This function is safe for concurrent access.
UNCOV
2554
func (s *server) Stop() error {
×
UNCOV
2555
        s.stop.Do(func() {
×
UNCOV
2556
                atomic.StoreInt32(&s.stopping, 1)
×
UNCOV
2557

×
UNCOV
2558
                close(s.quit)
×
UNCOV
2559

×
UNCOV
2560
                // Shutdown connMgr first to prevent conns during shutdown.
×
UNCOV
2561
                s.connMgr.Stop()
×
UNCOV
2562

×
UNCOV
2563
                // Stop dispatching blocks to other systems immediately.
×
UNCOV
2564
                s.blockbeatDispatcher.Stop()
×
UNCOV
2565

×
UNCOV
2566
                // Shutdown the wallet, funding manager, and the rpc server.
×
UNCOV
2567
                if err := s.chanStatusMgr.Stop(); err != nil {
×
2568
                        srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)
×
2569
                }
×
UNCOV
2570
                if err := s.htlcSwitch.Stop(); err != nil {
×
2571
                        srvrLog.Warnf("failed to stop htlcSwitch: %v", err)
×
2572
                }
×
UNCOV
2573
                if err := s.sphinx.Stop(); err != nil {
×
2574
                        srvrLog.Warnf("failed to stop sphinx: %v", err)
×
2575
                }
×
UNCOV
2576
                if err := s.invoices.Stop(); err != nil {
×
2577
                        srvrLog.Warnf("failed to stop invoices: %v", err)
×
2578
                }
×
UNCOV
2579
                if err := s.interceptableSwitch.Stop(); err != nil {
×
2580
                        srvrLog.Warnf("failed to stop interceptable "+
×
2581
                                "switch: %v", err)
×
2582
                }
×
UNCOV
2583
                if err := s.invoiceHtlcModifier.Stop(); err != nil {
×
2584
                        srvrLog.Warnf("failed to stop htlc invoices "+
×
2585
                                "modifier: %v", err)
×
2586
                }
×
UNCOV
2587
                if err := s.chanRouter.Stop(); err != nil {
×
2588
                        srvrLog.Warnf("failed to stop chanRouter: %v", err)
×
2589
                }
×
UNCOV
2590
                if err := s.graphBuilder.Stop(); err != nil {
×
2591
                        srvrLog.Warnf("failed to stop graphBuilder %v", err)
×
2592
                }
×
UNCOV
2593
                if err := s.chainArb.Stop(); err != nil {
×
2594
                        srvrLog.Warnf("failed to stop chainArb: %v", err)
×
2595
                }
×
UNCOV
2596
                if err := s.fundingMgr.Stop(); err != nil {
×
2597
                        srvrLog.Warnf("failed to stop fundingMgr: %v", err)
×
2598
                }
×
UNCOV
2599
                if err := s.breachArbitrator.Stop(); err != nil {
×
2600
                        srvrLog.Warnf("failed to stop breachArbitrator: %v",
×
2601
                                err)
×
2602
                }
×
UNCOV
2603
                if err := s.utxoNursery.Stop(); err != nil {
×
2604
                        srvrLog.Warnf("failed to stop utxoNursery: %v", err)
×
2605
                }
×
UNCOV
2606
                if err := s.authGossiper.Stop(); err != nil {
×
2607
                        srvrLog.Warnf("failed to stop authGossiper: %v", err)
×
2608
                }
×
UNCOV
2609
                if err := s.sweeper.Stop(); err != nil {
×
2610
                        srvrLog.Warnf("failed to stop sweeper: %v", err)
×
2611
                }
×
UNCOV
2612
                if err := s.txPublisher.Stop(); err != nil {
×
2613
                        srvrLog.Warnf("failed to stop txPublisher: %v", err)
×
2614
                }
×
UNCOV
2615
                if err := s.channelNotifier.Stop(); err != nil {
×
2616
                        srvrLog.Warnf("failed to stop channelNotifier: %v", err)
×
2617
                }
×
UNCOV
2618
                if err := s.peerNotifier.Stop(); err != nil {
×
2619
                        srvrLog.Warnf("failed to stop peerNotifier: %v", err)
×
2620
                }
×
UNCOV
2621
                if err := s.htlcNotifier.Stop(); err != nil {
×
2622
                        srvrLog.Warnf("failed to stop htlcNotifier: %v", err)
×
2623
                }
×
2624

2625
                // Update channel.backup file. Make sure to do it before
2626
                // stopping chanSubSwapper.
UNCOV
2627
                singles, err := chanbackup.FetchStaticChanBackups(
×
UNCOV
2628
                        s.chanStateDB, s.addrSource,
×
UNCOV
2629
                )
×
UNCOV
2630
                if err != nil {
×
2631
                        srvrLog.Warnf("failed to fetch channel states: %v",
×
2632
                                err)
×
UNCOV
2633
                } else {
×
UNCOV
2634
                        err := s.chanSubSwapper.ManualUpdate(singles)
×
UNCOV
2635
                        if err != nil {
×
UNCOV
2636
                                srvrLog.Warnf("Manual update of channel "+
×
UNCOV
2637
                                        "backup failed: %v", err)
×
UNCOV
2638
                        }
×
2639
                }
2640

UNCOV
2641
                if err := s.chanSubSwapper.Stop(); err != nil {
×
2642
                        srvrLog.Warnf("failed to stop chanSubSwapper: %v", err)
×
2643
                }
×
UNCOV
2644
                if err := s.cc.ChainNotifier.Stop(); err != nil {
×
2645
                        srvrLog.Warnf("Unable to stop ChainNotifier: %v", err)
×
2646
                }
×
UNCOV
2647
                if err := s.cc.BestBlockTracker.Stop(); err != nil {
×
2648
                        srvrLog.Warnf("Unable to stop BestBlockTracker: %v",
×
2649
                                err)
×
2650
                }
×
UNCOV
2651
                if err := s.chanEventStore.Stop(); err != nil {
×
2652
                        srvrLog.Warnf("Unable to stop ChannelEventStore: %v",
×
2653
                                err)
×
2654
                }
×
UNCOV
2655
                s.missionController.StopStoreTickers()
×
UNCOV
2656

×
UNCOV
2657
                // Disconnect from each active peers to ensure that
×
UNCOV
2658
                // peerTerminationWatchers signal completion to each peer.
×
UNCOV
2659
                for _, peer := range s.Peers() {
×
UNCOV
2660
                        err := s.DisconnectPeer(peer.IdentityKey())
×
UNCOV
2661
                        if err != nil {
×
2662
                                srvrLog.Warnf("could not disconnect peer: %v"+
×
2663
                                        "received error: %v", peer.IdentityKey(),
×
2664
                                        err,
×
2665
                                )
×
2666
                        }
×
2667
                }
2668

2669
                // Now that all connections have been torn down, stop the tower
2670
                // client which will reliably flush all queued states to the
2671
                // tower. If this is halted for any reason, the force quit timer
2672
                // will kick in and abort to allow this method to return.
UNCOV
2673
                if s.towerClientMgr != nil {
×
UNCOV
2674
                        if err := s.towerClientMgr.Stop(); err != nil {
×
2675
                                srvrLog.Warnf("Unable to shut down tower "+
×
2676
                                        "client manager: %v", err)
×
2677
                        }
×
2678
                }
2679

UNCOV
2680
                if s.hostAnn != nil {
×
2681
                        if err := s.hostAnn.Stop(); err != nil {
×
2682
                                srvrLog.Warnf("unable to shut down host "+
×
2683
                                        "annoucner: %v", err)
×
2684
                        }
×
2685
                }
2686

UNCOV
2687
                if s.livenessMonitor != nil {
×
UNCOV
2688
                        if err := s.livenessMonitor.Stop(); err != nil {
×
2689
                                srvrLog.Warnf("unable to shutdown liveness "+
×
2690
                                        "monitor: %v", err)
×
2691
                        }
×
2692
                }
2693

2694
                // Wait for all lingering goroutines to quit.
UNCOV
2695
                srvrLog.Debug("Waiting for server to shutdown...")
×
UNCOV
2696
                s.wg.Wait()
×
UNCOV
2697

×
UNCOV
2698
                srvrLog.Debug("Stopping buffer pools...")
×
UNCOV
2699
                s.sigPool.Stop()
×
UNCOV
2700
                s.writePool.Stop()
×
UNCOV
2701
                s.readPool.Stop()
×
2702
        })
2703

UNCOV
2704
        return nil
×
2705
}
2706

2707
// Stopped returns true if the server has been instructed to shutdown.
2708
// NOTE: This function is safe for concurrent access.
UNCOV
2709
func (s *server) Stopped() bool {
×
UNCOV
2710
        return atomic.LoadInt32(&s.stopping) != 0
×
UNCOV
2711
}
×
2712

2713
// configurePortForwarding attempts to set up port forwarding for the different
2714
// ports that the server will be listening on.
2715
//
2716
// NOTE: This should only be used when using some kind of NAT traversal to
2717
// automatically set up forwarding rules.
2718
func (s *server) configurePortForwarding(ports ...uint16) ([]string, error) {
×
2719
        ip, err := s.natTraversal.ExternalIP()
×
2720
        if err != nil {
×
2721
                return nil, err
×
2722
        }
×
2723
        s.lastDetectedIP = ip
×
2724

×
2725
        externalIPs := make([]string, 0, len(ports))
×
2726
        for _, port := range ports {
×
2727
                if err := s.natTraversal.AddPortMapping(port); err != nil {
×
2728
                        srvrLog.Debugf("Unable to forward port %d: %v", port, err)
×
2729
                        continue
×
2730
                }
2731

2732
                hostIP := fmt.Sprintf("%v:%d", ip, port)
×
2733
                externalIPs = append(externalIPs, hostIP)
×
2734
        }
2735

2736
        return externalIPs, nil
×
2737
}
2738

2739
// removePortForwarding attempts to clear the forwarding rules for the different
2740
// ports the server is currently listening on.
2741
//
2742
// NOTE: This should only be used when using some kind of NAT traversal to
2743
// automatically set up forwarding rules.
2744
func (s *server) removePortForwarding() {
×
2745
        forwardedPorts := s.natTraversal.ForwardedPorts()
×
2746
        for _, port := range forwardedPorts {
×
2747
                if err := s.natTraversal.DeletePortMapping(port); err != nil {
×
2748
                        srvrLog.Errorf("Unable to remove forwarding rules for "+
×
2749
                                "port %d: %v", port, err)
×
2750
                }
×
2751
        }
2752
}
2753

2754
// watchExternalIP continuously checks for an updated external IP address every
2755
// 15 minutes. Once a new IP address has been detected, it will automatically
2756
// handle port forwarding rules and send updated node announcements to the
2757
// currently connected peers.
2758
//
2759
// NOTE: This MUST be run as a goroutine.
2760
func (s *server) watchExternalIP() {
×
2761
        defer s.wg.Done()
×
2762

×
2763
        // Before exiting, we'll make sure to remove the forwarding rules set
×
2764
        // up by the server.
×
2765
        defer s.removePortForwarding()
×
2766

×
2767
        // Keep track of the external IPs set by the user to avoid replacing
×
2768
        // them when detecting a new IP.
×
2769
        ipsSetByUser := make(map[string]struct{})
×
2770
        for _, ip := range s.cfg.ExternalIPs {
×
2771
                ipsSetByUser[ip.String()] = struct{}{}
×
2772
        }
×
2773

2774
        forwardedPorts := s.natTraversal.ForwardedPorts()
×
2775

×
2776
        ticker := time.NewTicker(15 * time.Minute)
×
2777
        defer ticker.Stop()
×
2778
out:
×
2779
        for {
×
2780
                select {
×
2781
                case <-ticker.C:
×
2782
                        // We'll start off by making sure a new IP address has
×
2783
                        // been detected.
×
2784
                        ip, err := s.natTraversal.ExternalIP()
×
2785
                        if err != nil {
×
2786
                                srvrLog.Debugf("Unable to retrieve the "+
×
2787
                                        "external IP address: %v", err)
×
2788
                                continue
×
2789
                        }
2790

2791
                        // Periodically renew the NAT port forwarding.
2792
                        for _, port := range forwardedPorts {
×
2793
                                err := s.natTraversal.AddPortMapping(port)
×
2794
                                if err != nil {
×
2795
                                        srvrLog.Warnf("Unable to automatically "+
×
2796
                                                "re-create port forwarding using %s: %v",
×
2797
                                                s.natTraversal.Name(), err)
×
2798
                                } else {
×
2799
                                        srvrLog.Debugf("Automatically re-created "+
×
2800
                                                "forwarding for port %d using %s to "+
×
2801
                                                "advertise external IP",
×
2802
                                                port, s.natTraversal.Name())
×
2803
                                }
×
2804
                        }
2805

2806
                        if ip.Equal(s.lastDetectedIP) {
×
2807
                                continue
×
2808
                        }
2809

2810
                        srvrLog.Infof("Detected new external IP address %s", ip)
×
2811

×
2812
                        // Next, we'll craft the new addresses that will be
×
2813
                        // included in the new node announcement and advertised
×
2814
                        // to the network. Each address will consist of the new
×
2815
                        // IP detected and one of the currently advertised
×
2816
                        // ports.
×
2817
                        var newAddrs []net.Addr
×
2818
                        for _, port := range forwardedPorts {
×
2819
                                hostIP := fmt.Sprintf("%v:%d", ip, port)
×
2820
                                addr, err := net.ResolveTCPAddr("tcp", hostIP)
×
2821
                                if err != nil {
×
2822
                                        srvrLog.Debugf("Unable to resolve "+
×
2823
                                                "host %v: %v", addr, err)
×
2824
                                        continue
×
2825
                                }
2826

2827
                                newAddrs = append(newAddrs, addr)
×
2828
                        }
2829

2830
                        // Skip the update if we weren't able to resolve any of
2831
                        // the new addresses.
2832
                        if len(newAddrs) == 0 {
×
2833
                                srvrLog.Debug("Skipping node announcement " +
×
2834
                                        "update due to not being able to " +
×
2835
                                        "resolve any new addresses")
×
2836
                                continue
×
2837
                        }
2838

2839
                        // Now, we'll need to update the addresses in our node's
2840
                        // announcement in order to propagate the update
2841
                        // throughout the network. We'll only include addresses
2842
                        // that have a different IP from the previous one, as
2843
                        // the previous IP is no longer valid.
2844
                        currentNodeAnn := s.getNodeAnnouncement()
×
2845

×
2846
                        for _, addr := range currentNodeAnn.Addresses {
×
2847
                                host, _, err := net.SplitHostPort(addr.String())
×
2848
                                if err != nil {
×
2849
                                        srvrLog.Debugf("Unable to determine "+
×
2850
                                                "host from address %v: %v",
×
2851
                                                addr, err)
×
2852
                                        continue
×
2853
                                }
2854

2855
                                // We'll also make sure to include external IPs
2856
                                // set manually by the user.
2857
                                _, setByUser := ipsSetByUser[addr.String()]
×
2858
                                if setByUser || host != s.lastDetectedIP.String() {
×
2859
                                        newAddrs = append(newAddrs, addr)
×
2860
                                }
×
2861
                        }
2862

2863
                        // Then, we'll generate a new timestamped node
2864
                        // announcement with the updated addresses and broadcast
2865
                        // it to our peers.
2866
                        newNodeAnn, err := s.genNodeAnnouncement(
×
2867
                                nil, netann.NodeAnnSetAddrs(newAddrs),
×
2868
                        )
×
2869
                        if err != nil {
×
2870
                                srvrLog.Debugf("Unable to generate new node "+
×
2871
                                        "announcement: %v", err)
×
2872
                                continue
×
2873
                        }
2874

2875
                        err = s.BroadcastMessage(nil, &newNodeAnn)
×
2876
                        if err != nil {
×
2877
                                srvrLog.Debugf("Unable to broadcast new node "+
×
2878
                                        "announcement to peers: %v", err)
×
2879
                                continue
×
2880
                        }
2881

2882
                        // Finally, update the last IP seen to the current one.
2883
                        s.lastDetectedIP = ip
×
2884
                case <-s.quit:
×
2885
                        break out
×
2886
                }
2887
        }
2888
}
2889

2890
// initNetworkBootstrappers initializes a set of network peer bootstrappers
2891
// based on the server, and currently active bootstrap mechanisms as defined
2892
// within the current configuration.
2893
func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, error) {
×
2894
        srvrLog.Infof("Initializing peer network bootstrappers!")
×
2895

×
2896
        var bootStrappers []discovery.NetworkPeerBootstrapper
×
2897

×
2898
        // First, we'll create an instance of the ChannelGraphBootstrapper as
×
2899
        // this can be used by default if we've already partially seeded the
×
2900
        // network.
×
2901
        chanGraph := autopilot.ChannelGraphFromDatabase(s.graphDB)
×
2902
        graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph)
×
2903
        if err != nil {
×
2904
                return nil, err
×
2905
        }
×
2906
        bootStrappers = append(bootStrappers, graphBootstrapper)
×
2907

×
2908
        // If this isn't simnet mode, then one of our additional bootstrapping
×
2909
        // sources will be the set of running DNS seeds.
×
2910
        if !s.cfg.Bitcoin.SimNet {
×
2911
                dnsSeeds, ok := chainreg.ChainDNSSeeds[*s.cfg.ActiveNetParams.GenesisHash]
×
2912

×
2913
                // If we have a set of DNS seeds for this chain, then we'll add
×
2914
                // it as an additional bootstrapping source.
×
2915
                if ok {
×
2916
                        srvrLog.Infof("Creating DNS peer bootstrapper with "+
×
2917
                                "seeds: %v", dnsSeeds)
×
2918

×
2919
                        dnsBootStrapper := discovery.NewDNSSeedBootstrapper(
×
2920
                                dnsSeeds, s.cfg.net, s.cfg.ConnectionTimeout,
×
2921
                        )
×
2922
                        bootStrappers = append(bootStrappers, dnsBootStrapper)
×
2923
                }
×
2924
        }
2925

2926
        return bootStrappers, nil
×
2927
}
2928

2929
// createBootstrapIgnorePeers creates a map of peers that the bootstrap process
2930
// needs to ignore, which is made of three parts,
2931
//   - the node itself needs to be skipped as it doesn't make sense to connect
2932
//     to itself.
2933
//   - the peers that already have connections with, as in s.peersByPub.
2934
//   - the peers that we are attempting to connect, as in s.persistentPeers.
2935
func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} {
×
2936
        s.mu.RLock()
×
2937
        defer s.mu.RUnlock()
×
2938

×
2939
        ignore := make(map[autopilot.NodeID]struct{})
×
2940

×
2941
        // We should ignore ourselves from bootstrapping.
×
2942
        selfKey := autopilot.NewNodeID(s.identityECDH.PubKey())
×
2943
        ignore[selfKey] = struct{}{}
×
2944

×
2945
        // Ignore all connected peers.
×
2946
        for _, peer := range s.peersByPub {
×
2947
                nID := autopilot.NewNodeID(peer.IdentityKey())
×
2948
                ignore[nID] = struct{}{}
×
2949
        }
×
2950

2951
        // Ignore all persistent peers as they have a dedicated reconnecting
2952
        // process.
2953
        for pubKeyStr := range s.persistentPeers {
×
2954
                var nID autopilot.NodeID
×
2955
                copy(nID[:], []byte(pubKeyStr))
×
2956
                ignore[nID] = struct{}{}
×
2957
        }
×
2958

2959
        return ignore
×
2960
}
2961

2962
// peerBootstrapper is a goroutine which is tasked with attempting to establish
2963
// and maintain a target minimum number of outbound connections. With this
2964
// invariant, we ensure that our node is connected to a diverse set of peers
2965
// and that nodes newly joining the network receive an up to date network view
2966
// as soon as possible.
2967
func (s *server) peerBootstrapper(numTargetPeers uint32,
2968
        bootstrappers []discovery.NetworkPeerBootstrapper) {
×
2969

×
2970
        defer s.wg.Done()
×
2971

×
2972
        // Before we continue, init the ignore peers map.
×
2973
        ignoreList := s.createBootstrapIgnorePeers()
×
2974

×
2975
        // We'll start off by aggressively attempting connections to peers in
×
2976
        // order to be a part of the network as soon as possible.
×
2977
        s.initialPeerBootstrap(ignoreList, numTargetPeers, bootstrappers)
×
2978

×
2979
        // Once done, we'll attempt to maintain our target minimum number of
×
2980
        // peers.
×
2981
        //
×
2982
        // We'll use a 15 second backoff, and double the time every time an
×
2983
        // epoch fails up to a ceiling.
×
2984
        backOff := time.Second * 15
×
2985

×
2986
        // We'll create a new ticker to wake us up every 15 seconds so we can
×
2987
        // see if we've reached our minimum number of peers.
×
2988
        sampleTicker := time.NewTicker(backOff)
×
2989
        defer sampleTicker.Stop()
×
2990

×
2991
        // We'll use the number of attempts and errors to determine if we need
×
2992
        // to increase the time between discovery epochs.
×
2993
        var epochErrors uint32 // To be used atomically.
×
2994
        var epochAttempts uint32
×
2995

×
2996
        for {
×
2997
                select {
×
2998
                // The ticker has just woken us up, so we'll need to check if
2999
                // we need to attempt to connect our to any more peers.
3000
                case <-sampleTicker.C:
×
3001
                        // Obtain the current number of peers, so we can gauge
×
3002
                        // if we need to sample more peers or not.
×
3003
                        s.mu.RLock()
×
3004
                        numActivePeers := uint32(len(s.peersByPub))
×
3005
                        s.mu.RUnlock()
×
3006

×
3007
                        // If we have enough peers, then we can loop back
×
3008
                        // around to the next round as we're done here.
×
3009
                        if numActivePeers >= numTargetPeers {
×
3010
                                continue
×
3011
                        }
3012

3013
                        // If all of our attempts failed during this last back
3014
                        // off period, then will increase our backoff to 5
3015
                        // minute ceiling to avoid an excessive number of
3016
                        // queries
3017
                        //
3018
                        // TODO(roasbeef): add reverse policy too?
3019

3020
                        if epochAttempts > 0 &&
×
3021
                                atomic.LoadUint32(&epochErrors) >= epochAttempts {
×
3022

×
3023
                                sampleTicker.Stop()
×
3024

×
3025
                                backOff *= 2
×
3026
                                if backOff > bootstrapBackOffCeiling {
×
3027
                                        backOff = bootstrapBackOffCeiling
×
3028
                                }
×
3029

3030
                                srvrLog.Debugf("Backing off peer bootstrapper to "+
×
3031
                                        "%v", backOff)
×
3032
                                sampleTicker = time.NewTicker(backOff)
×
3033
                                continue
×
3034
                        }
3035

3036
                        atomic.StoreUint32(&epochErrors, 0)
×
3037
                        epochAttempts = 0
×
3038

×
3039
                        // Since we know need more peers, we'll compute the
×
3040
                        // exact number we need to reach our threshold.
×
3041
                        numNeeded := numTargetPeers - numActivePeers
×
3042

×
3043
                        srvrLog.Debugf("Attempting to obtain %v more network "+
×
3044
                                "peers", numNeeded)
×
3045

×
3046
                        // With the number of peers we need calculated, we'll
×
3047
                        // query the network bootstrappers to sample a set of
×
3048
                        // random addrs for us.
×
3049
                        //
×
3050
                        // Before we continue, get a copy of the ignore peers
×
3051
                        // map.
×
3052
                        ignoreList = s.createBootstrapIgnorePeers()
×
3053

×
3054
                        peerAddrs, err := discovery.MultiSourceBootstrap(
×
3055
                                ignoreList, numNeeded*2, bootstrappers...,
×
3056
                        )
×
3057
                        if err != nil {
×
3058
                                srvrLog.Errorf("Unable to retrieve bootstrap "+
×
3059
                                        "peers: %v", err)
×
3060
                                continue
×
3061
                        }
3062

3063
                        // Finally, we'll launch a new goroutine for each
3064
                        // prospective peer candidates.
3065
                        for _, addr := range peerAddrs {
×
3066
                                epochAttempts++
×
3067

×
3068
                                go func(a *lnwire.NetAddress) {
×
3069
                                        // TODO(roasbeef): can do AS, subnet,
×
3070
                                        // country diversity, etc
×
3071
                                        errChan := make(chan error, 1)
×
3072
                                        s.connectToPeer(
×
3073
                                                a, errChan,
×
3074
                                                s.cfg.ConnectionTimeout,
×
3075
                                        )
×
3076
                                        select {
×
3077
                                        case err := <-errChan:
×
3078
                                                if err == nil {
×
3079
                                                        return
×
3080
                                                }
×
3081

3082
                                                srvrLog.Errorf("Unable to "+
×
3083
                                                        "connect to %v: %v",
×
3084
                                                        a, err)
×
3085
                                                atomic.AddUint32(&epochErrors, 1)
×
3086
                                        case <-s.quit:
×
3087
                                        }
3088
                                }(addr)
3089
                        }
3090
                case <-s.quit:
×
3091
                        return
×
3092
                }
3093
        }
3094
}
3095

3096
// bootstrapBackOffCeiling is the maximum amount of time we'll wait between
3097
// failed attempts to locate a set of bootstrap peers. We'll slowly double our
3098
// query back off each time we encounter a failure.
3099
const bootstrapBackOffCeiling = time.Minute * 5
3100

3101
// initialPeerBootstrap attempts to continuously connect to peers on startup
3102
// until the target number of peers has been reached. This ensures that nodes
3103
// receive an up to date network view as soon as possible.
3104
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
3105
        numTargetPeers uint32,
3106
        bootstrappers []discovery.NetworkPeerBootstrapper) {
×
3107

×
3108
        srvrLog.Debugf("Init bootstrap with targetPeers=%v, bootstrappers=%v, "+
×
3109
                "ignore=%v", numTargetPeers, len(bootstrappers), len(ignore))
×
3110

×
3111
        // We'll start off by waiting 2 seconds between failed attempts, then
×
3112
        // double each time we fail until we hit the bootstrapBackOffCeiling.
×
3113
        var delaySignal <-chan time.Time
×
3114
        delayTime := time.Second * 2
×
3115

×
3116
        // As want to be more aggressive, we'll use a lower back off celling
×
3117
        // then the main peer bootstrap logic.
×
3118
        backOffCeiling := bootstrapBackOffCeiling / 5
×
3119

×
3120
        for attempts := 0; ; attempts++ {
×
3121
                // Check if the server has been requested to shut down in order
×
3122
                // to prevent blocking.
×
3123
                if s.Stopped() {
×
3124
                        return
×
3125
                }
×
3126

3127
                // We can exit our aggressive initial peer bootstrapping stage
3128
                // if we've reached out target number of peers.
3129
                s.mu.RLock()
×
3130
                numActivePeers := uint32(len(s.peersByPub))
×
3131
                s.mu.RUnlock()
×
3132

×
3133
                if numActivePeers >= numTargetPeers {
×
3134
                        return
×
3135
                }
×
3136

3137
                if attempts > 0 {
×
3138
                        srvrLog.Debugf("Waiting %v before trying to locate "+
×
3139
                                "bootstrap peers (attempt #%v)", delayTime,
×
3140
                                attempts)
×
3141

×
3142
                        // We've completed at least one iterating and haven't
×
3143
                        // finished, so we'll start to insert a delay period
×
3144
                        // between each attempt.
×
3145
                        delaySignal = time.After(delayTime)
×
3146
                        select {
×
3147
                        case <-delaySignal:
×
3148
                        case <-s.quit:
×
3149
                                return
×
3150
                        }
3151

3152
                        // After our delay, we'll double the time we wait up to
3153
                        // the max back off period.
3154
                        delayTime *= 2
×
3155
                        if delayTime > backOffCeiling {
×
3156
                                delayTime = backOffCeiling
×
3157
                        }
×
3158
                }
3159

3160
                // Otherwise, we'll request for the remaining number of peers
3161
                // in order to reach our target.
3162
                peersNeeded := numTargetPeers - numActivePeers
×
3163
                bootstrapAddrs, err := discovery.MultiSourceBootstrap(
×
3164
                        ignore, peersNeeded, bootstrappers...,
×
3165
                )
×
3166
                if err != nil {
×
3167
                        srvrLog.Errorf("Unable to retrieve initial bootstrap "+
×
3168
                                "peers: %v", err)
×
3169
                        continue
×
3170
                }
3171

3172
                // Then, we'll attempt to establish a connection to the
3173
                // different peer addresses retrieved by our bootstrappers.
3174
                var wg sync.WaitGroup
×
3175
                for _, bootstrapAddr := range bootstrapAddrs {
×
3176
                        wg.Add(1)
×
3177
                        go func(addr *lnwire.NetAddress) {
×
3178
                                defer wg.Done()
×
3179

×
3180
                                errChan := make(chan error, 1)
×
3181
                                go s.connectToPeer(
×
3182
                                        addr, errChan, s.cfg.ConnectionTimeout,
×
3183
                                )
×
3184

×
3185
                                // We'll only allow this connection attempt to
×
3186
                                // take up to 3 seconds. This allows us to move
×
3187
                                // quickly by discarding peers that are slowing
×
3188
                                // us down.
×
3189
                                select {
×
3190
                                case err := <-errChan:
×
3191
                                        if err == nil {
×
3192
                                                return
×
3193
                                        }
×
3194
                                        srvrLog.Errorf("Unable to connect to "+
×
3195
                                                "%v: %v", addr, err)
×
3196
                                // TODO: tune timeout? 3 seconds might be *too*
3197
                                // aggressive but works well.
3198
                                case <-time.After(3 * time.Second):
×
3199
                                        srvrLog.Tracef("Skipping peer %v due "+
×
3200
                                                "to not establishing a "+
×
3201
                                                "connection within 3 seconds",
×
3202
                                                addr)
×
3203
                                case <-s.quit:
×
3204
                                }
3205
                        }(bootstrapAddr)
3206
                }
3207

3208
                wg.Wait()
×
3209
        }
3210
}
3211

3212
// createNewHiddenService automatically sets up a v2 or v3 onion service in
3213
// order to listen for inbound connections over Tor.
3214
func (s *server) createNewHiddenService() error {
×
3215
        // Determine the different ports the server is listening on. The onion
×
3216
        // service's virtual port will map to these ports and one will be picked
×
3217
        // at random when the onion service is being accessed.
×
3218
        listenPorts := make([]int, 0, len(s.listenAddrs))
×
3219
        for _, listenAddr := range s.listenAddrs {
×
3220
                port := listenAddr.(*net.TCPAddr).Port
×
3221
                listenPorts = append(listenPorts, port)
×
3222
        }
×
3223

3224
        encrypter, err := lnencrypt.KeyRingEncrypter(s.cc.KeyRing)
×
3225
        if err != nil {
×
3226
                return err
×
3227
        }
×
3228

3229
        // Once the port mapping has been set, we can go ahead and automatically
3230
        // create our onion service. The service's private key will be saved to
3231
        // disk in order to regain access to this service when restarting `lnd`.
3232
        onionCfg := tor.AddOnionConfig{
×
3233
                VirtualPort: defaultPeerPort,
×
3234
                TargetPorts: listenPorts,
×
3235
                Store: tor.NewOnionFile(
×
3236
                        s.cfg.Tor.PrivateKeyPath, 0600, s.cfg.Tor.EncryptKey,
×
3237
                        encrypter,
×
3238
                ),
×
3239
        }
×
3240

×
3241
        switch {
×
3242
        case s.cfg.Tor.V2:
×
3243
                onionCfg.Type = tor.V2
×
3244
        case s.cfg.Tor.V3:
×
3245
                onionCfg.Type = tor.V3
×
3246
        }
3247

3248
        addr, err := s.torController.AddOnion(onionCfg)
×
3249
        if err != nil {
×
3250
                return err
×
3251
        }
×
3252

3253
        // Now that the onion service has been created, we'll add the onion
3254
        // address it can be reached at to our list of advertised addresses.
3255
        newNodeAnn, err := s.genNodeAnnouncement(
×
3256
                nil, func(currentAnn *lnwire.NodeAnnouncement) {
×
3257
                        currentAnn.Addresses = append(currentAnn.Addresses, addr)
×
3258
                },
×
3259
        )
3260
        if err != nil {
×
3261
                return fmt.Errorf("unable to generate new node "+
×
3262
                        "announcement: %v", err)
×
3263
        }
×
3264

3265
        // Finally, we'll update the on-disk version of our announcement so it
3266
        // will eventually propagate to nodes in the network.
3267
        selfNode := &models.LightningNode{
×
3268
                HaveNodeAnnouncement: true,
×
3269
                LastUpdate:           time.Unix(int64(newNodeAnn.Timestamp), 0),
×
3270
                Addresses:            newNodeAnn.Addresses,
×
3271
                Alias:                newNodeAnn.Alias.String(),
×
3272
                Features: lnwire.NewFeatureVector(
×
3273
                        newNodeAnn.Features, lnwire.Features,
×
3274
                ),
×
3275
                Color:        newNodeAnn.RGBColor,
×
3276
                AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(),
×
3277
        }
×
3278
        copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
×
3279
        if err := s.graphDB.SetSourceNode(selfNode); err != nil {
×
3280
                return fmt.Errorf("can't set self node: %w", err)
×
3281
        }
×
3282

3283
        return nil
×
3284
}
3285

3286
// findChannel finds a channel given a public key and ChannelID. It is an
3287
// optimization that is quicker than seeking for a channel given only the
3288
// ChannelID.
3289
func (s *server) findChannel(node *btcec.PublicKey, chanID lnwire.ChannelID) (
UNCOV
3290
        *channeldb.OpenChannel, error) {
×
UNCOV
3291

×
UNCOV
3292
        nodeChans, err := s.chanStateDB.FetchOpenChannels(node)
×
UNCOV
3293
        if err != nil {
×
3294
                return nil, err
×
3295
        }
×
3296

UNCOV
3297
        for _, channel := range nodeChans {
×
UNCOV
3298
                if chanID.IsChanPoint(&channel.FundingOutpoint) {
×
UNCOV
3299
                        return channel, nil
×
UNCOV
3300
                }
×
3301
        }
3302

UNCOV
3303
        return nil, fmt.Errorf("unable to find channel")
×
3304
}
3305

3306
// getNodeAnnouncement fetches the current, fully signed node announcement.
UNCOV
3307
func (s *server) getNodeAnnouncement() lnwire.NodeAnnouncement {
×
UNCOV
3308
        s.mu.Lock()
×
UNCOV
3309
        defer s.mu.Unlock()
×
UNCOV
3310

×
UNCOV
3311
        return *s.currentNodeAnn
×
UNCOV
3312
}
×
3313

3314
// genNodeAnnouncement generates and returns the current fully signed node
3315
// announcement. The time stamp of the announcement will be updated in order
3316
// to ensure it propagates through the network.
3317
func (s *server) genNodeAnnouncement(features *lnwire.RawFeatureVector,
UNCOV
3318
        modifiers ...netann.NodeAnnModifier) (lnwire.NodeAnnouncement, error) {
×
UNCOV
3319

×
UNCOV
3320
        s.mu.Lock()
×
UNCOV
3321
        defer s.mu.Unlock()
×
UNCOV
3322

×
UNCOV
3323
        // First, try to update our feature manager with the updated set of
×
UNCOV
3324
        // features.
×
UNCOV
3325
        if features != nil {
×
UNCOV
3326
                proposedFeatures := map[feature.Set]*lnwire.RawFeatureVector{
×
UNCOV
3327
                        feature.SetNodeAnn: features,
×
UNCOV
3328
                }
×
UNCOV
3329
                err := s.featureMgr.UpdateFeatureSets(proposedFeatures)
×
UNCOV
3330
                if err != nil {
×
UNCOV
3331
                        return lnwire.NodeAnnouncement{}, err
×
UNCOV
3332
                }
×
3333

3334
                // If we could successfully update our feature manager, add
3335
                // an update modifier to include these new features to our
3336
                // set.
UNCOV
3337
                modifiers = append(
×
UNCOV
3338
                        modifiers, netann.NodeAnnSetFeatures(features),
×
UNCOV
3339
                )
×
3340
        }
3341

3342
        // Always update the timestamp when refreshing to ensure the update
3343
        // propagates.
UNCOV
3344
        modifiers = append(modifiers, netann.NodeAnnSetTimestamp)
×
UNCOV
3345

×
UNCOV
3346
        // Apply the requested changes to the node announcement.
×
UNCOV
3347
        for _, modifier := range modifiers {
×
UNCOV
3348
                modifier(s.currentNodeAnn)
×
UNCOV
3349
        }
×
3350

3351
        // Sign a new update after applying all of the passed modifiers.
UNCOV
3352
        err := netann.SignNodeAnnouncement(
×
UNCOV
3353
                s.nodeSigner, s.identityKeyLoc, s.currentNodeAnn,
×
UNCOV
3354
        )
×
UNCOV
3355
        if err != nil {
×
3356
                return lnwire.NodeAnnouncement{}, err
×
3357
        }
×
3358

UNCOV
3359
        return *s.currentNodeAnn, nil
×
3360
}
3361

3362
// updateAndBroadcastSelfNode generates a new node announcement
3363
// applying the giving modifiers and updating the time stamp
3364
// to ensure it propagates through the network. Then it broadcasts
3365
// it to the network.
3366
func (s *server) updateAndBroadcastSelfNode(features *lnwire.RawFeatureVector,
UNCOV
3367
        modifiers ...netann.NodeAnnModifier) error {
×
UNCOV
3368

×
UNCOV
3369
        newNodeAnn, err := s.genNodeAnnouncement(features, modifiers...)
×
UNCOV
3370
        if err != nil {
×
UNCOV
3371
                return fmt.Errorf("unable to generate new node "+
×
UNCOV
3372
                        "announcement: %v", err)
×
UNCOV
3373
        }
×
3374

3375
        // Update the on-disk version of our announcement.
3376
        // Load and modify self node istead of creating anew instance so we
3377
        // don't risk overwriting any existing values.
UNCOV
3378
        selfNode, err := s.graphDB.SourceNode()
×
UNCOV
3379
        if err != nil {
×
3380
                return fmt.Errorf("unable to get current source node: %w", err)
×
3381
        }
×
3382

UNCOV
3383
        selfNode.HaveNodeAnnouncement = true
×
UNCOV
3384
        selfNode.LastUpdate = time.Unix(int64(newNodeAnn.Timestamp), 0)
×
UNCOV
3385
        selfNode.Addresses = newNodeAnn.Addresses
×
UNCOV
3386
        selfNode.Alias = newNodeAnn.Alias.String()
×
UNCOV
3387
        selfNode.Features = s.featureMgr.Get(feature.SetNodeAnn)
×
UNCOV
3388
        selfNode.Color = newNodeAnn.RGBColor
×
UNCOV
3389
        selfNode.AuthSigBytes = newNodeAnn.Signature.ToSignatureBytes()
×
UNCOV
3390

×
UNCOV
3391
        copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
×
UNCOV
3392

×
UNCOV
3393
        if err := s.graphDB.SetSourceNode(selfNode); err != nil {
×
3394
                return fmt.Errorf("can't set self node: %w", err)
×
3395
        }
×
3396

3397
        // Finally, propagate it to the nodes in the network.
UNCOV
3398
        err = s.BroadcastMessage(nil, &newNodeAnn)
×
UNCOV
3399
        if err != nil {
×
3400
                rpcsLog.Debugf("Unable to broadcast new node "+
×
3401
                        "announcement to peers: %v", err)
×
3402
                return err
×
3403
        }
×
3404

UNCOV
3405
        return nil
×
3406
}
3407

3408
type nodeAddresses struct {
3409
        pubKey    *btcec.PublicKey
3410
        addresses []net.Addr
3411
}
3412

3413
// establishPersistentConnections attempts to establish persistent connections
3414
// to all our direct channel collaborators. In order to promote liveness of our
3415
// active channels, we instruct the connection manager to attempt to establish
3416
// and maintain persistent connections to all our direct channel counterparties.
UNCOV
3417
func (s *server) establishPersistentConnections() error {
×
UNCOV
3418
        // nodeAddrsMap stores the combination of node public keys and addresses
×
UNCOV
3419
        // that we'll attempt to reconnect to. PubKey strings are used as keys
×
UNCOV
3420
        // since other PubKey forms can't be compared.
×
UNCOV
3421
        nodeAddrsMap := map[string]*nodeAddresses{}
×
UNCOV
3422

×
UNCOV
3423
        // Iterate through the list of LinkNodes to find addresses we should
×
UNCOV
3424
        // attempt to connect to based on our set of previous connections. Set
×
UNCOV
3425
        // the reconnection port to the default peer port.
×
UNCOV
3426
        linkNodes, err := s.chanStateDB.LinkNodeDB().FetchAllLinkNodes()
×
UNCOV
3427
        if err != nil && err != channeldb.ErrLinkNodesNotFound {
×
3428
                return err
×
3429
        }
×
UNCOV
3430
        for _, node := range linkNodes {
×
UNCOV
3431
                pubStr := string(node.IdentityPub.SerializeCompressed())
×
UNCOV
3432
                nodeAddrs := &nodeAddresses{
×
UNCOV
3433
                        pubKey:    node.IdentityPub,
×
UNCOV
3434
                        addresses: node.Addresses,
×
UNCOV
3435
                }
×
UNCOV
3436
                nodeAddrsMap[pubStr] = nodeAddrs
×
UNCOV
3437
        }
×
3438

3439
        // After checking our previous connections for addresses to connect to,
3440
        // iterate through the nodes in our channel graph to find addresses
3441
        // that have been added via NodeAnnouncement messages.
UNCOV
3442
        sourceNode, err := s.graphDB.SourceNode()
×
UNCOV
3443
        if err != nil {
×
3444
                return err
×
3445
        }
×
3446

3447
        // TODO(roasbeef): instead iterate over link nodes and query graph for
3448
        // each of the nodes.
UNCOV
3449
        selfPub := s.identityECDH.PubKey().SerializeCompressed()
×
UNCOV
3450
        err = s.graphDB.ForEachNodeChannel(sourceNode.PubKeyBytes, func(
×
UNCOV
3451
                tx kvdb.RTx,
×
UNCOV
3452
                chanInfo *models.ChannelEdgeInfo,
×
UNCOV
3453
                policy, _ *models.ChannelEdgePolicy) error {
×
UNCOV
3454

×
UNCOV
3455
                // If the remote party has announced the channel to us, but we
×
UNCOV
3456
                // haven't yet, then we won't have a policy. However, we don't
×
UNCOV
3457
                // need this to connect to the peer, so we'll log it and move on.
×
UNCOV
3458
                if policy == nil {
×
3459
                        srvrLog.Warnf("No channel policy found for "+
×
3460
                                "ChannelPoint(%v): ", chanInfo.ChannelPoint)
×
3461
                }
×
3462

3463
                // We'll now fetch the peer opposite from us within this
3464
                // channel so we can queue up a direct connection to them.
UNCOV
3465
                channelPeer, err := s.graphDB.FetchOtherNode(
×
UNCOV
3466
                        tx, chanInfo, selfPub,
×
UNCOV
3467
                )
×
UNCOV
3468
                if err != nil {
×
3469
                        return fmt.Errorf("unable to fetch channel peer for "+
×
3470
                                "ChannelPoint(%v): %v", chanInfo.ChannelPoint,
×
3471
                                err)
×
3472
                }
×
3473

UNCOV
3474
                pubStr := string(channelPeer.PubKeyBytes[:])
×
UNCOV
3475

×
UNCOV
3476
                // Add all unique addresses from channel
×
UNCOV
3477
                // graph/NodeAnnouncements to the list of addresses we'll
×
UNCOV
3478
                // connect to for this peer.
×
UNCOV
3479
                addrSet := make(map[string]net.Addr)
×
UNCOV
3480
                for _, addr := range channelPeer.Addresses {
×
UNCOV
3481
                        switch addr.(type) {
×
UNCOV
3482
                        case *net.TCPAddr:
×
UNCOV
3483
                                addrSet[addr.String()] = addr
×
3484

3485
                        // We'll only attempt to connect to Tor addresses if Tor
3486
                        // outbound support is enabled.
3487
                        case *tor.OnionAddr:
×
3488
                                if s.cfg.Tor.Active {
×
3489
                                        addrSet[addr.String()] = addr
×
3490
                                }
×
3491
                        }
3492
                }
3493

3494
                // If this peer is also recorded as a link node, we'll add any
3495
                // additional addresses that have not already been selected.
UNCOV
3496
                linkNodeAddrs, ok := nodeAddrsMap[pubStr]
×
UNCOV
3497
                if ok {
×
UNCOV
3498
                        for _, lnAddress := range linkNodeAddrs.addresses {
×
UNCOV
3499
                                switch lnAddress.(type) {
×
UNCOV
3500
                                case *net.TCPAddr:
×
UNCOV
3501
                                        addrSet[lnAddress.String()] = lnAddress
×
3502

3503
                                // We'll only attempt to connect to Tor
3504
                                // addresses if Tor outbound support is enabled.
3505
                                case *tor.OnionAddr:
×
3506
                                        if s.cfg.Tor.Active {
×
3507
                                                addrSet[lnAddress.String()] = lnAddress
×
3508
                                        }
×
3509
                                }
3510
                        }
3511
                }
3512

3513
                // Construct a slice of the deduped addresses.
UNCOV
3514
                var addrs []net.Addr
×
UNCOV
3515
                for _, addr := range addrSet {
×
UNCOV
3516
                        addrs = append(addrs, addr)
×
UNCOV
3517
                }
×
3518

UNCOV
3519
                n := &nodeAddresses{
×
UNCOV
3520
                        addresses: addrs,
×
UNCOV
3521
                }
×
UNCOV
3522
                n.pubKey, err = channelPeer.PubKey()
×
UNCOV
3523
                if err != nil {
×
3524
                        return err
×
3525
                }
×
3526

UNCOV
3527
                nodeAddrsMap[pubStr] = n
×
UNCOV
3528
                return nil
×
3529
        })
UNCOV
3530
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
×
3531
                return err
×
3532
        }
×
3533

UNCOV
3534
        srvrLog.Debugf("Establishing %v persistent connections on start",
×
UNCOV
3535
                len(nodeAddrsMap))
×
UNCOV
3536

×
UNCOV
3537
        // Acquire and hold server lock until all persistent connection requests
×
UNCOV
3538
        // have been recorded and sent to the connection manager.
×
UNCOV
3539
        s.mu.Lock()
×
UNCOV
3540
        defer s.mu.Unlock()
×
UNCOV
3541

×
UNCOV
3542
        // Iterate through the combined list of addresses from prior links and
×
UNCOV
3543
        // node announcements and attempt to reconnect to each node.
×
UNCOV
3544
        var numOutboundConns int
×
UNCOV
3545
        for pubStr, nodeAddr := range nodeAddrsMap {
×
UNCOV
3546
                // Add this peer to the set of peers we should maintain a
×
UNCOV
3547
                // persistent connection with. We set the value to false to
×
UNCOV
3548
                // indicate that we should not continue to reconnect if the
×
UNCOV
3549
                // number of channels returns to zero, since this peer has not
×
UNCOV
3550
                // been requested as perm by the user.
×
UNCOV
3551
                s.persistentPeers[pubStr] = false
×
UNCOV
3552
                if _, ok := s.persistentPeersBackoff[pubStr]; !ok {
×
UNCOV
3553
                        s.persistentPeersBackoff[pubStr] = s.cfg.MinBackoff
×
UNCOV
3554
                }
×
3555

UNCOV
3556
                for _, address := range nodeAddr.addresses {
×
UNCOV
3557
                        // Create a wrapper address which couples the IP and
×
UNCOV
3558
                        // the pubkey so the brontide authenticated connection
×
UNCOV
3559
                        // can be established.
×
UNCOV
3560
                        lnAddr := &lnwire.NetAddress{
×
UNCOV
3561
                                IdentityKey: nodeAddr.pubKey,
×
UNCOV
3562
                                Address:     address,
×
UNCOV
3563
                        }
×
UNCOV
3564

×
UNCOV
3565
                        s.persistentPeerAddrs[pubStr] = append(
×
UNCOV
3566
                                s.persistentPeerAddrs[pubStr], lnAddr)
×
UNCOV
3567
                }
×
3568

3569
                // We'll connect to the first 10 peers immediately, then
3570
                // randomly stagger any remaining connections if the
3571
                // stagger initial reconnect flag is set. This ensures
3572
                // that mobile nodes or nodes with a small number of
3573
                // channels obtain connectivity quickly, but larger
3574
                // nodes are able to disperse the costs of connecting to
3575
                // all peers at once.
UNCOV
3576
                if numOutboundConns < numInstantInitReconnect ||
×
UNCOV
3577
                        !s.cfg.StaggerInitialReconnect {
×
UNCOV
3578

×
UNCOV
3579
                        go s.connectToPersistentPeer(pubStr)
×
UNCOV
3580
                } else {
×
3581
                        go s.delayInitialReconnect(pubStr)
×
3582
                }
×
3583

UNCOV
3584
                numOutboundConns++
×
3585
        }
3586

UNCOV
3587
        return nil
×
3588
}
3589

3590
// delayInitialReconnect will attempt a reconnection to the given peer after
3591
// sampling a value for the delay between 0s and the maxInitReconnectDelay.
3592
//
3593
// NOTE: This method MUST be run as a goroutine.
3594
func (s *server) delayInitialReconnect(pubStr string) {
×
3595
        delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second
×
3596
        select {
×
3597
        case <-time.After(delay):
×
3598
                s.connectToPersistentPeer(pubStr)
×
3599
        case <-s.quit:
×
3600
        }
3601
}
3602

3603
// prunePersistentPeerConnection removes all internal state related to
3604
// persistent connections to a peer within the server. This is used to avoid
3605
// persistent connection retries to peers we do not have any open channels with.
UNCOV
3606
func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
×
UNCOV
3607
        pubKeyStr := string(compressedPubKey[:])
×
UNCOV
3608

×
UNCOV
3609
        s.mu.Lock()
×
UNCOV
3610
        if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm {
×
UNCOV
3611
                delete(s.persistentPeers, pubKeyStr)
×
UNCOV
3612
                delete(s.persistentPeersBackoff, pubKeyStr)
×
UNCOV
3613
                delete(s.persistentPeerAddrs, pubKeyStr)
×
UNCOV
3614
                s.cancelConnReqs(pubKeyStr, nil)
×
UNCOV
3615
                s.mu.Unlock()
×
UNCOV
3616

×
UNCOV
3617
                srvrLog.Infof("Pruned peer %x from persistent connections, "+
×
UNCOV
3618
                        "peer has no open channels", compressedPubKey)
×
UNCOV
3619

×
UNCOV
3620
                return
×
UNCOV
3621
        }
×
UNCOV
3622
        s.mu.Unlock()
×
3623
}
3624

3625
// BroadcastMessage sends a request to the server to broadcast a set of
3626
// messages to all peers other than the one specified by the `skips` parameter.
3627
// All messages sent via BroadcastMessage will be queued for lazy delivery to
3628
// the target peers.
3629
//
3630
// NOTE: This function is safe for concurrent access.
3631
func (s *server) BroadcastMessage(skips map[route.Vertex]struct{},
UNCOV
3632
        msgs ...lnwire.Message) error {
×
UNCOV
3633

×
UNCOV
3634
        // Filter out peers found in the skips map. We synchronize access to
×
UNCOV
3635
        // peersByPub throughout this process to ensure we deliver messages to
×
UNCOV
3636
        // exact set of peers present at the time of invocation.
×
UNCOV
3637
        s.mu.RLock()
×
UNCOV
3638
        peers := make([]*peer.Brontide, 0, len(s.peersByPub))
×
UNCOV
3639
        for pubStr, sPeer := range s.peersByPub {
×
UNCOV
3640
                if skips != nil {
×
UNCOV
3641
                        if _, ok := skips[sPeer.PubKey()]; ok {
×
UNCOV
3642
                                srvrLog.Tracef("Skipping %x in broadcast with "+
×
UNCOV
3643
                                        "pubStr=%x", sPeer.PubKey(), pubStr)
×
UNCOV
3644
                                continue
×
3645
                        }
3646
                }
3647

UNCOV
3648
                peers = append(peers, sPeer)
×
3649
        }
UNCOV
3650
        s.mu.RUnlock()
×
UNCOV
3651

×
UNCOV
3652
        // Iterate over all known peers, dispatching a go routine to enqueue
×
UNCOV
3653
        // all messages to each of peers.
×
UNCOV
3654
        var wg sync.WaitGroup
×
UNCOV
3655
        for _, sPeer := range peers {
×
UNCOV
3656
                srvrLog.Debugf("Sending %v messages to peer %x", len(msgs),
×
UNCOV
3657
                        sPeer.PubKey())
×
UNCOV
3658

×
UNCOV
3659
                // Dispatch a go routine to enqueue all messages to this peer.
×
UNCOV
3660
                wg.Add(1)
×
UNCOV
3661
                s.wg.Add(1)
×
UNCOV
3662
                go func(p lnpeer.Peer) {
×
UNCOV
3663
                        defer s.wg.Done()
×
UNCOV
3664
                        defer wg.Done()
×
UNCOV
3665

×
UNCOV
3666
                        p.SendMessageLazy(false, msgs...)
×
UNCOV
3667
                }(sPeer)
×
3668
        }
3669

3670
        // Wait for all messages to have been dispatched before returning to
3671
        // caller.
UNCOV
3672
        wg.Wait()
×
UNCOV
3673

×
UNCOV
3674
        return nil
×
3675
}
3676

3677
// NotifyWhenOnline can be called by other subsystems to get notified when a
3678
// particular peer comes online. The peer itself is sent across the peerChan.
3679
//
3680
// NOTE: This function is safe for concurrent access.
3681
func (s *server) NotifyWhenOnline(peerKey [33]byte,
UNCOV
3682
        peerChan chan<- lnpeer.Peer) {
×
UNCOV
3683

×
UNCOV
3684
        s.mu.Lock()
×
UNCOV
3685

×
UNCOV
3686
        // Compute the target peer's identifier.
×
UNCOV
3687
        pubStr := string(peerKey[:])
×
UNCOV
3688

×
UNCOV
3689
        // Check if peer is connected.
×
UNCOV
3690
        peer, ok := s.peersByPub[pubStr]
×
UNCOV
3691
        if ok {
×
UNCOV
3692
                // Unlock here so that the mutex isn't held while we are
×
UNCOV
3693
                // waiting for the peer to become active.
×
UNCOV
3694
                s.mu.Unlock()
×
UNCOV
3695

×
UNCOV
3696
                // Wait until the peer signals that it is actually active
×
UNCOV
3697
                // rather than only in the server's maps.
×
UNCOV
3698
                select {
×
UNCOV
3699
                case <-peer.ActiveSignal():
×
3700
                case <-peer.QuitSignal():
×
3701
                        // The peer quit, so we'll add the channel to the slice
×
3702
                        // and return.
×
3703
                        s.mu.Lock()
×
3704
                        s.peerConnectedListeners[pubStr] = append(
×
3705
                                s.peerConnectedListeners[pubStr], peerChan,
×
3706
                        )
×
3707
                        s.mu.Unlock()
×
3708
                        return
×
3709
                }
3710

3711
                // Connected, can return early.
UNCOV
3712
                srvrLog.Debugf("Notifying that peer %x is online", peerKey)
×
UNCOV
3713

×
UNCOV
3714
                select {
×
UNCOV
3715
                case peerChan <- peer:
×
3716
                case <-s.quit:
×
3717
                }
3718

UNCOV
3719
                return
×
3720
        }
3721

3722
        // Not connected, store this listener such that it can be notified when
3723
        // the peer comes online.
UNCOV
3724
        s.peerConnectedListeners[pubStr] = append(
×
UNCOV
3725
                s.peerConnectedListeners[pubStr], peerChan,
×
UNCOV
3726
        )
×
UNCOV
3727
        s.mu.Unlock()
×
3728
}
3729

3730
// NotifyWhenOffline delivers a notification to the caller of when the peer with
3731
// the given public key has been disconnected. The notification is signaled by
3732
// closing the channel returned.
UNCOV
3733
func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
×
UNCOV
3734
        s.mu.Lock()
×
UNCOV
3735
        defer s.mu.Unlock()
×
UNCOV
3736

×
UNCOV
3737
        c := make(chan struct{})
×
UNCOV
3738

×
UNCOV
3739
        // If the peer is already offline, we can immediately trigger the
×
UNCOV
3740
        // notification.
×
UNCOV
3741
        peerPubKeyStr := string(peerPubKey[:])
×
UNCOV
3742
        if _, ok := s.peersByPub[peerPubKeyStr]; !ok {
×
3743
                srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey)
×
3744
                close(c)
×
3745
                return c
×
3746
        }
×
3747

3748
        // Otherwise, the peer is online, so we'll keep track of the channel to
3749
        // trigger the notification once the server detects the peer
3750
        // disconnects.
UNCOV
3751
        s.peerDisconnectedListeners[peerPubKeyStr] = append(
×
UNCOV
3752
                s.peerDisconnectedListeners[peerPubKeyStr], c,
×
UNCOV
3753
        )
×
UNCOV
3754

×
UNCOV
3755
        return c
×
3756
}
3757

3758
// FindPeer will return the peer that corresponds to the passed in public key.
3759
// This function is used by the funding manager, allowing it to update the
3760
// daemon's local representation of the remote peer.
3761
//
3762
// NOTE: This function is safe for concurrent access.
UNCOV
3763
func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer.Brontide, error) {
×
UNCOV
3764
        s.mu.RLock()
×
UNCOV
3765
        defer s.mu.RUnlock()
×
UNCOV
3766

×
UNCOV
3767
        pubStr := string(peerKey.SerializeCompressed())
×
UNCOV
3768

×
UNCOV
3769
        return s.findPeerByPubStr(pubStr)
×
UNCOV
3770
}
×
3771

3772
// FindPeerByPubStr will return the peer that corresponds to the passed peerID,
3773
// which should be a string representation of the peer's serialized, compressed
3774
// public key.
3775
//
3776
// NOTE: This function is safe for concurrent access.
UNCOV
3777
func (s *server) FindPeerByPubStr(pubStr string) (*peer.Brontide, error) {
×
UNCOV
3778
        s.mu.RLock()
×
UNCOV
3779
        defer s.mu.RUnlock()
×
UNCOV
3780

×
UNCOV
3781
        return s.findPeerByPubStr(pubStr)
×
UNCOV
3782
}
×
3783

3784
// findPeerByPubStr is an internal method that retrieves the specified peer from
3785
// the server's internal state using.
UNCOV
3786
func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) {
×
UNCOV
3787
        peer, ok := s.peersByPub[pubStr]
×
UNCOV
3788
        if !ok {
×
UNCOV
3789
                return nil, ErrPeerNotConnected
×
UNCOV
3790
        }
×
3791

UNCOV
3792
        return peer, nil
×
3793
}
3794

3795
// nextPeerBackoff computes the next backoff duration for a peer's pubkey using
3796
// exponential backoff. If no previous backoff was known, the default is
3797
// returned.
3798
func (s *server) nextPeerBackoff(pubStr string,
UNCOV
3799
        startTime time.Time) time.Duration {
×
UNCOV
3800

×
UNCOV
3801
        // Now, determine the appropriate backoff to use for the retry.
×
UNCOV
3802
        backoff, ok := s.persistentPeersBackoff[pubStr]
×
UNCOV
3803
        if !ok {
×
UNCOV
3804
                // If an existing backoff was unknown, use the default.
×
UNCOV
3805
                return s.cfg.MinBackoff
×
UNCOV
3806
        }
×
3807

3808
        // If the peer failed to start properly, we'll just use the previous
3809
        // backoff to compute the subsequent randomized exponential backoff
3810
        // duration. This will roughly double on average.
UNCOV
3811
        if startTime.IsZero() {
×
3812
                return computeNextBackoff(backoff, s.cfg.MaxBackoff)
×
3813
        }
×
3814

3815
        // The peer succeeded in starting. If the connection didn't last long
3816
        // enough to be considered stable, we'll continue to back off retries
3817
        // with this peer.
UNCOV
3818
        connDuration := time.Since(startTime)
×
UNCOV
3819
        if connDuration < defaultStableConnDuration {
×
UNCOV
3820
                return computeNextBackoff(backoff, s.cfg.MaxBackoff)
×
UNCOV
3821
        }
×
3822

3823
        // The peer succeed in starting and this was stable peer, so we'll
3824
        // reduce the timeout duration by the length of the connection after
3825
        // applying randomized exponential backoff. We'll only apply this in the
3826
        // case that:
3827
        //   reb(curBackoff) - connDuration > cfg.MinBackoff
3828
        relaxedBackoff := computeNextBackoff(backoff, s.cfg.MaxBackoff) - connDuration
×
3829
        if relaxedBackoff > s.cfg.MinBackoff {
×
3830
                return relaxedBackoff
×
3831
        }
×
3832

3833
        // Lastly, if reb(currBackoff) - connDuration <= cfg.MinBackoff, meaning
3834
        // the stable connection lasted much longer than our previous backoff.
3835
        // To reward such good behavior, we'll reconnect after the default
3836
        // timeout.
3837
        return s.cfg.MinBackoff
×
3838
}
3839

3840
// shouldDropLocalConnection determines if our local connection to a remote peer
3841
// should be dropped in the case of concurrent connection establishment. In
3842
// order to deterministically decide which connection should be dropped, we'll
3843
// utilize the ordering of the local and remote public key. If we didn't use
3844
// such a tie breaker, then we risk _both_ connections erroneously being
3845
// dropped.
3846
func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool {
×
3847
        localPubBytes := local.SerializeCompressed()
×
3848
        remotePubPbytes := remote.SerializeCompressed()
×
3849

×
3850
        // The connection that comes from the node with a "smaller" pubkey
×
3851
        // should be kept. Therefore, if our pubkey is "greater" than theirs, we
×
3852
        // should drop our established connection.
×
3853
        return bytes.Compare(localPubBytes, remotePubPbytes) > 0
×
3854
}
×
3855

3856
// InboundPeerConnected initializes a new peer in response to a new inbound
3857
// connection.
3858
//
3859
// NOTE: This function is safe for concurrent access.
UNCOV
3860
func (s *server) InboundPeerConnected(conn net.Conn) {
×
UNCOV
3861
        // Exit early if we have already been instructed to shutdown, this
×
UNCOV
3862
        // prevents any delayed callbacks from accidentally registering peers.
×
UNCOV
3863
        if s.Stopped() {
×
3864
                return
×
3865
        }
×
3866

UNCOV
3867
        nodePub := conn.(*brontide.Conn).RemotePub()
×
UNCOV
3868
        pubSer := nodePub.SerializeCompressed()
×
UNCOV
3869
        pubStr := string(pubSer)
×
UNCOV
3870

×
UNCOV
3871
        var pubBytes [33]byte
×
UNCOV
3872
        copy(pubBytes[:], pubSer)
×
UNCOV
3873

×
UNCOV
3874
        s.mu.Lock()
×
UNCOV
3875
        defer s.mu.Unlock()
×
UNCOV
3876

×
UNCOV
3877
        // If the remote node's public key is banned, drop the connection.
×
UNCOV
3878
        shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
×
UNCOV
3879
        if dcErr != nil {
×
3880
                srvrLog.Errorf("Unable to check if we should disconnect "+
×
3881
                        "peer: %v", dcErr)
×
3882
                conn.Close()
×
3883

×
3884
                return
×
3885
        }
×
3886

UNCOV
3887
        if shouldDc {
×
3888
                srvrLog.Debugf("Dropping connection for %v since they are "+
×
3889
                        "banned.", pubSer)
×
3890

×
3891
                conn.Close()
×
3892

×
3893
                return
×
3894
        }
×
3895

3896
        // If we already have an outbound connection to this peer, then ignore
3897
        // this new connection.
UNCOV
3898
        if p, ok := s.outboundPeers[pubStr]; ok {
×
UNCOV
3899
                srvrLog.Debugf("Already have outbound connection for %v, "+
×
UNCOV
3900
                        "ignoring inbound connection from local=%v, remote=%v",
×
UNCOV
3901
                        p, conn.LocalAddr(), conn.RemoteAddr())
×
UNCOV
3902

×
UNCOV
3903
                conn.Close()
×
UNCOV
3904
                return
×
UNCOV
3905
        }
×
3906

3907
        // If we already have a valid connection that is scheduled to take
3908
        // precedence once the prior peer has finished disconnecting, we'll
3909
        // ignore this connection.
UNCOV
3910
        if p, ok := s.scheduledPeerConnection[pubStr]; ok {
×
3911
                srvrLog.Debugf("Ignoring connection from %v, peer %v already "+
×
3912
                        "scheduled", conn.RemoteAddr(), p)
×
3913
                conn.Close()
×
3914
                return
×
3915
        }
×
3916

UNCOV
3917
        srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
×
UNCOV
3918

×
UNCOV
3919
        // Check to see if we already have a connection with this peer. If so,
×
UNCOV
3920
        // we may need to drop our existing connection. This prevents us from
×
UNCOV
3921
        // having duplicate connections to the same peer. We forgo adding a
×
UNCOV
3922
        // default case as we expect these to be the only error values returned
×
UNCOV
3923
        // from findPeerByPubStr.
×
UNCOV
3924
        connectedPeer, err := s.findPeerByPubStr(pubStr)
×
UNCOV
3925
        switch err {
×
UNCOV
3926
        case ErrPeerNotConnected:
×
UNCOV
3927
                // We were unable to locate an existing connection with the
×
UNCOV
3928
                // target peer, proceed to connect.
×
UNCOV
3929
                s.cancelConnReqs(pubStr, nil)
×
UNCOV
3930
                s.peerConnected(conn, nil, true)
×
3931

3932
        case nil:
×
3933
                // We already have a connection with the incoming peer. If the
×
3934
                // connection we've already established should be kept and is
×
3935
                // not of the same type of the new connection (inbound), then
×
3936
                // we'll close out the new connection s.t there's only a single
×
3937
                // connection between us.
×
3938
                localPub := s.identityECDH.PubKey()
×
3939
                if !connectedPeer.Inbound() &&
×
3940
                        !shouldDropLocalConnection(localPub, nodePub) {
×
3941

×
3942
                        srvrLog.Warnf("Received inbound connection from "+
×
3943
                                "peer %v, but already have outbound "+
×
3944
                                "connection, dropping conn", connectedPeer)
×
3945
                        conn.Close()
×
3946
                        return
×
3947
                }
×
3948

3949
                // Otherwise, if we should drop the connection, then we'll
3950
                // disconnect our already connected peer.
3951
                srvrLog.Debugf("Disconnecting stale connection to %v",
×
3952
                        connectedPeer)
×
3953

×
3954
                s.cancelConnReqs(pubStr, nil)
×
3955

×
3956
                // Remove the current peer from the server's internal state and
×
3957
                // signal that the peer termination watcher does not need to
×
3958
                // execute for this peer.
×
3959
                s.removePeer(connectedPeer)
×
3960
                s.ignorePeerTermination[connectedPeer] = struct{}{}
×
3961
                s.scheduledPeerConnection[pubStr] = func() {
×
3962
                        s.peerConnected(conn, nil, true)
×
3963
                }
×
3964
        }
3965
}
3966

3967
// OutboundPeerConnected initializes a new peer in response to a new outbound
3968
// connection.
3969
// NOTE: This function is safe for concurrent access.
UNCOV
3970
func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
×
UNCOV
3971
        // Exit early if we have already been instructed to shutdown, this
×
UNCOV
3972
        // prevents any delayed callbacks from accidentally registering peers.
×
UNCOV
3973
        if s.Stopped() {
×
3974
                return
×
3975
        }
×
3976

UNCOV
3977
        nodePub := conn.(*brontide.Conn).RemotePub()
×
UNCOV
3978
        pubSer := nodePub.SerializeCompressed()
×
UNCOV
3979
        pubStr := string(pubSer)
×
UNCOV
3980

×
UNCOV
3981
        var pubBytes [33]byte
×
UNCOV
3982
        copy(pubBytes[:], pubSer)
×
UNCOV
3983

×
UNCOV
3984
        s.mu.Lock()
×
UNCOV
3985
        defer s.mu.Unlock()
×
UNCOV
3986

×
UNCOV
3987
        // If the remote node's public key is banned, drop the connection.
×
UNCOV
3988
        shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
×
UNCOV
3989
        if dcErr != nil {
×
3990
                srvrLog.Errorf("Unable to check if we should disconnect "+
×
3991
                        "peer: %v", dcErr)
×
3992
                conn.Close()
×
3993

×
3994
                return
×
3995
        }
×
3996

UNCOV
3997
        if shouldDc {
×
3998
                srvrLog.Debugf("Dropping connection for %v since they are "+
×
3999
                        "banned.", pubSer)
×
4000

×
4001
                if connReq != nil {
×
4002
                        s.connMgr.Remove(connReq.ID())
×
4003
                }
×
4004

4005
                conn.Close()
×
4006

×
4007
                return
×
4008
        }
4009

4010
        // If we already have an inbound connection to this peer, then ignore
4011
        // this new connection.
UNCOV
4012
        if p, ok := s.inboundPeers[pubStr]; ok {
×
UNCOV
4013
                srvrLog.Debugf("Already have inbound connection for %v, "+
×
UNCOV
4014
                        "ignoring outbound connection from local=%v, remote=%v",
×
UNCOV
4015
                        p, conn.LocalAddr(), conn.RemoteAddr())
×
UNCOV
4016

×
UNCOV
4017
                if connReq != nil {
×
UNCOV
4018
                        s.connMgr.Remove(connReq.ID())
×
UNCOV
4019
                }
×
UNCOV
4020
                conn.Close()
×
UNCOV
4021
                return
×
4022
        }
UNCOV
4023
        if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
×
4024
                srvrLog.Debugf("Ignoring canceled outbound connection")
×
4025
                s.connMgr.Remove(connReq.ID())
×
4026
                conn.Close()
×
4027
                return
×
4028
        }
×
4029

4030
        // If we already have a valid connection that is scheduled to take
4031
        // precedence once the prior peer has finished disconnecting, we'll
4032
        // ignore this connection.
UNCOV
4033
        if _, ok := s.scheduledPeerConnection[pubStr]; ok {
×
4034
                srvrLog.Debugf("Ignoring connection, peer already scheduled")
×
4035

×
4036
                if connReq != nil {
×
4037
                        s.connMgr.Remove(connReq.ID())
×
4038
                }
×
4039

4040
                conn.Close()
×
4041
                return
×
4042
        }
4043

UNCOV
4044
        srvrLog.Infof("Established connection to: %x@%v", pubStr,
×
UNCOV
4045
                conn.RemoteAddr())
×
UNCOV
4046

×
UNCOV
4047
        if connReq != nil {
×
UNCOV
4048
                // A successful connection was returned by the connmgr.
×
UNCOV
4049
                // Immediately cancel all pending requests, excluding the
×
UNCOV
4050
                // outbound connection we just established.
×
UNCOV
4051
                ignore := connReq.ID()
×
UNCOV
4052
                s.cancelConnReqs(pubStr, &ignore)
×
UNCOV
4053
        } else {
×
UNCOV
4054
                // This was a successful connection made by some other
×
UNCOV
4055
                // subsystem. Remove all requests being managed by the connmgr.
×
UNCOV
4056
                s.cancelConnReqs(pubStr, nil)
×
UNCOV
4057
        }
×
4058

4059
        // If we already have a connection with this peer, decide whether or not
4060
        // we need to drop the stale connection. We forgo adding a default case
4061
        // as we expect these to be the only error values returned from
4062
        // findPeerByPubStr.
UNCOV
4063
        connectedPeer, err := s.findPeerByPubStr(pubStr)
×
UNCOV
4064
        switch err {
×
UNCOV
4065
        case ErrPeerNotConnected:
×
UNCOV
4066
                // We were unable to locate an existing connection with the
×
UNCOV
4067
                // target peer, proceed to connect.
×
UNCOV
4068
                s.peerConnected(conn, connReq, false)
×
4069

4070
        case nil:
×
4071
                // We already have a connection with the incoming peer. If the
×
4072
                // connection we've already established should be kept and is
×
4073
                // not of the same type of the new connection (outbound), then
×
4074
                // we'll close out the new connection s.t there's only a single
×
4075
                // connection between us.
×
4076
                localPub := s.identityECDH.PubKey()
×
4077
                if connectedPeer.Inbound() &&
×
4078
                        shouldDropLocalConnection(localPub, nodePub) {
×
4079

×
4080
                        srvrLog.Warnf("Established outbound connection to "+
×
4081
                                "peer %v, but already have inbound "+
×
4082
                                "connection, dropping conn", connectedPeer)
×
4083
                        if connReq != nil {
×
4084
                                s.connMgr.Remove(connReq.ID())
×
4085
                        }
×
4086
                        conn.Close()
×
4087
                        return
×
4088
                }
4089

4090
                // Otherwise, _their_ connection should be dropped. So we'll
4091
                // disconnect the peer and send the now obsolete peer to the
4092
                // server for garbage collection.
4093
                srvrLog.Debugf("Disconnecting stale connection to %v",
×
4094
                        connectedPeer)
×
4095

×
4096
                // Remove the current peer from the server's internal state and
×
4097
                // signal that the peer termination watcher does not need to
×
4098
                // execute for this peer.
×
4099
                s.removePeer(connectedPeer)
×
4100
                s.ignorePeerTermination[connectedPeer] = struct{}{}
×
4101
                s.scheduledPeerConnection[pubStr] = func() {
×
4102
                        s.peerConnected(conn, connReq, false)
×
4103
                }
×
4104
        }
4105
}
4106

4107
// UnassignedConnID is the default connection ID that a request can have before
4108
// it actually is submitted to the connmgr.
4109
// TODO(conner): move into connmgr package, or better, add connmgr method for
4110
// generating atomic IDs
4111
const UnassignedConnID uint64 = 0
4112

4113
// cancelConnReqs stops all persistent connection requests for a given pubkey.
4114
// Any attempts initiated by the peerTerminationWatcher are canceled first.
4115
// Afterwards, each connection request removed from the connmgr. The caller can
4116
// optionally specify a connection ID to ignore, which prevents us from
4117
// canceling a successful request. All persistent connreqs for the provided
4118
// pubkey are discarded after the operationjw.
UNCOV
4119
func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
×
UNCOV
4120
        // First, cancel any lingering persistent retry attempts, which will
×
UNCOV
4121
        // prevent retries for any with backoffs that are still maturing.
×
UNCOV
4122
        if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok {
×
UNCOV
4123
                close(cancelChan)
×
UNCOV
4124
                delete(s.persistentRetryCancels, pubStr)
×
UNCOV
4125
        }
×
4126

4127
        // Next, check to see if we have any outstanding persistent connection
4128
        // requests to this peer. If so, then we'll remove all of these
4129
        // connection requests, and also delete the entry from the map.
UNCOV
4130
        connReqs, ok := s.persistentConnReqs[pubStr]
×
UNCOV
4131
        if !ok {
×
UNCOV
4132
                return
×
UNCOV
4133
        }
×
4134

UNCOV
4135
        for _, connReq := range connReqs {
×
UNCOV
4136
                srvrLog.Tracef("Canceling %s:", connReqs)
×
UNCOV
4137

×
UNCOV
4138
                // Atomically capture the current request identifier.
×
UNCOV
4139
                connID := connReq.ID()
×
UNCOV
4140

×
UNCOV
4141
                // Skip any zero IDs, this indicates the request has not
×
UNCOV
4142
                // yet been schedule.
×
UNCOV
4143
                if connID == UnassignedConnID {
×
4144
                        continue
×
4145
                }
4146

4147
                // Skip a particular connection ID if instructed.
UNCOV
4148
                if skip != nil && connID == *skip {
×
UNCOV
4149
                        continue
×
4150
                }
4151

UNCOV
4152
                s.connMgr.Remove(connID)
×
4153
        }
4154

UNCOV
4155
        delete(s.persistentConnReqs, pubStr)
×
4156
}
4157

4158
// handleCustomMessage dispatches an incoming custom peers message to
4159
// subscribers.
UNCOV
4160
func (s *server) handleCustomMessage(peer [33]byte, msg *lnwire.Custom) error {
×
UNCOV
4161
        srvrLog.Debugf("Custom message received: peer=%x, type=%d",
×
UNCOV
4162
                peer, msg.Type)
×
UNCOV
4163

×
UNCOV
4164
        return s.customMessageServer.SendUpdate(&CustomMessage{
×
UNCOV
4165
                Peer: peer,
×
UNCOV
4166
                Msg:  msg,
×
UNCOV
4167
        })
×
UNCOV
4168
}
×
4169

4170
// SubscribeCustomMessages subscribes to a stream of incoming custom peer
4171
// messages.
UNCOV
4172
func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) {
×
UNCOV
4173
        return s.customMessageServer.Subscribe()
×
UNCOV
4174
}
×
4175

4176
// peerConnected is a function that handles initialization a newly connected
4177
// peer by adding it to the server's global list of all active peers, and
4178
// starting all the goroutines the peer needs to function properly. The inbound
4179
// boolean should be true if the peer initiated the connection to us.
4180
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
UNCOV
4181
        inbound bool) {
×
UNCOV
4182

×
UNCOV
4183
        brontideConn := conn.(*brontide.Conn)
×
UNCOV
4184
        addr := conn.RemoteAddr()
×
UNCOV
4185
        pubKey := brontideConn.RemotePub()
×
UNCOV
4186

×
UNCOV
4187
        srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v",
×
UNCOV
4188
                pubKey.SerializeCompressed(), addr, inbound)
×
UNCOV
4189

×
UNCOV
4190
        peerAddr := &lnwire.NetAddress{
×
UNCOV
4191
                IdentityKey: pubKey,
×
UNCOV
4192
                Address:     addr,
×
UNCOV
4193
                ChainNet:    s.cfg.ActiveNetParams.Net,
×
UNCOV
4194
        }
×
UNCOV
4195

×
UNCOV
4196
        // With the brontide connection established, we'll now craft the feature
×
UNCOV
4197
        // vectors to advertise to the remote node.
×
UNCOV
4198
        initFeatures := s.featureMgr.Get(feature.SetInit)
×
UNCOV
4199
        legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal)
×
UNCOV
4200

×
UNCOV
4201
        // Lookup past error caches for the peer in the server. If no buffer is
×
UNCOV
4202
        // found, create a fresh buffer.
×
UNCOV
4203
        pkStr := string(peerAddr.IdentityKey.SerializeCompressed())
×
UNCOV
4204
        errBuffer, ok := s.peerErrors[pkStr]
×
UNCOV
4205
        if !ok {
×
UNCOV
4206
                var err error
×
UNCOV
4207
                errBuffer, err = queue.NewCircularBuffer(peer.ErrorBufferSize)
×
UNCOV
4208
                if err != nil {
×
4209
                        srvrLog.Errorf("unable to create peer %v", err)
×
4210
                        return
×
4211
                }
×
4212
        }
4213

4214
        // If we directly set the peer.Config TowerClient member to the
4215
        // s.towerClientMgr then in the case that the s.towerClientMgr is nil,
4216
        // the peer.Config's TowerClient member will not evaluate to nil even
4217
        // though the underlying value is nil. To avoid this gotcha which can
4218
        // cause a panic, we need to explicitly pass nil to the peer.Config's
4219
        // TowerClient if needed.
UNCOV
4220
        var towerClient wtclient.ClientManager
×
UNCOV
4221
        if s.towerClientMgr != nil {
×
UNCOV
4222
                towerClient = s.towerClientMgr
×
UNCOV
4223
        }
×
4224

UNCOV
4225
        thresholdSats := btcutil.Amount(s.cfg.MaxFeeExposure)
×
UNCOV
4226
        thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
×
UNCOV
4227

×
UNCOV
4228
        // Now that we've established a connection, create a peer, and it to the
×
UNCOV
4229
        // set of currently active peers. Configure the peer with the incoming
×
UNCOV
4230
        // and outgoing broadcast deltas to prevent htlcs from being accepted or
×
UNCOV
4231
        // offered that would trigger channel closure. In case of outgoing
×
UNCOV
4232
        // htlcs, an extra block is added to prevent the channel from being
×
UNCOV
4233
        // closed when the htlc is outstanding and a new block comes in.
×
UNCOV
4234
        pCfg := peer.Config{
×
UNCOV
4235
                Conn:                    brontideConn,
×
UNCOV
4236
                ConnReq:                 connReq,
×
UNCOV
4237
                Addr:                    peerAddr,
×
UNCOV
4238
                Inbound:                 inbound,
×
UNCOV
4239
                Features:                initFeatures,
×
UNCOV
4240
                LegacyFeatures:          legacyFeatures,
×
UNCOV
4241
                OutgoingCltvRejectDelta: lncfg.DefaultOutgoingCltvRejectDelta,
×
UNCOV
4242
                ChanActiveTimeout:       s.cfg.ChanEnableTimeout,
×
UNCOV
4243
                ErrorBuffer:             errBuffer,
×
UNCOV
4244
                WritePool:               s.writePool,
×
UNCOV
4245
                ReadPool:                s.readPool,
×
UNCOV
4246
                Switch:                  s.htlcSwitch,
×
UNCOV
4247
                InterceptSwitch:         s.interceptableSwitch,
×
UNCOV
4248
                ChannelDB:               s.chanStateDB,
×
UNCOV
4249
                ChannelGraph:            s.graphDB,
×
UNCOV
4250
                ChainArb:                s.chainArb,
×
UNCOV
4251
                AuthGossiper:            s.authGossiper,
×
UNCOV
4252
                ChanStatusMgr:           s.chanStatusMgr,
×
UNCOV
4253
                ChainIO:                 s.cc.ChainIO,
×
UNCOV
4254
                FeeEstimator:            s.cc.FeeEstimator,
×
UNCOV
4255
                Signer:                  s.cc.Wallet.Cfg.Signer,
×
UNCOV
4256
                SigPool:                 s.sigPool,
×
UNCOV
4257
                Wallet:                  s.cc.Wallet,
×
UNCOV
4258
                ChainNotifier:           s.cc.ChainNotifier,
×
UNCOV
4259
                BestBlockView:           s.cc.BestBlockTracker,
×
UNCOV
4260
                RoutingPolicy:           s.cc.RoutingPolicy,
×
UNCOV
4261
                Sphinx:                  s.sphinx,
×
UNCOV
4262
                WitnessBeacon:           s.witnessBeacon,
×
UNCOV
4263
                Invoices:                s.invoices,
×
UNCOV
4264
                ChannelNotifier:         s.channelNotifier,
×
UNCOV
4265
                HtlcNotifier:            s.htlcNotifier,
×
UNCOV
4266
                TowerClient:             towerClient,
×
UNCOV
4267
                DisconnectPeer:          s.DisconnectPeer,
×
UNCOV
4268
                GenNodeAnnouncement: func(...netann.NodeAnnModifier) (
×
UNCOV
4269
                        lnwire.NodeAnnouncement, error) {
×
UNCOV
4270

×
UNCOV
4271
                        return s.genNodeAnnouncement(nil)
×
UNCOV
4272
                },
×
4273

4274
                PongBuf: s.pongBuf,
4275

4276
                PrunePersistentPeerConnection: s.prunePersistentPeerConnection,
4277

4278
                FetchLastChanUpdate: s.fetchLastChanUpdate(),
4279

4280
                FundingManager: s.fundingMgr,
4281

4282
                Hodl:                    s.cfg.Hodl,
4283
                UnsafeReplay:            s.cfg.UnsafeReplay,
4284
                MaxOutgoingCltvExpiry:   s.cfg.MaxOutgoingCltvExpiry,
4285
                MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation,
4286
                CoopCloseTargetConfs:    s.cfg.CoopCloseTargetConfs,
4287
                MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
4288
                        s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
4289
                ChannelCommitInterval:  s.cfg.ChannelCommitInterval,
4290
                PendingCommitInterval:  s.cfg.PendingCommitInterval,
4291
                ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize,
4292
                HandleCustomMessage:    s.handleCustomMessage,
4293
                GetAliases:             s.aliasMgr.GetAliases,
4294
                RequestAlias:           s.aliasMgr.RequestAlias,
4295
                AddLocalAlias:          s.aliasMgr.AddLocalAlias,
4296
                DisallowRouteBlinding:  s.cfg.ProtocolOptions.NoRouteBlinding(),
4297
                DisallowQuiescence:     s.cfg.ProtocolOptions.NoQuiescence(),
4298
                MaxFeeExposure:         thresholdMSats,
4299
                Quit:                   s.quit,
4300
                AuxLeafStore:           s.implCfg.AuxLeafStore,
4301
                AuxSigner:              s.implCfg.AuxSigner,
4302
                MsgRouter:              s.implCfg.MsgRouter,
4303
                AuxChanCloser:          s.implCfg.AuxChanCloser,
4304
                AuxResolver:            s.implCfg.AuxContractResolver,
4305
                AuxTrafficShaper:       s.implCfg.TrafficShaper,
UNCOV
4306
                ShouldFwdExpEndorsement: func() bool {
×
UNCOV
4307
                        if s.cfg.ProtocolOptions.NoExperimentalEndorsement() {
×
UNCOV
4308
                                return false
×
UNCOV
4309
                        }
×
4310

UNCOV
4311
                        return clock.NewDefaultClock().Now().Before(
×
UNCOV
4312
                                EndorsementExperimentEnd,
×
UNCOV
4313
                        )
×
4314
                },
4315
        }
4316

UNCOV
4317
        copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed())
×
UNCOV
4318
        copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed())
×
UNCOV
4319

×
UNCOV
4320
        p := peer.NewBrontide(pCfg)
×
UNCOV
4321

×
UNCOV
4322
        // TODO(roasbeef): update IP address for link-node
×
UNCOV
4323
        //  * also mark last-seen, do it one single transaction?
×
UNCOV
4324

×
UNCOV
4325
        s.addPeer(p)
×
UNCOV
4326

×
UNCOV
4327
        // Once we have successfully added the peer to the server, we can
×
UNCOV
4328
        // delete the previous error buffer from the server's map of error
×
UNCOV
4329
        // buffers.
×
UNCOV
4330
        delete(s.peerErrors, pkStr)
×
UNCOV
4331

×
UNCOV
4332
        // Dispatch a goroutine to asynchronously start the peer. This process
×
UNCOV
4333
        // includes sending and receiving Init messages, which would be a DOS
×
UNCOV
4334
        // vector if we held the server's mutex throughout the procedure.
×
UNCOV
4335
        s.wg.Add(1)
×
UNCOV
4336
        go s.peerInitializer(p)
×
4337
}
4338

4339
// addPeer adds the passed peer to the server's global state of all active
4340
// peers.
UNCOV
4341
func (s *server) addPeer(p *peer.Brontide) {
×
UNCOV
4342
        if p == nil {
×
4343
                return
×
4344
        }
×
4345

UNCOV
4346
        pubBytes := p.IdentityKey().SerializeCompressed()
×
UNCOV
4347

×
UNCOV
4348
        // Ignore new peers if we're shutting down.
×
UNCOV
4349
        if s.Stopped() {
×
4350
                srvrLog.Infof("Server stopped, skipped adding peer=%x",
×
4351
                        pubBytes)
×
4352
                p.Disconnect(ErrServerShuttingDown)
×
4353

×
4354
                return
×
4355
        }
×
4356

4357
        // Track the new peer in our indexes so we can quickly look it up either
4358
        // according to its public key, or its peer ID.
4359
        // TODO(roasbeef): pipe all requests through to the
4360
        // queryHandler/peerManager
4361

4362
        // NOTE: This pubStr is a raw bytes to string conversion and will NOT
4363
        // be human-readable.
UNCOV
4364
        pubStr := string(pubBytes)
×
UNCOV
4365

×
UNCOV
4366
        s.peersByPub[pubStr] = p
×
UNCOV
4367

×
UNCOV
4368
        if p.Inbound() {
×
UNCOV
4369
                s.inboundPeers[pubStr] = p
×
UNCOV
4370
        } else {
×
UNCOV
4371
                s.outboundPeers[pubStr] = p
×
UNCOV
4372
        }
×
4373

4374
        // Inform the peer notifier of a peer online event so that it can be reported
4375
        // to clients listening for peer events.
UNCOV
4376
        var pubKey [33]byte
×
UNCOV
4377
        copy(pubKey[:], pubBytes)
×
UNCOV
4378

×
UNCOV
4379
        s.peerNotifier.NotifyPeerOnline(pubKey)
×
4380
}
4381

4382
// peerInitializer asynchronously starts a newly connected peer after it has
4383
// been added to the server's peer map. This method sets up a
4384
// peerTerminationWatcher for the given peer, and ensures that it executes even
4385
// if the peer failed to start. In the event of a successful connection, this
4386
// method reads the negotiated, local feature-bits and spawns the appropriate
4387
// graph synchronization method. Any registered clients of NotifyWhenOnline will
4388
// be signaled of the new peer once the method returns.
4389
//
4390
// NOTE: This MUST be launched as a goroutine.
UNCOV
4391
func (s *server) peerInitializer(p *peer.Brontide) {
×
UNCOV
4392
        defer s.wg.Done()
×
UNCOV
4393

×
UNCOV
4394
        pubBytes := p.IdentityKey().SerializeCompressed()
×
UNCOV
4395

×
UNCOV
4396
        // Avoid initializing peers while the server is exiting.
×
UNCOV
4397
        if s.Stopped() {
×
4398
                srvrLog.Infof("Server stopped, skipped initializing peer=%x",
×
4399
                        pubBytes)
×
4400
                return
×
4401
        }
×
4402

4403
        // Create a channel that will be used to signal a successful start of
4404
        // the link. This prevents the peer termination watcher from beginning
4405
        // its duty too early.
UNCOV
4406
        ready := make(chan struct{})
×
UNCOV
4407

×
UNCOV
4408
        // Before starting the peer, launch a goroutine to watch for the
×
UNCOV
4409
        // unexpected termination of this peer, which will ensure all resources
×
UNCOV
4410
        // are properly cleaned up, and re-establish persistent connections when
×
UNCOV
4411
        // necessary. The peer termination watcher will be short circuited if
×
UNCOV
4412
        // the peer is ever added to the ignorePeerTermination map, indicating
×
UNCOV
4413
        // that the server has already handled the removal of this peer.
×
UNCOV
4414
        s.wg.Add(1)
×
UNCOV
4415
        go s.peerTerminationWatcher(p, ready)
×
UNCOV
4416

×
UNCOV
4417
        // Start the peer! If an error occurs, we Disconnect the peer, which
×
UNCOV
4418
        // will unblock the peerTerminationWatcher.
×
UNCOV
4419
        if err := p.Start(); err != nil {
×
4420
                srvrLog.Warnf("Starting peer=%x got error: %v", pubBytes, err)
×
4421

×
4422
                p.Disconnect(fmt.Errorf("unable to start peer: %w", err))
×
4423
                return
×
4424
        }
×
4425

4426
        // Otherwise, signal to the peerTerminationWatcher that the peer startup
4427
        // was successful, and to begin watching the peer's wait group.
UNCOV
4428
        close(ready)
×
UNCOV
4429

×
UNCOV
4430
        s.mu.Lock()
×
UNCOV
4431
        defer s.mu.Unlock()
×
UNCOV
4432

×
UNCOV
4433
        // Check if there are listeners waiting for this peer to come online.
×
UNCOV
4434
        srvrLog.Debugf("Notifying that peer %v is online", p)
×
UNCOV
4435

×
UNCOV
4436
        // TODO(guggero): Do a proper conversion to a string everywhere, or use
×
UNCOV
4437
        // route.Vertex as the key type of peerConnectedListeners.
×
UNCOV
4438
        pubStr := string(pubBytes)
×
UNCOV
4439
        for _, peerChan := range s.peerConnectedListeners[pubStr] {
×
UNCOV
4440
                select {
×
UNCOV
4441
                case peerChan <- p:
×
4442
                case <-s.quit:
×
4443
                        return
×
4444
                }
4445
        }
UNCOV
4446
        delete(s.peerConnectedListeners, pubStr)
×
4447
}
4448

4449
// peerTerminationWatcher waits until a peer has been disconnected unexpectedly,
4450
// and then cleans up all resources allocated to the peer, notifies relevant
4451
// sub-systems of its demise, and finally handles re-connecting to the peer if
4452
// it's persistent. If the server intentionally disconnects a peer, it should
4453
// have a corresponding entry in the ignorePeerTermination map which will cause
4454
// the cleanup routine to exit early. The passed `ready` chan is used to
4455
// synchronize when WaitForDisconnect should begin watching on the peer's
4456
// waitgroup. The ready chan should only be signaled if the peer starts
4457
// successfully, otherwise the peer should be disconnected instead.
4458
//
4459
// NOTE: This MUST be launched as a goroutine.
UNCOV
4460
func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
×
UNCOV
4461
        defer s.wg.Done()
×
UNCOV
4462

×
UNCOV
4463
        p.WaitForDisconnect(ready)
×
UNCOV
4464

×
UNCOV
4465
        srvrLog.Debugf("Peer %v has been disconnected", p)
×
UNCOV
4466

×
UNCOV
4467
        // If the server is exiting then we can bail out early ourselves as all
×
UNCOV
4468
        // the other sub-systems will already be shutting down.
×
UNCOV
4469
        if s.Stopped() {
×
UNCOV
4470
                srvrLog.Debugf("Server quitting, exit early for peer %v", p)
×
UNCOV
4471
                return
×
UNCOV
4472
        }
×
4473

4474
        // Next, we'll cancel all pending funding reservations with this node.
4475
        // If we tried to initiate any funding flows that haven't yet finished,
4476
        // then we need to unlock those committed outputs so they're still
4477
        // available for use.
UNCOV
4478
        s.fundingMgr.CancelPeerReservations(p.PubKey())
×
UNCOV
4479

×
UNCOV
4480
        pubKey := p.IdentityKey()
×
UNCOV
4481

×
UNCOV
4482
        // We'll also inform the gossiper that this peer is no longer active,
×
UNCOV
4483
        // so we don't need to maintain sync state for it any longer.
×
UNCOV
4484
        s.authGossiper.PruneSyncState(p.PubKey())
×
UNCOV
4485

×
UNCOV
4486
        // Tell the switch to remove all links associated with this peer.
×
UNCOV
4487
        // Passing nil as the target link indicates that all links associated
×
UNCOV
4488
        // with this interface should be closed.
×
UNCOV
4489
        //
×
UNCOV
4490
        // TODO(roasbeef): instead add a PurgeInterfaceLinks function?
×
UNCOV
4491
        links, err := s.htlcSwitch.GetLinksByInterface(p.PubKey())
×
UNCOV
4492
        if err != nil && err != htlcswitch.ErrNoLinksFound {
×
4493
                srvrLog.Errorf("Unable to get channel links for %v: %v", p, err)
×
4494
        }
×
4495

UNCOV
4496
        for _, link := range links {
×
UNCOV
4497
                s.htlcSwitch.RemoveLink(link.ChanID())
×
UNCOV
4498
        }
×
4499

UNCOV
4500
        s.mu.Lock()
×
UNCOV
4501
        defer s.mu.Unlock()
×
UNCOV
4502

×
UNCOV
4503
        // If there were any notification requests for when this peer
×
UNCOV
4504
        // disconnected, we can trigger them now.
×
UNCOV
4505
        srvrLog.Debugf("Notifying that peer %v is offline", p)
×
UNCOV
4506
        pubStr := string(pubKey.SerializeCompressed())
×
UNCOV
4507
        for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
×
UNCOV
4508
                close(offlineChan)
×
UNCOV
4509
        }
×
UNCOV
4510
        delete(s.peerDisconnectedListeners, pubStr)
×
UNCOV
4511

×
UNCOV
4512
        // If the server has already removed this peer, we can short circuit the
×
UNCOV
4513
        // peer termination watcher and skip cleanup.
×
UNCOV
4514
        if _, ok := s.ignorePeerTermination[p]; ok {
×
4515
                delete(s.ignorePeerTermination, p)
×
4516

×
4517
                pubKey := p.PubKey()
×
4518
                pubStr := string(pubKey[:])
×
4519

×
4520
                // If a connection callback is present, we'll go ahead and
×
4521
                // execute it now that previous peer has fully disconnected. If
×
4522
                // the callback is not present, this likely implies the peer was
×
4523
                // purposefully disconnected via RPC, and that no reconnect
×
4524
                // should be attempted.
×
4525
                connCallback, ok := s.scheduledPeerConnection[pubStr]
×
4526
                if ok {
×
4527
                        delete(s.scheduledPeerConnection, pubStr)
×
4528
                        connCallback()
×
4529
                }
×
4530
                return
×
4531
        }
4532

4533
        // First, cleanup any remaining state the server has regarding the peer
4534
        // in question.
UNCOV
4535
        s.removePeer(p)
×
UNCOV
4536

×
UNCOV
4537
        // Next, check to see if this is a persistent peer or not.
×
UNCOV
4538
        if _, ok := s.persistentPeers[pubStr]; !ok {
×
UNCOV
4539
                return
×
UNCOV
4540
        }
×
4541

4542
        // Get the last address that we used to connect to the peer.
UNCOV
4543
        addrs := []net.Addr{
×
UNCOV
4544
                p.NetAddress().Address,
×
UNCOV
4545
        }
×
UNCOV
4546

×
UNCOV
4547
        // We'll ensure that we locate all the peers advertised addresses for
×
UNCOV
4548
        // reconnection purposes.
×
UNCOV
4549
        advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(pubKey)
×
UNCOV
4550
        switch {
×
4551
        // We found advertised addresses, so use them.
UNCOV
4552
        case err == nil:
×
UNCOV
4553
                addrs = advertisedAddrs
×
4554

4555
        // The peer doesn't have an advertised address.
UNCOV
4556
        case err == errNoAdvertisedAddr:
×
UNCOV
4557
                // If it is an outbound peer then we fall back to the existing
×
UNCOV
4558
                // peer address.
×
UNCOV
4559
                if !p.Inbound() {
×
UNCOV
4560
                        break
×
4561
                }
4562

4563
                // Fall back to the existing peer address if
4564
                // we're not accepting connections over Tor.
UNCOV
4565
                if s.torController == nil {
×
UNCOV
4566
                        break
×
4567
                }
4568

4569
                // If we are, the peer's address won't be known
4570
                // to us (we'll see a private address, which is
4571
                // the address used by our onion service to dial
4572
                // to lnd), so we don't have enough information
4573
                // to attempt a reconnect.
4574
                srvrLog.Debugf("Ignoring reconnection attempt "+
×
4575
                        "to inbound peer %v without "+
×
4576
                        "advertised address", p)
×
4577
                return
×
4578

4579
        // We came across an error retrieving an advertised
4580
        // address, log it, and fall back to the existing peer
4581
        // address.
UNCOV
4582
        default:
×
UNCOV
4583
                srvrLog.Errorf("Unable to retrieve advertised "+
×
UNCOV
4584
                        "address for node %x: %v", p.PubKey(),
×
UNCOV
4585
                        err)
×
4586
        }
4587

4588
        // Make an easy lookup map so that we can check if an address
4589
        // is already in the address list that we have stored for this peer.
UNCOV
4590
        existingAddrs := make(map[string]bool)
×
UNCOV
4591
        for _, addr := range s.persistentPeerAddrs[pubStr] {
×
UNCOV
4592
                existingAddrs[addr.String()] = true
×
UNCOV
4593
        }
×
4594

4595
        // Add any missing addresses for this peer to persistentPeerAddr.
UNCOV
4596
        for _, addr := range addrs {
×
UNCOV
4597
                if existingAddrs[addr.String()] {
×
4598
                        continue
×
4599
                }
4600

UNCOV
4601
                s.persistentPeerAddrs[pubStr] = append(
×
UNCOV
4602
                        s.persistentPeerAddrs[pubStr],
×
UNCOV
4603
                        &lnwire.NetAddress{
×
UNCOV
4604
                                IdentityKey: p.IdentityKey(),
×
UNCOV
4605
                                Address:     addr,
×
UNCOV
4606
                                ChainNet:    p.NetAddress().ChainNet,
×
UNCOV
4607
                        },
×
UNCOV
4608
                )
×
4609
        }
4610

4611
        // Record the computed backoff in the backoff map.
UNCOV
4612
        backoff := s.nextPeerBackoff(pubStr, p.StartTime())
×
UNCOV
4613
        s.persistentPeersBackoff[pubStr] = backoff
×
UNCOV
4614

×
UNCOV
4615
        // Initialize a retry canceller for this peer if one does not
×
UNCOV
4616
        // exist.
×
UNCOV
4617
        cancelChan, ok := s.persistentRetryCancels[pubStr]
×
UNCOV
4618
        if !ok {
×
UNCOV
4619
                cancelChan = make(chan struct{})
×
UNCOV
4620
                s.persistentRetryCancels[pubStr] = cancelChan
×
UNCOV
4621
        }
×
4622

4623
        // We choose not to wait group this go routine since the Connect
4624
        // call can stall for arbitrarily long if we shutdown while an
4625
        // outbound connection attempt is being made.
UNCOV
4626
        go func() {
×
UNCOV
4627
                srvrLog.Debugf("Scheduling connection re-establishment to "+
×
UNCOV
4628
                        "persistent peer %x in %s",
×
UNCOV
4629
                        p.IdentityKey().SerializeCompressed(), backoff)
×
UNCOV
4630

×
UNCOV
4631
                select {
×
UNCOV
4632
                case <-time.After(backoff):
×
UNCOV
4633
                case <-cancelChan:
×
UNCOV
4634
                        return
×
UNCOV
4635
                case <-s.quit:
×
UNCOV
4636
                        return
×
4637
                }
4638

UNCOV
4639
                srvrLog.Debugf("Attempting to re-establish persistent "+
×
UNCOV
4640
                        "connection to peer %x",
×
UNCOV
4641
                        p.IdentityKey().SerializeCompressed())
×
UNCOV
4642

×
UNCOV
4643
                s.connectToPersistentPeer(pubStr)
×
4644
        }()
4645
}
4646

4647
// connectToPersistentPeer uses all the stored addresses for a peer to attempt
4648
// to connect to the peer. It creates connection requests if there are
4649
// currently none for a given address and it removes old connection requests
4650
// if the associated address is no longer in the latest address list for the
4651
// peer.
UNCOV
4652
func (s *server) connectToPersistentPeer(pubKeyStr string) {
×
UNCOV
4653
        s.mu.Lock()
×
UNCOV
4654
        defer s.mu.Unlock()
×
UNCOV
4655

×
UNCOV
4656
        // Create an easy lookup map of the addresses we have stored for the
×
UNCOV
4657
        // peer. We will remove entries from this map if we have existing
×
UNCOV
4658
        // connection requests for the associated address and then any leftover
×
UNCOV
4659
        // entries will indicate which addresses we should create new
×
UNCOV
4660
        // connection requests for.
×
UNCOV
4661
        addrMap := make(map[string]*lnwire.NetAddress)
×
UNCOV
4662
        for _, addr := range s.persistentPeerAddrs[pubKeyStr] {
×
UNCOV
4663
                addrMap[addr.String()] = addr
×
UNCOV
4664
        }
×
4665

4666
        // Go through each of the existing connection requests and
4667
        // check if they correspond to the latest set of addresses. If
4668
        // there is a connection requests that does not use one of the latest
4669
        // advertised addresses then remove that connection request.
UNCOV
4670
        var updatedConnReqs []*connmgr.ConnReq
×
UNCOV
4671
        for _, connReq := range s.persistentConnReqs[pubKeyStr] {
×
UNCOV
4672
                lnAddr := connReq.Addr.(*lnwire.NetAddress).Address.String()
×
UNCOV
4673

×
UNCOV
4674
                switch _, ok := addrMap[lnAddr]; ok {
×
4675
                // If the existing connection request is using one of the
4676
                // latest advertised addresses for the peer then we add it to
4677
                // updatedConnReqs and remove the associated address from
4678
                // addrMap so that we don't recreate this connReq later on.
4679
                case true:
×
4680
                        updatedConnReqs = append(
×
4681
                                updatedConnReqs, connReq,
×
4682
                        )
×
4683
                        delete(addrMap, lnAddr)
×
4684

4685
                // If the existing connection request is using an address that
4686
                // is not one of the latest advertised addresses for the peer
4687
                // then we remove the connecting request from the connection
4688
                // manager.
UNCOV
4689
                case false:
×
UNCOV
4690
                        srvrLog.Info(
×
UNCOV
4691
                                "Removing conn req:", connReq.Addr.String(),
×
UNCOV
4692
                        )
×
UNCOV
4693
                        s.connMgr.Remove(connReq.ID())
×
4694
                }
4695
        }
4696

UNCOV
4697
        s.persistentConnReqs[pubKeyStr] = updatedConnReqs
×
UNCOV
4698

×
UNCOV
4699
        cancelChan, ok := s.persistentRetryCancels[pubKeyStr]
×
UNCOV
4700
        if !ok {
×
UNCOV
4701
                cancelChan = make(chan struct{})
×
UNCOV
4702
                s.persistentRetryCancels[pubKeyStr] = cancelChan
×
UNCOV
4703
        }
×
4704

4705
        // Any addresses left in addrMap are new ones that we have not made
4706
        // connection requests for. So create new connection requests for those.
4707
        // If there is more than one address in the address map, stagger the
4708
        // creation of the connection requests for those.
UNCOV
4709
        go func() {
×
UNCOV
4710
                ticker := time.NewTicker(multiAddrConnectionStagger)
×
UNCOV
4711
                defer ticker.Stop()
×
UNCOV
4712

×
UNCOV
4713
                for _, addr := range addrMap {
×
UNCOV
4714
                        // Send the persistent connection request to the
×
UNCOV
4715
                        // connection manager, saving the request itself so we
×
UNCOV
4716
                        // can cancel/restart the process as needed.
×
UNCOV
4717
                        connReq := &connmgr.ConnReq{
×
UNCOV
4718
                                Addr:      addr,
×
UNCOV
4719
                                Permanent: true,
×
UNCOV
4720
                        }
×
UNCOV
4721

×
UNCOV
4722
                        s.mu.Lock()
×
UNCOV
4723
                        s.persistentConnReqs[pubKeyStr] = append(
×
UNCOV
4724
                                s.persistentConnReqs[pubKeyStr], connReq,
×
UNCOV
4725
                        )
×
UNCOV
4726
                        s.mu.Unlock()
×
UNCOV
4727

×
UNCOV
4728
                        srvrLog.Debugf("Attempting persistent connection to "+
×
UNCOV
4729
                                "channel peer %v", addr)
×
UNCOV
4730

×
UNCOV
4731
                        go s.connMgr.Connect(connReq)
×
UNCOV
4732

×
UNCOV
4733
                        select {
×
UNCOV
4734
                        case <-s.quit:
×
UNCOV
4735
                                return
×
UNCOV
4736
                        case <-cancelChan:
×
UNCOV
4737
                                return
×
UNCOV
4738
                        case <-ticker.C:
×
4739
                        }
4740
                }
4741
        }()
4742
}
4743

4744
// removePeer removes the passed peer from the server's state of all active
4745
// peers.
UNCOV
4746
func (s *server) removePeer(p *peer.Brontide) {
×
UNCOV
4747
        if p == nil {
×
4748
                return
×
4749
        }
×
4750

UNCOV
4751
        srvrLog.Debugf("removing peer %v", p)
×
UNCOV
4752

×
UNCOV
4753
        // As the peer is now finished, ensure that the TCP connection is
×
UNCOV
4754
        // closed and all of its related goroutines have exited.
×
UNCOV
4755
        p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p))
×
UNCOV
4756

×
UNCOV
4757
        // If this peer had an active persistent connection request, remove it.
×
UNCOV
4758
        if p.ConnReq() != nil {
×
UNCOV
4759
                s.connMgr.Remove(p.ConnReq().ID())
×
UNCOV
4760
        }
×
4761

4762
        // Ignore deleting peers if we're shutting down.
UNCOV
4763
        if s.Stopped() {
×
4764
                return
×
4765
        }
×
4766

UNCOV
4767
        pKey := p.PubKey()
×
UNCOV
4768
        pubSer := pKey[:]
×
UNCOV
4769
        pubStr := string(pubSer)
×
UNCOV
4770

×
UNCOV
4771
        delete(s.peersByPub, pubStr)
×
UNCOV
4772

×
UNCOV
4773
        if p.Inbound() {
×
UNCOV
4774
                delete(s.inboundPeers, pubStr)
×
UNCOV
4775
        } else {
×
UNCOV
4776
                delete(s.outboundPeers, pubStr)
×
UNCOV
4777
        }
×
4778

4779
        // Copy the peer's error buffer across to the server if it has any items
4780
        // in it so that we can restore peer errors across connections.
UNCOV
4781
        if p.ErrorBuffer().Total() > 0 {
×
UNCOV
4782
                s.peerErrors[pubStr] = p.ErrorBuffer()
×
UNCOV
4783
        }
×
4784

4785
        // Inform the peer notifier of a peer offline event so that it can be
4786
        // reported to clients listening for peer events.
UNCOV
4787
        var pubKey [33]byte
×
UNCOV
4788
        copy(pubKey[:], pubSer)
×
UNCOV
4789

×
UNCOV
4790
        s.peerNotifier.NotifyPeerOffline(pubKey)
×
4791
}
4792

4793
// ConnectToPeer requests that the server connect to a Lightning Network peer
4794
// at the specified address. This function will *block* until either a
4795
// connection is established, or the initial handshake process fails.
4796
//
4797
// NOTE: This function is safe for concurrent access.
4798
func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
UNCOV
4799
        perm bool, timeout time.Duration) error {
×
UNCOV
4800

×
UNCOV
4801
        targetPub := string(addr.IdentityKey.SerializeCompressed())
×
UNCOV
4802

×
UNCOV
4803
        // Acquire mutex, but use explicit unlocking instead of defer for
×
UNCOV
4804
        // better granularity.  In certain conditions, this method requires
×
UNCOV
4805
        // making an outbound connection to a remote peer, which requires the
×
UNCOV
4806
        // lock to be released, and subsequently reacquired.
×
UNCOV
4807
        s.mu.Lock()
×
UNCOV
4808

×
UNCOV
4809
        // Ensure we're not already connected to this peer.
×
UNCOV
4810
        peer, err := s.findPeerByPubStr(targetPub)
×
UNCOV
4811
        if err == nil {
×
UNCOV
4812
                s.mu.Unlock()
×
UNCOV
4813
                return &errPeerAlreadyConnected{peer: peer}
×
UNCOV
4814
        }
×
4815

4816
        // Peer was not found, continue to pursue connection with peer.
4817

4818
        // If there's already a pending connection request for this pubkey,
4819
        // then we ignore this request to ensure we don't create a redundant
4820
        // connection.
UNCOV
4821
        if reqs, ok := s.persistentConnReqs[targetPub]; ok {
×
UNCOV
4822
                srvrLog.Warnf("Already have %d persistent connection "+
×
UNCOV
4823
                        "requests for %v, connecting anyway.", len(reqs), addr)
×
UNCOV
4824
        }
×
4825

4826
        // If there's not already a pending or active connection to this node,
4827
        // then instruct the connection manager to attempt to establish a
4828
        // persistent connection to the peer.
UNCOV
4829
        srvrLog.Debugf("Connecting to %v", addr)
×
UNCOV
4830
        if perm {
×
UNCOV
4831
                connReq := &connmgr.ConnReq{
×
UNCOV
4832
                        Addr:      addr,
×
UNCOV
4833
                        Permanent: true,
×
UNCOV
4834
                }
×
UNCOV
4835

×
UNCOV
4836
                // Since the user requested a permanent connection, we'll set
×
UNCOV
4837
                // the entry to true which will tell the server to continue
×
UNCOV
4838
                // reconnecting even if the number of channels with this peer is
×
UNCOV
4839
                // zero.
×
UNCOV
4840
                s.persistentPeers[targetPub] = true
×
UNCOV
4841
                if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
×
UNCOV
4842
                        s.persistentPeersBackoff[targetPub] = s.cfg.MinBackoff
×
UNCOV
4843
                }
×
UNCOV
4844
                s.persistentConnReqs[targetPub] = append(
×
UNCOV
4845
                        s.persistentConnReqs[targetPub], connReq,
×
UNCOV
4846
                )
×
UNCOV
4847
                s.mu.Unlock()
×
UNCOV
4848

×
UNCOV
4849
                go s.connMgr.Connect(connReq)
×
UNCOV
4850

×
UNCOV
4851
                return nil
×
4852
        }
UNCOV
4853
        s.mu.Unlock()
×
UNCOV
4854

×
UNCOV
4855
        // If we're not making a persistent connection, then we'll attempt to
×
UNCOV
4856
        // connect to the target peer. If the we can't make the connection, or
×
UNCOV
4857
        // the crypto negotiation breaks down, then return an error to the
×
UNCOV
4858
        // caller.
×
UNCOV
4859
        errChan := make(chan error, 1)
×
UNCOV
4860
        s.connectToPeer(addr, errChan, timeout)
×
UNCOV
4861

×
UNCOV
4862
        select {
×
UNCOV
4863
        case err := <-errChan:
×
UNCOV
4864
                return err
×
4865
        case <-s.quit:
×
4866
                return ErrServerShuttingDown
×
4867
        }
4868
}
4869

4870
// connectToPeer establishes a connection to a remote peer. errChan is used to
4871
// notify the caller if the connection attempt has failed. Otherwise, it will be
4872
// closed.
4873
func (s *server) connectToPeer(addr *lnwire.NetAddress,
UNCOV
4874
        errChan chan<- error, timeout time.Duration) {
×
UNCOV
4875

×
UNCOV
4876
        conn, err := brontide.Dial(
×
UNCOV
4877
                s.identityECDH, addr, timeout, s.cfg.net.Dial,
×
UNCOV
4878
        )
×
UNCOV
4879
        if err != nil {
×
UNCOV
4880
                srvrLog.Errorf("Unable to connect to %v: %v", addr, err)
×
UNCOV
4881
                select {
×
UNCOV
4882
                case errChan <- err:
×
4883
                case <-s.quit:
×
4884
                }
UNCOV
4885
                return
×
4886
        }
4887

UNCOV
4888
        close(errChan)
×
UNCOV
4889

×
UNCOV
4890
        srvrLog.Tracef("Brontide dialer made local=%v, remote=%v",
×
UNCOV
4891
                conn.LocalAddr(), conn.RemoteAddr())
×
UNCOV
4892

×
UNCOV
4893
        s.OutboundPeerConnected(nil, conn)
×
4894
}
4895

4896
// DisconnectPeer sends the request to server to close the connection with peer
4897
// identified by public key.
4898
//
4899
// NOTE: This function is safe for concurrent access.
UNCOV
4900
func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
×
UNCOV
4901
        pubBytes := pubKey.SerializeCompressed()
×
UNCOV
4902
        pubStr := string(pubBytes)
×
UNCOV
4903

×
UNCOV
4904
        s.mu.Lock()
×
UNCOV
4905
        defer s.mu.Unlock()
×
UNCOV
4906

×
UNCOV
4907
        // Check that were actually connected to this peer. If not, then we'll
×
UNCOV
4908
        // exit in an error as we can't disconnect from a peer that we're not
×
UNCOV
4909
        // currently connected to.
×
UNCOV
4910
        peer, err := s.findPeerByPubStr(pubStr)
×
UNCOV
4911
        if err == ErrPeerNotConnected {
×
UNCOV
4912
                return fmt.Errorf("peer %x is not connected", pubBytes)
×
UNCOV
4913
        }
×
4914

UNCOV
4915
        srvrLog.Infof("Disconnecting from %v", peer)
×
UNCOV
4916

×
UNCOV
4917
        s.cancelConnReqs(pubStr, nil)
×
UNCOV
4918

×
UNCOV
4919
        // If this peer was formerly a persistent connection, then we'll remove
×
UNCOV
4920
        // them from this map so we don't attempt to re-connect after we
×
UNCOV
4921
        // disconnect.
×
UNCOV
4922
        delete(s.persistentPeers, pubStr)
×
UNCOV
4923
        delete(s.persistentPeersBackoff, pubStr)
×
UNCOV
4924

×
UNCOV
4925
        // Remove the peer by calling Disconnect. Previously this was done with
×
UNCOV
4926
        // removePeer, which bypassed the peerTerminationWatcher.
×
UNCOV
4927
        peer.Disconnect(fmt.Errorf("server: DisconnectPeer called"))
×
UNCOV
4928

×
UNCOV
4929
        return nil
×
4930
}
4931

4932
// OpenChannel sends a request to the server to open a channel to the specified
4933
// peer identified by nodeKey with the passed channel funding parameters.
4934
//
4935
// NOTE: This function is safe for concurrent access.
4936
func (s *server) OpenChannel(
UNCOV
4937
        req *funding.InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) {
×
UNCOV
4938

×
UNCOV
4939
        // The updateChan will have a buffer of 2, since we expect a ChanPending
×
UNCOV
4940
        // + a ChanOpen update, and we want to make sure the funding process is
×
UNCOV
4941
        // not blocked if the caller is not reading the updates.
×
UNCOV
4942
        req.Updates = make(chan *lnrpc.OpenStatusUpdate, 2)
×
UNCOV
4943
        req.Err = make(chan error, 1)
×
UNCOV
4944

×
UNCOV
4945
        // First attempt to locate the target peer to open a channel with, if
×
UNCOV
4946
        // we're unable to locate the peer then this request will fail.
×
UNCOV
4947
        pubKeyBytes := req.TargetPubkey.SerializeCompressed()
×
UNCOV
4948
        s.mu.RLock()
×
UNCOV
4949
        peer, ok := s.peersByPub[string(pubKeyBytes)]
×
UNCOV
4950
        if !ok {
×
4951
                s.mu.RUnlock()
×
4952

×
4953
                req.Err <- fmt.Errorf("peer %x is not online", pubKeyBytes)
×
4954
                return req.Updates, req.Err
×
4955
        }
×
UNCOV
4956
        req.Peer = peer
×
UNCOV
4957
        s.mu.RUnlock()
×
UNCOV
4958

×
UNCOV
4959
        // We'll wait until the peer is active before beginning the channel
×
UNCOV
4960
        // opening process.
×
UNCOV
4961
        select {
×
UNCOV
4962
        case <-peer.ActiveSignal():
×
4963
        case <-peer.QuitSignal():
×
4964
                req.Err <- fmt.Errorf("peer %x disconnected", pubKeyBytes)
×
4965
                return req.Updates, req.Err
×
4966
        case <-s.quit:
×
4967
                req.Err <- ErrServerShuttingDown
×
4968
                return req.Updates, req.Err
×
4969
        }
4970

4971
        // If the fee rate wasn't specified at this point we fail the funding
4972
        // because of the missing fee rate information. The caller of the
4973
        // `OpenChannel` method needs to make sure that default values for the
4974
        // fee rate are set beforehand.
UNCOV
4975
        if req.FundingFeePerKw == 0 {
×
4976
                req.Err <- fmt.Errorf("no FundingFeePerKw specified for " +
×
4977
                        "the channel opening transaction")
×
4978

×
4979
                return req.Updates, req.Err
×
4980
        }
×
4981

4982
        // Spawn a goroutine to send the funding workflow request to the funding
4983
        // manager. This allows the server to continue handling queries instead
4984
        // of blocking on this request which is exported as a synchronous
4985
        // request to the outside world.
UNCOV
4986
        go s.fundingMgr.InitFundingWorkflow(req)
×
UNCOV
4987

×
UNCOV
4988
        return req.Updates, req.Err
×
4989
}
4990

4991
// Peers returns a slice of all active peers.
4992
//
4993
// NOTE: This function is safe for concurrent access.
UNCOV
4994
func (s *server) Peers() []*peer.Brontide {
×
UNCOV
4995
        s.mu.RLock()
×
UNCOV
4996
        defer s.mu.RUnlock()
×
UNCOV
4997

×
UNCOV
4998
        peers := make([]*peer.Brontide, 0, len(s.peersByPub))
×
UNCOV
4999
        for _, peer := range s.peersByPub {
×
UNCOV
5000
                peers = append(peers, peer)
×
UNCOV
5001
        }
×
5002

UNCOV
5003
        return peers
×
5004
}
5005

5006
// computeNextBackoff uses a truncated exponential backoff to compute the next
5007
// backoff using the value of the exiting backoff. The returned duration is
5008
// randomized in either direction by 1/20 to prevent tight loops from
5009
// stabilizing.
UNCOV
5010
func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration {
×
UNCOV
5011
        // Double the current backoff, truncating if it exceeds our maximum.
×
UNCOV
5012
        nextBackoff := 2 * currBackoff
×
UNCOV
5013
        if nextBackoff > maxBackoff {
×
UNCOV
5014
                nextBackoff = maxBackoff
×
UNCOV
5015
        }
×
5016

5017
        // Using 1/10 of our duration as a margin, compute a random offset to
5018
        // avoid the nodes entering connection cycles.
UNCOV
5019
        margin := nextBackoff / 10
×
UNCOV
5020

×
UNCOV
5021
        var wiggle big.Int
×
UNCOV
5022
        wiggle.SetUint64(uint64(margin))
×
UNCOV
5023
        if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
×
5024
                // Randomizing is not mission critical, so we'll just return the
×
5025
                // current backoff.
×
5026
                return nextBackoff
×
5027
        }
×
5028

5029
        // Otherwise add in our wiggle, but subtract out half of the margin so
5030
        // that the backoff can tweaked by 1/20 in either direction.
UNCOV
5031
        return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
×
5032
}
5033

5034
// errNoAdvertisedAddr is an error returned when we attempt to retrieve the
5035
// advertised address of a node, but they don't have one.
5036
var errNoAdvertisedAddr = errors.New("no advertised address found")
5037

5038
// fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a node.
UNCOV
5039
func (s *server) fetchNodeAdvertisedAddrs(pub *btcec.PublicKey) ([]net.Addr, error) {
×
UNCOV
5040
        vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed())
×
UNCOV
5041
        if err != nil {
×
5042
                return nil, err
×
5043
        }
×
5044

UNCOV
5045
        node, err := s.graphDB.FetchLightningNode(vertex)
×
UNCOV
5046
        if err != nil {
×
UNCOV
5047
                return nil, err
×
UNCOV
5048
        }
×
5049

UNCOV
5050
        if len(node.Addresses) == 0 {
×
UNCOV
5051
                return nil, errNoAdvertisedAddr
×
UNCOV
5052
        }
×
5053

UNCOV
5054
        return node.Addresses, nil
×
5055
}
5056

5057
// fetchLastChanUpdate returns a function which is able to retrieve our latest
5058
// channel update for a target channel.
5059
func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
UNCOV
5060
        *lnwire.ChannelUpdate1, error) {
×
UNCOV
5061

×
UNCOV
5062
        ourPubKey := s.identityECDH.PubKey().SerializeCompressed()
×
UNCOV
5063
        return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate1, error) {
×
UNCOV
5064
                info, edge1, edge2, err := s.graphBuilder.GetChannelByID(cid)
×
UNCOV
5065
                if err != nil {
×
UNCOV
5066
                        return nil, err
×
UNCOV
5067
                }
×
5068

UNCOV
5069
                return netann.ExtractChannelUpdate(
×
UNCOV
5070
                        ourPubKey[:], info, edge1, edge2,
×
UNCOV
5071
                )
×
5072
        }
5073
}
5074

5075
// applyChannelUpdate applies the channel update to the different sub-systems of
5076
// the server. The useAlias boolean denotes whether or not to send an alias in
5077
// place of the real SCID.
5078
func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate1,
UNCOV
5079
        op *wire.OutPoint, useAlias bool) error {
×
UNCOV
5080

×
UNCOV
5081
        var (
×
UNCOV
5082
                peerAlias    *lnwire.ShortChannelID
×
UNCOV
5083
                defaultAlias lnwire.ShortChannelID
×
UNCOV
5084
        )
×
UNCOV
5085

×
UNCOV
5086
        chanID := lnwire.NewChanIDFromOutPoint(*op)
×
UNCOV
5087

×
UNCOV
5088
        // Fetch the peer's alias from the lnwire.ChannelID so it can be used
×
UNCOV
5089
        // in the ChannelUpdate if it hasn't been announced yet.
×
UNCOV
5090
        if useAlias {
×
UNCOV
5091
                foundAlias, _ := s.aliasMgr.GetPeerAlias(chanID)
×
UNCOV
5092
                if foundAlias != defaultAlias {
×
UNCOV
5093
                        peerAlias = &foundAlias
×
UNCOV
5094
                }
×
5095
        }
5096

UNCOV
5097
        errChan := s.authGossiper.ProcessLocalAnnouncement(
×
UNCOV
5098
                update, discovery.RemoteAlias(peerAlias),
×
UNCOV
5099
        )
×
UNCOV
5100
        select {
×
UNCOV
5101
        case err := <-errChan:
×
UNCOV
5102
                return err
×
5103
        case <-s.quit:
×
5104
                return ErrServerShuttingDown
×
5105
        }
5106
}
5107

5108
// SendCustomMessage sends a custom message to the peer with the specified
5109
// pubkey.
5110
func (s *server) SendCustomMessage(peerPub [33]byte, msgType lnwire.MessageType,
UNCOV
5111
        data []byte) error {
×
UNCOV
5112

×
UNCOV
5113
        peer, err := s.FindPeerByPubStr(string(peerPub[:]))
×
UNCOV
5114
        if err != nil {
×
5115
                return err
×
5116
        }
×
5117

5118
        // We'll wait until the peer is active.
UNCOV
5119
        select {
×
UNCOV
5120
        case <-peer.ActiveSignal():
×
5121
        case <-peer.QuitSignal():
×
5122
                return fmt.Errorf("peer %x disconnected", peerPub)
×
5123
        case <-s.quit:
×
5124
                return ErrServerShuttingDown
×
5125
        }
5126

UNCOV
5127
        msg, err := lnwire.NewCustom(msgType, data)
×
UNCOV
5128
        if err != nil {
×
UNCOV
5129
                return err
×
UNCOV
5130
        }
×
5131

5132
        // Send the message as low-priority. For now we assume that all
5133
        // application-defined message are low priority.
UNCOV
5134
        return peer.SendMessageLazy(true, msg)
×
5135
}
5136

5137
// newSweepPkScriptGen creates closure that generates a new public key script
5138
// which should be used to sweep any funds into the on-chain wallet.
5139
// Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash
5140
// (p2wkh) output.
5141
func newSweepPkScriptGen(
5142
        wallet lnwallet.WalletController,
UNCOV
5143
        netParams *chaincfg.Params) func() fn.Result[lnwallet.AddrWithKey] {
×
UNCOV
5144

×
UNCOV
5145
        return func() fn.Result[lnwallet.AddrWithKey] {
×
UNCOV
5146
                sweepAddr, err := wallet.NewAddress(
×
UNCOV
5147
                        lnwallet.TaprootPubkey, false,
×
UNCOV
5148
                        lnwallet.DefaultAccountName,
×
UNCOV
5149
                )
×
UNCOV
5150
                if err != nil {
×
5151
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5152
                }
×
5153

UNCOV
5154
                addr, err := txscript.PayToAddrScript(sweepAddr)
×
UNCOV
5155
                if err != nil {
×
5156
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5157
                }
×
5158

UNCOV
5159
                internalKeyDesc, err := lnwallet.InternalKeyForAddr(
×
UNCOV
5160
                        wallet, netParams, addr,
×
UNCOV
5161
                )
×
UNCOV
5162
                if err != nil {
×
5163
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5164
                }
×
5165

UNCOV
5166
                return fn.Ok(lnwallet.AddrWithKey{
×
UNCOV
5167
                        DeliveryAddress: addr,
×
UNCOV
5168
                        InternalKey:     internalKeyDesc,
×
UNCOV
5169
                })
×
5170
        }
5171
}
5172

5173
// shouldPeerBootstrap returns true if we should attempt to perform peer
5174
// bootstrapping to actively seek our peers using the set of active network
5175
// bootstrappers.
5176
func shouldPeerBootstrap(cfg *Config) bool {
6✔
5177
        isSimnet := cfg.Bitcoin.SimNet
6✔
5178
        isSignet := cfg.Bitcoin.SigNet
6✔
5179
        isRegtest := cfg.Bitcoin.RegTest
6✔
5180
        isDevNetwork := isSimnet || isSignet || isRegtest
6✔
5181

6✔
5182
        // TODO(yy): remove the check on simnet/regtest such that the itest is
6✔
5183
        // covering the bootstrapping process.
6✔
5184
        return !cfg.NoNetBootstrap && !isDevNetwork
6✔
5185
}
6✔
5186

5187
// fetchClosedChannelSCIDs returns a set of SCIDs that have their force closing
5188
// finished.
UNCOV
5189
func (s *server) fetchClosedChannelSCIDs() map[lnwire.ShortChannelID]struct{} {
×
UNCOV
5190
        // Get a list of closed channels.
×
UNCOV
5191
        channels, err := s.chanStateDB.FetchClosedChannels(false)
×
UNCOV
5192
        if err != nil {
×
5193
                srvrLog.Errorf("Failed to fetch closed channels: %v", err)
×
5194
                return nil
×
5195
        }
×
5196

5197
        // Save the SCIDs in a map.
UNCOV
5198
        closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels))
×
UNCOV
5199
        for _, c := range channels {
×
UNCOV
5200
                // If the channel is not pending, its FC has been finalized.
×
UNCOV
5201
                if !c.IsPending {
×
UNCOV
5202
                        closedSCIDs[c.ShortChanID] = struct{}{}
×
UNCOV
5203
                }
×
5204
        }
5205

5206
        // Double check whether the reported closed channel has indeed finished
5207
        // closing.
5208
        //
5209
        // NOTE: There are misalignments regarding when a channel's FC is
5210
        // marked as finalized. We double check the pending channels to make
5211
        // sure the returned SCIDs are indeed terminated.
5212
        //
5213
        // TODO(yy): fix the misalignments in `FetchClosedChannels`.
UNCOV
5214
        pendings, err := s.chanStateDB.FetchPendingChannels()
×
UNCOV
5215
        if err != nil {
×
5216
                srvrLog.Errorf("Failed to fetch pending channels: %v", err)
×
5217
                return nil
×
5218
        }
×
5219

UNCOV
5220
        for _, c := range pendings {
×
UNCOV
5221
                if _, ok := closedSCIDs[c.ShortChannelID]; !ok {
×
UNCOV
5222
                        continue
×
5223
                }
5224

5225
                // If the channel is still reported as pending, remove it from
5226
                // the map.
5227
                delete(closedSCIDs, c.ShortChannelID)
×
5228

×
5229
                srvrLog.Warnf("Channel=%v is prematurely marked as finalized",
×
5230
                        c.ShortChannelID)
×
5231
        }
5232

UNCOV
5233
        return closedSCIDs
×
5234
}
5235

5236
// getStartingBeat returns the current beat. This is used during the startup to
5237
// initialize blockbeat consumers.
UNCOV
5238
func (s *server) getStartingBeat() (*chainio.Beat, error) {
×
UNCOV
5239
        // beat is the current blockbeat.
×
UNCOV
5240
        var beat *chainio.Beat
×
UNCOV
5241

×
UNCOV
5242
        // We should get a notification with the current best block immediately
×
UNCOV
5243
        // by passing a nil block.
×
UNCOV
5244
        blockEpochs, err := s.cc.ChainNotifier.RegisterBlockEpochNtfn(nil)
×
UNCOV
5245
        if err != nil {
×
5246
                return beat, fmt.Errorf("register block epoch ntfn: %w", err)
×
5247
        }
×
UNCOV
5248
        defer blockEpochs.Cancel()
×
UNCOV
5249

×
UNCOV
5250
        // We registered for the block epochs with a nil request. The notifier
×
UNCOV
5251
        // should send us the current best block immediately. So we need to
×
UNCOV
5252
        // wait for it here because we need to know the current best height.
×
UNCOV
5253
        select {
×
UNCOV
5254
        case bestBlock := <-blockEpochs.Epochs:
×
UNCOV
5255
                srvrLog.Infof("Received initial block %v at height %d",
×
UNCOV
5256
                        bestBlock.Hash, bestBlock.Height)
×
UNCOV
5257

×
UNCOV
5258
                // Update the current blockbeat.
×
UNCOV
5259
                beat = chainio.NewBeat(*bestBlock)
×
5260

5261
        case <-s.quit:
×
5262
                srvrLog.Debug("LND shutting down")
×
5263
        }
5264

UNCOV
5265
        return beat, nil
×
5266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc