• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12343072627

15 Dec 2024 11:09PM UTC coverage: 57.504% (-1.1%) from 58.636%
12343072627

Pull #9315

github

yyforyongyu
contractcourt: offer outgoing htlc one block earlier before its expiry

We need to offer the outgoing htlc one block earlier to make sure when
the expiry height hits, the sweeper will not miss sweeping it in the
same block. This also means the outgoing contest resolver now only does
one thing - watch for preimage spend till height expiry-1, which can
easily be moved into the timeout resolver instead in the future.
Pull Request #9315: Implement `blockbeat`

1445 of 2007 new or added lines in 26 files covered. (72.0%)

19246 existing lines in 249 files now uncovered.

102342 of 177975 relevant lines covered (57.5%)

24772.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.28
/server.go
1
package lnd
2

3
import (
4
        "bytes"
5
        "context"
6
        "crypto/rand"
7
        "encoding/hex"
8
        "fmt"
9
        "math/big"
10
        prand "math/rand"
11
        "net"
12
        "strconv"
13
        "strings"
14
        "sync"
15
        "sync/atomic"
16
        "time"
17

18
        "github.com/btcsuite/btcd/btcec/v2"
19
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
20
        "github.com/btcsuite/btcd/btcutil"
21
        "github.com/btcsuite/btcd/chaincfg"
22
        "github.com/btcsuite/btcd/chaincfg/chainhash"
23
        "github.com/btcsuite/btcd/connmgr"
24
        "github.com/btcsuite/btcd/txscript"
25
        "github.com/btcsuite/btcd/wire"
26
        "github.com/go-errors/errors"
27
        sphinx "github.com/lightningnetwork/lightning-onion"
28
        "github.com/lightningnetwork/lnd/aliasmgr"
29
        "github.com/lightningnetwork/lnd/autopilot"
30
        "github.com/lightningnetwork/lnd/brontide"
31
        "github.com/lightningnetwork/lnd/chainio"
32
        "github.com/lightningnetwork/lnd/chainreg"
33
        "github.com/lightningnetwork/lnd/chanacceptor"
34
        "github.com/lightningnetwork/lnd/chanbackup"
35
        "github.com/lightningnetwork/lnd/chanfitness"
36
        "github.com/lightningnetwork/lnd/channeldb"
37
        "github.com/lightningnetwork/lnd/channelnotifier"
38
        "github.com/lightningnetwork/lnd/clock"
39
        "github.com/lightningnetwork/lnd/cluster"
40
        "github.com/lightningnetwork/lnd/contractcourt"
41
        "github.com/lightningnetwork/lnd/discovery"
42
        "github.com/lightningnetwork/lnd/feature"
43
        "github.com/lightningnetwork/lnd/fn/v2"
44
        "github.com/lightningnetwork/lnd/funding"
45
        "github.com/lightningnetwork/lnd/graph"
46
        graphdb "github.com/lightningnetwork/lnd/graph/db"
47
        "github.com/lightningnetwork/lnd/graph/db/models"
48
        "github.com/lightningnetwork/lnd/graph/graphsession"
49
        "github.com/lightningnetwork/lnd/healthcheck"
50
        "github.com/lightningnetwork/lnd/htlcswitch"
51
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
52
        "github.com/lightningnetwork/lnd/input"
53
        "github.com/lightningnetwork/lnd/invoices"
54
        "github.com/lightningnetwork/lnd/keychain"
55
        "github.com/lightningnetwork/lnd/kvdb"
56
        "github.com/lightningnetwork/lnd/lncfg"
57
        "github.com/lightningnetwork/lnd/lnencrypt"
58
        "github.com/lightningnetwork/lnd/lnpeer"
59
        "github.com/lightningnetwork/lnd/lnrpc"
60
        "github.com/lightningnetwork/lnd/lnrpc/routerrpc"
61
        "github.com/lightningnetwork/lnd/lnwallet"
62
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
63
        "github.com/lightningnetwork/lnd/lnwallet/chanfunding"
64
        "github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
65
        "github.com/lightningnetwork/lnd/lnwire"
66
        "github.com/lightningnetwork/lnd/nat"
67
        "github.com/lightningnetwork/lnd/netann"
68
        "github.com/lightningnetwork/lnd/peer"
69
        "github.com/lightningnetwork/lnd/peernotifier"
70
        "github.com/lightningnetwork/lnd/pool"
71
        "github.com/lightningnetwork/lnd/queue"
72
        "github.com/lightningnetwork/lnd/routing"
73
        "github.com/lightningnetwork/lnd/routing/localchans"
74
        "github.com/lightningnetwork/lnd/routing/route"
75
        "github.com/lightningnetwork/lnd/subscribe"
76
        "github.com/lightningnetwork/lnd/sweep"
77
        "github.com/lightningnetwork/lnd/ticker"
78
        "github.com/lightningnetwork/lnd/tor"
79
        "github.com/lightningnetwork/lnd/walletunlocker"
80
        "github.com/lightningnetwork/lnd/watchtower/blob"
81
        "github.com/lightningnetwork/lnd/watchtower/wtclient"
82
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
83
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
84
)
85

86
const (
87
        // defaultMinPeers is the minimum number of peers nodes should always be
88
        // connected to.
89
        defaultMinPeers = 3
90

91
        // defaultStableConnDuration is a floor under which all reconnection
92
        // attempts will apply exponential randomized backoff. Connections
93
        // durations exceeding this value will be eligible to have their
94
        // backoffs reduced.
95
        defaultStableConnDuration = 10 * time.Minute
96

97
        // numInstantInitReconnect specifies how many persistent peers we should
98
        // always attempt outbound connections to immediately. After this value
99
        // is surpassed, the remaining peers will be randomly delayed using
100
        // maxInitReconnectDelay.
101
        numInstantInitReconnect = 10
102

103
        // maxInitReconnectDelay specifies the maximum delay in seconds we will
104
        // apply in attempting to reconnect to persistent peers on startup. The
105
        // value used or a particular peer will be chosen between 0s and this
106
        // value.
107
        maxInitReconnectDelay = 30
108

109
        // multiAddrConnectionStagger is the number of seconds to wait between
110
        // attempting to a peer with each of its advertised addresses.
111
        multiAddrConnectionStagger = 10 * time.Second
112
)
113

114
var (
115
        // ErrPeerNotConnected signals that the server has no connection to the
116
        // given peer.
117
        ErrPeerNotConnected = errors.New("peer is not connected")
118

119
        // ErrServerNotActive indicates that the server has started but hasn't
120
        // fully finished the startup process.
121
        ErrServerNotActive = errors.New("server is still in the process of " +
122
                "starting")
123

124
        // ErrServerShuttingDown indicates that the server is in the process of
125
        // gracefully exiting.
126
        ErrServerShuttingDown = errors.New("server is shutting down")
127

128
        // MaxFundingAmount is a soft-limit of the maximum channel size
129
        // currently accepted within the Lightning Protocol. This is
130
        // defined in BOLT-0002, and serves as an initial precautionary limit
131
        // while implementations are battle tested in the real world.
132
        //
133
        // At the moment, this value depends on which chain is active. It is set
134
        // to the value under the Bitcoin chain as default.
135
        //
136
        // TODO(roasbeef): add command line param to modify.
137
        MaxFundingAmount = funding.MaxBtcFundingAmount
138

139
        // EndorsementExperimentEnd is the time after which nodes should stop
140
        // propagating experimental endorsement signals.
141
        //
142
        // Per blip04: January 1, 2026 12:00:00 AM UTC in unix seconds.
143
        EndorsementExperimentEnd = time.Unix(1767225600, 0)
144
)
145

146
// errPeerAlreadyConnected is an error returned by the server when we're
147
// commanded to connect to a peer, but they're already connected.
148
type errPeerAlreadyConnected struct {
149
        peer *peer.Brontide
150
}
151

152
// Error returns the human readable version of this error type.
153
//
154
// NOTE: Part of the error interface.
UNCOV
155
func (e *errPeerAlreadyConnected) Error() string {
×
UNCOV
156
        return fmt.Sprintf("already connected to peer: %v", e.peer)
×
UNCOV
157
}
×
158

159
// server is the main server of the Lightning Network Daemon. The server houses
160
// global state pertaining to the wallet, database, and the rpcserver.
161
// Additionally, the server is also used as a central messaging bus to interact
162
// with any of its companion objects.
163
type server struct {
164
        active   int32 // atomic
165
        stopping int32 // atomic
166

167
        start sync.Once
168
        stop  sync.Once
169

170
        cfg *Config
171

172
        implCfg *ImplementationCfg
173

174
        // identityECDH is an ECDH capable wrapper for the private key used
175
        // to authenticate any incoming connections.
176
        identityECDH keychain.SingleKeyECDH
177

178
        // identityKeyLoc is the key locator for the above wrapped identity key.
179
        identityKeyLoc keychain.KeyLocator
180

181
        // nodeSigner is an implementation of the MessageSigner implementation
182
        // that's backed by the identity private key of the running lnd node.
183
        nodeSigner *netann.NodeSigner
184

185
        chanStatusMgr *netann.ChanStatusManager
186

187
        // listenAddrs is the list of addresses the server is currently
188
        // listening on.
189
        listenAddrs []net.Addr
190

191
        // torController is a client that will communicate with a locally
192
        // running Tor server. This client will handle initiating and
193
        // authenticating the connection to the Tor server, automatically
194
        // creating and setting up onion services, etc.
195
        torController *tor.Controller
196

197
        // natTraversal is the specific NAT traversal technique used to
198
        // automatically set up port forwarding rules in order to advertise to
199
        // the network that the node is accepting inbound connections.
200
        natTraversal nat.Traversal
201

202
        // lastDetectedIP is the last IP detected by the NAT traversal technique
203
        // above. This IP will be watched periodically in a goroutine in order
204
        // to handle dynamic IP changes.
205
        lastDetectedIP net.IP
206

207
        mu sync.RWMutex
208

209
        // peersByPub is a map of the active peers.
210
        //
211
        // NOTE: The key used here is the raw bytes of the peer's public key to
212
        // string conversion, which means it cannot be printed using `%s` as it
213
        // will just print the binary.
214
        //
215
        // TODO(yy): Use the hex string instead.
216
        peersByPub map[string]*peer.Brontide
217

218
        inboundPeers  map[string]*peer.Brontide
219
        outboundPeers map[string]*peer.Brontide
220

221
        peerConnectedListeners    map[string][]chan<- lnpeer.Peer
222
        peerDisconnectedListeners map[string][]chan<- struct{}
223

224
        // TODO(yy): the Brontide.Start doesn't know this value, which means it
225
        // will continue to send messages even if there are no active channels
226
        // and the value below is false. Once it's pruned, all its connections
227
        // will be closed, thus the Brontide.Start will return an error.
228
        persistentPeers        map[string]bool
229
        persistentPeersBackoff map[string]time.Duration
230
        persistentPeerAddrs    map[string][]*lnwire.NetAddress
231
        persistentConnReqs     map[string][]*connmgr.ConnReq
232
        persistentRetryCancels map[string]chan struct{}
233

234
        // peerErrors keeps a set of peer error buffers for peers that have
235
        // disconnected from us. This allows us to track historic peer errors
236
        // over connections. The string of the peer's compressed pubkey is used
237
        // as a key for this map.
238
        peerErrors map[string]*queue.CircularBuffer
239

240
        // ignorePeerTermination tracks peers for which the server has initiated
241
        // a disconnect. Adding a peer to this map causes the peer termination
242
        // watcher to short circuit in the event that peers are purposefully
243
        // disconnected.
244
        ignorePeerTermination map[*peer.Brontide]struct{}
245

246
        // scheduledPeerConnection maps a pubkey string to a callback that
247
        // should be executed in the peerTerminationWatcher the prior peer with
248
        // the same pubkey exits.  This allows the server to wait until the
249
        // prior peer has cleaned up successfully, before adding the new peer
250
        // intended to replace it.
251
        scheduledPeerConnection map[string]func()
252

253
        // pongBuf is a shared pong reply buffer we'll use across all active
254
        // peer goroutines. We know the max size of a pong message
255
        // (lnwire.MaxPongBytes), so we can allocate this ahead of time, and
256
        // avoid allocations each time we need to send a pong message.
257
        pongBuf []byte
258

259
        cc *chainreg.ChainControl
260

261
        fundingMgr *funding.Manager
262

263
        graphDB *graphdb.ChannelGraph
264

265
        chanStateDB *channeldb.ChannelStateDB
266

267
        addrSource channeldb.AddrSource
268

269
        // miscDB is the DB that contains all "other" databases within the main
270
        // channel DB that haven't been separated out yet.
271
        miscDB *channeldb.DB
272

273
        invoicesDB invoices.InvoiceDB
274

275
        aliasMgr *aliasmgr.Manager
276

277
        htlcSwitch *htlcswitch.Switch
278

279
        interceptableSwitch *htlcswitch.InterceptableSwitch
280

281
        invoices *invoices.InvoiceRegistry
282

283
        invoiceHtlcModifier *invoices.HtlcModificationInterceptor
284

285
        channelNotifier *channelnotifier.ChannelNotifier
286

287
        peerNotifier *peernotifier.PeerNotifier
288

289
        htlcNotifier *htlcswitch.HtlcNotifier
290

291
        witnessBeacon contractcourt.WitnessBeacon
292

293
        breachArbitrator *contractcourt.BreachArbitrator
294

295
        missionController *routing.MissionController
296
        defaultMC         *routing.MissionControl
297

298
        graphBuilder *graph.Builder
299

300
        chanRouter *routing.ChannelRouter
301

302
        controlTower routing.ControlTower
303

304
        authGossiper *discovery.AuthenticatedGossiper
305

306
        localChanMgr *localchans.Manager
307

308
        utxoNursery *contractcourt.UtxoNursery
309

310
        sweeper *sweep.UtxoSweeper
311

312
        chainArb *contractcourt.ChainArbitrator
313

314
        sphinx *hop.OnionProcessor
315

316
        towerClientMgr *wtclient.Manager
317

318
        connMgr *connmgr.ConnManager
319

320
        sigPool *lnwallet.SigPool
321

322
        writePool *pool.Write
323

324
        readPool *pool.Read
325

326
        tlsManager *TLSManager
327

328
        // featureMgr dispatches feature vectors for various contexts within the
329
        // daemon.
330
        featureMgr *feature.Manager
331

332
        // currentNodeAnn is the node announcement that has been broadcast to
333
        // the network upon startup, if the attributes of the node (us) has
334
        // changed since last start.
335
        currentNodeAnn *lnwire.NodeAnnouncement
336

337
        // chansToRestore is the set of channels that upon starting, the server
338
        // should attempt to restore/recover.
339
        chansToRestore walletunlocker.ChannelsToRecover
340

341
        // chanSubSwapper is a sub-system that will ensure our on-disk channel
342
        // backups are consistent at all times. It interacts with the
343
        // channelNotifier to be notified of newly opened and closed channels.
344
        chanSubSwapper *chanbackup.SubSwapper
345

346
        // chanEventStore tracks the behaviour of channels and their remote peers to
347
        // provide insights into their health and performance.
348
        chanEventStore *chanfitness.ChannelEventStore
349

350
        hostAnn *netann.HostAnnouncer
351

352
        // livenessMonitor monitors that lnd has access to critical resources.
353
        livenessMonitor *healthcheck.Monitor
354

355
        customMessageServer *subscribe.Server
356

357
        // txPublisher is a publisher with fee-bumping capability.
358
        txPublisher *sweep.TxPublisher
359

360
        // blockbeatDispatcher is a block dispatcher that notifies subscribers
361
        // of new blocks.
362
        blockbeatDispatcher *chainio.BlockbeatDispatcher
363

364
        quit chan struct{}
365

366
        wg sync.WaitGroup
367
}
368

369
// updatePersistentPeerAddrs subscribes to topology changes and stores
370
// advertised addresses for any NodeAnnouncements from our persisted peers.
UNCOV
371
func (s *server) updatePersistentPeerAddrs() error {
×
UNCOV
372
        graphSub, err := s.graphBuilder.SubscribeTopology()
×
UNCOV
373
        if err != nil {
×
374
                return err
×
375
        }
×
376

UNCOV
377
        s.wg.Add(1)
×
UNCOV
378
        go func() {
×
UNCOV
379
                defer func() {
×
UNCOV
380
                        graphSub.Cancel()
×
UNCOV
381
                        s.wg.Done()
×
UNCOV
382
                }()
×
383

UNCOV
384
                for {
×
UNCOV
385
                        select {
×
UNCOV
386
                        case <-s.quit:
×
UNCOV
387
                                return
×
388

UNCOV
389
                        case topChange, ok := <-graphSub.TopologyChanges:
×
UNCOV
390
                                // If the router is shutting down, then we will
×
UNCOV
391
                                // as well.
×
UNCOV
392
                                if !ok {
×
393
                                        return
×
394
                                }
×
395

UNCOV
396
                                for _, update := range topChange.NodeUpdates {
×
UNCOV
397
                                        pubKeyStr := string(
×
UNCOV
398
                                                update.IdentityKey.
×
UNCOV
399
                                                        SerializeCompressed(),
×
UNCOV
400
                                        )
×
UNCOV
401

×
UNCOV
402
                                        // We only care about updates from
×
UNCOV
403
                                        // our persistentPeers.
×
UNCOV
404
                                        s.mu.RLock()
×
UNCOV
405
                                        _, ok := s.persistentPeers[pubKeyStr]
×
UNCOV
406
                                        s.mu.RUnlock()
×
UNCOV
407
                                        if !ok {
×
UNCOV
408
                                                continue
×
409
                                        }
410

UNCOV
411
                                        addrs := make([]*lnwire.NetAddress, 0,
×
UNCOV
412
                                                len(update.Addresses))
×
UNCOV
413

×
UNCOV
414
                                        for _, addr := range update.Addresses {
×
UNCOV
415
                                                addrs = append(addrs,
×
UNCOV
416
                                                        &lnwire.NetAddress{
×
UNCOV
417
                                                                IdentityKey: update.IdentityKey,
×
UNCOV
418
                                                                Address:     addr,
×
UNCOV
419
                                                                ChainNet:    s.cfg.ActiveNetParams.Net,
×
UNCOV
420
                                                        },
×
UNCOV
421
                                                )
×
UNCOV
422
                                        }
×
423

UNCOV
424
                                        s.mu.Lock()
×
UNCOV
425

×
UNCOV
426
                                        // Update the stored addresses for this
×
UNCOV
427
                                        // to peer to reflect the new set.
×
UNCOV
428
                                        s.persistentPeerAddrs[pubKeyStr] = addrs
×
UNCOV
429

×
UNCOV
430
                                        // If there are no outstanding
×
UNCOV
431
                                        // connection requests for this peer
×
UNCOV
432
                                        // then our work is done since we are
×
UNCOV
433
                                        // not currently trying to connect to
×
UNCOV
434
                                        // them.
×
UNCOV
435
                                        if len(s.persistentConnReqs[pubKeyStr]) == 0 {
×
UNCOV
436
                                                s.mu.Unlock()
×
UNCOV
437
                                                continue
×
438
                                        }
439

UNCOV
440
                                        s.mu.Unlock()
×
UNCOV
441

×
UNCOV
442
                                        s.connectToPersistentPeer(pubKeyStr)
×
443
                                }
444
                        }
445
                }
446
        }()
447

UNCOV
448
        return nil
×
449
}
450

451
// CustomMessage is a custom message that is received from a peer.
452
type CustomMessage struct {
453
        // Peer is the peer pubkey
454
        Peer [33]byte
455

456
        // Msg is the custom wire message.
457
        Msg *lnwire.Custom
458
}
459

460
// parseAddr parses an address from its string format to a net.Addr.
UNCOV
461
func parseAddr(address string, netCfg tor.Net) (net.Addr, error) {
×
UNCOV
462
        var (
×
UNCOV
463
                host string
×
UNCOV
464
                port int
×
UNCOV
465
        )
×
UNCOV
466

×
UNCOV
467
        // Split the address into its host and port components.
×
UNCOV
468
        h, p, err := net.SplitHostPort(address)
×
UNCOV
469
        if err != nil {
×
470
                // If a port wasn't specified, we'll assume the address only
×
471
                // contains the host so we'll use the default port.
×
472
                host = address
×
473
                port = defaultPeerPort
×
UNCOV
474
        } else {
×
UNCOV
475
                // Otherwise, we'll note both the host and ports.
×
UNCOV
476
                host = h
×
UNCOV
477
                portNum, err := strconv.Atoi(p)
×
UNCOV
478
                if err != nil {
×
479
                        return nil, err
×
480
                }
×
UNCOV
481
                port = portNum
×
482
        }
483

UNCOV
484
        if tor.IsOnionHost(host) {
×
485
                return &tor.OnionAddr{OnionService: host, Port: port}, nil
×
486
        }
×
487

488
        // If the host is part of a TCP address, we'll use the network
489
        // specific ResolveTCPAddr function in order to resolve these
490
        // addresses over Tor in order to prevent leaking your real IP
491
        // address.
UNCOV
492
        hostPort := net.JoinHostPort(host, strconv.Itoa(port))
×
UNCOV
493
        return netCfg.ResolveTCPAddr("tcp", hostPort)
×
494
}
495

496
// noiseDial is a factory function which creates a connmgr compliant dialing
497
// function by returning a closure which includes the server's identity key.
498
func noiseDial(idKey keychain.SingleKeyECDH,
UNCOV
499
        netCfg tor.Net, timeout time.Duration) func(net.Addr) (net.Conn, error) {
×
UNCOV
500

×
UNCOV
501
        return func(a net.Addr) (net.Conn, error) {
×
UNCOV
502
                lnAddr := a.(*lnwire.NetAddress)
×
UNCOV
503
                return brontide.Dial(idKey, lnAddr, timeout, netCfg.Dial)
×
UNCOV
504
        }
×
505
}
506

507
// newServer creates a new instance of the server which is to listen using the
508
// passed listener address.
509
func newServer(cfg *Config, listenAddrs []net.Addr,
510
        dbs *DatabaseInstances, cc *chainreg.ChainControl,
511
        nodeKeyDesc *keychain.KeyDescriptor,
512
        chansToRestore walletunlocker.ChannelsToRecover,
513
        chanPredicate chanacceptor.ChannelAcceptor,
514
        torController *tor.Controller, tlsManager *TLSManager,
515
        leaderElector cluster.LeaderElector,
UNCOV
516
        implCfg *ImplementationCfg) (*server, error) {
×
UNCOV
517

×
UNCOV
518
        var (
×
UNCOV
519
                err         error
×
UNCOV
520
                nodeKeyECDH = keychain.NewPubKeyECDH(*nodeKeyDesc, cc.KeyRing)
×
UNCOV
521

×
UNCOV
522
                // We just derived the full descriptor, so we know the public
×
UNCOV
523
                // key is set on it.
×
UNCOV
524
                nodeKeySigner = keychain.NewPubKeyMessageSigner(
×
UNCOV
525
                        nodeKeyDesc.PubKey, nodeKeyDesc.KeyLocator, cc.KeyRing,
×
UNCOV
526
                )
×
UNCOV
527
        )
×
UNCOV
528

×
UNCOV
529
        listeners := make([]net.Listener, len(listenAddrs))
×
UNCOV
530
        for i, listenAddr := range listenAddrs {
×
UNCOV
531
                // Note: though brontide.NewListener uses ResolveTCPAddr, it
×
UNCOV
532
                // doesn't need to call the general lndResolveTCP function
×
UNCOV
533
                // since we are resolving a local address.
×
UNCOV
534
                listeners[i], err = brontide.NewListener(
×
UNCOV
535
                        nodeKeyECDH, listenAddr.String(),
×
UNCOV
536
                )
×
UNCOV
537
                if err != nil {
×
538
                        return nil, err
×
539
                }
×
540
        }
541

UNCOV
542
        var serializedPubKey [33]byte
×
UNCOV
543
        copy(serializedPubKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
UNCOV
544

×
UNCOV
545
        netParams := cfg.ActiveNetParams.Params
×
UNCOV
546

×
UNCOV
547
        // Initialize the sphinx router.
×
UNCOV
548
        replayLog := htlcswitch.NewDecayedLog(
×
UNCOV
549
                dbs.DecayedLogDB, cc.ChainNotifier,
×
UNCOV
550
        )
×
UNCOV
551
        sphinxRouter := sphinx.NewRouter(nodeKeyECDH, replayLog)
×
UNCOV
552

×
UNCOV
553
        writeBufferPool := pool.NewWriteBuffer(
×
UNCOV
554
                pool.DefaultWriteBufferGCInterval,
×
UNCOV
555
                pool.DefaultWriteBufferExpiryInterval,
×
UNCOV
556
        )
×
UNCOV
557

×
UNCOV
558
        writePool := pool.NewWrite(
×
UNCOV
559
                writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout,
×
UNCOV
560
        )
×
UNCOV
561

×
UNCOV
562
        readBufferPool := pool.NewReadBuffer(
×
UNCOV
563
                pool.DefaultReadBufferGCInterval,
×
UNCOV
564
                pool.DefaultReadBufferExpiryInterval,
×
UNCOV
565
        )
×
UNCOV
566

×
UNCOV
567
        readPool := pool.NewRead(
×
UNCOV
568
                readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout,
×
UNCOV
569
        )
×
UNCOV
570

×
UNCOV
571
        // If the taproot overlay flag is set, but we don't have an aux funding
×
UNCOV
572
        // controller, then we'll exit as this is incompatible.
×
UNCOV
573
        if cfg.ProtocolOptions.TaprootOverlayChans &&
×
UNCOV
574
                implCfg.AuxFundingController.IsNone() {
×
575

×
576
                return nil, fmt.Errorf("taproot overlay flag set, but not " +
×
577
                        "aux controllers")
×
578
        }
×
579

580
        //nolint:ll
UNCOV
581
        featureMgr, err := feature.NewManager(feature.Config{
×
UNCOV
582
                NoTLVOnion:                cfg.ProtocolOptions.LegacyOnion(),
×
UNCOV
583
                NoStaticRemoteKey:         cfg.ProtocolOptions.NoStaticRemoteKey(),
×
UNCOV
584
                NoAnchors:                 cfg.ProtocolOptions.NoAnchorCommitments(),
×
UNCOV
585
                NoWumbo:                   !cfg.ProtocolOptions.Wumbo(),
×
UNCOV
586
                NoScriptEnforcementLease:  cfg.ProtocolOptions.NoScriptEnforcementLease(),
×
UNCOV
587
                NoKeysend:                 !cfg.AcceptKeySend,
×
UNCOV
588
                NoOptionScidAlias:         !cfg.ProtocolOptions.ScidAlias(),
×
UNCOV
589
                NoZeroConf:                !cfg.ProtocolOptions.ZeroConf(),
×
UNCOV
590
                NoAnySegwit:               cfg.ProtocolOptions.NoAnySegwit(),
×
UNCOV
591
                CustomFeatures:            cfg.ProtocolOptions.CustomFeatures(),
×
UNCOV
592
                NoTaprootChans:            !cfg.ProtocolOptions.TaprootChans,
×
UNCOV
593
                NoTaprootOverlay:          !cfg.ProtocolOptions.TaprootOverlayChans,
×
UNCOV
594
                NoRouteBlinding:           cfg.ProtocolOptions.NoRouteBlinding(),
×
UNCOV
595
                NoExperimentalEndorsement: cfg.ProtocolOptions.NoExperimentalEndorsement(),
×
UNCOV
596
                NoQuiescence:              cfg.ProtocolOptions.NoQuiescence(),
×
UNCOV
597
        })
×
UNCOV
598
        if err != nil {
×
599
                return nil, err
×
600
        }
×
601

UNCOV
602
        invoiceHtlcModifier := invoices.NewHtlcModificationInterceptor()
×
UNCOV
603
        registryConfig := invoices.RegistryConfig{
×
UNCOV
604
                FinalCltvRejectDelta:        lncfg.DefaultFinalCltvRejectDelta,
×
UNCOV
605
                HtlcHoldDuration:            invoices.DefaultHtlcHoldDuration,
×
UNCOV
606
                Clock:                       clock.NewDefaultClock(),
×
UNCOV
607
                AcceptKeySend:               cfg.AcceptKeySend,
×
UNCOV
608
                AcceptAMP:                   cfg.AcceptAMP,
×
UNCOV
609
                GcCanceledInvoicesOnStartup: cfg.GcCanceledInvoicesOnStartup,
×
UNCOV
610
                GcCanceledInvoicesOnTheFly:  cfg.GcCanceledInvoicesOnTheFly,
×
UNCOV
611
                KeysendHoldTime:             cfg.KeysendHoldTime,
×
UNCOV
612
                HtlcInterceptor:             invoiceHtlcModifier,
×
UNCOV
613
        }
×
UNCOV
614

×
UNCOV
615
        addrSource := channeldb.NewMultiAddrSource(dbs.ChanStateDB, dbs.GraphDB)
×
UNCOV
616

×
UNCOV
617
        s := &server{
×
UNCOV
618
                cfg:            cfg,
×
UNCOV
619
                implCfg:        implCfg,
×
UNCOV
620
                graphDB:        dbs.GraphDB,
×
UNCOV
621
                chanStateDB:    dbs.ChanStateDB.ChannelStateDB(),
×
UNCOV
622
                addrSource:     addrSource,
×
UNCOV
623
                miscDB:         dbs.ChanStateDB,
×
UNCOV
624
                invoicesDB:     dbs.InvoiceDB,
×
UNCOV
625
                cc:             cc,
×
UNCOV
626
                sigPool:        lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer),
×
UNCOV
627
                writePool:      writePool,
×
UNCOV
628
                readPool:       readPool,
×
UNCOV
629
                chansToRestore: chansToRestore,
×
UNCOV
630

×
NEW
631
                blockbeatDispatcher: chainio.NewBlockbeatDispatcher(
×
NEW
632
                        cc.ChainNotifier,
×
NEW
633
                ),
×
UNCOV
634
                channelNotifier: channelnotifier.New(
×
UNCOV
635
                        dbs.ChanStateDB.ChannelStateDB(),
×
UNCOV
636
                ),
×
UNCOV
637

×
UNCOV
638
                identityECDH:   nodeKeyECDH,
×
UNCOV
639
                identityKeyLoc: nodeKeyDesc.KeyLocator,
×
UNCOV
640
                nodeSigner:     netann.NewNodeSigner(nodeKeySigner),
×
UNCOV
641

×
UNCOV
642
                listenAddrs: listenAddrs,
×
UNCOV
643

×
UNCOV
644
                // TODO(roasbeef): derive proper onion key based on rotation
×
UNCOV
645
                // schedule
×
UNCOV
646
                sphinx: hop.NewOnionProcessor(sphinxRouter),
×
UNCOV
647

×
UNCOV
648
                torController: torController,
×
UNCOV
649

×
UNCOV
650
                persistentPeers:         make(map[string]bool),
×
UNCOV
651
                persistentPeersBackoff:  make(map[string]time.Duration),
×
UNCOV
652
                persistentConnReqs:      make(map[string][]*connmgr.ConnReq),
×
UNCOV
653
                persistentPeerAddrs:     make(map[string][]*lnwire.NetAddress),
×
UNCOV
654
                persistentRetryCancels:  make(map[string]chan struct{}),
×
UNCOV
655
                peerErrors:              make(map[string]*queue.CircularBuffer),
×
UNCOV
656
                ignorePeerTermination:   make(map[*peer.Brontide]struct{}),
×
UNCOV
657
                scheduledPeerConnection: make(map[string]func()),
×
UNCOV
658
                pongBuf:                 make([]byte, lnwire.MaxPongBytes),
×
UNCOV
659

×
UNCOV
660
                peersByPub:                make(map[string]*peer.Brontide),
×
UNCOV
661
                inboundPeers:              make(map[string]*peer.Brontide),
×
UNCOV
662
                outboundPeers:             make(map[string]*peer.Brontide),
×
UNCOV
663
                peerConnectedListeners:    make(map[string][]chan<- lnpeer.Peer),
×
UNCOV
664
                peerDisconnectedListeners: make(map[string][]chan<- struct{}),
×
UNCOV
665

×
UNCOV
666
                invoiceHtlcModifier: invoiceHtlcModifier,
×
UNCOV
667

×
UNCOV
668
                customMessageServer: subscribe.NewServer(),
×
UNCOV
669

×
UNCOV
670
                tlsManager: tlsManager,
×
UNCOV
671

×
UNCOV
672
                featureMgr: featureMgr,
×
UNCOV
673
                quit:       make(chan struct{}),
×
UNCOV
674
        }
×
UNCOV
675

×
NEW
676
        // Start the low-level services once they are initialized.
×
NEW
677
        //
×
NEW
678
        // TODO(yy): break the server startup into four steps,
×
NEW
679
        // 1. init the low-level services.
×
NEW
680
        // 2. start the low-level services.
×
NEW
681
        // 3. init the high-level services.
×
NEW
682
        // 4. start the high-level services.
×
NEW
683
        if err := s.startLowLevelServices(); err != nil {
×
NEW
684
                return nil, err
×
NEW
685
        }
×
686

UNCOV
687
        currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock()
×
UNCOV
688
        if err != nil {
×
689
                return nil, err
×
690
        }
×
691

UNCOV
692
        expiryWatcher := invoices.NewInvoiceExpiryWatcher(
×
UNCOV
693
                clock.NewDefaultClock(), cfg.Invoices.HoldExpiryDelta,
×
UNCOV
694
                uint32(currentHeight), currentHash, cc.ChainNotifier,
×
UNCOV
695
        )
×
UNCOV
696
        s.invoices = invoices.NewRegistry(
×
UNCOV
697
                dbs.InvoiceDB, expiryWatcher, &registryConfig,
×
UNCOV
698
        )
×
UNCOV
699

×
UNCOV
700
        s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
×
UNCOV
701

×
UNCOV
702
        thresholdSats := btcutil.Amount(cfg.MaxFeeExposure)
×
UNCOV
703
        thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
×
UNCOV
704

×
UNCOV
705
        linkUpdater := func(shortID lnwire.ShortChannelID) error {
×
UNCOV
706
                link, err := s.htlcSwitch.GetLinkByShortID(shortID)
×
UNCOV
707
                if err != nil {
×
708
                        return err
×
709
                }
×
710

UNCOV
711
                s.htlcSwitch.UpdateLinkAliases(link)
×
UNCOV
712

×
UNCOV
713
                return nil
×
714
        }
715

UNCOV
716
        s.aliasMgr, err = aliasmgr.NewManager(dbs.ChanStateDB, linkUpdater)
×
UNCOV
717
        if err != nil {
×
718
                return nil, err
×
719
        }
×
720

UNCOV
721
        s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{
×
UNCOV
722
                DB:                   dbs.ChanStateDB,
×
UNCOV
723
                FetchAllOpenChannels: s.chanStateDB.FetchAllOpenChannels,
×
UNCOV
724
                FetchAllChannels:     s.chanStateDB.FetchAllChannels,
×
UNCOV
725
                FetchClosedChannels:  s.chanStateDB.FetchClosedChannels,
×
UNCOV
726
                LocalChannelClose: func(pubKey []byte,
×
UNCOV
727
                        request *htlcswitch.ChanClose) {
×
UNCOV
728

×
UNCOV
729
                        peer, err := s.FindPeerByPubStr(string(pubKey))
×
UNCOV
730
                        if err != nil {
×
731
                                srvrLog.Errorf("unable to close channel, peer"+
×
732
                                        " with %v id can't be found: %v",
×
733
                                        pubKey, err,
×
734
                                )
×
735
                                return
×
736
                        }
×
737

UNCOV
738
                        peer.HandleLocalCloseChanReqs(request)
×
739
                },
740
                FwdingLog:              dbs.ChanStateDB.ForwardingLog(),
741
                SwitchPackager:         channeldb.NewSwitchPackager(),
742
                ExtractErrorEncrypter:  s.sphinx.ExtractErrorEncrypter,
743
                FetchLastChannelUpdate: s.fetchLastChanUpdate(),
744
                Notifier:               s.cc.ChainNotifier,
745
                HtlcNotifier:           s.htlcNotifier,
746
                FwdEventTicker:         ticker.New(htlcswitch.DefaultFwdEventInterval),
747
                LogEventTicker:         ticker.New(htlcswitch.DefaultLogInterval),
748
                AckEventTicker:         ticker.New(htlcswitch.DefaultAckInterval),
749
                AllowCircularRoute:     cfg.AllowCircularRoute,
750
                RejectHTLC:             cfg.RejectHTLC,
751
                Clock:                  clock.NewDefaultClock(),
752
                MailboxDeliveryTimeout: cfg.Htlcswitch.MailboxDeliveryTimeout,
753
                MaxFeeExposure:         thresholdMSats,
754
                SignAliasUpdate:        s.signAliasUpdate,
755
                IsAlias:                aliasmgr.IsAlias,
756
        }, uint32(currentHeight))
UNCOV
757
        if err != nil {
×
758
                return nil, err
×
759
        }
×
UNCOV
760
        s.interceptableSwitch, err = htlcswitch.NewInterceptableSwitch(
×
UNCOV
761
                &htlcswitch.InterceptableSwitchConfig{
×
UNCOV
762
                        Switch:             s.htlcSwitch,
×
UNCOV
763
                        CltvRejectDelta:    lncfg.DefaultFinalCltvRejectDelta,
×
UNCOV
764
                        CltvInterceptDelta: lncfg.DefaultCltvInterceptDelta,
×
UNCOV
765
                        RequireInterceptor: s.cfg.RequireInterceptor,
×
UNCOV
766
                        Notifier:           s.cc.ChainNotifier,
×
UNCOV
767
                },
×
UNCOV
768
        )
×
UNCOV
769
        if err != nil {
×
770
                return nil, err
×
771
        }
×
772

UNCOV
773
        s.witnessBeacon = newPreimageBeacon(
×
UNCOV
774
                dbs.ChanStateDB.NewWitnessCache(),
×
UNCOV
775
                s.interceptableSwitch.ForwardPacket,
×
UNCOV
776
        )
×
UNCOV
777

×
UNCOV
778
        chanStatusMgrCfg := &netann.ChanStatusConfig{
×
UNCOV
779
                ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
×
UNCOV
780
                ChanEnableTimeout:        cfg.ChanEnableTimeout,
×
UNCOV
781
                ChanDisableTimeout:       cfg.ChanDisableTimeout,
×
UNCOV
782
                OurPubKey:                nodeKeyDesc.PubKey,
×
UNCOV
783
                OurKeyLoc:                nodeKeyDesc.KeyLocator,
×
UNCOV
784
                MessageSigner:            s.nodeSigner,
×
UNCOV
785
                IsChannelActive:          s.htlcSwitch.HasActiveLink,
×
UNCOV
786
                ApplyChannelUpdate:       s.applyChannelUpdate,
×
UNCOV
787
                DB:                       s.chanStateDB,
×
UNCOV
788
                Graph:                    dbs.GraphDB,
×
UNCOV
789
        }
×
UNCOV
790

×
UNCOV
791
        chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
×
UNCOV
792
        if err != nil {
×
793
                return nil, err
×
794
        }
×
UNCOV
795
        s.chanStatusMgr = chanStatusMgr
×
UNCOV
796

×
UNCOV
797
        // If enabled, use either UPnP or NAT-PMP to automatically configure
×
UNCOV
798
        // port forwarding for users behind a NAT.
×
UNCOV
799
        if cfg.NAT {
×
800
                srvrLog.Info("Scanning local network for a UPnP enabled device")
×
801

×
802
                discoveryTimeout := time.Duration(10 * time.Second)
×
803

×
804
                ctx, cancel := context.WithTimeout(
×
805
                        context.Background(), discoveryTimeout,
×
806
                )
×
807
                defer cancel()
×
808
                upnp, err := nat.DiscoverUPnP(ctx)
×
809
                if err == nil {
×
810
                        s.natTraversal = upnp
×
811
                } else {
×
812
                        // If we were not able to discover a UPnP enabled device
×
813
                        // on the local network, we'll fall back to attempting
×
814
                        // to discover a NAT-PMP enabled device.
×
815
                        srvrLog.Errorf("Unable to discover a UPnP enabled "+
×
816
                                "device on the local network: %v", err)
×
817

×
818
                        srvrLog.Info("Scanning local network for a NAT-PMP " +
×
819
                                "enabled device")
×
820

×
821
                        pmp, err := nat.DiscoverPMP(discoveryTimeout)
×
822
                        if err != nil {
×
823
                                err := fmt.Errorf("unable to discover a "+
×
824
                                        "NAT-PMP enabled device on the local "+
×
825
                                        "network: %v", err)
×
826
                                srvrLog.Error(err)
×
827
                                return nil, err
×
828
                        }
×
829

830
                        s.natTraversal = pmp
×
831
                }
832
        }
833

834
        // If we were requested to automatically configure port forwarding,
835
        // we'll use the ports that the server will be listening on.
UNCOV
836
        externalIPStrings := make([]string, len(cfg.ExternalIPs))
×
UNCOV
837
        for idx, ip := range cfg.ExternalIPs {
×
UNCOV
838
                externalIPStrings[idx] = ip.String()
×
UNCOV
839
        }
×
UNCOV
840
        if s.natTraversal != nil {
×
841
                listenPorts := make([]uint16, 0, len(listenAddrs))
×
842
                for _, listenAddr := range listenAddrs {
×
843
                        // At this point, the listen addresses should have
×
844
                        // already been normalized, so it's safe to ignore the
×
845
                        // errors.
×
846
                        _, portStr, _ := net.SplitHostPort(listenAddr.String())
×
847
                        port, _ := strconv.Atoi(portStr)
×
848

×
849
                        listenPorts = append(listenPorts, uint16(port))
×
850
                }
×
851

852
                ips, err := s.configurePortForwarding(listenPorts...)
×
853
                if err != nil {
×
854
                        srvrLog.Errorf("Unable to automatically set up port "+
×
855
                                "forwarding using %s: %v",
×
856
                                s.natTraversal.Name(), err)
×
857
                } else {
×
858
                        srvrLog.Infof("Automatically set up port forwarding "+
×
859
                                "using %s to advertise external IP",
×
860
                                s.natTraversal.Name())
×
861
                        externalIPStrings = append(externalIPStrings, ips...)
×
862
                }
×
863
        }
864

865
        // If external IP addresses have been specified, add those to the list
866
        // of this server's addresses.
UNCOV
867
        externalIPs, err := lncfg.NormalizeAddresses(
×
UNCOV
868
                externalIPStrings, strconv.Itoa(defaultPeerPort),
×
UNCOV
869
                cfg.net.ResolveTCPAddr,
×
UNCOV
870
        )
×
UNCOV
871
        if err != nil {
×
872
                return nil, err
×
873
        }
×
874

UNCOV
875
        selfAddrs := make([]net.Addr, 0, len(externalIPs))
×
UNCOV
876
        selfAddrs = append(selfAddrs, externalIPs...)
×
UNCOV
877

×
UNCOV
878
        // We'll now reconstruct a node announcement based on our current
×
UNCOV
879
        // configuration so we can send it out as a sort of heart beat within
×
UNCOV
880
        // the network.
×
UNCOV
881
        //
×
UNCOV
882
        // We'll start by parsing the node color from configuration.
×
UNCOV
883
        color, err := lncfg.ParseHexColor(cfg.Color)
×
UNCOV
884
        if err != nil {
×
885
                srvrLog.Errorf("unable to parse color: %v\n", err)
×
886
                return nil, err
×
887
        }
×
888

889
        // If no alias is provided, default to first 10 characters of public
890
        // key.
UNCOV
891
        alias := cfg.Alias
×
UNCOV
892
        if alias == "" {
×
UNCOV
893
                alias = hex.EncodeToString(serializedPubKey[:10])
×
UNCOV
894
        }
×
UNCOV
895
        nodeAlias, err := lnwire.NewNodeAlias(alias)
×
UNCOV
896
        if err != nil {
×
897
                return nil, err
×
898
        }
×
UNCOV
899
        selfNode := &models.LightningNode{
×
UNCOV
900
                HaveNodeAnnouncement: true,
×
UNCOV
901
                LastUpdate:           time.Now(),
×
UNCOV
902
                Addresses:            selfAddrs,
×
UNCOV
903
                Alias:                nodeAlias.String(),
×
UNCOV
904
                Features:             s.featureMgr.Get(feature.SetNodeAnn),
×
UNCOV
905
                Color:                color,
×
UNCOV
906
        }
×
UNCOV
907
        copy(selfNode.PubKeyBytes[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
UNCOV
908

×
UNCOV
909
        // Based on the disk representation of the node announcement generated
×
UNCOV
910
        // above, we'll generate a node announcement that can go out on the
×
UNCOV
911
        // network so we can properly sign it.
×
UNCOV
912
        nodeAnn, err := selfNode.NodeAnnouncement(false)
×
UNCOV
913
        if err != nil {
×
914
                return nil, fmt.Errorf("unable to gen self node ann: %w", err)
×
915
        }
×
916

917
        // With the announcement generated, we'll sign it to properly
918
        // authenticate the message on the network.
UNCOV
919
        authSig, err := netann.SignAnnouncement(
×
UNCOV
920
                s.nodeSigner, nodeKeyDesc.KeyLocator, nodeAnn,
×
UNCOV
921
        )
×
UNCOV
922
        if err != nil {
×
923
                return nil, fmt.Errorf("unable to generate signature for "+
×
924
                        "self node announcement: %v", err)
×
925
        }
×
UNCOV
926
        selfNode.AuthSigBytes = authSig.Serialize()
×
UNCOV
927
        nodeAnn.Signature, err = lnwire.NewSigFromECDSARawSignature(
×
UNCOV
928
                selfNode.AuthSigBytes,
×
UNCOV
929
        )
×
UNCOV
930
        if err != nil {
×
931
                return nil, err
×
932
        }
×
933

934
        // Finally, we'll update the representation on disk, and update our
935
        // cached in-memory version as well.
UNCOV
936
        if err := dbs.GraphDB.SetSourceNode(selfNode); err != nil {
×
937
                return nil, fmt.Errorf("can't set self node: %w", err)
×
938
        }
×
UNCOV
939
        s.currentNodeAnn = nodeAnn
×
UNCOV
940

×
UNCOV
941
        // The router will get access to the payment ID sequencer, such that it
×
UNCOV
942
        // can generate unique payment IDs.
×
UNCOV
943
        sequencer, err := htlcswitch.NewPersistentSequencer(dbs.ChanStateDB)
×
UNCOV
944
        if err != nil {
×
945
                return nil, err
×
946
        }
×
947

948
        // Instantiate mission control with config from the sub server.
949
        //
950
        // TODO(joostjager): When we are further in the process of moving to sub
951
        // servers, the mission control instance itself can be moved there too.
UNCOV
952
        routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
×
UNCOV
953

×
UNCOV
954
        // We only initialize a probability estimator if there's no custom one.
×
UNCOV
955
        var estimator routing.Estimator
×
UNCOV
956
        if cfg.Estimator != nil {
×
957
                estimator = cfg.Estimator
×
UNCOV
958
        } else {
×
UNCOV
959
                switch routingConfig.ProbabilityEstimatorType {
×
UNCOV
960
                case routing.AprioriEstimatorName:
×
UNCOV
961
                        aCfg := routingConfig.AprioriConfig
×
UNCOV
962
                        aprioriConfig := routing.AprioriConfig{
×
UNCOV
963
                                AprioriHopProbability: aCfg.HopProbability,
×
UNCOV
964
                                PenaltyHalfLife:       aCfg.PenaltyHalfLife,
×
UNCOV
965
                                AprioriWeight:         aCfg.Weight,
×
UNCOV
966
                                CapacityFraction:      aCfg.CapacityFraction,
×
UNCOV
967
                        }
×
UNCOV
968

×
UNCOV
969
                        estimator, err = routing.NewAprioriEstimator(
×
UNCOV
970
                                aprioriConfig,
×
UNCOV
971
                        )
×
UNCOV
972
                        if err != nil {
×
973
                                return nil, err
×
974
                        }
×
975

976
                case routing.BimodalEstimatorName:
×
977
                        bCfg := routingConfig.BimodalConfig
×
978
                        bimodalConfig := routing.BimodalConfig{
×
979
                                BimodalNodeWeight: bCfg.NodeWeight,
×
980
                                BimodalScaleMsat: lnwire.MilliSatoshi(
×
981
                                        bCfg.Scale,
×
982
                                ),
×
983
                                BimodalDecayTime: bCfg.DecayTime,
×
984
                        }
×
985

×
986
                        estimator, err = routing.NewBimodalEstimator(
×
987
                                bimodalConfig,
×
988
                        )
×
989
                        if err != nil {
×
990
                                return nil, err
×
991
                        }
×
992

993
                default:
×
994
                        return nil, fmt.Errorf("unknown estimator type %v",
×
995
                                routingConfig.ProbabilityEstimatorType)
×
996
                }
997
        }
998

UNCOV
999
        mcCfg := &routing.MissionControlConfig{
×
UNCOV
1000
                OnConfigUpdate:          fn.Some(s.UpdateRoutingConfig),
×
UNCOV
1001
                Estimator:               estimator,
×
UNCOV
1002
                MaxMcHistory:            routingConfig.MaxMcHistory,
×
UNCOV
1003
                McFlushInterval:         routingConfig.McFlushInterval,
×
UNCOV
1004
                MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
×
UNCOV
1005
        }
×
UNCOV
1006

×
UNCOV
1007
        s.missionController, err = routing.NewMissionController(
×
UNCOV
1008
                dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg,
×
UNCOV
1009
        )
×
UNCOV
1010
        if err != nil {
×
1011
                return nil, fmt.Errorf("can't create mission control "+
×
1012
                        "manager: %w", err)
×
1013
        }
×
UNCOV
1014
        s.defaultMC, err = s.missionController.GetNamespacedStore(
×
UNCOV
1015
                routing.DefaultMissionControlNamespace,
×
UNCOV
1016
        )
×
UNCOV
1017
        if err != nil {
×
1018
                return nil, fmt.Errorf("can't create mission control in the "+
×
1019
                        "default namespace: %w", err)
×
1020
        }
×
1021

UNCOV
1022
        srvrLog.Debugf("Instantiating payment session source with config: "+
×
UNCOV
1023
                "AttemptCost=%v + %v%%, MinRouteProbability=%v",
×
UNCOV
1024
                int64(routingConfig.AttemptCost),
×
UNCOV
1025
                float64(routingConfig.AttemptCostPPM)/10000,
×
UNCOV
1026
                routingConfig.MinRouteProbability)
×
UNCOV
1027

×
UNCOV
1028
        pathFindingConfig := routing.PathFindingConfig{
×
UNCOV
1029
                AttemptCost: lnwire.NewMSatFromSatoshis(
×
UNCOV
1030
                        routingConfig.AttemptCost,
×
UNCOV
1031
                ),
×
UNCOV
1032
                AttemptCostPPM: routingConfig.AttemptCostPPM,
×
UNCOV
1033
                MinProbability: routingConfig.MinRouteProbability,
×
UNCOV
1034
        }
×
UNCOV
1035

×
UNCOV
1036
        sourceNode, err := dbs.GraphDB.SourceNode()
×
UNCOV
1037
        if err != nil {
×
1038
                return nil, fmt.Errorf("error getting source node: %w", err)
×
1039
        }
×
UNCOV
1040
        paymentSessionSource := &routing.SessionSource{
×
UNCOV
1041
                GraphSessionFactory: graphsession.NewGraphSessionFactory(
×
UNCOV
1042
                        dbs.GraphDB,
×
UNCOV
1043
                ),
×
UNCOV
1044
                SourceNode:        sourceNode,
×
UNCOV
1045
                MissionControl:    s.defaultMC,
×
UNCOV
1046
                GetLink:           s.htlcSwitch.GetLinkByShortID,
×
UNCOV
1047
                PathFindingConfig: pathFindingConfig,
×
UNCOV
1048
        }
×
UNCOV
1049

×
UNCOV
1050
        paymentControl := channeldb.NewPaymentControl(dbs.ChanStateDB)
×
UNCOV
1051

×
UNCOV
1052
        s.controlTower = routing.NewControlTower(paymentControl)
×
UNCOV
1053

×
UNCOV
1054
        strictPruning := cfg.Bitcoin.Node == "neutrino" ||
×
UNCOV
1055
                cfg.Routing.StrictZombiePruning
×
UNCOV
1056

×
UNCOV
1057
        s.graphBuilder, err = graph.NewBuilder(&graph.Config{
×
UNCOV
1058
                SelfNode:            selfNode.PubKeyBytes,
×
UNCOV
1059
                Graph:               dbs.GraphDB,
×
UNCOV
1060
                Chain:               cc.ChainIO,
×
UNCOV
1061
                ChainView:           cc.ChainView,
×
UNCOV
1062
                Notifier:            cc.ChainNotifier,
×
UNCOV
1063
                ChannelPruneExpiry:  graph.DefaultChannelPruneExpiry,
×
UNCOV
1064
                GraphPruneInterval:  time.Hour,
×
UNCOV
1065
                FirstTimePruneDelay: graph.DefaultFirstTimePruneDelay,
×
UNCOV
1066
                AssumeChannelValid:  cfg.Routing.AssumeChannelValid,
×
UNCOV
1067
                StrictZombiePruning: strictPruning,
×
UNCOV
1068
                IsAlias:             aliasmgr.IsAlias,
×
UNCOV
1069
        })
×
UNCOV
1070
        if err != nil {
×
1071
                return nil, fmt.Errorf("can't create graph builder: %w", err)
×
1072
        }
×
1073

UNCOV
1074
        s.chanRouter, err = routing.New(routing.Config{
×
UNCOV
1075
                SelfNode:           selfNode.PubKeyBytes,
×
UNCOV
1076
                RoutingGraph:       graphsession.NewRoutingGraph(dbs.GraphDB),
×
UNCOV
1077
                Chain:              cc.ChainIO,
×
UNCOV
1078
                Payer:              s.htlcSwitch,
×
UNCOV
1079
                Control:            s.controlTower,
×
UNCOV
1080
                MissionControl:     s.defaultMC,
×
UNCOV
1081
                SessionSource:      paymentSessionSource,
×
UNCOV
1082
                GetLink:            s.htlcSwitch.GetLinkByShortID,
×
UNCOV
1083
                NextPaymentID:      sequencer.NextID,
×
UNCOV
1084
                PathFindingConfig:  pathFindingConfig,
×
UNCOV
1085
                Clock:              clock.NewDefaultClock(),
×
UNCOV
1086
                ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate,
×
UNCOV
1087
                ClosedSCIDs:        s.fetchClosedChannelSCIDs(),
×
UNCOV
1088
                TrafficShaper:      implCfg.TrafficShaper,
×
UNCOV
1089
        })
×
UNCOV
1090
        if err != nil {
×
1091
                return nil, fmt.Errorf("can't create router: %w", err)
×
1092
        }
×
1093

UNCOV
1094
        chanSeries := discovery.NewChanSeries(s.graphDB)
×
UNCOV
1095
        gossipMessageStore, err := discovery.NewMessageStore(dbs.ChanStateDB)
×
UNCOV
1096
        if err != nil {
×
1097
                return nil, err
×
1098
        }
×
UNCOV
1099
        waitingProofStore, err := channeldb.NewWaitingProofStore(dbs.ChanStateDB)
×
UNCOV
1100
        if err != nil {
×
1101
                return nil, err
×
1102
        }
×
1103

UNCOV
1104
        scidCloserMan := discovery.NewScidCloserMan(s.graphDB, s.chanStateDB)
×
UNCOV
1105

×
UNCOV
1106
        s.authGossiper = discovery.New(discovery.Config{
×
UNCOV
1107
                Graph:                 s.graphBuilder,
×
UNCOV
1108
                ChainIO:               s.cc.ChainIO,
×
UNCOV
1109
                Notifier:              s.cc.ChainNotifier,
×
UNCOV
1110
                ChainHash:             *s.cfg.ActiveNetParams.GenesisHash,
×
UNCOV
1111
                Broadcast:             s.BroadcastMessage,
×
UNCOV
1112
                ChanSeries:            chanSeries,
×
UNCOV
1113
                NotifyWhenOnline:      s.NotifyWhenOnline,
×
UNCOV
1114
                NotifyWhenOffline:     s.NotifyWhenOffline,
×
UNCOV
1115
                FetchSelfAnnouncement: s.getNodeAnnouncement,
×
UNCOV
1116
                UpdateSelfAnnouncement: func() (lnwire.NodeAnnouncement,
×
UNCOV
1117
                        error) {
×
1118

×
1119
                        return s.genNodeAnnouncement(nil)
×
1120
                },
×
1121
                ProofMatureDelta:        0,
1122
                TrickleDelay:            time.Millisecond * time.Duration(cfg.TrickleDelay),
1123
                RetransmitTicker:        ticker.New(time.Minute * 30),
1124
                RebroadcastInterval:     time.Hour * 24,
1125
                WaitingProofStore:       waitingProofStore,
1126
                MessageStore:            gossipMessageStore,
1127
                AnnSigner:               s.nodeSigner,
1128
                RotateTicker:            ticker.New(discovery.DefaultSyncerRotationInterval),
1129
                HistoricalSyncTicker:    ticker.New(cfg.HistoricalSyncInterval),
1130
                NumActiveSyncers:        cfg.NumGraphSyncPeers,
1131
                NoTimestampQueries:      cfg.ProtocolOptions.NoTimestampQueryOption, //nolint:ll
1132
                MinimumBatchSize:        10,
1133
                SubBatchDelay:           cfg.Gossip.SubBatchDelay,
1134
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters,
1135
                PinnedSyncers:           cfg.Gossip.PinnedSyncers,
1136
                MaxChannelUpdateBurst:   cfg.Gossip.MaxChannelUpdateBurst,
1137
                ChannelUpdateInterval:   cfg.Gossip.ChannelUpdateInterval,
1138
                IsAlias:                 aliasmgr.IsAlias,
1139
                SignAliasUpdate:         s.signAliasUpdate,
1140
                FindBaseByAlias:         s.aliasMgr.FindBaseSCID,
1141
                GetAlias:                s.aliasMgr.GetPeerAlias,
1142
                FindChannel:             s.findChannel,
1143
                IsStillZombieChannel:    s.graphBuilder.IsZombieChannel,
1144
                ScidCloser:              scidCloserMan,
1145
        }, nodeKeyDesc)
1146

UNCOV
1147
        selfVertex := route.Vertex(nodeKeyDesc.PubKey.SerializeCompressed())
×
UNCOV
1148
        //nolint:ll
×
UNCOV
1149
        s.localChanMgr = &localchans.Manager{
×
UNCOV
1150
                SelfPub:              nodeKeyDesc.PubKey,
×
UNCOV
1151
                DefaultRoutingPolicy: cc.RoutingPolicy,
×
UNCOV
1152
                ForAllOutgoingChannels: func(cb func(*models.ChannelEdgeInfo,
×
UNCOV
1153
                        *models.ChannelEdgePolicy) error) error {
×
UNCOV
1154

×
UNCOV
1155
                        return s.graphDB.ForEachNodeChannel(selfVertex,
×
UNCOV
1156
                                func(_ kvdb.RTx, c *models.ChannelEdgeInfo,
×
UNCOV
1157
                                        e *models.ChannelEdgePolicy,
×
UNCOV
1158
                                        _ *models.ChannelEdgePolicy) error {
×
UNCOV
1159

×
UNCOV
1160
                                        // NOTE: The invoked callback here may
×
UNCOV
1161
                                        // receive a nil channel policy.
×
UNCOV
1162
                                        return cb(c, e)
×
UNCOV
1163
                                },
×
1164
                        )
1165
                },
1166
                PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
1167
                UpdateForwardingPolicies:  s.htlcSwitch.UpdateForwardingPolicies,
1168
                FetchChannel:              s.chanStateDB.FetchChannel,
1169
                AddEdge: func(edge *models.ChannelEdgeInfo) error {
×
1170
                        return s.graphBuilder.AddEdge(edge)
×
1171
                },
×
1172
        }
1173

UNCOV
1174
        utxnStore, err := contractcourt.NewNurseryStore(
×
UNCOV
1175
                s.cfg.ActiveNetParams.GenesisHash, dbs.ChanStateDB,
×
UNCOV
1176
        )
×
UNCOV
1177
        if err != nil {
×
1178
                srvrLog.Errorf("unable to create nursery store: %v", err)
×
1179
                return nil, err
×
1180
        }
×
1181

UNCOV
1182
        sweeperStore, err := sweep.NewSweeperStore(
×
UNCOV
1183
                dbs.ChanStateDB, s.cfg.ActiveNetParams.GenesisHash,
×
UNCOV
1184
        )
×
UNCOV
1185
        if err != nil {
×
1186
                srvrLog.Errorf("unable to create sweeper store: %v", err)
×
1187
                return nil, err
×
1188
        }
×
1189

UNCOV
1190
        aggregator := sweep.NewBudgetAggregator(
×
UNCOV
1191
                cc.FeeEstimator, sweep.DefaultMaxInputsPerTx,
×
UNCOV
1192
                s.implCfg.AuxSweeper,
×
UNCOV
1193
        )
×
UNCOV
1194

×
UNCOV
1195
        s.txPublisher = sweep.NewTxPublisher(sweep.TxPublisherConfig{
×
UNCOV
1196
                Signer:     cc.Wallet.Cfg.Signer,
×
UNCOV
1197
                Wallet:     cc.Wallet,
×
UNCOV
1198
                Estimator:  cc.FeeEstimator,
×
UNCOV
1199
                Notifier:   cc.ChainNotifier,
×
UNCOV
1200
                AuxSweeper: s.implCfg.AuxSweeper,
×
UNCOV
1201
        })
×
UNCOV
1202

×
UNCOV
1203
        s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
×
UNCOV
1204
                FeeEstimator: cc.FeeEstimator,
×
UNCOV
1205
                GenSweepScript: newSweepPkScriptGen(
×
UNCOV
1206
                        cc.Wallet, s.cfg.ActiveNetParams.Params,
×
UNCOV
1207
                ),
×
UNCOV
1208
                Signer:               cc.Wallet.Cfg.Signer,
×
UNCOV
1209
                Wallet:               newSweeperWallet(cc.Wallet),
×
UNCOV
1210
                Mempool:              cc.MempoolNotifier,
×
UNCOV
1211
                Notifier:             cc.ChainNotifier,
×
UNCOV
1212
                Store:                sweeperStore,
×
UNCOV
1213
                MaxInputsPerTx:       sweep.DefaultMaxInputsPerTx,
×
UNCOV
1214
                MaxFeeRate:           cfg.Sweeper.MaxFeeRate,
×
UNCOV
1215
                Aggregator:           aggregator,
×
UNCOV
1216
                Publisher:            s.txPublisher,
×
UNCOV
1217
                NoDeadlineConfTarget: cfg.Sweeper.NoDeadlineConfTarget,
×
UNCOV
1218
        })
×
UNCOV
1219

×
UNCOV
1220
        s.utxoNursery = contractcourt.NewUtxoNursery(&contractcourt.NurseryConfig{
×
UNCOV
1221
                ChainIO:             cc.ChainIO,
×
UNCOV
1222
                ConfDepth:           1,
×
UNCOV
1223
                FetchClosedChannels: s.chanStateDB.FetchClosedChannels,
×
UNCOV
1224
                FetchClosedChannel:  s.chanStateDB.FetchClosedChannel,
×
UNCOV
1225
                Notifier:            cc.ChainNotifier,
×
UNCOV
1226
                PublishTransaction:  cc.Wallet.PublishTransaction,
×
UNCOV
1227
                Store:               utxnStore,
×
UNCOV
1228
                SweepInput:          s.sweeper.SweepInput,
×
UNCOV
1229
                Budget:              s.cfg.Sweeper.Budget,
×
UNCOV
1230
        })
×
UNCOV
1231

×
UNCOV
1232
        // Construct a closure that wraps the htlcswitch's CloseLink method.
×
UNCOV
1233
        closeLink := func(chanPoint *wire.OutPoint,
×
UNCOV
1234
                closureType contractcourt.ChannelCloseType) {
×
UNCOV
1235
                // TODO(conner): Properly respect the update and error channels
×
UNCOV
1236
                // returned by CloseLink.
×
UNCOV
1237

×
UNCOV
1238
                // Instruct the switch to close the channel.  Provide no close out
×
UNCOV
1239
                // delivery script or target fee per kw because user input is not
×
UNCOV
1240
                // available when the remote peer closes the channel.
×
UNCOV
1241
                s.htlcSwitch.CloseLink(chanPoint, closureType, 0, 0, nil)
×
UNCOV
1242
        }
×
1243

1244
        // We will use the following channel to reliably hand off contract
1245
        // breach events from the ChannelArbitrator to the BreachArbitrator,
UNCOV
1246
        contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1)
×
UNCOV
1247

×
UNCOV
1248
        s.breachArbitrator = contractcourt.NewBreachArbitrator(
×
UNCOV
1249
                &contractcourt.BreachConfig{
×
UNCOV
1250
                        CloseLink: closeLink,
×
UNCOV
1251
                        DB:        s.chanStateDB,
×
UNCOV
1252
                        Estimator: s.cc.FeeEstimator,
×
UNCOV
1253
                        GenSweepScript: newSweepPkScriptGen(
×
UNCOV
1254
                                cc.Wallet, s.cfg.ActiveNetParams.Params,
×
UNCOV
1255
                        ),
×
UNCOV
1256
                        Notifier:           cc.ChainNotifier,
×
UNCOV
1257
                        PublishTransaction: cc.Wallet.PublishTransaction,
×
UNCOV
1258
                        ContractBreaches:   contractBreaches,
×
UNCOV
1259
                        Signer:             cc.Wallet.Cfg.Signer,
×
UNCOV
1260
                        Store: contractcourt.NewRetributionStore(
×
UNCOV
1261
                                dbs.ChanStateDB,
×
UNCOV
1262
                        ),
×
UNCOV
1263
                        AuxSweeper: s.implCfg.AuxSweeper,
×
UNCOV
1264
                },
×
UNCOV
1265
        )
×
UNCOV
1266

×
UNCOV
1267
        //nolint:ll
×
UNCOV
1268
        s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{
×
UNCOV
1269
                ChainHash:              *s.cfg.ActiveNetParams.GenesisHash,
×
UNCOV
1270
                IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta,
×
UNCOV
1271
                OutgoingBroadcastDelta: lncfg.DefaultOutgoingBroadcastDelta,
×
UNCOV
1272
                NewSweepAddr: func() ([]byte, error) {
×
1273
                        addr, err := newSweepPkScriptGen(
×
1274
                                cc.Wallet, netParams,
×
1275
                        )().Unpack()
×
1276
                        if err != nil {
×
1277
                                return nil, err
×
1278
                        }
×
1279

1280
                        return addr.DeliveryAddress, nil
×
1281
                },
1282
                PublishTx: cc.Wallet.PublishTransaction,
UNCOV
1283
                DeliverResolutionMsg: func(msgs ...contractcourt.ResolutionMsg) error {
×
UNCOV
1284
                        for _, msg := range msgs {
×
UNCOV
1285
                                err := s.htlcSwitch.ProcessContractResolution(msg)
×
UNCOV
1286
                                if err != nil {
×
UNCOV
1287
                                        return err
×
UNCOV
1288
                                }
×
1289
                        }
UNCOV
1290
                        return nil
×
1291
                },
1292
                IncubateOutputs: func(chanPoint wire.OutPoint,
1293
                        outHtlcRes fn.Option[lnwallet.OutgoingHtlcResolution],
1294
                        inHtlcRes fn.Option[lnwallet.IncomingHtlcResolution],
1295
                        broadcastHeight uint32,
UNCOV
1296
                        deadlineHeight fn.Option[int32]) error {
×
UNCOV
1297

×
UNCOV
1298
                        return s.utxoNursery.IncubateOutputs(
×
UNCOV
1299
                                chanPoint, outHtlcRes, inHtlcRes,
×
UNCOV
1300
                                broadcastHeight, deadlineHeight,
×
UNCOV
1301
                        )
×
UNCOV
1302
                },
×
1303
                PreimageDB:   s.witnessBeacon,
1304
                Notifier:     cc.ChainNotifier,
1305
                Mempool:      cc.MempoolNotifier,
1306
                Signer:       cc.Wallet.Cfg.Signer,
1307
                FeeEstimator: cc.FeeEstimator,
1308
                ChainIO:      cc.ChainIO,
UNCOV
1309
                MarkLinkInactive: func(chanPoint wire.OutPoint) error {
×
UNCOV
1310
                        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
×
UNCOV
1311
                        s.htlcSwitch.RemoveLink(chanID)
×
UNCOV
1312
                        return nil
×
UNCOV
1313
                },
×
1314
                IsOurAddress: cc.Wallet.IsOurAddress,
1315
                ContractBreach: func(chanPoint wire.OutPoint,
UNCOV
1316
                        breachRet *lnwallet.BreachRetribution) error {
×
UNCOV
1317

×
UNCOV
1318
                        // processACK will handle the BreachArbitrator ACKing
×
UNCOV
1319
                        // the event.
×
UNCOV
1320
                        finalErr := make(chan error, 1)
×
UNCOV
1321
                        processACK := func(brarErr error) {
×
UNCOV
1322
                                if brarErr != nil {
×
1323
                                        finalErr <- brarErr
×
1324
                                        return
×
1325
                                }
×
1326

1327
                                // If the BreachArbitrator successfully handled
1328
                                // the event, we can signal that the handoff
1329
                                // was successful.
UNCOV
1330
                                finalErr <- nil
×
1331
                        }
1332

UNCOV
1333
                        event := &contractcourt.ContractBreachEvent{
×
UNCOV
1334
                                ChanPoint:         chanPoint,
×
UNCOV
1335
                                ProcessACK:        processACK,
×
UNCOV
1336
                                BreachRetribution: breachRet,
×
UNCOV
1337
                        }
×
UNCOV
1338

×
UNCOV
1339
                        // Send the contract breach event to the
×
UNCOV
1340
                        // BreachArbitrator.
×
UNCOV
1341
                        select {
×
UNCOV
1342
                        case contractBreaches <- event:
×
1343
                        case <-s.quit:
×
1344
                                return ErrServerShuttingDown
×
1345
                        }
1346

1347
                        // We'll wait for a final error to be available from
1348
                        // the BreachArbitrator.
UNCOV
1349
                        select {
×
UNCOV
1350
                        case err := <-finalErr:
×
UNCOV
1351
                                return err
×
1352
                        case <-s.quit:
×
1353
                                return ErrServerShuttingDown
×
1354
                        }
1355
                },
UNCOV
1356
                DisableChannel: func(chanPoint wire.OutPoint) error {
×
UNCOV
1357
                        return s.chanStatusMgr.RequestDisable(chanPoint, false)
×
UNCOV
1358
                },
×
1359
                Sweeper:                       s.sweeper,
1360
                Registry:                      s.invoices,
1361
                NotifyClosedChannel:           s.channelNotifier.NotifyClosedChannelEvent,
1362
                NotifyFullyResolvedChannel:    s.channelNotifier.NotifyFullyResolvedChannelEvent,
1363
                OnionProcessor:                s.sphinx,
1364
                PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
1365
                IsForwardedHTLC:               s.htlcSwitch.IsForwardedHTLC,
1366
                Clock:                         clock.NewDefaultClock(),
1367
                SubscribeBreachComplete:       s.breachArbitrator.SubscribeBreachComplete,
1368
                PutFinalHtlcOutcome:           s.chanStateDB.PutOnchainFinalHtlcOutcome,
1369
                HtlcNotifier:                  s.htlcNotifier,
1370
                Budget:                        *s.cfg.Sweeper.Budget,
1371

1372
                // TODO(yy): remove this hack once PaymentCircuit is interfaced.
1373
                QueryIncomingCircuit: func(
UNCOV
1374
                        circuit models.CircuitKey) *models.CircuitKey {
×
UNCOV
1375

×
UNCOV
1376
                        // Get the circuit map.
×
UNCOV
1377
                        circuits := s.htlcSwitch.CircuitLookup()
×
UNCOV
1378

×
UNCOV
1379
                        // Lookup the outgoing circuit.
×
UNCOV
1380
                        pc := circuits.LookupOpenCircuit(circuit)
×
UNCOV
1381
                        if pc == nil {
×
UNCOV
1382
                                return nil
×
UNCOV
1383
                        }
×
1384

UNCOV
1385
                        return &pc.Incoming
×
1386
                },
1387
                AuxLeafStore: implCfg.AuxLeafStore,
1388
                AuxSigner:    implCfg.AuxSigner,
1389
                AuxResolver:  implCfg.AuxContractResolver,
1390
        }, dbs.ChanStateDB)
1391

1392
        // Select the configuration and funding parameters for Bitcoin.
UNCOV
1393
        chainCfg := cfg.Bitcoin
×
UNCOV
1394
        minRemoteDelay := funding.MinBtcRemoteDelay
×
UNCOV
1395
        maxRemoteDelay := funding.MaxBtcRemoteDelay
×
UNCOV
1396

×
UNCOV
1397
        var chanIDSeed [32]byte
×
UNCOV
1398
        if _, err := rand.Read(chanIDSeed[:]); err != nil {
×
1399
                return nil, err
×
1400
        }
×
1401

1402
        // Wrap the DeleteChannelEdges method so that the funding manager can
1403
        // use it without depending on several layers of indirection.
UNCOV
1404
        deleteAliasEdge := func(scid lnwire.ShortChannelID) (
×
UNCOV
1405
                *models.ChannelEdgePolicy, error) {
×
UNCOV
1406

×
UNCOV
1407
                info, e1, e2, err := s.graphDB.FetchChannelEdgesByID(
×
UNCOV
1408
                        scid.ToUint64(),
×
UNCOV
1409
                )
×
UNCOV
1410
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
×
1411
                        // This is unlikely but there is a slim chance of this
×
1412
                        // being hit if lnd was killed via SIGKILL and the
×
1413
                        // funding manager was stepping through the delete
×
1414
                        // alias edge logic.
×
1415
                        return nil, nil
×
UNCOV
1416
                } else if err != nil {
×
1417
                        return nil, err
×
1418
                }
×
1419

1420
                // Grab our key to find our policy.
UNCOV
1421
                var ourKey [33]byte
×
UNCOV
1422
                copy(ourKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
UNCOV
1423

×
UNCOV
1424
                var ourPolicy *models.ChannelEdgePolicy
×
UNCOV
1425
                if info != nil && info.NodeKey1Bytes == ourKey {
×
UNCOV
1426
                        ourPolicy = e1
×
UNCOV
1427
                } else {
×
UNCOV
1428
                        ourPolicy = e2
×
UNCOV
1429
                }
×
1430

UNCOV
1431
                if ourPolicy == nil {
×
1432
                        // Something is wrong, so return an error.
×
1433
                        return nil, fmt.Errorf("we don't have an edge")
×
1434
                }
×
1435

UNCOV
1436
                err = s.graphDB.DeleteChannelEdges(
×
UNCOV
1437
                        false, false, scid.ToUint64(),
×
UNCOV
1438
                )
×
UNCOV
1439
                return ourPolicy, err
×
1440
        }
1441

1442
        // For the reservationTimeout and the zombieSweeperInterval different
1443
        // values are set in case we are in a dev environment so enhance test
1444
        // capacilities.
UNCOV
1445
        reservationTimeout := chanfunding.DefaultReservationTimeout
×
UNCOV
1446
        zombieSweeperInterval := lncfg.DefaultZombieSweeperInterval
×
UNCOV
1447

×
UNCOV
1448
        // Get the development config for funding manager. If we are not in
×
UNCOV
1449
        // development mode, this would be nil.
×
UNCOV
1450
        var devCfg *funding.DevConfig
×
UNCOV
1451
        if lncfg.IsDevBuild() {
×
UNCOV
1452
                devCfg = &funding.DevConfig{
×
UNCOV
1453
                        ProcessChannelReadyWait: cfg.Dev.ChannelReadyWait(),
×
UNCOV
1454
                }
×
UNCOV
1455

×
UNCOV
1456
                reservationTimeout = cfg.Dev.GetReservationTimeout()
×
UNCOV
1457
                zombieSweeperInterval = cfg.Dev.GetZombieSweeperInterval()
×
UNCOV
1458

×
UNCOV
1459
                srvrLog.Debugf("Using the dev config for the fundingMgr: %v, "+
×
UNCOV
1460
                        "reservationTimeout=%v, zombieSweeperInterval=%v",
×
UNCOV
1461
                        devCfg, reservationTimeout, zombieSweeperInterval)
×
UNCOV
1462
        }
×
1463

1464
        //nolint:ll
UNCOV
1465
        s.fundingMgr, err = funding.NewFundingManager(funding.Config{
×
UNCOV
1466
                Dev:                devCfg,
×
UNCOV
1467
                NoWumboChans:       !cfg.ProtocolOptions.Wumbo(),
×
UNCOV
1468
                IDKey:              nodeKeyDesc.PubKey,
×
UNCOV
1469
                IDKeyLoc:           nodeKeyDesc.KeyLocator,
×
UNCOV
1470
                Wallet:             cc.Wallet,
×
UNCOV
1471
                PublishTransaction: cc.Wallet.PublishTransaction,
×
UNCOV
1472
                UpdateLabel: func(hash chainhash.Hash, label string) error {
×
UNCOV
1473
                        return cc.Wallet.LabelTransaction(hash, label, true)
×
UNCOV
1474
                },
×
1475
                Notifier:     cc.ChainNotifier,
1476
                ChannelDB:    s.chanStateDB,
1477
                FeeEstimator: cc.FeeEstimator,
1478
                SignMessage:  cc.MsgSigner.SignMessage,
1479
                CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement,
UNCOV
1480
                        error) {
×
UNCOV
1481

×
UNCOV
1482
                        return s.genNodeAnnouncement(nil)
×
UNCOV
1483
                },
×
1484
                SendAnnouncement:     s.authGossiper.ProcessLocalAnnouncement,
1485
                NotifyWhenOnline:     s.NotifyWhenOnline,
1486
                TempChanIDSeed:       chanIDSeed,
1487
                FindChannel:          s.findChannel,
1488
                DefaultRoutingPolicy: cc.RoutingPolicy,
1489
                DefaultMinHtlcIn:     cc.MinHtlcIn,
1490
                NumRequiredConfs: func(chanAmt btcutil.Amount,
UNCOV
1491
                        pushAmt lnwire.MilliSatoshi) uint16 {
×
UNCOV
1492
                        // For large channels we increase the number
×
UNCOV
1493
                        // of confirmations we require for the
×
UNCOV
1494
                        // channel to be considered open. As it is
×
UNCOV
1495
                        // always the responder that gets to choose
×
UNCOV
1496
                        // value, the pushAmt is value being pushed
×
UNCOV
1497
                        // to us. This means we have more to lose
×
UNCOV
1498
                        // in the case this gets re-orged out, and
×
UNCOV
1499
                        // we will require more confirmations before
×
UNCOV
1500
                        // we consider it open.
×
UNCOV
1501

×
UNCOV
1502
                        // In case the user has explicitly specified
×
UNCOV
1503
                        // a default value for the number of
×
UNCOV
1504
                        // confirmations, we use it.
×
UNCOV
1505
                        defaultConf := uint16(chainCfg.DefaultNumChanConfs)
×
UNCOV
1506
                        if defaultConf != 0 {
×
UNCOV
1507
                                return defaultConf
×
UNCOV
1508
                        }
×
1509

1510
                        minConf := uint64(3)
×
1511
                        maxConf := uint64(6)
×
1512

×
1513
                        // If this is a wumbo channel, then we'll require the
×
1514
                        // max amount of confirmations.
×
1515
                        if chanAmt > MaxFundingAmount {
×
1516
                                return uint16(maxConf)
×
1517
                        }
×
1518

1519
                        // If not we return a value scaled linearly
1520
                        // between 3 and 6, depending on channel size.
1521
                        // TODO(halseth): Use 1 as minimum?
1522
                        maxChannelSize := uint64(
×
1523
                                lnwire.NewMSatFromSatoshis(MaxFundingAmount))
×
1524
                        stake := lnwire.NewMSatFromSatoshis(chanAmt) + pushAmt
×
1525
                        conf := maxConf * uint64(stake) / maxChannelSize
×
1526
                        if conf < minConf {
×
1527
                                conf = minConf
×
1528
                        }
×
1529
                        if conf > maxConf {
×
1530
                                conf = maxConf
×
1531
                        }
×
1532
                        return uint16(conf)
×
1533
                },
UNCOV
1534
                RequiredRemoteDelay: func(chanAmt btcutil.Amount) uint16 {
×
UNCOV
1535
                        // We scale the remote CSV delay (the time the
×
UNCOV
1536
                        // remote have to claim funds in case of a unilateral
×
UNCOV
1537
                        // close) linearly from minRemoteDelay blocks
×
UNCOV
1538
                        // for small channels, to maxRemoteDelay blocks
×
UNCOV
1539
                        // for channels of size MaxFundingAmount.
×
UNCOV
1540

×
UNCOV
1541
                        // In case the user has explicitly specified
×
UNCOV
1542
                        // a default value for the remote delay, we
×
UNCOV
1543
                        // use it.
×
UNCOV
1544
                        defaultDelay := uint16(chainCfg.DefaultRemoteDelay)
×
UNCOV
1545
                        if defaultDelay > 0 {
×
UNCOV
1546
                                return defaultDelay
×
UNCOV
1547
                        }
×
1548

1549
                        // If this is a wumbo channel, then we'll require the
1550
                        // max value.
1551
                        if chanAmt > MaxFundingAmount {
×
1552
                                return maxRemoteDelay
×
1553
                        }
×
1554

1555
                        // If not we scale according to channel size.
1556
                        delay := uint16(btcutil.Amount(maxRemoteDelay) *
×
1557
                                chanAmt / MaxFundingAmount)
×
1558
                        if delay < minRemoteDelay {
×
1559
                                delay = minRemoteDelay
×
1560
                        }
×
1561
                        if delay > maxRemoteDelay {
×
1562
                                delay = maxRemoteDelay
×
1563
                        }
×
1564
                        return delay
×
1565
                },
1566
                WatchNewChannel: func(channel *channeldb.OpenChannel,
UNCOV
1567
                        peerKey *btcec.PublicKey) error {
×
UNCOV
1568

×
UNCOV
1569
                        // First, we'll mark this new peer as a persistent peer
×
UNCOV
1570
                        // for re-connection purposes. If the peer is not yet
×
UNCOV
1571
                        // tracked or the user hasn't requested it to be perm,
×
UNCOV
1572
                        // we'll set false to prevent the server from continuing
×
UNCOV
1573
                        // to connect to this peer even if the number of
×
UNCOV
1574
                        // channels with this peer is zero.
×
UNCOV
1575
                        s.mu.Lock()
×
UNCOV
1576
                        pubStr := string(peerKey.SerializeCompressed())
×
UNCOV
1577
                        if _, ok := s.persistentPeers[pubStr]; !ok {
×
UNCOV
1578
                                s.persistentPeers[pubStr] = false
×
UNCOV
1579
                        }
×
UNCOV
1580
                        s.mu.Unlock()
×
UNCOV
1581

×
UNCOV
1582
                        // With that taken care of, we'll send this channel to
×
UNCOV
1583
                        // the chain arb so it can react to on-chain events.
×
UNCOV
1584
                        return s.chainArb.WatchNewChannel(channel)
×
1585
                },
UNCOV
1586
                ReportShortChanID: func(chanPoint wire.OutPoint) error {
×
UNCOV
1587
                        cid := lnwire.NewChanIDFromOutPoint(chanPoint)
×
UNCOV
1588
                        return s.htlcSwitch.UpdateShortChanID(cid)
×
UNCOV
1589
                },
×
1590
                RequiredRemoteChanReserve: func(chanAmt,
UNCOV
1591
                        dustLimit btcutil.Amount) btcutil.Amount {
×
UNCOV
1592

×
UNCOV
1593
                        // By default, we'll require the remote peer to maintain
×
UNCOV
1594
                        // at least 1% of the total channel capacity at all
×
UNCOV
1595
                        // times. If this value ends up dipping below the dust
×
UNCOV
1596
                        // limit, then we'll use the dust limit itself as the
×
UNCOV
1597
                        // reserve as required by BOLT #2.
×
UNCOV
1598
                        reserve := chanAmt / 100
×
UNCOV
1599
                        if reserve < dustLimit {
×
UNCOV
1600
                                reserve = dustLimit
×
UNCOV
1601
                        }
×
1602

UNCOV
1603
                        return reserve
×
1604
                },
UNCOV
1605
                RequiredRemoteMaxValue: func(chanAmt btcutil.Amount) lnwire.MilliSatoshi {
×
UNCOV
1606
                        // By default, we'll allow the remote peer to fully
×
UNCOV
1607
                        // utilize the full bandwidth of the channel, minus our
×
UNCOV
1608
                        // required reserve.
×
UNCOV
1609
                        reserve := lnwire.NewMSatFromSatoshis(chanAmt / 100)
×
UNCOV
1610
                        return lnwire.NewMSatFromSatoshis(chanAmt) - reserve
×
UNCOV
1611
                },
×
UNCOV
1612
                RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 {
×
UNCOV
1613
                        if cfg.DefaultRemoteMaxHtlcs > 0 {
×
UNCOV
1614
                                return cfg.DefaultRemoteMaxHtlcs
×
UNCOV
1615
                        }
×
1616

1617
                        // By default, we'll permit them to utilize the full
1618
                        // channel bandwidth.
1619
                        return uint16(input.MaxHTLCNumber / 2)
×
1620
                },
1621
                ZombieSweeperInterval:         zombieSweeperInterval,
1622
                ReservationTimeout:            reservationTimeout,
1623
                MinChanSize:                   btcutil.Amount(cfg.MinChanSize),
1624
                MaxChanSize:                   btcutil.Amount(cfg.MaxChanSize),
1625
                MaxPendingChannels:            cfg.MaxPendingChannels,
1626
                RejectPush:                    cfg.RejectPush,
1627
                MaxLocalCSVDelay:              chainCfg.MaxLocalDelay,
1628
                NotifyOpenChannelEvent:        s.channelNotifier.NotifyOpenChannelEvent,
1629
                OpenChannelPredicate:          chanPredicate,
1630
                NotifyPendingOpenChannelEvent: s.channelNotifier.NotifyPendingOpenChannelEvent,
1631
                EnableUpfrontShutdown:         cfg.EnableUpfrontShutdown,
1632
                MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
1633
                        s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
1634
                DeleteAliasEdge:      deleteAliasEdge,
1635
                AliasManager:         s.aliasMgr,
1636
                IsSweeperOutpoint:    s.sweeper.IsSweeperOutpoint,
1637
                AuxFundingController: implCfg.AuxFundingController,
1638
                AuxSigner:            implCfg.AuxSigner,
1639
                AuxResolver:          implCfg.AuxContractResolver,
1640
        })
UNCOV
1641
        if err != nil {
×
1642
                return nil, err
×
1643
        }
×
1644

1645
        // Next, we'll assemble the sub-system that will maintain an on-disk
1646
        // static backup of the latest channel state.
UNCOV
1647
        chanNotifier := &channelNotifier{
×
UNCOV
1648
                chanNotifier: s.channelNotifier,
×
UNCOV
1649
                addrs:        s.addrSource,
×
UNCOV
1650
        }
×
UNCOV
1651
        backupFile := chanbackup.NewMultiFile(cfg.BackupFilePath)
×
UNCOV
1652
        startingChans, err := chanbackup.FetchStaticChanBackups(
×
UNCOV
1653
                s.chanStateDB, s.addrSource,
×
UNCOV
1654
        )
×
UNCOV
1655
        if err != nil {
×
1656
                return nil, err
×
1657
        }
×
UNCOV
1658
        s.chanSubSwapper, err = chanbackup.NewSubSwapper(
×
UNCOV
1659
                startingChans, chanNotifier, s.cc.KeyRing, backupFile,
×
UNCOV
1660
        )
×
UNCOV
1661
        if err != nil {
×
1662
                return nil, err
×
1663
        }
×
1664

1665
        // Assemble a peer notifier which will provide clients with subscriptions
1666
        // to peer online and offline events.
UNCOV
1667
        s.peerNotifier = peernotifier.New()
×
UNCOV
1668

×
UNCOV
1669
        // Create a channel event store which monitors all open channels.
×
UNCOV
1670
        s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
×
UNCOV
1671
                SubscribeChannelEvents: func() (subscribe.Subscription, error) {
×
UNCOV
1672
                        return s.channelNotifier.SubscribeChannelEvents()
×
UNCOV
1673
                },
×
UNCOV
1674
                SubscribePeerEvents: func() (subscribe.Subscription, error) {
×
UNCOV
1675
                        return s.peerNotifier.SubscribePeerEvents()
×
UNCOV
1676
                },
×
1677
                GetOpenChannels: s.chanStateDB.FetchAllOpenChannels,
1678
                Clock:           clock.NewDefaultClock(),
1679
                ReadFlapCount:   s.miscDB.ReadFlapCount,
1680
                WriteFlapCount:  s.miscDB.WriteFlapCounts,
1681
                FlapCountTicker: ticker.New(chanfitness.FlapCountFlushRate),
1682
        })
1683

UNCOV
1684
        if cfg.WtClient.Active {
×
UNCOV
1685
                policy := wtpolicy.DefaultPolicy()
×
UNCOV
1686
                policy.MaxUpdates = cfg.WtClient.MaxUpdates
×
UNCOV
1687

×
UNCOV
1688
                // We expose the sweep fee rate in sat/vbyte, but the tower
×
UNCOV
1689
                // protocol operations on sat/kw.
×
UNCOV
1690
                sweepRateSatPerVByte := chainfee.SatPerKVByte(
×
UNCOV
1691
                        1000 * cfg.WtClient.SweepFeeRate,
×
UNCOV
1692
                )
×
UNCOV
1693

×
UNCOV
1694
                policy.SweepFeeRate = sweepRateSatPerVByte.FeePerKWeight()
×
UNCOV
1695

×
UNCOV
1696
                if err := policy.Validate(); err != nil {
×
1697
                        return nil, err
×
1698
                }
×
1699

1700
                // authDial is the wrapper around the btrontide.Dial for the
1701
                // watchtower.
UNCOV
1702
                authDial := func(localKey keychain.SingleKeyECDH,
×
UNCOV
1703
                        netAddr *lnwire.NetAddress,
×
UNCOV
1704
                        dialer tor.DialFunc) (wtserver.Peer, error) {
×
UNCOV
1705

×
UNCOV
1706
                        return brontide.Dial(
×
UNCOV
1707
                                localKey, netAddr, cfg.ConnectionTimeout, dialer,
×
UNCOV
1708
                        )
×
UNCOV
1709
                }
×
1710

1711
                // buildBreachRetribution is a call-back that can be used to
1712
                // query the BreachRetribution info and channel type given a
1713
                // channel ID and commitment height.
UNCOV
1714
                buildBreachRetribution := func(chanID lnwire.ChannelID,
×
UNCOV
1715
                        commitHeight uint64) (*lnwallet.BreachRetribution,
×
UNCOV
1716
                        channeldb.ChannelType, error) {
×
UNCOV
1717

×
UNCOV
1718
                        channel, err := s.chanStateDB.FetchChannelByID(
×
UNCOV
1719
                                nil, chanID,
×
UNCOV
1720
                        )
×
UNCOV
1721
                        if err != nil {
×
1722
                                return nil, 0, err
×
1723
                        }
×
1724

UNCOV
1725
                        br, err := lnwallet.NewBreachRetribution(
×
UNCOV
1726
                                channel, commitHeight, 0, nil,
×
UNCOV
1727
                                implCfg.AuxLeafStore,
×
UNCOV
1728
                                implCfg.AuxContractResolver,
×
UNCOV
1729
                        )
×
UNCOV
1730
                        if err != nil {
×
1731
                                return nil, 0, err
×
1732
                        }
×
1733

UNCOV
1734
                        return br, channel.ChanType, nil
×
1735
                }
1736

UNCOV
1737
                fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID
×
UNCOV
1738

×
UNCOV
1739
                // Copy the policy for legacy channels and set the blob flag
×
UNCOV
1740
                // signalling support for anchor channels.
×
UNCOV
1741
                anchorPolicy := policy
×
UNCOV
1742
                anchorPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel)
×
UNCOV
1743

×
UNCOV
1744
                // Copy the policy for legacy channels and set the blob flag
×
UNCOV
1745
                // signalling support for taproot channels.
×
UNCOV
1746
                taprootPolicy := policy
×
UNCOV
1747
                taprootPolicy.TxPolicy.BlobType |= blob.Type(
×
UNCOV
1748
                        blob.FlagTaprootChannel,
×
UNCOV
1749
                )
×
UNCOV
1750

×
UNCOV
1751
                s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{
×
UNCOV
1752
                        FetchClosedChannel:     fetchClosedChannel,
×
UNCOV
1753
                        BuildBreachRetribution: buildBreachRetribution,
×
UNCOV
1754
                        SessionCloseRange:      cfg.WtClient.SessionCloseRange,
×
UNCOV
1755
                        ChainNotifier:          s.cc.ChainNotifier,
×
UNCOV
1756
                        SubscribeChannelEvents: func() (subscribe.Subscription,
×
UNCOV
1757
                                error) {
×
UNCOV
1758

×
UNCOV
1759
                                return s.channelNotifier.
×
UNCOV
1760
                                        SubscribeChannelEvents()
×
UNCOV
1761
                        },
×
1762
                        Signer: cc.Wallet.Cfg.Signer,
UNCOV
1763
                        NewAddress: func() ([]byte, error) {
×
UNCOV
1764
                                addr, err := newSweepPkScriptGen(
×
UNCOV
1765
                                        cc.Wallet, netParams,
×
UNCOV
1766
                                )().Unpack()
×
UNCOV
1767
                                if err != nil {
×
1768
                                        return nil, err
×
1769
                                }
×
1770

UNCOV
1771
                                return addr.DeliveryAddress, nil
×
1772
                        },
1773
                        SecretKeyRing:      s.cc.KeyRing,
1774
                        Dial:               cfg.net.Dial,
1775
                        AuthDial:           authDial,
1776
                        DB:                 dbs.TowerClientDB,
1777
                        ChainHash:          *s.cfg.ActiveNetParams.GenesisHash,
1778
                        MinBackoff:         10 * time.Second,
1779
                        MaxBackoff:         5 * time.Minute,
1780
                        MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
1781
                }, policy, anchorPolicy, taprootPolicy)
UNCOV
1782
                if err != nil {
×
1783
                        return nil, err
×
1784
                }
×
1785
        }
1786

UNCOV
1787
        if len(cfg.ExternalHosts) != 0 {
×
1788
                advertisedIPs := make(map[string]struct{})
×
1789
                for _, addr := range s.currentNodeAnn.Addresses {
×
1790
                        advertisedIPs[addr.String()] = struct{}{}
×
1791
                }
×
1792

1793
                s.hostAnn = netann.NewHostAnnouncer(netann.HostAnnouncerConfig{
×
1794
                        Hosts:         cfg.ExternalHosts,
×
1795
                        RefreshTicker: ticker.New(defaultHostSampleInterval),
×
1796
                        LookupHost: func(host string) (net.Addr, error) {
×
1797
                                return lncfg.ParseAddressString(
×
1798
                                        host, strconv.Itoa(defaultPeerPort),
×
1799
                                        cfg.net.ResolveTCPAddr,
×
1800
                                )
×
1801
                        },
×
1802
                        AdvertisedIPs: advertisedIPs,
1803
                        AnnounceNewIPs: netann.IPAnnouncer(
1804
                                func(modifier ...netann.NodeAnnModifier) (
1805
                                        lnwire.NodeAnnouncement, error) {
×
1806

×
1807
                                        return s.genNodeAnnouncement(
×
1808
                                                nil, modifier...,
×
1809
                                        )
×
1810
                                }),
×
1811
                })
1812
        }
1813

1814
        // Create liveness monitor.
UNCOV
1815
        s.createLivenessMonitor(cfg, cc, leaderElector)
×
UNCOV
1816

×
UNCOV
1817
        // Create the connection manager which will be responsible for
×
UNCOV
1818
        // maintaining persistent outbound connections and also accepting new
×
UNCOV
1819
        // incoming connections
×
UNCOV
1820
        cmgr, err := connmgr.New(&connmgr.Config{
×
UNCOV
1821
                Listeners:      listeners,
×
UNCOV
1822
                OnAccept:       s.InboundPeerConnected,
×
UNCOV
1823
                RetryDuration:  time.Second * 5,
×
UNCOV
1824
                TargetOutbound: 100,
×
UNCOV
1825
                Dial: noiseDial(
×
UNCOV
1826
                        nodeKeyECDH, s.cfg.net, s.cfg.ConnectionTimeout,
×
UNCOV
1827
                ),
×
UNCOV
1828
                OnConnection: s.OutboundPeerConnected,
×
UNCOV
1829
        })
×
UNCOV
1830
        if err != nil {
×
1831
                return nil, err
×
1832
        }
×
UNCOV
1833
        s.connMgr = cmgr
×
UNCOV
1834

×
NEW
1835
        // Finally, register the subsystems in blockbeat.
×
NEW
1836
        s.registerBlockConsumers()
×
NEW
1837

×
UNCOV
1838
        return s, nil
×
1839
}
1840

1841
// UpdateRoutingConfig is a callback function to update the routing config
1842
// values in the main cfg.
UNCOV
1843
func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) {
×
UNCOV
1844
        routerCfg := s.cfg.SubRPCServers.RouterRPC
×
UNCOV
1845

×
UNCOV
1846
        switch c := cfg.Estimator.Config().(type) {
×
UNCOV
1847
        case routing.AprioriConfig:
×
UNCOV
1848
                routerCfg.ProbabilityEstimatorType =
×
UNCOV
1849
                        routing.AprioriEstimatorName
×
UNCOV
1850

×
UNCOV
1851
                targetCfg := routerCfg.AprioriConfig
×
UNCOV
1852
                targetCfg.PenaltyHalfLife = c.PenaltyHalfLife
×
UNCOV
1853
                targetCfg.Weight = c.AprioriWeight
×
UNCOV
1854
                targetCfg.CapacityFraction = c.CapacityFraction
×
UNCOV
1855
                targetCfg.HopProbability = c.AprioriHopProbability
×
1856

UNCOV
1857
        case routing.BimodalConfig:
×
UNCOV
1858
                routerCfg.ProbabilityEstimatorType =
×
UNCOV
1859
                        routing.BimodalEstimatorName
×
UNCOV
1860

×
UNCOV
1861
                targetCfg := routerCfg.BimodalConfig
×
UNCOV
1862
                targetCfg.Scale = int64(c.BimodalScaleMsat)
×
UNCOV
1863
                targetCfg.NodeWeight = c.BimodalNodeWeight
×
UNCOV
1864
                targetCfg.DecayTime = c.BimodalDecayTime
×
1865
        }
1866

UNCOV
1867
        routerCfg.MaxMcHistory = cfg.MaxMcHistory
×
1868
}
1869

1870
// registerBlockConsumers registers the subsystems that consume block events.
1871
// By calling `RegisterQueue`, a list of subsystems are registered in the
1872
// blockbeat for block notifications. When a new block arrives, the subsystems
1873
// in the same queue are notified sequentially, and different queues are
1874
// notified concurrently.
1875
//
1876
// NOTE: To put a subsystem in a different queue, create a slice and pass it to
1877
// a new `RegisterQueue` call.
NEW
1878
func (s *server) registerBlockConsumers() {
×
NEW
1879
        // In this queue, when a new block arrives, it will be received and
×
NEW
1880
        // processed in this order: chainArb -> sweeper -> txPublisher.
×
NEW
1881
        consumers := []chainio.Consumer{
×
NEW
1882
                s.chainArb,
×
NEW
1883
                s.sweeper,
×
NEW
1884
                s.txPublisher,
×
NEW
1885
        }
×
NEW
1886
        s.blockbeatDispatcher.RegisterQueue(consumers)
×
NEW
1887
}
×
1888

1889
// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
1890
// used for option_scid_alias channels where the ChannelUpdate to be sent back
1891
// may differ from what is on disk.
1892
func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
UNCOV
1893
        error) {
×
UNCOV
1894

×
UNCOV
1895
        data, err := u.DataToSign()
×
UNCOV
1896
        if err != nil {
×
1897
                return nil, err
×
1898
        }
×
1899

UNCOV
1900
        return s.cc.MsgSigner.SignMessage(s.identityKeyLoc, data, true)
×
1901
}
1902

1903
// createLivenessMonitor creates a set of health checks using our configured
1904
// values and uses these checks to create a liveness monitor. Available
1905
// health checks,
1906
//   - chainHealthCheck (will be disabled for --nochainbackend mode)
1907
//   - diskCheck
1908
//   - tlsHealthCheck
1909
//   - torController, only created when tor is enabled.
1910
//
1911
// If a health check has been disabled by setting attempts to 0, our monitor
1912
// will not run it.
1913
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl,
UNCOV
1914
        leaderElector cluster.LeaderElector) {
×
UNCOV
1915

×
UNCOV
1916
        chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts
×
UNCOV
1917
        if cfg.Bitcoin.Node == "nochainbackend" {
×
1918
                srvrLog.Info("Disabling chain backend checks for " +
×
1919
                        "nochainbackend mode")
×
1920

×
1921
                chainBackendAttempts = 0
×
1922
        }
×
1923

UNCOV
1924
        chainHealthCheck := healthcheck.NewObservation(
×
UNCOV
1925
                "chain backend",
×
UNCOV
1926
                cc.HealthCheck,
×
UNCOV
1927
                cfg.HealthChecks.ChainCheck.Interval,
×
UNCOV
1928
                cfg.HealthChecks.ChainCheck.Timeout,
×
UNCOV
1929
                cfg.HealthChecks.ChainCheck.Backoff,
×
UNCOV
1930
                chainBackendAttempts,
×
UNCOV
1931
        )
×
UNCOV
1932

×
UNCOV
1933
        diskCheck := healthcheck.NewObservation(
×
UNCOV
1934
                "disk space",
×
UNCOV
1935
                func() error {
×
1936
                        free, err := healthcheck.AvailableDiskSpaceRatio(
×
1937
                                cfg.LndDir,
×
1938
                        )
×
1939
                        if err != nil {
×
1940
                                return err
×
1941
                        }
×
1942

1943
                        // If we have more free space than we require,
1944
                        // we return a nil error.
1945
                        if free > cfg.HealthChecks.DiskCheck.RequiredRemaining {
×
1946
                                return nil
×
1947
                        }
×
1948

1949
                        return fmt.Errorf("require: %v free space, got: %v",
×
1950
                                cfg.HealthChecks.DiskCheck.RequiredRemaining,
×
1951
                                free)
×
1952
                },
1953
                cfg.HealthChecks.DiskCheck.Interval,
1954
                cfg.HealthChecks.DiskCheck.Timeout,
1955
                cfg.HealthChecks.DiskCheck.Backoff,
1956
                cfg.HealthChecks.DiskCheck.Attempts,
1957
        )
1958

UNCOV
1959
        tlsHealthCheck := healthcheck.NewObservation(
×
UNCOV
1960
                "tls",
×
UNCOV
1961
                func() error {
×
1962
                        expired, expTime, err := s.tlsManager.IsCertExpired(
×
1963
                                s.cc.KeyRing,
×
1964
                        )
×
1965
                        if err != nil {
×
1966
                                return err
×
1967
                        }
×
1968
                        if expired {
×
1969
                                return fmt.Errorf("TLS certificate is "+
×
1970
                                        "expired as of %v", expTime)
×
1971
                        }
×
1972

1973
                        // If the certificate is not outdated, no error needs
1974
                        // to be returned
1975
                        return nil
×
1976
                },
1977
                cfg.HealthChecks.TLSCheck.Interval,
1978
                cfg.HealthChecks.TLSCheck.Timeout,
1979
                cfg.HealthChecks.TLSCheck.Backoff,
1980
                cfg.HealthChecks.TLSCheck.Attempts,
1981
        )
1982

UNCOV
1983
        checks := []*healthcheck.Observation{
×
UNCOV
1984
                chainHealthCheck, diskCheck, tlsHealthCheck,
×
UNCOV
1985
        }
×
UNCOV
1986

×
UNCOV
1987
        // If Tor is enabled, add the healthcheck for tor connection.
×
UNCOV
1988
        if s.torController != nil {
×
1989
                torConnectionCheck := healthcheck.NewObservation(
×
1990
                        "tor connection",
×
1991
                        func() error {
×
1992
                                return healthcheck.CheckTorServiceStatus(
×
1993
                                        s.torController,
×
1994
                                        s.createNewHiddenService,
×
1995
                                )
×
1996
                        },
×
1997
                        cfg.HealthChecks.TorConnection.Interval,
1998
                        cfg.HealthChecks.TorConnection.Timeout,
1999
                        cfg.HealthChecks.TorConnection.Backoff,
2000
                        cfg.HealthChecks.TorConnection.Attempts,
2001
                )
2002
                checks = append(checks, torConnectionCheck)
×
2003
        }
2004

2005
        // If remote signing is enabled, add the healthcheck for the remote
2006
        // signing RPC interface.
UNCOV
2007
        if s.cfg.RemoteSigner != nil && s.cfg.RemoteSigner.Enable {
×
UNCOV
2008
                // Because we have two cascading timeouts here, we need to add
×
UNCOV
2009
                // some slack to the "outer" one of them in case the "inner"
×
UNCOV
2010
                // returns exactly on time.
×
UNCOV
2011
                overhead := time.Millisecond * 10
×
UNCOV
2012

×
UNCOV
2013
                remoteSignerConnectionCheck := healthcheck.NewObservation(
×
UNCOV
2014
                        "remote signer connection",
×
UNCOV
2015
                        rpcwallet.HealthCheck(
×
UNCOV
2016
                                s.cfg.RemoteSigner,
×
UNCOV
2017

×
UNCOV
2018
                                // For the health check we might to be even
×
UNCOV
2019
                                // stricter than the initial/normal connect, so
×
UNCOV
2020
                                // we use the health check timeout here.
×
UNCOV
2021
                                cfg.HealthChecks.RemoteSigner.Timeout,
×
UNCOV
2022
                        ),
×
UNCOV
2023
                        cfg.HealthChecks.RemoteSigner.Interval,
×
UNCOV
2024
                        cfg.HealthChecks.RemoteSigner.Timeout+overhead,
×
UNCOV
2025
                        cfg.HealthChecks.RemoteSigner.Backoff,
×
UNCOV
2026
                        cfg.HealthChecks.RemoteSigner.Attempts,
×
UNCOV
2027
                )
×
UNCOV
2028
                checks = append(checks, remoteSignerConnectionCheck)
×
UNCOV
2029
        }
×
2030

2031
        // If we have a leader elector, we add a health check to ensure we are
2032
        // still the leader. During normal operation, we should always be the
2033
        // leader, but there are circumstances where this may change, such as
2034
        // when we lose network connectivity for long enough expiring out lease.
UNCOV
2035
        if leaderElector != nil {
×
2036
                leaderCheck := healthcheck.NewObservation(
×
2037
                        "leader status",
×
2038
                        func() error {
×
2039
                                // Check if we are still the leader. Note that
×
2040
                                // we don't need to use a timeout context here
×
2041
                                // as the healthcheck observer will handle the
×
2042
                                // timeout case for us.
×
2043
                                timeoutCtx, cancel := context.WithTimeout(
×
2044
                                        context.Background(),
×
2045
                                        cfg.HealthChecks.LeaderCheck.Timeout,
×
2046
                                )
×
2047
                                defer cancel()
×
2048

×
2049
                                leader, err := leaderElector.IsLeader(
×
2050
                                        timeoutCtx,
×
2051
                                )
×
2052
                                if err != nil {
×
2053
                                        return fmt.Errorf("unable to check if "+
×
2054
                                                "still leader: %v", err)
×
2055
                                }
×
2056

2057
                                if !leader {
×
2058
                                        srvrLog.Debug("Not the current leader")
×
2059
                                        return fmt.Errorf("not the current " +
×
2060
                                                "leader")
×
2061
                                }
×
2062

2063
                                return nil
×
2064
                        },
2065
                        cfg.HealthChecks.LeaderCheck.Interval,
2066
                        cfg.HealthChecks.LeaderCheck.Timeout,
2067
                        cfg.HealthChecks.LeaderCheck.Backoff,
2068
                        cfg.HealthChecks.LeaderCheck.Attempts,
2069
                )
2070

2071
                checks = append(checks, leaderCheck)
×
2072
        }
2073

2074
        // If we have not disabled all of our health checks, we create a
2075
        // liveness monitor with our configured checks.
UNCOV
2076
        s.livenessMonitor = healthcheck.NewMonitor(
×
UNCOV
2077
                &healthcheck.Config{
×
UNCOV
2078
                        Checks:   checks,
×
UNCOV
2079
                        Shutdown: srvrLog.Criticalf,
×
UNCOV
2080
                },
×
UNCOV
2081
        )
×
2082
}
2083

2084
// Started returns true if the server has been started, and false otherwise.
2085
// NOTE: This function is safe for concurrent access.
UNCOV
2086
func (s *server) Started() bool {
×
UNCOV
2087
        return atomic.LoadInt32(&s.active) != 0
×
UNCOV
2088
}
×
2089

2090
// cleaner is used to aggregate "cleanup" functions during an operation that
2091
// starts several subsystems. In case one of the subsystem fails to start
2092
// and a proper resource cleanup is required, the "run" method achieves this
2093
// by running all these added "cleanup" functions.
2094
type cleaner []func() error
2095

2096
// add is used to add a cleanup function to be called when
2097
// the run function is executed.
UNCOV
2098
func (c cleaner) add(cleanup func() error) cleaner {
×
UNCOV
2099
        return append(c, cleanup)
×
UNCOV
2100
}
×
2101

2102
// run is used to run all the previousely added cleanup functions.
2103
func (c cleaner) run() {
×
2104
        for i := len(c) - 1; i >= 0; i-- {
×
2105
                if err := c[i](); err != nil {
×
2106
                        srvrLog.Infof("Cleanup failed: %v", err)
×
2107
                }
×
2108
        }
2109
}
2110

2111
// startLowLevelServices starts the low-level services of the server. These
2112
// services must be started successfully before running the main server. The
2113
// services are,
2114
// 1. the chain notifier.
2115
//
2116
// TODO(yy): identify and add more low-level services here.
NEW
2117
func (s *server) startLowLevelServices() error {
×
NEW
2118
        var startErr error
×
NEW
2119

×
NEW
2120
        cleanup := cleaner{}
×
NEW
2121

×
NEW
2122
        cleanup = cleanup.add(s.cc.ChainNotifier.Stop)
×
NEW
2123
        if err := s.cc.ChainNotifier.Start(); err != nil {
×
NEW
2124
                startErr = err
×
NEW
2125
        }
×
2126

NEW
2127
        if startErr != nil {
×
NEW
2128
                cleanup.run()
×
NEW
2129
        }
×
2130

NEW
2131
        return startErr
×
2132
}
2133

2134
// Start starts the main daemon server, all requested listeners, and any helper
2135
// goroutines.
2136
// NOTE: This function is safe for concurrent access.
2137
//
2138
//nolint:funlen
UNCOV
2139
func (s *server) Start() error {
×
NEW
2140
        // Get the current blockbeat.
×
NEW
2141
        beat, err := s.getStartingBeat()
×
NEW
2142
        if err != nil {
×
NEW
2143
                return err
×
NEW
2144
        }
×
2145

UNCOV
2146
        var startErr error
×
UNCOV
2147

×
UNCOV
2148
        // If one sub system fails to start, the following code ensures that the
×
UNCOV
2149
        // previous started ones are stopped. It also ensures a proper wallet
×
UNCOV
2150
        // shutdown which is important for releasing its resources (boltdb, etc...)
×
UNCOV
2151
        cleanup := cleaner{}
×
UNCOV
2152

×
UNCOV
2153
        s.start.Do(func() {
×
UNCOV
2154
                cleanup = cleanup.add(s.customMessageServer.Stop)
×
UNCOV
2155
                if err := s.customMessageServer.Start(); err != nil {
×
2156
                        startErr = err
×
2157
                        return
×
2158
                }
×
2159

UNCOV
2160
                if s.hostAnn != nil {
×
2161
                        cleanup = cleanup.add(s.hostAnn.Stop)
×
2162
                        if err := s.hostAnn.Start(); err != nil {
×
2163
                                startErr = err
×
2164
                                return
×
2165
                        }
×
2166
                }
2167

UNCOV
2168
                if s.livenessMonitor != nil {
×
UNCOV
2169
                        cleanup = cleanup.add(s.livenessMonitor.Stop)
×
UNCOV
2170
                        if err := s.livenessMonitor.Start(); err != nil {
×
2171
                                startErr = err
×
2172
                                return
×
2173
                        }
×
2174
                }
2175

2176
                // Start the notification server. This is used so channel
2177
                // management goroutines can be notified when a funding
2178
                // transaction reaches a sufficient number of confirmations, or
2179
                // when the input for the funding transaction is spent in an
2180
                // attempt at an uncooperative close by the counterparty.
UNCOV
2181
                cleanup = cleanup.add(s.sigPool.Stop)
×
UNCOV
2182
                if err := s.sigPool.Start(); err != nil {
×
2183
                        startErr = err
×
2184
                        return
×
2185
                }
×
2186

UNCOV
2187
                cleanup = cleanup.add(s.writePool.Stop)
×
UNCOV
2188
                if err := s.writePool.Start(); err != nil {
×
2189
                        startErr = err
×
2190
                        return
×
2191
                }
×
2192

UNCOV
2193
                cleanup = cleanup.add(s.readPool.Stop)
×
UNCOV
2194
                if err := s.readPool.Start(); err != nil {
×
2195
                        startErr = err
×
2196
                        return
×
2197
                }
×
2198

UNCOV
2199
                cleanup = cleanup.add(s.cc.BestBlockTracker.Stop)
×
UNCOV
2200
                if err := s.cc.BestBlockTracker.Start(); err != nil {
×
2201
                        startErr = err
×
2202
                        return
×
2203
                }
×
2204

UNCOV
2205
                cleanup = cleanup.add(s.channelNotifier.Stop)
×
UNCOV
2206
                if err := s.channelNotifier.Start(); err != nil {
×
2207
                        startErr = err
×
2208
                        return
×
2209
                }
×
2210

UNCOV
2211
                cleanup = cleanup.add(func() error {
×
2212
                        return s.peerNotifier.Stop()
×
2213
                })
×
UNCOV
2214
                if err := s.peerNotifier.Start(); err != nil {
×
2215
                        startErr = err
×
2216
                        return
×
2217
                }
×
2218

UNCOV
2219
                cleanup = cleanup.add(s.htlcNotifier.Stop)
×
UNCOV
2220
                if err := s.htlcNotifier.Start(); err != nil {
×
2221
                        startErr = err
×
2222
                        return
×
2223
                }
×
2224

UNCOV
2225
                if s.towerClientMgr != nil {
×
UNCOV
2226
                        cleanup = cleanup.add(s.towerClientMgr.Stop)
×
UNCOV
2227
                        if err := s.towerClientMgr.Start(); err != nil {
×
2228
                                startErr = err
×
2229
                                return
×
2230
                        }
×
2231
                }
2232

UNCOV
2233
                cleanup = cleanup.add(s.txPublisher.Stop)
×
NEW
2234
                if err := s.txPublisher.Start(beat); err != nil {
×
2235
                        startErr = err
×
2236
                        return
×
2237
                }
×
2238

UNCOV
2239
                cleanup = cleanup.add(s.sweeper.Stop)
×
NEW
2240
                if err := s.sweeper.Start(beat); err != nil {
×
2241
                        startErr = err
×
2242
                        return
×
2243
                }
×
2244

UNCOV
2245
                cleanup = cleanup.add(s.utxoNursery.Stop)
×
UNCOV
2246
                if err := s.utxoNursery.Start(); err != nil {
×
2247
                        startErr = err
×
2248
                        return
×
2249
                }
×
2250

UNCOV
2251
                cleanup = cleanup.add(s.breachArbitrator.Stop)
×
UNCOV
2252
                if err := s.breachArbitrator.Start(); err != nil {
×
2253
                        startErr = err
×
2254
                        return
×
2255
                }
×
2256

UNCOV
2257
                cleanup = cleanup.add(s.fundingMgr.Stop)
×
UNCOV
2258
                if err := s.fundingMgr.Start(); err != nil {
×
2259
                        startErr = err
×
2260
                        return
×
2261
                }
×
2262

2263
                // htlcSwitch must be started before chainArb since the latter
2264
                // relies on htlcSwitch to deliver resolution message upon
2265
                // start.
UNCOV
2266
                cleanup = cleanup.add(s.htlcSwitch.Stop)
×
UNCOV
2267
                if err := s.htlcSwitch.Start(); err != nil {
×
2268
                        startErr = err
×
2269
                        return
×
2270
                }
×
2271

UNCOV
2272
                cleanup = cleanup.add(s.interceptableSwitch.Stop)
×
UNCOV
2273
                if err := s.interceptableSwitch.Start(); err != nil {
×
2274
                        startErr = err
×
2275
                        return
×
2276
                }
×
2277

UNCOV
2278
                cleanup = cleanup.add(s.invoiceHtlcModifier.Stop)
×
UNCOV
2279
                if err := s.invoiceHtlcModifier.Start(); err != nil {
×
2280
                        startErr = err
×
2281
                        return
×
2282
                }
×
2283

UNCOV
2284
                cleanup = cleanup.add(s.chainArb.Stop)
×
NEW
2285
                if err := s.chainArb.Start(beat); err != nil {
×
2286
                        startErr = err
×
2287
                        return
×
2288
                }
×
2289

UNCOV
2290
                cleanup = cleanup.add(s.graphBuilder.Stop)
×
UNCOV
2291
                if err := s.graphBuilder.Start(); err != nil {
×
2292
                        startErr = err
×
2293
                        return
×
2294
                }
×
2295

UNCOV
2296
                cleanup = cleanup.add(s.chanRouter.Stop)
×
UNCOV
2297
                if err := s.chanRouter.Start(); err != nil {
×
2298
                        startErr = err
×
2299
                        return
×
2300
                }
×
2301
                // The authGossiper depends on the chanRouter and therefore
2302
                // should be started after it.
UNCOV
2303
                cleanup = cleanup.add(s.authGossiper.Stop)
×
UNCOV
2304
                if err := s.authGossiper.Start(); err != nil {
×
2305
                        startErr = err
×
2306
                        return
×
2307
                }
×
2308

UNCOV
2309
                cleanup = cleanup.add(s.invoices.Stop)
×
UNCOV
2310
                if err := s.invoices.Start(); err != nil {
×
2311
                        startErr = err
×
2312
                        return
×
2313
                }
×
2314

UNCOV
2315
                cleanup = cleanup.add(s.sphinx.Stop)
×
UNCOV
2316
                if err := s.sphinx.Start(); err != nil {
×
2317
                        startErr = err
×
2318
                        return
×
2319
                }
×
2320

UNCOV
2321
                cleanup = cleanup.add(s.chanStatusMgr.Stop)
×
UNCOV
2322
                if err := s.chanStatusMgr.Start(); err != nil {
×
2323
                        startErr = err
×
2324
                        return
×
2325
                }
×
2326

UNCOV
2327
                cleanup = cleanup.add(s.chanEventStore.Stop)
×
UNCOV
2328
                if err := s.chanEventStore.Start(); err != nil {
×
2329
                        startErr = err
×
2330
                        return
×
2331
                }
×
2332

UNCOV
2333
                cleanup.add(func() error {
×
2334
                        s.missionController.StopStoreTickers()
×
2335
                        return nil
×
2336
                })
×
UNCOV
2337
                s.missionController.RunStoreTickers()
×
UNCOV
2338

×
UNCOV
2339
                // Before we start the connMgr, we'll check to see if we have
×
UNCOV
2340
                // any backups to recover. We do this now as we want to ensure
×
UNCOV
2341
                // that have all the information we need to handle channel
×
UNCOV
2342
                // recovery _before_ we even accept connections from any peers.
×
UNCOV
2343
                chanRestorer := &chanDBRestorer{
×
UNCOV
2344
                        db:         s.chanStateDB,
×
UNCOV
2345
                        secretKeys: s.cc.KeyRing,
×
UNCOV
2346
                        chainArb:   s.chainArb,
×
UNCOV
2347
                }
×
UNCOV
2348
                if len(s.chansToRestore.PackedSingleChanBackups) != 0 {
×
2349
                        _, err := chanbackup.UnpackAndRecoverSingles(
×
2350
                                s.chansToRestore.PackedSingleChanBackups,
×
2351
                                s.cc.KeyRing, chanRestorer, s,
×
2352
                        )
×
2353
                        if err != nil {
×
2354
                                startErr = fmt.Errorf("unable to unpack single "+
×
2355
                                        "backups: %v", err)
×
2356
                                return
×
2357
                        }
×
2358
                }
UNCOV
2359
                if len(s.chansToRestore.PackedMultiChanBackup) != 0 {
×
UNCOV
2360
                        _, err := chanbackup.UnpackAndRecoverMulti(
×
UNCOV
2361
                                s.chansToRestore.PackedMultiChanBackup,
×
UNCOV
2362
                                s.cc.KeyRing, chanRestorer, s,
×
UNCOV
2363
                        )
×
UNCOV
2364
                        if err != nil {
×
2365
                                startErr = fmt.Errorf("unable to unpack chan "+
×
2366
                                        "backup: %v", err)
×
2367
                                return
×
2368
                        }
×
2369
                }
2370

2371
                // chanSubSwapper must be started after the `channelNotifier`
2372
                // because it depends on channel events as a synchronization
2373
                // point.
UNCOV
2374
                cleanup = cleanup.add(s.chanSubSwapper.Stop)
×
UNCOV
2375
                if err := s.chanSubSwapper.Start(); err != nil {
×
2376
                        startErr = err
×
2377
                        return
×
2378
                }
×
2379

UNCOV
2380
                if s.torController != nil {
×
2381
                        cleanup = cleanup.add(s.torController.Stop)
×
2382
                        if err := s.createNewHiddenService(); err != nil {
×
2383
                                startErr = err
×
2384
                                return
×
2385
                        }
×
2386
                }
2387

UNCOV
2388
                if s.natTraversal != nil {
×
2389
                        s.wg.Add(1)
×
2390
                        go s.watchExternalIP()
×
2391
                }
×
2392

2393
                // Start connmgr last to prevent connections before init.
UNCOV
2394
                cleanup = cleanup.add(func() error {
×
2395
                        s.connMgr.Stop()
×
2396
                        return nil
×
2397
                })
×
UNCOV
2398
                s.connMgr.Start()
×
UNCOV
2399

×
UNCOV
2400
                // If peers are specified as a config option, we'll add those
×
UNCOV
2401
                // peers first.
×
UNCOV
2402
                for _, peerAddrCfg := range s.cfg.AddPeers {
×
UNCOV
2403
                        parsedPubkey, parsedHost, err := lncfg.ParseLNAddressPubkey(
×
UNCOV
2404
                                peerAddrCfg,
×
UNCOV
2405
                        )
×
UNCOV
2406
                        if err != nil {
×
2407
                                startErr = fmt.Errorf("unable to parse peer "+
×
2408
                                        "pubkey from config: %v", err)
×
2409
                                return
×
2410
                        }
×
UNCOV
2411
                        addr, err := parseAddr(parsedHost, s.cfg.net)
×
UNCOV
2412
                        if err != nil {
×
2413
                                startErr = fmt.Errorf("unable to parse peer "+
×
2414
                                        "address provided as a config option: "+
×
2415
                                        "%v", err)
×
2416
                                return
×
2417
                        }
×
2418

UNCOV
2419
                        peerAddr := &lnwire.NetAddress{
×
UNCOV
2420
                                IdentityKey: parsedPubkey,
×
UNCOV
2421
                                Address:     addr,
×
UNCOV
2422
                                ChainNet:    s.cfg.ActiveNetParams.Net,
×
UNCOV
2423
                        }
×
UNCOV
2424

×
UNCOV
2425
                        err = s.ConnectToPeer(
×
UNCOV
2426
                                peerAddr, true,
×
UNCOV
2427
                                s.cfg.ConnectionTimeout,
×
UNCOV
2428
                        )
×
UNCOV
2429
                        if err != nil {
×
2430
                                startErr = fmt.Errorf("unable to connect to "+
×
2431
                                        "peer address provided as a config "+
×
2432
                                        "option: %v", err)
×
2433
                                return
×
2434
                        }
×
2435
                }
2436

2437
                // Subscribe to NodeAnnouncements that advertise new addresses
2438
                // our persistent peers.
UNCOV
2439
                if err := s.updatePersistentPeerAddrs(); err != nil {
×
2440
                        startErr = err
×
2441
                        return
×
2442
                }
×
2443

2444
                // With all the relevant sub-systems started, we'll now attempt
2445
                // to establish persistent connections to our direct channel
2446
                // collaborators within the network. Before doing so however,
2447
                // we'll prune our set of link nodes found within the database
2448
                // to ensure we don't reconnect to any nodes we no longer have
2449
                // open channels with.
UNCOV
2450
                if err := s.chanStateDB.PruneLinkNodes(); err != nil {
×
2451
                        startErr = err
×
2452
                        return
×
2453
                }
×
UNCOV
2454
                if err := s.establishPersistentConnections(); err != nil {
×
2455
                        startErr = err
×
2456
                        return
×
2457
                }
×
2458

2459
                // setSeedList is a helper function that turns multiple DNS seed
2460
                // server tuples from the command line or config file into the
2461
                // data structure we need and does a basic formal sanity check
2462
                // in the process.
UNCOV
2463
                setSeedList := func(tuples []string, genesisHash chainhash.Hash) {
×
2464
                        if len(tuples) == 0 {
×
2465
                                return
×
2466
                        }
×
2467

2468
                        result := make([][2]string, len(tuples))
×
2469
                        for idx, tuple := range tuples {
×
2470
                                tuple = strings.TrimSpace(tuple)
×
2471
                                if len(tuple) == 0 {
×
2472
                                        return
×
2473
                                }
×
2474

2475
                                servers := strings.Split(tuple, ",")
×
2476
                                if len(servers) > 2 || len(servers) == 0 {
×
2477
                                        srvrLog.Warnf("Ignoring invalid DNS "+
×
2478
                                                "seed tuple: %v", servers)
×
2479
                                        return
×
2480
                                }
×
2481

2482
                                copy(result[idx][:], servers)
×
2483
                        }
2484

2485
                        chainreg.ChainDNSSeeds[genesisHash] = result
×
2486
                }
2487

2488
                // Let users overwrite the DNS seed nodes. We only allow them
2489
                // for bitcoin mainnet/testnet/signet.
UNCOV
2490
                if s.cfg.Bitcoin.MainNet {
×
2491
                        setSeedList(
×
2492
                                s.cfg.Bitcoin.DNSSeeds,
×
2493
                                chainreg.BitcoinMainnetGenesis,
×
2494
                        )
×
2495
                }
×
UNCOV
2496
                if s.cfg.Bitcoin.TestNet3 {
×
2497
                        setSeedList(
×
2498
                                s.cfg.Bitcoin.DNSSeeds,
×
2499
                                chainreg.BitcoinTestnetGenesis,
×
2500
                        )
×
2501
                }
×
UNCOV
2502
                if s.cfg.Bitcoin.SigNet {
×
2503
                        setSeedList(
×
2504
                                s.cfg.Bitcoin.DNSSeeds,
×
2505
                                chainreg.BitcoinSignetGenesis,
×
2506
                        )
×
2507
                }
×
2508

2509
                // If network bootstrapping hasn't been disabled, then we'll
2510
                // configure the set of active bootstrappers, and launch a
2511
                // dedicated goroutine to maintain a set of persistent
2512
                // connections.
UNCOV
2513
                if shouldPeerBootstrap(s.cfg) {
×
2514
                        bootstrappers, err := initNetworkBootstrappers(s)
×
2515
                        if err != nil {
×
2516
                                startErr = err
×
2517
                                return
×
2518
                        }
×
2519

2520
                        s.wg.Add(1)
×
2521
                        go s.peerBootstrapper(defaultMinPeers, bootstrappers)
×
UNCOV
2522
                } else {
×
UNCOV
2523
                        srvrLog.Infof("Auto peer bootstrapping is disabled")
×
UNCOV
2524
                }
×
2525

2526
                // Start the blockbeat after all other subsystems have been
2527
                // started so they are ready to receive new blocks.
NEW
2528
                cleanup = cleanup.add(func() error {
×
NEW
2529
                        s.blockbeatDispatcher.Stop()
×
NEW
2530
                        return nil
×
NEW
2531
                })
×
NEW
2532
                if err := s.blockbeatDispatcher.Start(); err != nil {
×
NEW
2533
                        startErr = err
×
NEW
2534
                        return
×
NEW
2535
                }
×
2536

2537
                // Set the active flag now that we've completed the full
2538
                // startup.
UNCOV
2539
                atomic.StoreInt32(&s.active, 1)
×
2540
        })
2541

UNCOV
2542
        if startErr != nil {
×
2543
                cleanup.run()
×
2544
        }
×
UNCOV
2545
        return startErr
×
2546
}
2547

2548
// Stop gracefully shutsdown the main daemon server. This function will signal
2549
// any active goroutines, or helper objects to exit, then blocks until they've
2550
// all successfully exited. Additionally, any/all listeners are closed.
2551
// NOTE: This function is safe for concurrent access.
UNCOV
2552
func (s *server) Stop() error {
×
UNCOV
2553
        s.stop.Do(func() {
×
UNCOV
2554
                atomic.StoreInt32(&s.stopping, 1)
×
UNCOV
2555

×
UNCOV
2556
                close(s.quit)
×
UNCOV
2557

×
UNCOV
2558
                // Shutdown connMgr first to prevent conns during shutdown.
×
UNCOV
2559
                s.connMgr.Stop()
×
UNCOV
2560

×
NEW
2561
                // Stop dispatching blocks to other systems immediately.
×
NEW
2562
                s.blockbeatDispatcher.Stop()
×
NEW
2563

×
UNCOV
2564
                // Shutdown the wallet, funding manager, and the rpc server.
×
UNCOV
2565
                if err := s.chanStatusMgr.Stop(); err != nil {
×
2566
                        srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)
×
2567
                }
×
UNCOV
2568
                if err := s.htlcSwitch.Stop(); err != nil {
×
2569
                        srvrLog.Warnf("failed to stop htlcSwitch: %v", err)
×
2570
                }
×
UNCOV
2571
                if err := s.sphinx.Stop(); err != nil {
×
2572
                        srvrLog.Warnf("failed to stop sphinx: %v", err)
×
2573
                }
×
UNCOV
2574
                if err := s.invoices.Stop(); err != nil {
×
2575
                        srvrLog.Warnf("failed to stop invoices: %v", err)
×
2576
                }
×
UNCOV
2577
                if err := s.interceptableSwitch.Stop(); err != nil {
×
2578
                        srvrLog.Warnf("failed to stop interceptable "+
×
2579
                                "switch: %v", err)
×
2580
                }
×
UNCOV
2581
                if err := s.invoiceHtlcModifier.Stop(); err != nil {
×
2582
                        srvrLog.Warnf("failed to stop htlc invoices "+
×
2583
                                "modifier: %v", err)
×
2584
                }
×
UNCOV
2585
                if err := s.chanRouter.Stop(); err != nil {
×
2586
                        srvrLog.Warnf("failed to stop chanRouter: %v", err)
×
2587
                }
×
UNCOV
2588
                if err := s.graphBuilder.Stop(); err != nil {
×
2589
                        srvrLog.Warnf("failed to stop graphBuilder %v", err)
×
2590
                }
×
UNCOV
2591
                if err := s.chainArb.Stop(); err != nil {
×
2592
                        srvrLog.Warnf("failed to stop chainArb: %v", err)
×
2593
                }
×
UNCOV
2594
                if err := s.fundingMgr.Stop(); err != nil {
×
2595
                        srvrLog.Warnf("failed to stop fundingMgr: %v", err)
×
2596
                }
×
UNCOV
2597
                if err := s.breachArbitrator.Stop(); err != nil {
×
2598
                        srvrLog.Warnf("failed to stop breachArbitrator: %v",
×
2599
                                err)
×
2600
                }
×
UNCOV
2601
                if err := s.utxoNursery.Stop(); err != nil {
×
2602
                        srvrLog.Warnf("failed to stop utxoNursery: %v", err)
×
2603
                }
×
UNCOV
2604
                if err := s.authGossiper.Stop(); err != nil {
×
2605
                        srvrLog.Warnf("failed to stop authGossiper: %v", err)
×
2606
                }
×
UNCOV
2607
                if err := s.sweeper.Stop(); err != nil {
×
2608
                        srvrLog.Warnf("failed to stop sweeper: %v", err)
×
2609
                }
×
UNCOV
2610
                if err := s.txPublisher.Stop(); err != nil {
×
2611
                        srvrLog.Warnf("failed to stop txPublisher: %v", err)
×
2612
                }
×
UNCOV
2613
                if err := s.channelNotifier.Stop(); err != nil {
×
2614
                        srvrLog.Warnf("failed to stop channelNotifier: %v", err)
×
2615
                }
×
UNCOV
2616
                if err := s.peerNotifier.Stop(); err != nil {
×
2617
                        srvrLog.Warnf("failed to stop peerNotifier: %v", err)
×
2618
                }
×
UNCOV
2619
                if err := s.htlcNotifier.Stop(); err != nil {
×
2620
                        srvrLog.Warnf("failed to stop htlcNotifier: %v", err)
×
2621
                }
×
2622

2623
                // Update channel.backup file. Make sure to do it before
2624
                // stopping chanSubSwapper.
UNCOV
2625
                singles, err := chanbackup.FetchStaticChanBackups(
×
UNCOV
2626
                        s.chanStateDB, s.addrSource,
×
UNCOV
2627
                )
×
UNCOV
2628
                if err != nil {
×
2629
                        srvrLog.Warnf("failed to fetch channel states: %v",
×
2630
                                err)
×
UNCOV
2631
                } else {
×
UNCOV
2632
                        err := s.chanSubSwapper.ManualUpdate(singles)
×
UNCOV
2633
                        if err != nil {
×
UNCOV
2634
                                srvrLog.Warnf("Manual update of channel "+
×
UNCOV
2635
                                        "backup failed: %v", err)
×
UNCOV
2636
                        }
×
2637
                }
2638

UNCOV
2639
                if err := s.chanSubSwapper.Stop(); err != nil {
×
2640
                        srvrLog.Warnf("failed to stop chanSubSwapper: %v", err)
×
2641
                }
×
UNCOV
2642
                if err := s.cc.ChainNotifier.Stop(); err != nil {
×
2643
                        srvrLog.Warnf("Unable to stop ChainNotifier: %v", err)
×
2644
                }
×
UNCOV
2645
                if err := s.cc.BestBlockTracker.Stop(); err != nil {
×
2646
                        srvrLog.Warnf("Unable to stop BestBlockTracker: %v",
×
2647
                                err)
×
2648
                }
×
UNCOV
2649
                if err := s.chanEventStore.Stop(); err != nil {
×
2650
                        srvrLog.Warnf("Unable to stop ChannelEventStore: %v",
×
2651
                                err)
×
2652
                }
×
UNCOV
2653
                s.missionController.StopStoreTickers()
×
UNCOV
2654

×
UNCOV
2655
                // Disconnect from each active peers to ensure that
×
UNCOV
2656
                // peerTerminationWatchers signal completion to each peer.
×
UNCOV
2657
                for _, peer := range s.Peers() {
×
UNCOV
2658
                        err := s.DisconnectPeer(peer.IdentityKey())
×
UNCOV
2659
                        if err != nil {
×
2660
                                srvrLog.Warnf("could not disconnect peer: %v"+
×
2661
                                        "received error: %v", peer.IdentityKey(),
×
2662
                                        err,
×
2663
                                )
×
2664
                        }
×
2665
                }
2666

2667
                // Now that all connections have been torn down, stop the tower
2668
                // client which will reliably flush all queued states to the
2669
                // tower. If this is halted for any reason, the force quit timer
2670
                // will kick in and abort to allow this method to return.
UNCOV
2671
                if s.towerClientMgr != nil {
×
UNCOV
2672
                        if err := s.towerClientMgr.Stop(); err != nil {
×
2673
                                srvrLog.Warnf("Unable to shut down tower "+
×
2674
                                        "client manager: %v", err)
×
2675
                        }
×
2676
                }
2677

UNCOV
2678
                if s.hostAnn != nil {
×
2679
                        if err := s.hostAnn.Stop(); err != nil {
×
2680
                                srvrLog.Warnf("unable to shut down host "+
×
2681
                                        "annoucner: %v", err)
×
2682
                        }
×
2683
                }
2684

UNCOV
2685
                if s.livenessMonitor != nil {
×
UNCOV
2686
                        if err := s.livenessMonitor.Stop(); err != nil {
×
2687
                                srvrLog.Warnf("unable to shutdown liveness "+
×
2688
                                        "monitor: %v", err)
×
2689
                        }
×
2690
                }
2691

2692
                // Wait for all lingering goroutines to quit.
UNCOV
2693
                srvrLog.Debug("Waiting for server to shutdown...")
×
UNCOV
2694
                s.wg.Wait()
×
UNCOV
2695

×
UNCOV
2696
                srvrLog.Debug("Stopping buffer pools...")
×
UNCOV
2697
                s.sigPool.Stop()
×
UNCOV
2698
                s.writePool.Stop()
×
UNCOV
2699
                s.readPool.Stop()
×
2700
        })
2701

UNCOV
2702
        return nil
×
2703
}
2704

2705
// Stopped returns true if the server has been instructed to shutdown.
2706
// NOTE: This function is safe for concurrent access.
UNCOV
2707
func (s *server) Stopped() bool {
×
UNCOV
2708
        return atomic.LoadInt32(&s.stopping) != 0
×
UNCOV
2709
}
×
2710

2711
// configurePortForwarding attempts to set up port forwarding for the different
2712
// ports that the server will be listening on.
2713
//
2714
// NOTE: This should only be used when using some kind of NAT traversal to
2715
// automatically set up forwarding rules.
2716
func (s *server) configurePortForwarding(ports ...uint16) ([]string, error) {
×
2717
        ip, err := s.natTraversal.ExternalIP()
×
2718
        if err != nil {
×
2719
                return nil, err
×
2720
        }
×
2721
        s.lastDetectedIP = ip
×
2722

×
2723
        externalIPs := make([]string, 0, len(ports))
×
2724
        for _, port := range ports {
×
2725
                if err := s.natTraversal.AddPortMapping(port); err != nil {
×
2726
                        srvrLog.Debugf("Unable to forward port %d: %v", port, err)
×
2727
                        continue
×
2728
                }
2729

2730
                hostIP := fmt.Sprintf("%v:%d", ip, port)
×
2731
                externalIPs = append(externalIPs, hostIP)
×
2732
        }
2733

2734
        return externalIPs, nil
×
2735
}
2736

2737
// removePortForwarding attempts to clear the forwarding rules for the different
2738
// ports the server is currently listening on.
2739
//
2740
// NOTE: This should only be used when using some kind of NAT traversal to
2741
// automatically set up forwarding rules.
2742
func (s *server) removePortForwarding() {
×
2743
        forwardedPorts := s.natTraversal.ForwardedPorts()
×
2744
        for _, port := range forwardedPorts {
×
2745
                if err := s.natTraversal.DeletePortMapping(port); err != nil {
×
2746
                        srvrLog.Errorf("Unable to remove forwarding rules for "+
×
2747
                                "port %d: %v", port, err)
×
2748
                }
×
2749
        }
2750
}
2751

2752
// watchExternalIP continuously checks for an updated external IP address every
2753
// 15 minutes. Once a new IP address has been detected, it will automatically
2754
// handle port forwarding rules and send updated node announcements to the
2755
// currently connected peers.
2756
//
2757
// NOTE: This MUST be run as a goroutine.
2758
func (s *server) watchExternalIP() {
×
2759
        defer s.wg.Done()
×
2760

×
2761
        // Before exiting, we'll make sure to remove the forwarding rules set
×
2762
        // up by the server.
×
2763
        defer s.removePortForwarding()
×
2764

×
2765
        // Keep track of the external IPs set by the user to avoid replacing
×
2766
        // them when detecting a new IP.
×
2767
        ipsSetByUser := make(map[string]struct{})
×
2768
        for _, ip := range s.cfg.ExternalIPs {
×
2769
                ipsSetByUser[ip.String()] = struct{}{}
×
2770
        }
×
2771

2772
        forwardedPorts := s.natTraversal.ForwardedPorts()
×
2773

×
2774
        ticker := time.NewTicker(15 * time.Minute)
×
2775
        defer ticker.Stop()
×
2776
out:
×
2777
        for {
×
2778
                select {
×
2779
                case <-ticker.C:
×
2780
                        // We'll start off by making sure a new IP address has
×
2781
                        // been detected.
×
2782
                        ip, err := s.natTraversal.ExternalIP()
×
2783
                        if err != nil {
×
2784
                                srvrLog.Debugf("Unable to retrieve the "+
×
2785
                                        "external IP address: %v", err)
×
2786
                                continue
×
2787
                        }
2788

2789
                        // Periodically renew the NAT port forwarding.
2790
                        for _, port := range forwardedPorts {
×
2791
                                err := s.natTraversal.AddPortMapping(port)
×
2792
                                if err != nil {
×
2793
                                        srvrLog.Warnf("Unable to automatically "+
×
2794
                                                "re-create port forwarding using %s: %v",
×
2795
                                                s.natTraversal.Name(), err)
×
2796
                                } else {
×
2797
                                        srvrLog.Debugf("Automatically re-created "+
×
2798
                                                "forwarding for port %d using %s to "+
×
2799
                                                "advertise external IP",
×
2800
                                                port, s.natTraversal.Name())
×
2801
                                }
×
2802
                        }
2803

2804
                        if ip.Equal(s.lastDetectedIP) {
×
2805
                                continue
×
2806
                        }
2807

2808
                        srvrLog.Infof("Detected new external IP address %s", ip)
×
2809

×
2810
                        // Next, we'll craft the new addresses that will be
×
2811
                        // included in the new node announcement and advertised
×
2812
                        // to the network. Each address will consist of the new
×
2813
                        // IP detected and one of the currently advertised
×
2814
                        // ports.
×
2815
                        var newAddrs []net.Addr
×
2816
                        for _, port := range forwardedPorts {
×
2817
                                hostIP := fmt.Sprintf("%v:%d", ip, port)
×
2818
                                addr, err := net.ResolveTCPAddr("tcp", hostIP)
×
2819
                                if err != nil {
×
2820
                                        srvrLog.Debugf("Unable to resolve "+
×
2821
                                                "host %v: %v", addr, err)
×
2822
                                        continue
×
2823
                                }
2824

2825
                                newAddrs = append(newAddrs, addr)
×
2826
                        }
2827

2828
                        // Skip the update if we weren't able to resolve any of
2829
                        // the new addresses.
2830
                        if len(newAddrs) == 0 {
×
2831
                                srvrLog.Debug("Skipping node announcement " +
×
2832
                                        "update due to not being able to " +
×
2833
                                        "resolve any new addresses")
×
2834
                                continue
×
2835
                        }
2836

2837
                        // Now, we'll need to update the addresses in our node's
2838
                        // announcement in order to propagate the update
2839
                        // throughout the network. We'll only include addresses
2840
                        // that have a different IP from the previous one, as
2841
                        // the previous IP is no longer valid.
2842
                        currentNodeAnn := s.getNodeAnnouncement()
×
2843

×
2844
                        for _, addr := range currentNodeAnn.Addresses {
×
2845
                                host, _, err := net.SplitHostPort(addr.String())
×
2846
                                if err != nil {
×
2847
                                        srvrLog.Debugf("Unable to determine "+
×
2848
                                                "host from address %v: %v",
×
2849
                                                addr, err)
×
2850
                                        continue
×
2851
                                }
2852

2853
                                // We'll also make sure to include external IPs
2854
                                // set manually by the user.
2855
                                _, setByUser := ipsSetByUser[addr.String()]
×
2856
                                if setByUser || host != s.lastDetectedIP.String() {
×
2857
                                        newAddrs = append(newAddrs, addr)
×
2858
                                }
×
2859
                        }
2860

2861
                        // Then, we'll generate a new timestamped node
2862
                        // announcement with the updated addresses and broadcast
2863
                        // it to our peers.
2864
                        newNodeAnn, err := s.genNodeAnnouncement(
×
2865
                                nil, netann.NodeAnnSetAddrs(newAddrs),
×
2866
                        )
×
2867
                        if err != nil {
×
2868
                                srvrLog.Debugf("Unable to generate new node "+
×
2869
                                        "announcement: %v", err)
×
2870
                                continue
×
2871
                        }
2872

2873
                        err = s.BroadcastMessage(nil, &newNodeAnn)
×
2874
                        if err != nil {
×
2875
                                srvrLog.Debugf("Unable to broadcast new node "+
×
2876
                                        "announcement to peers: %v", err)
×
2877
                                continue
×
2878
                        }
2879

2880
                        // Finally, update the last IP seen to the current one.
2881
                        s.lastDetectedIP = ip
×
2882
                case <-s.quit:
×
2883
                        break out
×
2884
                }
2885
        }
2886
}
2887

2888
// initNetworkBootstrappers initializes a set of network peer bootstrappers
2889
// based on the server, and currently active bootstrap mechanisms as defined
2890
// within the current configuration.
2891
func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, error) {
×
2892
        srvrLog.Infof("Initializing peer network bootstrappers!")
×
2893

×
2894
        var bootStrappers []discovery.NetworkPeerBootstrapper
×
2895

×
2896
        // First, we'll create an instance of the ChannelGraphBootstrapper as
×
2897
        // this can be used by default if we've already partially seeded the
×
2898
        // network.
×
2899
        chanGraph := autopilot.ChannelGraphFromDatabase(s.graphDB)
×
2900
        graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph)
×
2901
        if err != nil {
×
2902
                return nil, err
×
2903
        }
×
2904
        bootStrappers = append(bootStrappers, graphBootstrapper)
×
2905

×
2906
        // If this isn't simnet mode, then one of our additional bootstrapping
×
2907
        // sources will be the set of running DNS seeds.
×
2908
        if !s.cfg.Bitcoin.SimNet {
×
2909
                dnsSeeds, ok := chainreg.ChainDNSSeeds[*s.cfg.ActiveNetParams.GenesisHash]
×
2910

×
2911
                // If we have a set of DNS seeds for this chain, then we'll add
×
2912
                // it as an additional bootstrapping source.
×
2913
                if ok {
×
2914
                        srvrLog.Infof("Creating DNS peer bootstrapper with "+
×
2915
                                "seeds: %v", dnsSeeds)
×
2916

×
2917
                        dnsBootStrapper := discovery.NewDNSSeedBootstrapper(
×
2918
                                dnsSeeds, s.cfg.net, s.cfg.ConnectionTimeout,
×
2919
                        )
×
2920
                        bootStrappers = append(bootStrappers, dnsBootStrapper)
×
2921
                }
×
2922
        }
2923

2924
        return bootStrappers, nil
×
2925
}
2926

2927
// createBootstrapIgnorePeers creates a map of peers that the bootstrap process
2928
// needs to ignore, which is made of three parts,
2929
//   - the node itself needs to be skipped as it doesn't make sense to connect
2930
//     to itself.
2931
//   - the peers that already have connections with, as in s.peersByPub.
2932
//   - the peers that we are attempting to connect, as in s.persistentPeers.
2933
func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} {
×
2934
        s.mu.RLock()
×
2935
        defer s.mu.RUnlock()
×
2936

×
2937
        ignore := make(map[autopilot.NodeID]struct{})
×
2938

×
2939
        // We should ignore ourselves from bootstrapping.
×
2940
        selfKey := autopilot.NewNodeID(s.identityECDH.PubKey())
×
2941
        ignore[selfKey] = struct{}{}
×
2942

×
2943
        // Ignore all connected peers.
×
2944
        for _, peer := range s.peersByPub {
×
2945
                nID := autopilot.NewNodeID(peer.IdentityKey())
×
2946
                ignore[nID] = struct{}{}
×
2947
        }
×
2948

2949
        // Ignore all persistent peers as they have a dedicated reconnecting
2950
        // process.
2951
        for pubKeyStr := range s.persistentPeers {
×
2952
                var nID autopilot.NodeID
×
2953
                copy(nID[:], []byte(pubKeyStr))
×
2954
                ignore[nID] = struct{}{}
×
2955
        }
×
2956

2957
        return ignore
×
2958
}
2959

2960
// peerBootstrapper is a goroutine which is tasked with attempting to establish
2961
// and maintain a target minimum number of outbound connections. With this
2962
// invariant, we ensure that our node is connected to a diverse set of peers
2963
// and that nodes newly joining the network receive an up to date network view
2964
// as soon as possible.
2965
func (s *server) peerBootstrapper(numTargetPeers uint32,
2966
        bootstrappers []discovery.NetworkPeerBootstrapper) {
×
2967

×
2968
        defer s.wg.Done()
×
2969

×
2970
        // Before we continue, init the ignore peers map.
×
2971
        ignoreList := s.createBootstrapIgnorePeers()
×
2972

×
2973
        // We'll start off by aggressively attempting connections to peers in
×
2974
        // order to be a part of the network as soon as possible.
×
2975
        s.initialPeerBootstrap(ignoreList, numTargetPeers, bootstrappers)
×
2976

×
2977
        // Once done, we'll attempt to maintain our target minimum number of
×
2978
        // peers.
×
2979
        //
×
2980
        // We'll use a 15 second backoff, and double the time every time an
×
2981
        // epoch fails up to a ceiling.
×
2982
        backOff := time.Second * 15
×
2983

×
2984
        // We'll create a new ticker to wake us up every 15 seconds so we can
×
2985
        // see if we've reached our minimum number of peers.
×
2986
        sampleTicker := time.NewTicker(backOff)
×
2987
        defer sampleTicker.Stop()
×
2988

×
2989
        // We'll use the number of attempts and errors to determine if we need
×
2990
        // to increase the time between discovery epochs.
×
2991
        var epochErrors uint32 // To be used atomically.
×
2992
        var epochAttempts uint32
×
2993

×
2994
        for {
×
2995
                select {
×
2996
                // The ticker has just woken us up, so we'll need to check if
2997
                // we need to attempt to connect our to any more peers.
2998
                case <-sampleTicker.C:
×
2999
                        // Obtain the current number of peers, so we can gauge
×
3000
                        // if we need to sample more peers or not.
×
3001
                        s.mu.RLock()
×
3002
                        numActivePeers := uint32(len(s.peersByPub))
×
3003
                        s.mu.RUnlock()
×
3004

×
3005
                        // If we have enough peers, then we can loop back
×
3006
                        // around to the next round as we're done here.
×
3007
                        if numActivePeers >= numTargetPeers {
×
3008
                                continue
×
3009
                        }
3010

3011
                        // If all of our attempts failed during this last back
3012
                        // off period, then will increase our backoff to 5
3013
                        // minute ceiling to avoid an excessive number of
3014
                        // queries
3015
                        //
3016
                        // TODO(roasbeef): add reverse policy too?
3017

3018
                        if epochAttempts > 0 &&
×
3019
                                atomic.LoadUint32(&epochErrors) >= epochAttempts {
×
3020

×
3021
                                sampleTicker.Stop()
×
3022

×
3023
                                backOff *= 2
×
3024
                                if backOff > bootstrapBackOffCeiling {
×
3025
                                        backOff = bootstrapBackOffCeiling
×
3026
                                }
×
3027

3028
                                srvrLog.Debugf("Backing off peer bootstrapper to "+
×
3029
                                        "%v", backOff)
×
3030
                                sampleTicker = time.NewTicker(backOff)
×
3031
                                continue
×
3032
                        }
3033

3034
                        atomic.StoreUint32(&epochErrors, 0)
×
3035
                        epochAttempts = 0
×
3036

×
3037
                        // Since we know need more peers, we'll compute the
×
3038
                        // exact number we need to reach our threshold.
×
3039
                        numNeeded := numTargetPeers - numActivePeers
×
3040

×
3041
                        srvrLog.Debugf("Attempting to obtain %v more network "+
×
3042
                                "peers", numNeeded)
×
3043

×
3044
                        // With the number of peers we need calculated, we'll
×
3045
                        // query the network bootstrappers to sample a set of
×
3046
                        // random addrs for us.
×
3047
                        //
×
3048
                        // Before we continue, get a copy of the ignore peers
×
3049
                        // map.
×
3050
                        ignoreList = s.createBootstrapIgnorePeers()
×
3051

×
3052
                        peerAddrs, err := discovery.MultiSourceBootstrap(
×
3053
                                ignoreList, numNeeded*2, bootstrappers...,
×
3054
                        )
×
3055
                        if err != nil {
×
3056
                                srvrLog.Errorf("Unable to retrieve bootstrap "+
×
3057
                                        "peers: %v", err)
×
3058
                                continue
×
3059
                        }
3060

3061
                        // Finally, we'll launch a new goroutine for each
3062
                        // prospective peer candidates.
3063
                        for _, addr := range peerAddrs {
×
3064
                                epochAttempts++
×
3065

×
3066
                                go func(a *lnwire.NetAddress) {
×
3067
                                        // TODO(roasbeef): can do AS, subnet,
×
3068
                                        // country diversity, etc
×
3069
                                        errChan := make(chan error, 1)
×
3070
                                        s.connectToPeer(
×
3071
                                                a, errChan,
×
3072
                                                s.cfg.ConnectionTimeout,
×
3073
                                        )
×
3074
                                        select {
×
3075
                                        case err := <-errChan:
×
3076
                                                if err == nil {
×
3077
                                                        return
×
3078
                                                }
×
3079

3080
                                                srvrLog.Errorf("Unable to "+
×
3081
                                                        "connect to %v: %v",
×
3082
                                                        a, err)
×
3083
                                                atomic.AddUint32(&epochErrors, 1)
×
3084
                                        case <-s.quit:
×
3085
                                        }
3086
                                }(addr)
3087
                        }
3088
                case <-s.quit:
×
3089
                        return
×
3090
                }
3091
        }
3092
}
3093

3094
// bootstrapBackOffCeiling is the maximum amount of time we'll wait between
3095
// failed attempts to locate a set of bootstrap peers. We'll slowly double our
3096
// query back off each time we encounter a failure.
3097
const bootstrapBackOffCeiling = time.Minute * 5
3098

3099
// initialPeerBootstrap attempts to continuously connect to peers on startup
3100
// until the target number of peers has been reached. This ensures that nodes
3101
// receive an up to date network view as soon as possible.
3102
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
3103
        numTargetPeers uint32,
3104
        bootstrappers []discovery.NetworkPeerBootstrapper) {
×
3105

×
3106
        srvrLog.Debugf("Init bootstrap with targetPeers=%v, bootstrappers=%v, "+
×
3107
                "ignore=%v", numTargetPeers, len(bootstrappers), len(ignore))
×
3108

×
3109
        // We'll start off by waiting 2 seconds between failed attempts, then
×
3110
        // double each time we fail until we hit the bootstrapBackOffCeiling.
×
3111
        var delaySignal <-chan time.Time
×
3112
        delayTime := time.Second * 2
×
3113

×
3114
        // As want to be more aggressive, we'll use a lower back off celling
×
3115
        // then the main peer bootstrap logic.
×
3116
        backOffCeiling := bootstrapBackOffCeiling / 5
×
3117

×
3118
        for attempts := 0; ; attempts++ {
×
3119
                // Check if the server has been requested to shut down in order
×
3120
                // to prevent blocking.
×
3121
                if s.Stopped() {
×
3122
                        return
×
3123
                }
×
3124

3125
                // We can exit our aggressive initial peer bootstrapping stage
3126
                // if we've reached out target number of peers.
3127
                s.mu.RLock()
×
3128
                numActivePeers := uint32(len(s.peersByPub))
×
3129
                s.mu.RUnlock()
×
3130

×
3131
                if numActivePeers >= numTargetPeers {
×
3132
                        return
×
3133
                }
×
3134

3135
                if attempts > 0 {
×
3136
                        srvrLog.Debugf("Waiting %v before trying to locate "+
×
3137
                                "bootstrap peers (attempt #%v)", delayTime,
×
3138
                                attempts)
×
3139

×
3140
                        // We've completed at least one iterating and haven't
×
3141
                        // finished, so we'll start to insert a delay period
×
3142
                        // between each attempt.
×
3143
                        delaySignal = time.After(delayTime)
×
3144
                        select {
×
3145
                        case <-delaySignal:
×
3146
                        case <-s.quit:
×
3147
                                return
×
3148
                        }
3149

3150
                        // After our delay, we'll double the time we wait up to
3151
                        // the max back off period.
3152
                        delayTime *= 2
×
3153
                        if delayTime > backOffCeiling {
×
3154
                                delayTime = backOffCeiling
×
3155
                        }
×
3156
                }
3157

3158
                // Otherwise, we'll request for the remaining number of peers
3159
                // in order to reach our target.
3160
                peersNeeded := numTargetPeers - numActivePeers
×
3161
                bootstrapAddrs, err := discovery.MultiSourceBootstrap(
×
3162
                        ignore, peersNeeded, bootstrappers...,
×
3163
                )
×
3164
                if err != nil {
×
3165
                        srvrLog.Errorf("Unable to retrieve initial bootstrap "+
×
3166
                                "peers: %v", err)
×
3167
                        continue
×
3168
                }
3169

3170
                // Then, we'll attempt to establish a connection to the
3171
                // different peer addresses retrieved by our bootstrappers.
3172
                var wg sync.WaitGroup
×
3173
                for _, bootstrapAddr := range bootstrapAddrs {
×
3174
                        wg.Add(1)
×
3175
                        go func(addr *lnwire.NetAddress) {
×
3176
                                defer wg.Done()
×
3177

×
3178
                                errChan := make(chan error, 1)
×
3179
                                go s.connectToPeer(
×
3180
                                        addr, errChan, s.cfg.ConnectionTimeout,
×
3181
                                )
×
3182

×
3183
                                // We'll only allow this connection attempt to
×
3184
                                // take up to 3 seconds. This allows us to move
×
3185
                                // quickly by discarding peers that are slowing
×
3186
                                // us down.
×
3187
                                select {
×
3188
                                case err := <-errChan:
×
3189
                                        if err == nil {
×
3190
                                                return
×
3191
                                        }
×
3192
                                        srvrLog.Errorf("Unable to connect to "+
×
3193
                                                "%v: %v", addr, err)
×
3194
                                // TODO: tune timeout? 3 seconds might be *too*
3195
                                // aggressive but works well.
3196
                                case <-time.After(3 * time.Second):
×
3197
                                        srvrLog.Tracef("Skipping peer %v due "+
×
3198
                                                "to not establishing a "+
×
3199
                                                "connection within 3 seconds",
×
3200
                                                addr)
×
3201
                                case <-s.quit:
×
3202
                                }
3203
                        }(bootstrapAddr)
3204
                }
3205

3206
                wg.Wait()
×
3207
        }
3208
}
3209

3210
// createNewHiddenService automatically sets up a v2 or v3 onion service in
3211
// order to listen for inbound connections over Tor.
3212
func (s *server) createNewHiddenService() error {
×
3213
        // Determine the different ports the server is listening on. The onion
×
3214
        // service's virtual port will map to these ports and one will be picked
×
3215
        // at random when the onion service is being accessed.
×
3216
        listenPorts := make([]int, 0, len(s.listenAddrs))
×
3217
        for _, listenAddr := range s.listenAddrs {
×
3218
                port := listenAddr.(*net.TCPAddr).Port
×
3219
                listenPorts = append(listenPorts, port)
×
3220
        }
×
3221

3222
        encrypter, err := lnencrypt.KeyRingEncrypter(s.cc.KeyRing)
×
3223
        if err != nil {
×
3224
                return err
×
3225
        }
×
3226

3227
        // Once the port mapping has been set, we can go ahead and automatically
3228
        // create our onion service. The service's private key will be saved to
3229
        // disk in order to regain access to this service when restarting `lnd`.
3230
        onionCfg := tor.AddOnionConfig{
×
3231
                VirtualPort: defaultPeerPort,
×
3232
                TargetPorts: listenPorts,
×
3233
                Store: tor.NewOnionFile(
×
3234
                        s.cfg.Tor.PrivateKeyPath, 0600, s.cfg.Tor.EncryptKey,
×
3235
                        encrypter,
×
3236
                ),
×
3237
        }
×
3238

×
3239
        switch {
×
3240
        case s.cfg.Tor.V2:
×
3241
                onionCfg.Type = tor.V2
×
3242
        case s.cfg.Tor.V3:
×
3243
                onionCfg.Type = tor.V3
×
3244
        }
3245

3246
        addr, err := s.torController.AddOnion(onionCfg)
×
3247
        if err != nil {
×
3248
                return err
×
3249
        }
×
3250

3251
        // Now that the onion service has been created, we'll add the onion
3252
        // address it can be reached at to our list of advertised addresses.
3253
        newNodeAnn, err := s.genNodeAnnouncement(
×
3254
                nil, func(currentAnn *lnwire.NodeAnnouncement) {
×
3255
                        currentAnn.Addresses = append(currentAnn.Addresses, addr)
×
3256
                },
×
3257
        )
3258
        if err != nil {
×
3259
                return fmt.Errorf("unable to generate new node "+
×
3260
                        "announcement: %v", err)
×
3261
        }
×
3262

3263
        // Finally, we'll update the on-disk version of our announcement so it
3264
        // will eventually propagate to nodes in the network.
3265
        selfNode := &models.LightningNode{
×
3266
                HaveNodeAnnouncement: true,
×
3267
                LastUpdate:           time.Unix(int64(newNodeAnn.Timestamp), 0),
×
3268
                Addresses:            newNodeAnn.Addresses,
×
3269
                Alias:                newNodeAnn.Alias.String(),
×
3270
                Features: lnwire.NewFeatureVector(
×
3271
                        newNodeAnn.Features, lnwire.Features,
×
3272
                ),
×
3273
                Color:        newNodeAnn.RGBColor,
×
3274
                AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(),
×
3275
        }
×
3276
        copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
×
3277
        if err := s.graphDB.SetSourceNode(selfNode); err != nil {
×
3278
                return fmt.Errorf("can't set self node: %w", err)
×
3279
        }
×
3280

3281
        return nil
×
3282
}
3283

3284
// findChannel finds a channel given a public key and ChannelID. It is an
3285
// optimization that is quicker than seeking for a channel given only the
3286
// ChannelID.
3287
func (s *server) findChannel(node *btcec.PublicKey, chanID lnwire.ChannelID) (
UNCOV
3288
        *channeldb.OpenChannel, error) {
×
UNCOV
3289

×
UNCOV
3290
        nodeChans, err := s.chanStateDB.FetchOpenChannels(node)
×
UNCOV
3291
        if err != nil {
×
3292
                return nil, err
×
3293
        }
×
3294

UNCOV
3295
        for _, channel := range nodeChans {
×
UNCOV
3296
                if chanID.IsChanPoint(&channel.FundingOutpoint) {
×
UNCOV
3297
                        return channel, nil
×
UNCOV
3298
                }
×
3299
        }
3300

UNCOV
3301
        return nil, fmt.Errorf("unable to find channel")
×
3302
}
3303

3304
// getNodeAnnouncement fetches the current, fully signed node announcement.
UNCOV
3305
func (s *server) getNodeAnnouncement() lnwire.NodeAnnouncement {
×
UNCOV
3306
        s.mu.Lock()
×
UNCOV
3307
        defer s.mu.Unlock()
×
UNCOV
3308

×
UNCOV
3309
        return *s.currentNodeAnn
×
UNCOV
3310
}
×
3311

3312
// genNodeAnnouncement generates and returns the current fully signed node
3313
// announcement. The time stamp of the announcement will be updated in order
3314
// to ensure it propagates through the network.
3315
func (s *server) genNodeAnnouncement(features *lnwire.RawFeatureVector,
UNCOV
3316
        modifiers ...netann.NodeAnnModifier) (lnwire.NodeAnnouncement, error) {
×
UNCOV
3317

×
UNCOV
3318
        s.mu.Lock()
×
UNCOV
3319
        defer s.mu.Unlock()
×
UNCOV
3320

×
UNCOV
3321
        // First, try to update our feature manager with the updated set of
×
UNCOV
3322
        // features.
×
UNCOV
3323
        if features != nil {
×
UNCOV
3324
                proposedFeatures := map[feature.Set]*lnwire.RawFeatureVector{
×
UNCOV
3325
                        feature.SetNodeAnn: features,
×
UNCOV
3326
                }
×
UNCOV
3327
                err := s.featureMgr.UpdateFeatureSets(proposedFeatures)
×
UNCOV
3328
                if err != nil {
×
UNCOV
3329
                        return lnwire.NodeAnnouncement{}, err
×
UNCOV
3330
                }
×
3331

3332
                // If we could successfully update our feature manager, add
3333
                // an update modifier to include these new features to our
3334
                // set.
UNCOV
3335
                modifiers = append(
×
UNCOV
3336
                        modifiers, netann.NodeAnnSetFeatures(features),
×
UNCOV
3337
                )
×
3338
        }
3339

3340
        // Always update the timestamp when refreshing to ensure the update
3341
        // propagates.
UNCOV
3342
        modifiers = append(modifiers, netann.NodeAnnSetTimestamp)
×
UNCOV
3343

×
UNCOV
3344
        // Apply the requested changes to the node announcement.
×
UNCOV
3345
        for _, modifier := range modifiers {
×
UNCOV
3346
                modifier(s.currentNodeAnn)
×
UNCOV
3347
        }
×
3348

3349
        // Sign a new update after applying all of the passed modifiers.
UNCOV
3350
        err := netann.SignNodeAnnouncement(
×
UNCOV
3351
                s.nodeSigner, s.identityKeyLoc, s.currentNodeAnn,
×
UNCOV
3352
        )
×
UNCOV
3353
        if err != nil {
×
3354
                return lnwire.NodeAnnouncement{}, err
×
3355
        }
×
3356

UNCOV
3357
        return *s.currentNodeAnn, nil
×
3358
}
3359

3360
// updateAndBroadcastSelfNode generates a new node announcement
3361
// applying the giving modifiers and updating the time stamp
3362
// to ensure it propagates through the network. Then it broadcasts
3363
// it to the network.
3364
func (s *server) updateAndBroadcastSelfNode(features *lnwire.RawFeatureVector,
UNCOV
3365
        modifiers ...netann.NodeAnnModifier) error {
×
UNCOV
3366

×
UNCOV
3367
        newNodeAnn, err := s.genNodeAnnouncement(features, modifiers...)
×
UNCOV
3368
        if err != nil {
×
UNCOV
3369
                return fmt.Errorf("unable to generate new node "+
×
UNCOV
3370
                        "announcement: %v", err)
×
UNCOV
3371
        }
×
3372

3373
        // Update the on-disk version of our announcement.
3374
        // Load and modify self node istead of creating anew instance so we
3375
        // don't risk overwriting any existing values.
UNCOV
3376
        selfNode, err := s.graphDB.SourceNode()
×
UNCOV
3377
        if err != nil {
×
3378
                return fmt.Errorf("unable to get current source node: %w", err)
×
3379
        }
×
3380

UNCOV
3381
        selfNode.HaveNodeAnnouncement = true
×
UNCOV
3382
        selfNode.LastUpdate = time.Unix(int64(newNodeAnn.Timestamp), 0)
×
UNCOV
3383
        selfNode.Addresses = newNodeAnn.Addresses
×
UNCOV
3384
        selfNode.Alias = newNodeAnn.Alias.String()
×
UNCOV
3385
        selfNode.Features = s.featureMgr.Get(feature.SetNodeAnn)
×
UNCOV
3386
        selfNode.Color = newNodeAnn.RGBColor
×
UNCOV
3387
        selfNode.AuthSigBytes = newNodeAnn.Signature.ToSignatureBytes()
×
UNCOV
3388

×
UNCOV
3389
        copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
×
UNCOV
3390

×
UNCOV
3391
        if err := s.graphDB.SetSourceNode(selfNode); err != nil {
×
3392
                return fmt.Errorf("can't set self node: %w", err)
×
3393
        }
×
3394

3395
        // Finally, propagate it to the nodes in the network.
UNCOV
3396
        err = s.BroadcastMessage(nil, &newNodeAnn)
×
UNCOV
3397
        if err != nil {
×
3398
                rpcsLog.Debugf("Unable to broadcast new node "+
×
3399
                        "announcement to peers: %v", err)
×
3400
                return err
×
3401
        }
×
3402

UNCOV
3403
        return nil
×
3404
}
3405

3406
type nodeAddresses struct {
3407
        pubKey    *btcec.PublicKey
3408
        addresses []net.Addr
3409
}
3410

3411
// establishPersistentConnections attempts to establish persistent connections
3412
// to all our direct channel collaborators. In order to promote liveness of our
3413
// active channels, we instruct the connection manager to attempt to establish
3414
// and maintain persistent connections to all our direct channel counterparties.
UNCOV
3415
func (s *server) establishPersistentConnections() error {
×
UNCOV
3416
        // nodeAddrsMap stores the combination of node public keys and addresses
×
UNCOV
3417
        // that we'll attempt to reconnect to. PubKey strings are used as keys
×
UNCOV
3418
        // since other PubKey forms can't be compared.
×
UNCOV
3419
        nodeAddrsMap := map[string]*nodeAddresses{}
×
UNCOV
3420

×
UNCOV
3421
        // Iterate through the list of LinkNodes to find addresses we should
×
UNCOV
3422
        // attempt to connect to based on our set of previous connections. Set
×
UNCOV
3423
        // the reconnection port to the default peer port.
×
UNCOV
3424
        linkNodes, err := s.chanStateDB.LinkNodeDB().FetchAllLinkNodes()
×
UNCOV
3425
        if err != nil && err != channeldb.ErrLinkNodesNotFound {
×
3426
                return err
×
3427
        }
×
UNCOV
3428
        for _, node := range linkNodes {
×
UNCOV
3429
                pubStr := string(node.IdentityPub.SerializeCompressed())
×
UNCOV
3430
                nodeAddrs := &nodeAddresses{
×
UNCOV
3431
                        pubKey:    node.IdentityPub,
×
UNCOV
3432
                        addresses: node.Addresses,
×
UNCOV
3433
                }
×
UNCOV
3434
                nodeAddrsMap[pubStr] = nodeAddrs
×
UNCOV
3435
        }
×
3436

3437
        // After checking our previous connections for addresses to connect to,
3438
        // iterate through the nodes in our channel graph to find addresses
3439
        // that have been added via NodeAnnouncement messages.
UNCOV
3440
        sourceNode, err := s.graphDB.SourceNode()
×
UNCOV
3441
        if err != nil {
×
3442
                return err
×
3443
        }
×
3444

3445
        // TODO(roasbeef): instead iterate over link nodes and query graph for
3446
        // each of the nodes.
UNCOV
3447
        selfPub := s.identityECDH.PubKey().SerializeCompressed()
×
UNCOV
3448
        err = s.graphDB.ForEachNodeChannel(sourceNode.PubKeyBytes, func(
×
UNCOV
3449
                tx kvdb.RTx,
×
UNCOV
3450
                chanInfo *models.ChannelEdgeInfo,
×
UNCOV
3451
                policy, _ *models.ChannelEdgePolicy) error {
×
UNCOV
3452

×
UNCOV
3453
                // If the remote party has announced the channel to us, but we
×
UNCOV
3454
                // haven't yet, then we won't have a policy. However, we don't
×
UNCOV
3455
                // need this to connect to the peer, so we'll log it and move on.
×
UNCOV
3456
                if policy == nil {
×
3457
                        srvrLog.Warnf("No channel policy found for "+
×
3458
                                "ChannelPoint(%v): ", chanInfo.ChannelPoint)
×
3459
                }
×
3460

3461
                // We'll now fetch the peer opposite from us within this
3462
                // channel so we can queue up a direct connection to them.
UNCOV
3463
                channelPeer, err := s.graphDB.FetchOtherNode(
×
UNCOV
3464
                        tx, chanInfo, selfPub,
×
UNCOV
3465
                )
×
UNCOV
3466
                if err != nil {
×
3467
                        return fmt.Errorf("unable to fetch channel peer for "+
×
3468
                                "ChannelPoint(%v): %v", chanInfo.ChannelPoint,
×
3469
                                err)
×
3470
                }
×
3471

UNCOV
3472
                pubStr := string(channelPeer.PubKeyBytes[:])
×
UNCOV
3473

×
UNCOV
3474
                // Add all unique addresses from channel
×
UNCOV
3475
                // graph/NodeAnnouncements to the list of addresses we'll
×
UNCOV
3476
                // connect to for this peer.
×
UNCOV
3477
                addrSet := make(map[string]net.Addr)
×
UNCOV
3478
                for _, addr := range channelPeer.Addresses {
×
UNCOV
3479
                        switch addr.(type) {
×
UNCOV
3480
                        case *net.TCPAddr:
×
UNCOV
3481
                                addrSet[addr.String()] = addr
×
3482

3483
                        // We'll only attempt to connect to Tor addresses if Tor
3484
                        // outbound support is enabled.
3485
                        case *tor.OnionAddr:
×
3486
                                if s.cfg.Tor.Active {
×
3487
                                        addrSet[addr.String()] = addr
×
3488
                                }
×
3489
                        }
3490
                }
3491

3492
                // If this peer is also recorded as a link node, we'll add any
3493
                // additional addresses that have not already been selected.
UNCOV
3494
                linkNodeAddrs, ok := nodeAddrsMap[pubStr]
×
UNCOV
3495
                if ok {
×
UNCOV
3496
                        for _, lnAddress := range linkNodeAddrs.addresses {
×
UNCOV
3497
                                switch lnAddress.(type) {
×
UNCOV
3498
                                case *net.TCPAddr:
×
UNCOV
3499
                                        addrSet[lnAddress.String()] = lnAddress
×
3500

3501
                                // We'll only attempt to connect to Tor
3502
                                // addresses if Tor outbound support is enabled.
3503
                                case *tor.OnionAddr:
×
3504
                                        if s.cfg.Tor.Active {
×
3505
                                                addrSet[lnAddress.String()] = lnAddress
×
3506
                                        }
×
3507
                                }
3508
                        }
3509
                }
3510

3511
                // Construct a slice of the deduped addresses.
UNCOV
3512
                var addrs []net.Addr
×
UNCOV
3513
                for _, addr := range addrSet {
×
UNCOV
3514
                        addrs = append(addrs, addr)
×
UNCOV
3515
                }
×
3516

UNCOV
3517
                n := &nodeAddresses{
×
UNCOV
3518
                        addresses: addrs,
×
UNCOV
3519
                }
×
UNCOV
3520
                n.pubKey, err = channelPeer.PubKey()
×
UNCOV
3521
                if err != nil {
×
3522
                        return err
×
3523
                }
×
3524

UNCOV
3525
                nodeAddrsMap[pubStr] = n
×
UNCOV
3526
                return nil
×
3527
        })
UNCOV
3528
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
×
3529
                return err
×
3530
        }
×
3531

UNCOV
3532
        srvrLog.Debugf("Establishing %v persistent connections on start",
×
UNCOV
3533
                len(nodeAddrsMap))
×
UNCOV
3534

×
UNCOV
3535
        // Acquire and hold server lock until all persistent connection requests
×
UNCOV
3536
        // have been recorded and sent to the connection manager.
×
UNCOV
3537
        s.mu.Lock()
×
UNCOV
3538
        defer s.mu.Unlock()
×
UNCOV
3539

×
UNCOV
3540
        // Iterate through the combined list of addresses from prior links and
×
UNCOV
3541
        // node announcements and attempt to reconnect to each node.
×
UNCOV
3542
        var numOutboundConns int
×
UNCOV
3543
        for pubStr, nodeAddr := range nodeAddrsMap {
×
UNCOV
3544
                // Add this peer to the set of peers we should maintain a
×
UNCOV
3545
                // persistent connection with. We set the value to false to
×
UNCOV
3546
                // indicate that we should not continue to reconnect if the
×
UNCOV
3547
                // number of channels returns to zero, since this peer has not
×
UNCOV
3548
                // been requested as perm by the user.
×
UNCOV
3549
                s.persistentPeers[pubStr] = false
×
UNCOV
3550
                if _, ok := s.persistentPeersBackoff[pubStr]; !ok {
×
UNCOV
3551
                        s.persistentPeersBackoff[pubStr] = s.cfg.MinBackoff
×
UNCOV
3552
                }
×
3553

UNCOV
3554
                for _, address := range nodeAddr.addresses {
×
UNCOV
3555
                        // Create a wrapper address which couples the IP and
×
UNCOV
3556
                        // the pubkey so the brontide authenticated connection
×
UNCOV
3557
                        // can be established.
×
UNCOV
3558
                        lnAddr := &lnwire.NetAddress{
×
UNCOV
3559
                                IdentityKey: nodeAddr.pubKey,
×
UNCOV
3560
                                Address:     address,
×
UNCOV
3561
                        }
×
UNCOV
3562

×
UNCOV
3563
                        s.persistentPeerAddrs[pubStr] = append(
×
UNCOV
3564
                                s.persistentPeerAddrs[pubStr], lnAddr)
×
UNCOV
3565
                }
×
3566

3567
                // We'll connect to the first 10 peers immediately, then
3568
                // randomly stagger any remaining connections if the
3569
                // stagger initial reconnect flag is set. This ensures
3570
                // that mobile nodes or nodes with a small number of
3571
                // channels obtain connectivity quickly, but larger
3572
                // nodes are able to disperse the costs of connecting to
3573
                // all peers at once.
UNCOV
3574
                if numOutboundConns < numInstantInitReconnect ||
×
UNCOV
3575
                        !s.cfg.StaggerInitialReconnect {
×
UNCOV
3576

×
UNCOV
3577
                        go s.connectToPersistentPeer(pubStr)
×
UNCOV
3578
                } else {
×
3579
                        go s.delayInitialReconnect(pubStr)
×
3580
                }
×
3581

UNCOV
3582
                numOutboundConns++
×
3583
        }
3584

UNCOV
3585
        return nil
×
3586
}
3587

3588
// delayInitialReconnect will attempt a reconnection to the given peer after
3589
// sampling a value for the delay between 0s and the maxInitReconnectDelay.
3590
//
3591
// NOTE: This method MUST be run as a goroutine.
3592
func (s *server) delayInitialReconnect(pubStr string) {
×
3593
        delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second
×
3594
        select {
×
3595
        case <-time.After(delay):
×
3596
                s.connectToPersistentPeer(pubStr)
×
3597
        case <-s.quit:
×
3598
        }
3599
}
3600

3601
// prunePersistentPeerConnection removes all internal state related to
3602
// persistent connections to a peer within the server. This is used to avoid
3603
// persistent connection retries to peers we do not have any open channels with.
UNCOV
3604
func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
×
UNCOV
3605
        pubKeyStr := string(compressedPubKey[:])
×
UNCOV
3606

×
UNCOV
3607
        s.mu.Lock()
×
UNCOV
3608
        if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm {
×
UNCOV
3609
                delete(s.persistentPeers, pubKeyStr)
×
UNCOV
3610
                delete(s.persistentPeersBackoff, pubKeyStr)
×
UNCOV
3611
                delete(s.persistentPeerAddrs, pubKeyStr)
×
UNCOV
3612
                s.cancelConnReqs(pubKeyStr, nil)
×
UNCOV
3613
                s.mu.Unlock()
×
UNCOV
3614

×
UNCOV
3615
                srvrLog.Infof("Pruned peer %x from persistent connections, "+
×
UNCOV
3616
                        "peer has no open channels", compressedPubKey)
×
UNCOV
3617

×
UNCOV
3618
                return
×
UNCOV
3619
        }
×
UNCOV
3620
        s.mu.Unlock()
×
3621
}
3622

3623
// BroadcastMessage sends a request to the server to broadcast a set of
3624
// messages to all peers other than the one specified by the `skips` parameter.
3625
// All messages sent via BroadcastMessage will be queued for lazy delivery to
3626
// the target peers.
3627
//
3628
// NOTE: This function is safe for concurrent access.
3629
func (s *server) BroadcastMessage(skips map[route.Vertex]struct{},
UNCOV
3630
        msgs ...lnwire.Message) error {
×
UNCOV
3631

×
UNCOV
3632
        // Filter out peers found in the skips map. We synchronize access to
×
UNCOV
3633
        // peersByPub throughout this process to ensure we deliver messages to
×
UNCOV
3634
        // exact set of peers present at the time of invocation.
×
UNCOV
3635
        s.mu.RLock()
×
UNCOV
3636
        peers := make([]*peer.Brontide, 0, len(s.peersByPub))
×
UNCOV
3637
        for pubStr, sPeer := range s.peersByPub {
×
UNCOV
3638
                if skips != nil {
×
UNCOV
3639
                        if _, ok := skips[sPeer.PubKey()]; ok {
×
UNCOV
3640
                                srvrLog.Tracef("Skipping %x in broadcast with "+
×
UNCOV
3641
                                        "pubStr=%x", sPeer.PubKey(), pubStr)
×
UNCOV
3642
                                continue
×
3643
                        }
3644
                }
3645

UNCOV
3646
                peers = append(peers, sPeer)
×
3647
        }
UNCOV
3648
        s.mu.RUnlock()
×
UNCOV
3649

×
UNCOV
3650
        // Iterate over all known peers, dispatching a go routine to enqueue
×
UNCOV
3651
        // all messages to each of peers.
×
UNCOV
3652
        var wg sync.WaitGroup
×
UNCOV
3653
        for _, sPeer := range peers {
×
UNCOV
3654
                srvrLog.Debugf("Sending %v messages to peer %x", len(msgs),
×
UNCOV
3655
                        sPeer.PubKey())
×
UNCOV
3656

×
UNCOV
3657
                // Dispatch a go routine to enqueue all messages to this peer.
×
UNCOV
3658
                wg.Add(1)
×
UNCOV
3659
                s.wg.Add(1)
×
UNCOV
3660
                go func(p lnpeer.Peer) {
×
UNCOV
3661
                        defer s.wg.Done()
×
UNCOV
3662
                        defer wg.Done()
×
UNCOV
3663

×
UNCOV
3664
                        p.SendMessageLazy(false, msgs...)
×
UNCOV
3665
                }(sPeer)
×
3666
        }
3667

3668
        // Wait for all messages to have been dispatched before returning to
3669
        // caller.
UNCOV
3670
        wg.Wait()
×
UNCOV
3671

×
UNCOV
3672
        return nil
×
3673
}
3674

3675
// NotifyWhenOnline can be called by other subsystems to get notified when a
3676
// particular peer comes online. The peer itself is sent across the peerChan.
3677
//
3678
// NOTE: This function is safe for concurrent access.
3679
func (s *server) NotifyWhenOnline(peerKey [33]byte,
UNCOV
3680
        peerChan chan<- lnpeer.Peer) {
×
UNCOV
3681

×
UNCOV
3682
        s.mu.Lock()
×
UNCOV
3683

×
UNCOV
3684
        // Compute the target peer's identifier.
×
UNCOV
3685
        pubStr := string(peerKey[:])
×
UNCOV
3686

×
UNCOV
3687
        // Check if peer is connected.
×
UNCOV
3688
        peer, ok := s.peersByPub[pubStr]
×
UNCOV
3689
        if ok {
×
UNCOV
3690
                // Unlock here so that the mutex isn't held while we are
×
UNCOV
3691
                // waiting for the peer to become active.
×
UNCOV
3692
                s.mu.Unlock()
×
UNCOV
3693

×
UNCOV
3694
                // Wait until the peer signals that it is actually active
×
UNCOV
3695
                // rather than only in the server's maps.
×
UNCOV
3696
                select {
×
UNCOV
3697
                case <-peer.ActiveSignal():
×
3698
                case <-peer.QuitSignal():
×
3699
                        // The peer quit, so we'll add the channel to the slice
×
3700
                        // and return.
×
3701
                        s.mu.Lock()
×
3702
                        s.peerConnectedListeners[pubStr] = append(
×
3703
                                s.peerConnectedListeners[pubStr], peerChan,
×
3704
                        )
×
3705
                        s.mu.Unlock()
×
3706
                        return
×
3707
                }
3708

3709
                // Connected, can return early.
UNCOV
3710
                srvrLog.Debugf("Notifying that peer %x is online", peerKey)
×
UNCOV
3711

×
UNCOV
3712
                select {
×
UNCOV
3713
                case peerChan <- peer:
×
3714
                case <-s.quit:
×
3715
                }
3716

UNCOV
3717
                return
×
3718
        }
3719

3720
        // Not connected, store this listener such that it can be notified when
3721
        // the peer comes online.
UNCOV
3722
        s.peerConnectedListeners[pubStr] = append(
×
UNCOV
3723
                s.peerConnectedListeners[pubStr], peerChan,
×
UNCOV
3724
        )
×
UNCOV
3725
        s.mu.Unlock()
×
3726
}
3727

3728
// NotifyWhenOffline delivers a notification to the caller of when the peer with
3729
// the given public key has been disconnected. The notification is signaled by
3730
// closing the channel returned.
UNCOV
3731
func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
×
UNCOV
3732
        s.mu.Lock()
×
UNCOV
3733
        defer s.mu.Unlock()
×
UNCOV
3734

×
UNCOV
3735
        c := make(chan struct{})
×
UNCOV
3736

×
UNCOV
3737
        // If the peer is already offline, we can immediately trigger the
×
UNCOV
3738
        // notification.
×
UNCOV
3739
        peerPubKeyStr := string(peerPubKey[:])
×
UNCOV
3740
        if _, ok := s.peersByPub[peerPubKeyStr]; !ok {
×
3741
                srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey)
×
3742
                close(c)
×
3743
                return c
×
3744
        }
×
3745

3746
        // Otherwise, the peer is online, so we'll keep track of the channel to
3747
        // trigger the notification once the server detects the peer
3748
        // disconnects.
UNCOV
3749
        s.peerDisconnectedListeners[peerPubKeyStr] = append(
×
UNCOV
3750
                s.peerDisconnectedListeners[peerPubKeyStr], c,
×
UNCOV
3751
        )
×
UNCOV
3752

×
UNCOV
3753
        return c
×
3754
}
3755

3756
// FindPeer will return the peer that corresponds to the passed in public key.
3757
// This function is used by the funding manager, allowing it to update the
3758
// daemon's local representation of the remote peer.
3759
//
3760
// NOTE: This function is safe for concurrent access.
UNCOV
3761
func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer.Brontide, error) {
×
UNCOV
3762
        s.mu.RLock()
×
UNCOV
3763
        defer s.mu.RUnlock()
×
UNCOV
3764

×
UNCOV
3765
        pubStr := string(peerKey.SerializeCompressed())
×
UNCOV
3766

×
UNCOV
3767
        return s.findPeerByPubStr(pubStr)
×
UNCOV
3768
}
×
3769

3770
// FindPeerByPubStr will return the peer that corresponds to the passed peerID,
3771
// which should be a string representation of the peer's serialized, compressed
3772
// public key.
3773
//
3774
// NOTE: This function is safe for concurrent access.
UNCOV
3775
func (s *server) FindPeerByPubStr(pubStr string) (*peer.Brontide, error) {
×
UNCOV
3776
        s.mu.RLock()
×
UNCOV
3777
        defer s.mu.RUnlock()
×
UNCOV
3778

×
UNCOV
3779
        return s.findPeerByPubStr(pubStr)
×
UNCOV
3780
}
×
3781

3782
// findPeerByPubStr is an internal method that retrieves the specified peer from
3783
// the server's internal state using.
UNCOV
3784
func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) {
×
UNCOV
3785
        peer, ok := s.peersByPub[pubStr]
×
UNCOV
3786
        if !ok {
×
UNCOV
3787
                return nil, ErrPeerNotConnected
×
UNCOV
3788
        }
×
3789

UNCOV
3790
        return peer, nil
×
3791
}
3792

3793
// nextPeerBackoff computes the next backoff duration for a peer's pubkey using
3794
// exponential backoff. If no previous backoff was known, the default is
3795
// returned.
3796
func (s *server) nextPeerBackoff(pubStr string,
UNCOV
3797
        startTime time.Time) time.Duration {
×
UNCOV
3798

×
UNCOV
3799
        // Now, determine the appropriate backoff to use for the retry.
×
UNCOV
3800
        backoff, ok := s.persistentPeersBackoff[pubStr]
×
UNCOV
3801
        if !ok {
×
UNCOV
3802
                // If an existing backoff was unknown, use the default.
×
UNCOV
3803
                return s.cfg.MinBackoff
×
UNCOV
3804
        }
×
3805

3806
        // If the peer failed to start properly, we'll just use the previous
3807
        // backoff to compute the subsequent randomized exponential backoff
3808
        // duration. This will roughly double on average.
UNCOV
3809
        if startTime.IsZero() {
×
3810
                return computeNextBackoff(backoff, s.cfg.MaxBackoff)
×
3811
        }
×
3812

3813
        // The peer succeeded in starting. If the connection didn't last long
3814
        // enough to be considered stable, we'll continue to back off retries
3815
        // with this peer.
UNCOV
3816
        connDuration := time.Since(startTime)
×
UNCOV
3817
        if connDuration < defaultStableConnDuration {
×
UNCOV
3818
                return computeNextBackoff(backoff, s.cfg.MaxBackoff)
×
UNCOV
3819
        }
×
3820

3821
        // The peer succeed in starting and this was stable peer, so we'll
3822
        // reduce the timeout duration by the length of the connection after
3823
        // applying randomized exponential backoff. We'll only apply this in the
3824
        // case that:
3825
        //   reb(curBackoff) - connDuration > cfg.MinBackoff
3826
        relaxedBackoff := computeNextBackoff(backoff, s.cfg.MaxBackoff) - connDuration
×
3827
        if relaxedBackoff > s.cfg.MinBackoff {
×
3828
                return relaxedBackoff
×
3829
        }
×
3830

3831
        // Lastly, if reb(currBackoff) - connDuration <= cfg.MinBackoff, meaning
3832
        // the stable connection lasted much longer than our previous backoff.
3833
        // To reward such good behavior, we'll reconnect after the default
3834
        // timeout.
3835
        return s.cfg.MinBackoff
×
3836
}
3837

3838
// shouldDropLocalConnection determines if our local connection to a remote peer
3839
// should be dropped in the case of concurrent connection establishment. In
3840
// order to deterministically decide which connection should be dropped, we'll
3841
// utilize the ordering of the local and remote public key. If we didn't use
3842
// such a tie breaker, then we risk _both_ connections erroneously being
3843
// dropped.
3844
func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool {
×
3845
        localPubBytes := local.SerializeCompressed()
×
3846
        remotePubPbytes := remote.SerializeCompressed()
×
3847

×
3848
        // The connection that comes from the node with a "smaller" pubkey
×
3849
        // should be kept. Therefore, if our pubkey is "greater" than theirs, we
×
3850
        // should drop our established connection.
×
3851
        return bytes.Compare(localPubBytes, remotePubPbytes) > 0
×
3852
}
×
3853

3854
// InboundPeerConnected initializes a new peer in response to a new inbound
3855
// connection.
3856
//
3857
// NOTE: This function is safe for concurrent access.
UNCOV
3858
func (s *server) InboundPeerConnected(conn net.Conn) {
×
UNCOV
3859
        // Exit early if we have already been instructed to shutdown, this
×
UNCOV
3860
        // prevents any delayed callbacks from accidentally registering peers.
×
UNCOV
3861
        if s.Stopped() {
×
3862
                return
×
3863
        }
×
3864

UNCOV
3865
        nodePub := conn.(*brontide.Conn).RemotePub()
×
UNCOV
3866
        pubSer := nodePub.SerializeCompressed()
×
UNCOV
3867
        pubStr := string(pubSer)
×
UNCOV
3868

×
UNCOV
3869
        var pubBytes [33]byte
×
UNCOV
3870
        copy(pubBytes[:], pubSer)
×
UNCOV
3871

×
UNCOV
3872
        s.mu.Lock()
×
UNCOV
3873
        defer s.mu.Unlock()
×
UNCOV
3874

×
UNCOV
3875
        // If the remote node's public key is banned, drop the connection.
×
UNCOV
3876
        shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
×
UNCOV
3877
        if dcErr != nil {
×
3878
                srvrLog.Errorf("Unable to check if we should disconnect "+
×
3879
                        "peer: %v", dcErr)
×
3880
                conn.Close()
×
3881

×
3882
                return
×
3883
        }
×
3884

UNCOV
3885
        if shouldDc {
×
3886
                srvrLog.Debugf("Dropping connection for %v since they are "+
×
3887
                        "banned.", pubSer)
×
3888

×
3889
                conn.Close()
×
3890

×
3891
                return
×
3892
        }
×
3893

3894
        // If we already have an outbound connection to this peer, then ignore
3895
        // this new connection.
UNCOV
3896
        if p, ok := s.outboundPeers[pubStr]; ok {
×
UNCOV
3897
                srvrLog.Debugf("Already have outbound connection for %v, "+
×
UNCOV
3898
                        "ignoring inbound connection from local=%v, remote=%v",
×
UNCOV
3899
                        p, conn.LocalAddr(), conn.RemoteAddr())
×
UNCOV
3900

×
UNCOV
3901
                conn.Close()
×
UNCOV
3902
                return
×
UNCOV
3903
        }
×
3904

3905
        // If we already have a valid connection that is scheduled to take
3906
        // precedence once the prior peer has finished disconnecting, we'll
3907
        // ignore this connection.
UNCOV
3908
        if p, ok := s.scheduledPeerConnection[pubStr]; ok {
×
3909
                srvrLog.Debugf("Ignoring connection from %v, peer %v already "+
×
3910
                        "scheduled", conn.RemoteAddr(), p)
×
3911
                conn.Close()
×
3912
                return
×
3913
        }
×
3914

UNCOV
3915
        srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
×
UNCOV
3916

×
UNCOV
3917
        // Check to see if we already have a connection with this peer. If so,
×
UNCOV
3918
        // we may need to drop our existing connection. This prevents us from
×
UNCOV
3919
        // having duplicate connections to the same peer. We forgo adding a
×
UNCOV
3920
        // default case as we expect these to be the only error values returned
×
UNCOV
3921
        // from findPeerByPubStr.
×
UNCOV
3922
        connectedPeer, err := s.findPeerByPubStr(pubStr)
×
UNCOV
3923
        switch err {
×
UNCOV
3924
        case ErrPeerNotConnected:
×
UNCOV
3925
                // We were unable to locate an existing connection with the
×
UNCOV
3926
                // target peer, proceed to connect.
×
UNCOV
3927
                s.cancelConnReqs(pubStr, nil)
×
UNCOV
3928
                s.peerConnected(conn, nil, true)
×
3929

3930
        case nil:
×
3931
                // We already have a connection with the incoming peer. If the
×
3932
                // connection we've already established should be kept and is
×
3933
                // not of the same type of the new connection (inbound), then
×
3934
                // we'll close out the new connection s.t there's only a single
×
3935
                // connection between us.
×
3936
                localPub := s.identityECDH.PubKey()
×
3937
                if !connectedPeer.Inbound() &&
×
3938
                        !shouldDropLocalConnection(localPub, nodePub) {
×
3939

×
3940
                        srvrLog.Warnf("Received inbound connection from "+
×
3941
                                "peer %v, but already have outbound "+
×
3942
                                "connection, dropping conn", connectedPeer)
×
3943
                        conn.Close()
×
3944
                        return
×
3945
                }
×
3946

3947
                // Otherwise, if we should drop the connection, then we'll
3948
                // disconnect our already connected peer.
3949
                srvrLog.Debugf("Disconnecting stale connection to %v",
×
3950
                        connectedPeer)
×
3951

×
3952
                s.cancelConnReqs(pubStr, nil)
×
3953

×
3954
                // Remove the current peer from the server's internal state and
×
3955
                // signal that the peer termination watcher does not need to
×
3956
                // execute for this peer.
×
3957
                s.removePeer(connectedPeer)
×
3958
                s.ignorePeerTermination[connectedPeer] = struct{}{}
×
3959
                s.scheduledPeerConnection[pubStr] = func() {
×
3960
                        s.peerConnected(conn, nil, true)
×
3961
                }
×
3962
        }
3963
}
3964

3965
// OutboundPeerConnected initializes a new peer in response to a new outbound
3966
// connection.
3967
// NOTE: This function is safe for concurrent access.
UNCOV
3968
func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
×
UNCOV
3969
        // Exit early if we have already been instructed to shutdown, this
×
UNCOV
3970
        // prevents any delayed callbacks from accidentally registering peers.
×
UNCOV
3971
        if s.Stopped() {
×
3972
                return
×
3973
        }
×
3974

UNCOV
3975
        nodePub := conn.(*brontide.Conn).RemotePub()
×
UNCOV
3976
        pubSer := nodePub.SerializeCompressed()
×
UNCOV
3977
        pubStr := string(pubSer)
×
UNCOV
3978

×
UNCOV
3979
        var pubBytes [33]byte
×
UNCOV
3980
        copy(pubBytes[:], pubSer)
×
UNCOV
3981

×
UNCOV
3982
        s.mu.Lock()
×
UNCOV
3983
        defer s.mu.Unlock()
×
UNCOV
3984

×
UNCOV
3985
        // If the remote node's public key is banned, drop the connection.
×
UNCOV
3986
        shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
×
UNCOV
3987
        if dcErr != nil {
×
3988
                srvrLog.Errorf("Unable to check if we should disconnect "+
×
3989
                        "peer: %v", dcErr)
×
3990
                conn.Close()
×
3991

×
3992
                return
×
3993
        }
×
3994

UNCOV
3995
        if shouldDc {
×
3996
                srvrLog.Debugf("Dropping connection for %v since they are "+
×
3997
                        "banned.", pubSer)
×
3998

×
3999
                if connReq != nil {
×
4000
                        s.connMgr.Remove(connReq.ID())
×
4001
                }
×
4002

4003
                conn.Close()
×
4004

×
4005
                return
×
4006
        }
4007

4008
        // If we already have an inbound connection to this peer, then ignore
4009
        // this new connection.
UNCOV
4010
        if p, ok := s.inboundPeers[pubStr]; ok {
×
UNCOV
4011
                srvrLog.Debugf("Already have inbound connection for %v, "+
×
UNCOV
4012
                        "ignoring outbound connection from local=%v, remote=%v",
×
UNCOV
4013
                        p, conn.LocalAddr(), conn.RemoteAddr())
×
UNCOV
4014

×
UNCOV
4015
                if connReq != nil {
×
UNCOV
4016
                        s.connMgr.Remove(connReq.ID())
×
UNCOV
4017
                }
×
UNCOV
4018
                conn.Close()
×
UNCOV
4019
                return
×
4020
        }
UNCOV
4021
        if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
×
4022
                srvrLog.Debugf("Ignoring canceled outbound connection")
×
4023
                s.connMgr.Remove(connReq.ID())
×
4024
                conn.Close()
×
4025
                return
×
4026
        }
×
4027

4028
        // If we already have a valid connection that is scheduled to take
4029
        // precedence once the prior peer has finished disconnecting, we'll
4030
        // ignore this connection.
UNCOV
4031
        if _, ok := s.scheduledPeerConnection[pubStr]; ok {
×
4032
                srvrLog.Debugf("Ignoring connection, peer already scheduled")
×
4033

×
4034
                if connReq != nil {
×
4035
                        s.connMgr.Remove(connReq.ID())
×
4036
                }
×
4037

4038
                conn.Close()
×
4039
                return
×
4040
        }
4041

UNCOV
4042
        srvrLog.Infof("Established connection to: %x@%v", pubStr,
×
UNCOV
4043
                conn.RemoteAddr())
×
UNCOV
4044

×
UNCOV
4045
        if connReq != nil {
×
UNCOV
4046
                // A successful connection was returned by the connmgr.
×
UNCOV
4047
                // Immediately cancel all pending requests, excluding the
×
UNCOV
4048
                // outbound connection we just established.
×
UNCOV
4049
                ignore := connReq.ID()
×
UNCOV
4050
                s.cancelConnReqs(pubStr, &ignore)
×
UNCOV
4051
        } else {
×
UNCOV
4052
                // This was a successful connection made by some other
×
UNCOV
4053
                // subsystem. Remove all requests being managed by the connmgr.
×
UNCOV
4054
                s.cancelConnReqs(pubStr, nil)
×
UNCOV
4055
        }
×
4056

4057
        // If we already have a connection with this peer, decide whether or not
4058
        // we need to drop the stale connection. We forgo adding a default case
4059
        // as we expect these to be the only error values returned from
4060
        // findPeerByPubStr.
UNCOV
4061
        connectedPeer, err := s.findPeerByPubStr(pubStr)
×
UNCOV
4062
        switch err {
×
UNCOV
4063
        case ErrPeerNotConnected:
×
UNCOV
4064
                // We were unable to locate an existing connection with the
×
UNCOV
4065
                // target peer, proceed to connect.
×
UNCOV
4066
                s.peerConnected(conn, connReq, false)
×
4067

4068
        case nil:
×
4069
                // We already have a connection with the incoming peer. If the
×
4070
                // connection we've already established should be kept and is
×
4071
                // not of the same type of the new connection (outbound), then
×
4072
                // we'll close out the new connection s.t there's only a single
×
4073
                // connection between us.
×
4074
                localPub := s.identityECDH.PubKey()
×
4075
                if connectedPeer.Inbound() &&
×
4076
                        shouldDropLocalConnection(localPub, nodePub) {
×
4077

×
4078
                        srvrLog.Warnf("Established outbound connection to "+
×
4079
                                "peer %v, but already have inbound "+
×
4080
                                "connection, dropping conn", connectedPeer)
×
4081
                        if connReq != nil {
×
4082
                                s.connMgr.Remove(connReq.ID())
×
4083
                        }
×
4084
                        conn.Close()
×
4085
                        return
×
4086
                }
4087

4088
                // Otherwise, _their_ connection should be dropped. So we'll
4089
                // disconnect the peer and send the now obsolete peer to the
4090
                // server for garbage collection.
4091
                srvrLog.Debugf("Disconnecting stale connection to %v",
×
4092
                        connectedPeer)
×
4093

×
4094
                // Remove the current peer from the server's internal state and
×
4095
                // signal that the peer termination watcher does not need to
×
4096
                // execute for this peer.
×
4097
                s.removePeer(connectedPeer)
×
4098
                s.ignorePeerTermination[connectedPeer] = struct{}{}
×
4099
                s.scheduledPeerConnection[pubStr] = func() {
×
4100
                        s.peerConnected(conn, connReq, false)
×
4101
                }
×
4102
        }
4103
}
4104

4105
// UnassignedConnID is the default connection ID that a request can have before
4106
// it actually is submitted to the connmgr.
4107
// TODO(conner): move into connmgr package, or better, add connmgr method for
4108
// generating atomic IDs
4109
const UnassignedConnID uint64 = 0
4110

4111
// cancelConnReqs stops all persistent connection requests for a given pubkey.
4112
// Any attempts initiated by the peerTerminationWatcher are canceled first.
4113
// Afterwards, each connection request removed from the connmgr. The caller can
4114
// optionally specify a connection ID to ignore, which prevents us from
4115
// canceling a successful request. All persistent connreqs for the provided
4116
// pubkey are discarded after the operationjw.
UNCOV
4117
func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
×
UNCOV
4118
        // First, cancel any lingering persistent retry attempts, which will
×
UNCOV
4119
        // prevent retries for any with backoffs that are still maturing.
×
UNCOV
4120
        if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok {
×
UNCOV
4121
                close(cancelChan)
×
UNCOV
4122
                delete(s.persistentRetryCancels, pubStr)
×
UNCOV
4123
        }
×
4124

4125
        // Next, check to see if we have any outstanding persistent connection
4126
        // requests to this peer. If so, then we'll remove all of these
4127
        // connection requests, and also delete the entry from the map.
UNCOV
4128
        connReqs, ok := s.persistentConnReqs[pubStr]
×
UNCOV
4129
        if !ok {
×
UNCOV
4130
                return
×
UNCOV
4131
        }
×
4132

UNCOV
4133
        for _, connReq := range connReqs {
×
UNCOV
4134
                srvrLog.Tracef("Canceling %s:", connReqs)
×
UNCOV
4135

×
UNCOV
4136
                // Atomically capture the current request identifier.
×
UNCOV
4137
                connID := connReq.ID()
×
UNCOV
4138

×
UNCOV
4139
                // Skip any zero IDs, this indicates the request has not
×
UNCOV
4140
                // yet been schedule.
×
UNCOV
4141
                if connID == UnassignedConnID {
×
4142
                        continue
×
4143
                }
4144

4145
                // Skip a particular connection ID if instructed.
UNCOV
4146
                if skip != nil && connID == *skip {
×
UNCOV
4147
                        continue
×
4148
                }
4149

UNCOV
4150
                s.connMgr.Remove(connID)
×
4151
        }
4152

UNCOV
4153
        delete(s.persistentConnReqs, pubStr)
×
4154
}
4155

4156
// handleCustomMessage dispatches an incoming custom peers message to
4157
// subscribers.
UNCOV
4158
func (s *server) handleCustomMessage(peer [33]byte, msg *lnwire.Custom) error {
×
UNCOV
4159
        srvrLog.Debugf("Custom message received: peer=%x, type=%d",
×
UNCOV
4160
                peer, msg.Type)
×
UNCOV
4161

×
UNCOV
4162
        return s.customMessageServer.SendUpdate(&CustomMessage{
×
UNCOV
4163
                Peer: peer,
×
UNCOV
4164
                Msg:  msg,
×
UNCOV
4165
        })
×
UNCOV
4166
}
×
4167

4168
// SubscribeCustomMessages subscribes to a stream of incoming custom peer
4169
// messages.
UNCOV
4170
func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) {
×
UNCOV
4171
        return s.customMessageServer.Subscribe()
×
UNCOV
4172
}
×
4173

4174
// peerConnected is a function that handles initialization a newly connected
4175
// peer by adding it to the server's global list of all active peers, and
4176
// starting all the goroutines the peer needs to function properly. The inbound
4177
// boolean should be true if the peer initiated the connection to us.
4178
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
UNCOV
4179
        inbound bool) {
×
UNCOV
4180

×
UNCOV
4181
        brontideConn := conn.(*brontide.Conn)
×
UNCOV
4182
        addr := conn.RemoteAddr()
×
UNCOV
4183
        pubKey := brontideConn.RemotePub()
×
UNCOV
4184

×
UNCOV
4185
        srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v",
×
UNCOV
4186
                pubKey.SerializeCompressed(), addr, inbound)
×
UNCOV
4187

×
UNCOV
4188
        peerAddr := &lnwire.NetAddress{
×
UNCOV
4189
                IdentityKey: pubKey,
×
UNCOV
4190
                Address:     addr,
×
UNCOV
4191
                ChainNet:    s.cfg.ActiveNetParams.Net,
×
UNCOV
4192
        }
×
UNCOV
4193

×
UNCOV
4194
        // With the brontide connection established, we'll now craft the feature
×
UNCOV
4195
        // vectors to advertise to the remote node.
×
UNCOV
4196
        initFeatures := s.featureMgr.Get(feature.SetInit)
×
UNCOV
4197
        legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal)
×
UNCOV
4198

×
UNCOV
4199
        // Lookup past error caches for the peer in the server. If no buffer is
×
UNCOV
4200
        // found, create a fresh buffer.
×
UNCOV
4201
        pkStr := string(peerAddr.IdentityKey.SerializeCompressed())
×
UNCOV
4202
        errBuffer, ok := s.peerErrors[pkStr]
×
UNCOV
4203
        if !ok {
×
UNCOV
4204
                var err error
×
UNCOV
4205
                errBuffer, err = queue.NewCircularBuffer(peer.ErrorBufferSize)
×
UNCOV
4206
                if err != nil {
×
4207
                        srvrLog.Errorf("unable to create peer %v", err)
×
4208
                        return
×
4209
                }
×
4210
        }
4211

4212
        // If we directly set the peer.Config TowerClient member to the
4213
        // s.towerClientMgr then in the case that the s.towerClientMgr is nil,
4214
        // the peer.Config's TowerClient member will not evaluate to nil even
4215
        // though the underlying value is nil. To avoid this gotcha which can
4216
        // cause a panic, we need to explicitly pass nil to the peer.Config's
4217
        // TowerClient if needed.
UNCOV
4218
        var towerClient wtclient.ClientManager
×
UNCOV
4219
        if s.towerClientMgr != nil {
×
UNCOV
4220
                towerClient = s.towerClientMgr
×
UNCOV
4221
        }
×
4222

UNCOV
4223
        thresholdSats := btcutil.Amount(s.cfg.MaxFeeExposure)
×
UNCOV
4224
        thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
×
UNCOV
4225

×
UNCOV
4226
        // Now that we've established a connection, create a peer, and it to the
×
UNCOV
4227
        // set of currently active peers. Configure the peer with the incoming
×
UNCOV
4228
        // and outgoing broadcast deltas to prevent htlcs from being accepted or
×
UNCOV
4229
        // offered that would trigger channel closure. In case of outgoing
×
UNCOV
4230
        // htlcs, an extra block is added to prevent the channel from being
×
UNCOV
4231
        // closed when the htlc is outstanding and a new block comes in.
×
UNCOV
4232
        pCfg := peer.Config{
×
UNCOV
4233
                Conn:                    brontideConn,
×
UNCOV
4234
                ConnReq:                 connReq,
×
UNCOV
4235
                Addr:                    peerAddr,
×
UNCOV
4236
                Inbound:                 inbound,
×
UNCOV
4237
                Features:                initFeatures,
×
UNCOV
4238
                LegacyFeatures:          legacyFeatures,
×
UNCOV
4239
                OutgoingCltvRejectDelta: lncfg.DefaultOutgoingCltvRejectDelta,
×
UNCOV
4240
                ChanActiveTimeout:       s.cfg.ChanEnableTimeout,
×
UNCOV
4241
                ErrorBuffer:             errBuffer,
×
UNCOV
4242
                WritePool:               s.writePool,
×
UNCOV
4243
                ReadPool:                s.readPool,
×
UNCOV
4244
                Switch:                  s.htlcSwitch,
×
UNCOV
4245
                InterceptSwitch:         s.interceptableSwitch,
×
UNCOV
4246
                ChannelDB:               s.chanStateDB,
×
UNCOV
4247
                ChannelGraph:            s.graphDB,
×
UNCOV
4248
                ChainArb:                s.chainArb,
×
UNCOV
4249
                AuthGossiper:            s.authGossiper,
×
UNCOV
4250
                ChanStatusMgr:           s.chanStatusMgr,
×
UNCOV
4251
                ChainIO:                 s.cc.ChainIO,
×
UNCOV
4252
                FeeEstimator:            s.cc.FeeEstimator,
×
UNCOV
4253
                Signer:                  s.cc.Wallet.Cfg.Signer,
×
UNCOV
4254
                SigPool:                 s.sigPool,
×
UNCOV
4255
                Wallet:                  s.cc.Wallet,
×
UNCOV
4256
                ChainNotifier:           s.cc.ChainNotifier,
×
UNCOV
4257
                BestBlockView:           s.cc.BestBlockTracker,
×
UNCOV
4258
                RoutingPolicy:           s.cc.RoutingPolicy,
×
UNCOV
4259
                Sphinx:                  s.sphinx,
×
UNCOV
4260
                WitnessBeacon:           s.witnessBeacon,
×
UNCOV
4261
                Invoices:                s.invoices,
×
UNCOV
4262
                ChannelNotifier:         s.channelNotifier,
×
UNCOV
4263
                HtlcNotifier:            s.htlcNotifier,
×
UNCOV
4264
                TowerClient:             towerClient,
×
UNCOV
4265
                DisconnectPeer:          s.DisconnectPeer,
×
UNCOV
4266
                GenNodeAnnouncement: func(...netann.NodeAnnModifier) (
×
UNCOV
4267
                        lnwire.NodeAnnouncement, error) {
×
UNCOV
4268

×
UNCOV
4269
                        return s.genNodeAnnouncement(nil)
×
UNCOV
4270
                },
×
4271

4272
                PongBuf: s.pongBuf,
4273

4274
                PrunePersistentPeerConnection: s.prunePersistentPeerConnection,
4275

4276
                FetchLastChanUpdate: s.fetchLastChanUpdate(),
4277

4278
                FundingManager: s.fundingMgr,
4279

4280
                Hodl:                    s.cfg.Hodl,
4281
                UnsafeReplay:            s.cfg.UnsafeReplay,
4282
                MaxOutgoingCltvExpiry:   s.cfg.MaxOutgoingCltvExpiry,
4283
                MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation,
4284
                CoopCloseTargetConfs:    s.cfg.CoopCloseTargetConfs,
4285
                MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
4286
                        s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
4287
                ChannelCommitInterval:  s.cfg.ChannelCommitInterval,
4288
                PendingCommitInterval:  s.cfg.PendingCommitInterval,
4289
                ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize,
4290
                HandleCustomMessage:    s.handleCustomMessage,
4291
                GetAliases:             s.aliasMgr.GetAliases,
4292
                RequestAlias:           s.aliasMgr.RequestAlias,
4293
                AddLocalAlias:          s.aliasMgr.AddLocalAlias,
4294
                DisallowRouteBlinding:  s.cfg.ProtocolOptions.NoRouteBlinding(),
4295
                DisallowQuiescence:     s.cfg.ProtocolOptions.NoQuiescence(),
4296
                MaxFeeExposure:         thresholdMSats,
4297
                Quit:                   s.quit,
4298
                AuxLeafStore:           s.implCfg.AuxLeafStore,
4299
                AuxSigner:              s.implCfg.AuxSigner,
4300
                MsgRouter:              s.implCfg.MsgRouter,
4301
                AuxChanCloser:          s.implCfg.AuxChanCloser,
4302
                AuxResolver:            s.implCfg.AuxContractResolver,
4303
                AuxTrafficShaper:       s.implCfg.TrafficShaper,
UNCOV
4304
                ShouldFwdExpEndorsement: func() bool {
×
UNCOV
4305
                        if s.cfg.ProtocolOptions.NoExperimentalEndorsement() {
×
UNCOV
4306
                                return false
×
UNCOV
4307
                        }
×
4308

UNCOV
4309
                        return clock.NewDefaultClock().Now().Before(
×
UNCOV
4310
                                EndorsementExperimentEnd,
×
UNCOV
4311
                        )
×
4312
                },
4313
        }
4314

UNCOV
4315
        copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed())
×
UNCOV
4316
        copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed())
×
UNCOV
4317

×
UNCOV
4318
        p := peer.NewBrontide(pCfg)
×
UNCOV
4319

×
UNCOV
4320
        // TODO(roasbeef): update IP address for link-node
×
UNCOV
4321
        //  * also mark last-seen, do it one single transaction?
×
UNCOV
4322

×
UNCOV
4323
        s.addPeer(p)
×
UNCOV
4324

×
UNCOV
4325
        // Once we have successfully added the peer to the server, we can
×
UNCOV
4326
        // delete the previous error buffer from the server's map of error
×
UNCOV
4327
        // buffers.
×
UNCOV
4328
        delete(s.peerErrors, pkStr)
×
UNCOV
4329

×
UNCOV
4330
        // Dispatch a goroutine to asynchronously start the peer. This process
×
UNCOV
4331
        // includes sending and receiving Init messages, which would be a DOS
×
UNCOV
4332
        // vector if we held the server's mutex throughout the procedure.
×
UNCOV
4333
        s.wg.Add(1)
×
UNCOV
4334
        go s.peerInitializer(p)
×
4335
}
4336

4337
// addPeer adds the passed peer to the server's global state of all active
4338
// peers.
UNCOV
4339
func (s *server) addPeer(p *peer.Brontide) {
×
UNCOV
4340
        if p == nil {
×
4341
                return
×
4342
        }
×
4343

UNCOV
4344
        pubBytes := p.IdentityKey().SerializeCompressed()
×
UNCOV
4345

×
UNCOV
4346
        // Ignore new peers if we're shutting down.
×
UNCOV
4347
        if s.Stopped() {
×
4348
                srvrLog.Infof("Server stopped, skipped adding peer=%x",
×
4349
                        pubBytes)
×
4350
                p.Disconnect(ErrServerShuttingDown)
×
4351

×
4352
                return
×
4353
        }
×
4354

4355
        // Track the new peer in our indexes so we can quickly look it up either
4356
        // according to its public key, or its peer ID.
4357
        // TODO(roasbeef): pipe all requests through to the
4358
        // queryHandler/peerManager
4359

4360
        // NOTE: This pubStr is a raw bytes to string conversion and will NOT
4361
        // be human-readable.
UNCOV
4362
        pubStr := string(pubBytes)
×
UNCOV
4363

×
UNCOV
4364
        s.peersByPub[pubStr] = p
×
UNCOV
4365

×
UNCOV
4366
        if p.Inbound() {
×
UNCOV
4367
                s.inboundPeers[pubStr] = p
×
UNCOV
4368
        } else {
×
UNCOV
4369
                s.outboundPeers[pubStr] = p
×
UNCOV
4370
        }
×
4371

4372
        // Inform the peer notifier of a peer online event so that it can be reported
4373
        // to clients listening for peer events.
UNCOV
4374
        var pubKey [33]byte
×
UNCOV
4375
        copy(pubKey[:], pubBytes)
×
UNCOV
4376

×
UNCOV
4377
        s.peerNotifier.NotifyPeerOnline(pubKey)
×
4378
}
4379

4380
// peerInitializer asynchronously starts a newly connected peer after it has
4381
// been added to the server's peer map. This method sets up a
4382
// peerTerminationWatcher for the given peer, and ensures that it executes even
4383
// if the peer failed to start. In the event of a successful connection, this
4384
// method reads the negotiated, local feature-bits and spawns the appropriate
4385
// graph synchronization method. Any registered clients of NotifyWhenOnline will
4386
// be signaled of the new peer once the method returns.
4387
//
4388
// NOTE: This MUST be launched as a goroutine.
UNCOV
4389
func (s *server) peerInitializer(p *peer.Brontide) {
×
UNCOV
4390
        defer s.wg.Done()
×
UNCOV
4391

×
UNCOV
4392
        pubBytes := p.IdentityKey().SerializeCompressed()
×
UNCOV
4393

×
UNCOV
4394
        // Avoid initializing peers while the server is exiting.
×
UNCOV
4395
        if s.Stopped() {
×
4396
                srvrLog.Infof("Server stopped, skipped initializing peer=%x",
×
4397
                        pubBytes)
×
4398
                return
×
4399
        }
×
4400

4401
        // Create a channel that will be used to signal a successful start of
4402
        // the link. This prevents the peer termination watcher from beginning
4403
        // its duty too early.
UNCOV
4404
        ready := make(chan struct{})
×
UNCOV
4405

×
UNCOV
4406
        // Before starting the peer, launch a goroutine to watch for the
×
UNCOV
4407
        // unexpected termination of this peer, which will ensure all resources
×
UNCOV
4408
        // are properly cleaned up, and re-establish persistent connections when
×
UNCOV
4409
        // necessary. The peer termination watcher will be short circuited if
×
UNCOV
4410
        // the peer is ever added to the ignorePeerTermination map, indicating
×
UNCOV
4411
        // that the server has already handled the removal of this peer.
×
UNCOV
4412
        s.wg.Add(1)
×
UNCOV
4413
        go s.peerTerminationWatcher(p, ready)
×
UNCOV
4414

×
UNCOV
4415
        // Start the peer! If an error occurs, we Disconnect the peer, which
×
UNCOV
4416
        // will unblock the peerTerminationWatcher.
×
UNCOV
4417
        if err := p.Start(); err != nil {
×
UNCOV
4418
                srvrLog.Warnf("Starting peer=%x got error: %v", pubBytes, err)
×
UNCOV
4419

×
UNCOV
4420
                p.Disconnect(fmt.Errorf("unable to start peer: %w", err))
×
UNCOV
4421
                return
×
UNCOV
4422
        }
×
4423

4424
        // Otherwise, signal to the peerTerminationWatcher that the peer startup
4425
        // was successful, and to begin watching the peer's wait group.
UNCOV
4426
        close(ready)
×
UNCOV
4427

×
UNCOV
4428
        s.mu.Lock()
×
UNCOV
4429
        defer s.mu.Unlock()
×
UNCOV
4430

×
UNCOV
4431
        // Check if there are listeners waiting for this peer to come online.
×
UNCOV
4432
        srvrLog.Debugf("Notifying that peer %v is online", p)
×
UNCOV
4433

×
UNCOV
4434
        // TODO(guggero): Do a proper conversion to a string everywhere, or use
×
UNCOV
4435
        // route.Vertex as the key type of peerConnectedListeners.
×
UNCOV
4436
        pubStr := string(pubBytes)
×
UNCOV
4437
        for _, peerChan := range s.peerConnectedListeners[pubStr] {
×
UNCOV
4438
                select {
×
UNCOV
4439
                case peerChan <- p:
×
UNCOV
4440
                case <-s.quit:
×
UNCOV
4441
                        return
×
4442
                }
4443
        }
UNCOV
4444
        delete(s.peerConnectedListeners, pubStr)
×
4445
}
4446

4447
// peerTerminationWatcher waits until a peer has been disconnected unexpectedly,
4448
// and then cleans up all resources allocated to the peer, notifies relevant
4449
// sub-systems of its demise, and finally handles re-connecting to the peer if
4450
// it's persistent. If the server intentionally disconnects a peer, it should
4451
// have a corresponding entry in the ignorePeerTermination map which will cause
4452
// the cleanup routine to exit early. The passed `ready` chan is used to
4453
// synchronize when WaitForDisconnect should begin watching on the peer's
4454
// waitgroup. The ready chan should only be signaled if the peer starts
4455
// successfully, otherwise the peer should be disconnected instead.
4456
//
4457
// NOTE: This MUST be launched as a goroutine.
UNCOV
4458
func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
×
UNCOV
4459
        defer s.wg.Done()
×
UNCOV
4460

×
UNCOV
4461
        p.WaitForDisconnect(ready)
×
UNCOV
4462

×
UNCOV
4463
        srvrLog.Debugf("Peer %v has been disconnected", p)
×
UNCOV
4464

×
UNCOV
4465
        // If the server is exiting then we can bail out early ourselves as all
×
UNCOV
4466
        // the other sub-systems will already be shutting down.
×
UNCOV
4467
        if s.Stopped() {
×
UNCOV
4468
                srvrLog.Debugf("Server quitting, exit early for peer %v", p)
×
UNCOV
4469
                return
×
UNCOV
4470
        }
×
4471

4472
        // Next, we'll cancel all pending funding reservations with this node.
4473
        // If we tried to initiate any funding flows that haven't yet finished,
4474
        // then we need to unlock those committed outputs so they're still
4475
        // available for use.
UNCOV
4476
        s.fundingMgr.CancelPeerReservations(p.PubKey())
×
UNCOV
4477

×
UNCOV
4478
        pubKey := p.IdentityKey()
×
UNCOV
4479

×
UNCOV
4480
        // We'll also inform the gossiper that this peer is no longer active,
×
UNCOV
4481
        // so we don't need to maintain sync state for it any longer.
×
UNCOV
4482
        s.authGossiper.PruneSyncState(p.PubKey())
×
UNCOV
4483

×
UNCOV
4484
        // Tell the switch to remove all links associated with this peer.
×
UNCOV
4485
        // Passing nil as the target link indicates that all links associated
×
UNCOV
4486
        // with this interface should be closed.
×
UNCOV
4487
        //
×
UNCOV
4488
        // TODO(roasbeef): instead add a PurgeInterfaceLinks function?
×
UNCOV
4489
        links, err := s.htlcSwitch.GetLinksByInterface(p.PubKey())
×
UNCOV
4490
        if err != nil && err != htlcswitch.ErrNoLinksFound {
×
4491
                srvrLog.Errorf("Unable to get channel links for %v: %v", p, err)
×
4492
        }
×
4493

UNCOV
4494
        for _, link := range links {
×
UNCOV
4495
                s.htlcSwitch.RemoveLink(link.ChanID())
×
UNCOV
4496
        }
×
4497

UNCOV
4498
        s.mu.Lock()
×
UNCOV
4499
        defer s.mu.Unlock()
×
UNCOV
4500

×
UNCOV
4501
        // If there were any notification requests for when this peer
×
UNCOV
4502
        // disconnected, we can trigger them now.
×
UNCOV
4503
        srvrLog.Debugf("Notifying that peer %v is offline", p)
×
UNCOV
4504
        pubStr := string(pubKey.SerializeCompressed())
×
UNCOV
4505
        for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
×
UNCOV
4506
                close(offlineChan)
×
UNCOV
4507
        }
×
UNCOV
4508
        delete(s.peerDisconnectedListeners, pubStr)
×
UNCOV
4509

×
UNCOV
4510
        // If the server has already removed this peer, we can short circuit the
×
UNCOV
4511
        // peer termination watcher and skip cleanup.
×
UNCOV
4512
        if _, ok := s.ignorePeerTermination[p]; ok {
×
4513
                delete(s.ignorePeerTermination, p)
×
4514

×
4515
                pubKey := p.PubKey()
×
4516
                pubStr := string(pubKey[:])
×
4517

×
4518
                // If a connection callback is present, we'll go ahead and
×
4519
                // execute it now that previous peer has fully disconnected. If
×
4520
                // the callback is not present, this likely implies the peer was
×
4521
                // purposefully disconnected via RPC, and that no reconnect
×
4522
                // should be attempted.
×
4523
                connCallback, ok := s.scheduledPeerConnection[pubStr]
×
4524
                if ok {
×
4525
                        delete(s.scheduledPeerConnection, pubStr)
×
4526
                        connCallback()
×
4527
                }
×
4528
                return
×
4529
        }
4530

4531
        // First, cleanup any remaining state the server has regarding the peer
4532
        // in question.
UNCOV
4533
        s.removePeer(p)
×
UNCOV
4534

×
UNCOV
4535
        // Next, check to see if this is a persistent peer or not.
×
UNCOV
4536
        if _, ok := s.persistentPeers[pubStr]; !ok {
×
UNCOV
4537
                return
×
UNCOV
4538
        }
×
4539

4540
        // Get the last address that we used to connect to the peer.
UNCOV
4541
        addrs := []net.Addr{
×
UNCOV
4542
                p.NetAddress().Address,
×
UNCOV
4543
        }
×
UNCOV
4544

×
UNCOV
4545
        // We'll ensure that we locate all the peers advertised addresses for
×
UNCOV
4546
        // reconnection purposes.
×
UNCOV
4547
        advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(pubKey)
×
UNCOV
4548
        switch {
×
4549
        // We found advertised addresses, so use them.
UNCOV
4550
        case err == nil:
×
UNCOV
4551
                addrs = advertisedAddrs
×
4552

4553
        // The peer doesn't have an advertised address.
UNCOV
4554
        case err == errNoAdvertisedAddr:
×
UNCOV
4555
                // If it is an outbound peer then we fall back to the existing
×
UNCOV
4556
                // peer address.
×
UNCOV
4557
                if !p.Inbound() {
×
UNCOV
4558
                        break
×
4559
                }
4560

4561
                // Fall back to the existing peer address if
4562
                // we're not accepting connections over Tor.
UNCOV
4563
                if s.torController == nil {
×
UNCOV
4564
                        break
×
4565
                }
4566

4567
                // If we are, the peer's address won't be known
4568
                // to us (we'll see a private address, which is
4569
                // the address used by our onion service to dial
4570
                // to lnd), so we don't have enough information
4571
                // to attempt a reconnect.
4572
                srvrLog.Debugf("Ignoring reconnection attempt "+
×
4573
                        "to inbound peer %v without "+
×
4574
                        "advertised address", p)
×
4575
                return
×
4576

4577
        // We came across an error retrieving an advertised
4578
        // address, log it, and fall back to the existing peer
4579
        // address.
UNCOV
4580
        default:
×
UNCOV
4581
                srvrLog.Errorf("Unable to retrieve advertised "+
×
UNCOV
4582
                        "address for node %x: %v", p.PubKey(),
×
UNCOV
4583
                        err)
×
4584
        }
4585

4586
        // Make an easy lookup map so that we can check if an address
4587
        // is already in the address list that we have stored for this peer.
UNCOV
4588
        existingAddrs := make(map[string]bool)
×
UNCOV
4589
        for _, addr := range s.persistentPeerAddrs[pubStr] {
×
UNCOV
4590
                existingAddrs[addr.String()] = true
×
UNCOV
4591
        }
×
4592

4593
        // Add any missing addresses for this peer to persistentPeerAddr.
UNCOV
4594
        for _, addr := range addrs {
×
UNCOV
4595
                if existingAddrs[addr.String()] {
×
4596
                        continue
×
4597
                }
4598

UNCOV
4599
                s.persistentPeerAddrs[pubStr] = append(
×
UNCOV
4600
                        s.persistentPeerAddrs[pubStr],
×
UNCOV
4601
                        &lnwire.NetAddress{
×
UNCOV
4602
                                IdentityKey: p.IdentityKey(),
×
UNCOV
4603
                                Address:     addr,
×
UNCOV
4604
                                ChainNet:    p.NetAddress().ChainNet,
×
UNCOV
4605
                        },
×
UNCOV
4606
                )
×
4607
        }
4608

4609
        // Record the computed backoff in the backoff map.
UNCOV
4610
        backoff := s.nextPeerBackoff(pubStr, p.StartTime())
×
UNCOV
4611
        s.persistentPeersBackoff[pubStr] = backoff
×
UNCOV
4612

×
UNCOV
4613
        // Initialize a retry canceller for this peer if one does not
×
UNCOV
4614
        // exist.
×
UNCOV
4615
        cancelChan, ok := s.persistentRetryCancels[pubStr]
×
UNCOV
4616
        if !ok {
×
UNCOV
4617
                cancelChan = make(chan struct{})
×
UNCOV
4618
                s.persistentRetryCancels[pubStr] = cancelChan
×
UNCOV
4619
        }
×
4620

4621
        // We choose not to wait group this go routine since the Connect
4622
        // call can stall for arbitrarily long if we shutdown while an
4623
        // outbound connection attempt is being made.
UNCOV
4624
        go func() {
×
UNCOV
4625
                srvrLog.Debugf("Scheduling connection re-establishment to "+
×
UNCOV
4626
                        "persistent peer %x in %s",
×
UNCOV
4627
                        p.IdentityKey().SerializeCompressed(), backoff)
×
UNCOV
4628

×
UNCOV
4629
                select {
×
UNCOV
4630
                case <-time.After(backoff):
×
UNCOV
4631
                case <-cancelChan:
×
UNCOV
4632
                        return
×
UNCOV
4633
                case <-s.quit:
×
UNCOV
4634
                        return
×
4635
                }
4636

UNCOV
4637
                srvrLog.Debugf("Attempting to re-establish persistent "+
×
UNCOV
4638
                        "connection to peer %x",
×
UNCOV
4639
                        p.IdentityKey().SerializeCompressed())
×
UNCOV
4640

×
UNCOV
4641
                s.connectToPersistentPeer(pubStr)
×
4642
        }()
4643
}
4644

4645
// connectToPersistentPeer uses all the stored addresses for a peer to attempt
4646
// to connect to the peer. It creates connection requests if there are
4647
// currently none for a given address and it removes old connection requests
4648
// if the associated address is no longer in the latest address list for the
4649
// peer.
UNCOV
4650
func (s *server) connectToPersistentPeer(pubKeyStr string) {
×
UNCOV
4651
        s.mu.Lock()
×
UNCOV
4652
        defer s.mu.Unlock()
×
UNCOV
4653

×
UNCOV
4654
        // Create an easy lookup map of the addresses we have stored for the
×
UNCOV
4655
        // peer. We will remove entries from this map if we have existing
×
UNCOV
4656
        // connection requests for the associated address and then any leftover
×
UNCOV
4657
        // entries will indicate which addresses we should create new
×
UNCOV
4658
        // connection requests for.
×
UNCOV
4659
        addrMap := make(map[string]*lnwire.NetAddress)
×
UNCOV
4660
        for _, addr := range s.persistentPeerAddrs[pubKeyStr] {
×
UNCOV
4661
                addrMap[addr.String()] = addr
×
UNCOV
4662
        }
×
4663

4664
        // Go through each of the existing connection requests and
4665
        // check if they correspond to the latest set of addresses. If
4666
        // there is a connection requests that does not use one of the latest
4667
        // advertised addresses then remove that connection request.
UNCOV
4668
        var updatedConnReqs []*connmgr.ConnReq
×
UNCOV
4669
        for _, connReq := range s.persistentConnReqs[pubKeyStr] {
×
UNCOV
4670
                lnAddr := connReq.Addr.(*lnwire.NetAddress).Address.String()
×
UNCOV
4671

×
UNCOV
4672
                switch _, ok := addrMap[lnAddr]; ok {
×
4673
                // If the existing connection request is using one of the
4674
                // latest advertised addresses for the peer then we add it to
4675
                // updatedConnReqs and remove the associated address from
4676
                // addrMap so that we don't recreate this connReq later on.
4677
                case true:
×
4678
                        updatedConnReqs = append(
×
4679
                                updatedConnReqs, connReq,
×
4680
                        )
×
4681
                        delete(addrMap, lnAddr)
×
4682

4683
                // If the existing connection request is using an address that
4684
                // is not one of the latest advertised addresses for the peer
4685
                // then we remove the connecting request from the connection
4686
                // manager.
UNCOV
4687
                case false:
×
UNCOV
4688
                        srvrLog.Info(
×
UNCOV
4689
                                "Removing conn req:", connReq.Addr.String(),
×
UNCOV
4690
                        )
×
UNCOV
4691
                        s.connMgr.Remove(connReq.ID())
×
4692
                }
4693
        }
4694

UNCOV
4695
        s.persistentConnReqs[pubKeyStr] = updatedConnReqs
×
UNCOV
4696

×
UNCOV
4697
        cancelChan, ok := s.persistentRetryCancels[pubKeyStr]
×
UNCOV
4698
        if !ok {
×
UNCOV
4699
                cancelChan = make(chan struct{})
×
UNCOV
4700
                s.persistentRetryCancels[pubKeyStr] = cancelChan
×
UNCOV
4701
        }
×
4702

4703
        // Any addresses left in addrMap are new ones that we have not made
4704
        // connection requests for. So create new connection requests for those.
4705
        // If there is more than one address in the address map, stagger the
4706
        // creation of the connection requests for those.
UNCOV
4707
        go func() {
×
UNCOV
4708
                ticker := time.NewTicker(multiAddrConnectionStagger)
×
UNCOV
4709
                defer ticker.Stop()
×
UNCOV
4710

×
UNCOV
4711
                for _, addr := range addrMap {
×
UNCOV
4712
                        // Send the persistent connection request to the
×
UNCOV
4713
                        // connection manager, saving the request itself so we
×
UNCOV
4714
                        // can cancel/restart the process as needed.
×
UNCOV
4715
                        connReq := &connmgr.ConnReq{
×
UNCOV
4716
                                Addr:      addr,
×
UNCOV
4717
                                Permanent: true,
×
UNCOV
4718
                        }
×
UNCOV
4719

×
UNCOV
4720
                        s.mu.Lock()
×
UNCOV
4721
                        s.persistentConnReqs[pubKeyStr] = append(
×
UNCOV
4722
                                s.persistentConnReqs[pubKeyStr], connReq,
×
UNCOV
4723
                        )
×
UNCOV
4724
                        s.mu.Unlock()
×
UNCOV
4725

×
UNCOV
4726
                        srvrLog.Debugf("Attempting persistent connection to "+
×
UNCOV
4727
                                "channel peer %v", addr)
×
UNCOV
4728

×
UNCOV
4729
                        go s.connMgr.Connect(connReq)
×
UNCOV
4730

×
UNCOV
4731
                        select {
×
UNCOV
4732
                        case <-s.quit:
×
UNCOV
4733
                                return
×
UNCOV
4734
                        case <-cancelChan:
×
UNCOV
4735
                                return
×
UNCOV
4736
                        case <-ticker.C:
×
4737
                        }
4738
                }
4739
        }()
4740
}
4741

4742
// removePeer removes the passed peer from the server's state of all active
4743
// peers.
UNCOV
4744
func (s *server) removePeer(p *peer.Brontide) {
×
UNCOV
4745
        if p == nil {
×
4746
                return
×
4747
        }
×
4748

UNCOV
4749
        srvrLog.Debugf("removing peer %v", p)
×
UNCOV
4750

×
UNCOV
4751
        // As the peer is now finished, ensure that the TCP connection is
×
UNCOV
4752
        // closed and all of its related goroutines have exited.
×
UNCOV
4753
        p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p))
×
UNCOV
4754

×
UNCOV
4755
        // If this peer had an active persistent connection request, remove it.
×
UNCOV
4756
        if p.ConnReq() != nil {
×
UNCOV
4757
                s.connMgr.Remove(p.ConnReq().ID())
×
UNCOV
4758
        }
×
4759

4760
        // Ignore deleting peers if we're shutting down.
UNCOV
4761
        if s.Stopped() {
×
4762
                return
×
4763
        }
×
4764

UNCOV
4765
        pKey := p.PubKey()
×
UNCOV
4766
        pubSer := pKey[:]
×
UNCOV
4767
        pubStr := string(pubSer)
×
UNCOV
4768

×
UNCOV
4769
        delete(s.peersByPub, pubStr)
×
UNCOV
4770

×
UNCOV
4771
        if p.Inbound() {
×
UNCOV
4772
                delete(s.inboundPeers, pubStr)
×
UNCOV
4773
        } else {
×
UNCOV
4774
                delete(s.outboundPeers, pubStr)
×
UNCOV
4775
        }
×
4776

4777
        // Copy the peer's error buffer across to the server if it has any items
4778
        // in it so that we can restore peer errors across connections.
UNCOV
4779
        if p.ErrorBuffer().Total() > 0 {
×
UNCOV
4780
                s.peerErrors[pubStr] = p.ErrorBuffer()
×
UNCOV
4781
        }
×
4782

4783
        // Inform the peer notifier of a peer offline event so that it can be
4784
        // reported to clients listening for peer events.
UNCOV
4785
        var pubKey [33]byte
×
UNCOV
4786
        copy(pubKey[:], pubSer)
×
UNCOV
4787

×
UNCOV
4788
        s.peerNotifier.NotifyPeerOffline(pubKey)
×
4789
}
4790

4791
// ConnectToPeer requests that the server connect to a Lightning Network peer
4792
// at the specified address. This function will *block* until either a
4793
// connection is established, or the initial handshake process fails.
4794
//
4795
// NOTE: This function is safe for concurrent access.
4796
func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
UNCOV
4797
        perm bool, timeout time.Duration) error {
×
UNCOV
4798

×
UNCOV
4799
        targetPub := string(addr.IdentityKey.SerializeCompressed())
×
UNCOV
4800

×
UNCOV
4801
        // Acquire mutex, but use explicit unlocking instead of defer for
×
UNCOV
4802
        // better granularity.  In certain conditions, this method requires
×
UNCOV
4803
        // making an outbound connection to a remote peer, which requires the
×
UNCOV
4804
        // lock to be released, and subsequently reacquired.
×
UNCOV
4805
        s.mu.Lock()
×
UNCOV
4806

×
UNCOV
4807
        // Ensure we're not already connected to this peer.
×
UNCOV
4808
        peer, err := s.findPeerByPubStr(targetPub)
×
UNCOV
4809
        if err == nil {
×
UNCOV
4810
                s.mu.Unlock()
×
UNCOV
4811
                return &errPeerAlreadyConnected{peer: peer}
×
UNCOV
4812
        }
×
4813

4814
        // Peer was not found, continue to pursue connection with peer.
4815

4816
        // If there's already a pending connection request for this pubkey,
4817
        // then we ignore this request to ensure we don't create a redundant
4818
        // connection.
UNCOV
4819
        if reqs, ok := s.persistentConnReqs[targetPub]; ok {
×
UNCOV
4820
                srvrLog.Warnf("Already have %d persistent connection "+
×
UNCOV
4821
                        "requests for %v, connecting anyway.", len(reqs), addr)
×
UNCOV
4822
        }
×
4823

4824
        // If there's not already a pending or active connection to this node,
4825
        // then instruct the connection manager to attempt to establish a
4826
        // persistent connection to the peer.
UNCOV
4827
        srvrLog.Debugf("Connecting to %v", addr)
×
UNCOV
4828
        if perm {
×
UNCOV
4829
                connReq := &connmgr.ConnReq{
×
UNCOV
4830
                        Addr:      addr,
×
UNCOV
4831
                        Permanent: true,
×
UNCOV
4832
                }
×
UNCOV
4833

×
UNCOV
4834
                // Since the user requested a permanent connection, we'll set
×
UNCOV
4835
                // the entry to true which will tell the server to continue
×
UNCOV
4836
                // reconnecting even if the number of channels with this peer is
×
UNCOV
4837
                // zero.
×
UNCOV
4838
                s.persistentPeers[targetPub] = true
×
UNCOV
4839
                if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
×
UNCOV
4840
                        s.persistentPeersBackoff[targetPub] = s.cfg.MinBackoff
×
UNCOV
4841
                }
×
UNCOV
4842
                s.persistentConnReqs[targetPub] = append(
×
UNCOV
4843
                        s.persistentConnReqs[targetPub], connReq,
×
UNCOV
4844
                )
×
UNCOV
4845
                s.mu.Unlock()
×
UNCOV
4846

×
UNCOV
4847
                go s.connMgr.Connect(connReq)
×
UNCOV
4848

×
UNCOV
4849
                return nil
×
4850
        }
UNCOV
4851
        s.mu.Unlock()
×
UNCOV
4852

×
UNCOV
4853
        // If we're not making a persistent connection, then we'll attempt to
×
UNCOV
4854
        // connect to the target peer. If the we can't make the connection, or
×
UNCOV
4855
        // the crypto negotiation breaks down, then return an error to the
×
UNCOV
4856
        // caller.
×
UNCOV
4857
        errChan := make(chan error, 1)
×
UNCOV
4858
        s.connectToPeer(addr, errChan, timeout)
×
UNCOV
4859

×
UNCOV
4860
        select {
×
UNCOV
4861
        case err := <-errChan:
×
UNCOV
4862
                return err
×
4863
        case <-s.quit:
×
4864
                return ErrServerShuttingDown
×
4865
        }
4866
}
4867

4868
// connectToPeer establishes a connection to a remote peer. errChan is used to
4869
// notify the caller if the connection attempt has failed. Otherwise, it will be
4870
// closed.
4871
func (s *server) connectToPeer(addr *lnwire.NetAddress,
UNCOV
4872
        errChan chan<- error, timeout time.Duration) {
×
UNCOV
4873

×
UNCOV
4874
        conn, err := brontide.Dial(
×
UNCOV
4875
                s.identityECDH, addr, timeout, s.cfg.net.Dial,
×
UNCOV
4876
        )
×
UNCOV
4877
        if err != nil {
×
UNCOV
4878
                srvrLog.Errorf("Unable to connect to %v: %v", addr, err)
×
UNCOV
4879
                select {
×
UNCOV
4880
                case errChan <- err:
×
4881
                case <-s.quit:
×
4882
                }
UNCOV
4883
                return
×
4884
        }
4885

UNCOV
4886
        close(errChan)
×
UNCOV
4887

×
UNCOV
4888
        srvrLog.Tracef("Brontide dialer made local=%v, remote=%v",
×
UNCOV
4889
                conn.LocalAddr(), conn.RemoteAddr())
×
UNCOV
4890

×
UNCOV
4891
        s.OutboundPeerConnected(nil, conn)
×
4892
}
4893

4894
// DisconnectPeer sends the request to server to close the connection with peer
4895
// identified by public key.
4896
//
4897
// NOTE: This function is safe for concurrent access.
UNCOV
4898
func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
×
UNCOV
4899
        pubBytes := pubKey.SerializeCompressed()
×
UNCOV
4900
        pubStr := string(pubBytes)
×
UNCOV
4901

×
UNCOV
4902
        s.mu.Lock()
×
UNCOV
4903
        defer s.mu.Unlock()
×
UNCOV
4904

×
UNCOV
4905
        // Check that were actually connected to this peer. If not, then we'll
×
UNCOV
4906
        // exit in an error as we can't disconnect from a peer that we're not
×
UNCOV
4907
        // currently connected to.
×
UNCOV
4908
        peer, err := s.findPeerByPubStr(pubStr)
×
UNCOV
4909
        if err == ErrPeerNotConnected {
×
UNCOV
4910
                return fmt.Errorf("peer %x is not connected", pubBytes)
×
UNCOV
4911
        }
×
4912

UNCOV
4913
        srvrLog.Infof("Disconnecting from %v", peer)
×
UNCOV
4914

×
UNCOV
4915
        s.cancelConnReqs(pubStr, nil)
×
UNCOV
4916

×
UNCOV
4917
        // If this peer was formerly a persistent connection, then we'll remove
×
UNCOV
4918
        // them from this map so we don't attempt to re-connect after we
×
UNCOV
4919
        // disconnect.
×
UNCOV
4920
        delete(s.persistentPeers, pubStr)
×
UNCOV
4921
        delete(s.persistentPeersBackoff, pubStr)
×
UNCOV
4922

×
UNCOV
4923
        // Remove the peer by calling Disconnect. Previously this was done with
×
UNCOV
4924
        // removePeer, which bypassed the peerTerminationWatcher.
×
UNCOV
4925
        peer.Disconnect(fmt.Errorf("server: DisconnectPeer called"))
×
UNCOV
4926

×
UNCOV
4927
        return nil
×
4928
}
4929

4930
// OpenChannel sends a request to the server to open a channel to the specified
4931
// peer identified by nodeKey with the passed channel funding parameters.
4932
//
4933
// NOTE: This function is safe for concurrent access.
4934
func (s *server) OpenChannel(
UNCOV
4935
        req *funding.InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) {
×
UNCOV
4936

×
UNCOV
4937
        // The updateChan will have a buffer of 2, since we expect a ChanPending
×
UNCOV
4938
        // + a ChanOpen update, and we want to make sure the funding process is
×
UNCOV
4939
        // not blocked if the caller is not reading the updates.
×
UNCOV
4940
        req.Updates = make(chan *lnrpc.OpenStatusUpdate, 2)
×
UNCOV
4941
        req.Err = make(chan error, 1)
×
UNCOV
4942

×
UNCOV
4943
        // First attempt to locate the target peer to open a channel with, if
×
UNCOV
4944
        // we're unable to locate the peer then this request will fail.
×
UNCOV
4945
        pubKeyBytes := req.TargetPubkey.SerializeCompressed()
×
UNCOV
4946
        s.mu.RLock()
×
UNCOV
4947
        peer, ok := s.peersByPub[string(pubKeyBytes)]
×
UNCOV
4948
        if !ok {
×
4949
                s.mu.RUnlock()
×
4950

×
4951
                req.Err <- fmt.Errorf("peer %x is not online", pubKeyBytes)
×
4952
                return req.Updates, req.Err
×
4953
        }
×
UNCOV
4954
        req.Peer = peer
×
UNCOV
4955
        s.mu.RUnlock()
×
UNCOV
4956

×
UNCOV
4957
        // We'll wait until the peer is active before beginning the channel
×
UNCOV
4958
        // opening process.
×
UNCOV
4959
        select {
×
UNCOV
4960
        case <-peer.ActiveSignal():
×
4961
        case <-peer.QuitSignal():
×
4962
                req.Err <- fmt.Errorf("peer %x disconnected", pubKeyBytes)
×
4963
                return req.Updates, req.Err
×
4964
        case <-s.quit:
×
4965
                req.Err <- ErrServerShuttingDown
×
4966
                return req.Updates, req.Err
×
4967
        }
4968

4969
        // If the fee rate wasn't specified at this point we fail the funding
4970
        // because of the missing fee rate information. The caller of the
4971
        // `OpenChannel` method needs to make sure that default values for the
4972
        // fee rate are set beforehand.
UNCOV
4973
        if req.FundingFeePerKw == 0 {
×
4974
                req.Err <- fmt.Errorf("no FundingFeePerKw specified for " +
×
4975
                        "the channel opening transaction")
×
4976

×
4977
                return req.Updates, req.Err
×
4978
        }
×
4979

4980
        // Spawn a goroutine to send the funding workflow request to the funding
4981
        // manager. This allows the server to continue handling queries instead
4982
        // of blocking on this request which is exported as a synchronous
4983
        // request to the outside world.
UNCOV
4984
        go s.fundingMgr.InitFundingWorkflow(req)
×
UNCOV
4985

×
UNCOV
4986
        return req.Updates, req.Err
×
4987
}
4988

4989
// Peers returns a slice of all active peers.
4990
//
4991
// NOTE: This function is safe for concurrent access.
UNCOV
4992
func (s *server) Peers() []*peer.Brontide {
×
UNCOV
4993
        s.mu.RLock()
×
UNCOV
4994
        defer s.mu.RUnlock()
×
UNCOV
4995

×
UNCOV
4996
        peers := make([]*peer.Brontide, 0, len(s.peersByPub))
×
UNCOV
4997
        for _, peer := range s.peersByPub {
×
UNCOV
4998
                peers = append(peers, peer)
×
UNCOV
4999
        }
×
5000

UNCOV
5001
        return peers
×
5002
}
5003

5004
// computeNextBackoff uses a truncated exponential backoff to compute the next
5005
// backoff using the value of the exiting backoff. The returned duration is
5006
// randomized in either direction by 1/20 to prevent tight loops from
5007
// stabilizing.
UNCOV
5008
func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration {
×
UNCOV
5009
        // Double the current backoff, truncating if it exceeds our maximum.
×
UNCOV
5010
        nextBackoff := 2 * currBackoff
×
UNCOV
5011
        if nextBackoff > maxBackoff {
×
UNCOV
5012
                nextBackoff = maxBackoff
×
UNCOV
5013
        }
×
5014

5015
        // Using 1/10 of our duration as a margin, compute a random offset to
5016
        // avoid the nodes entering connection cycles.
UNCOV
5017
        margin := nextBackoff / 10
×
UNCOV
5018

×
UNCOV
5019
        var wiggle big.Int
×
UNCOV
5020
        wiggle.SetUint64(uint64(margin))
×
UNCOV
5021
        if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
×
5022
                // Randomizing is not mission critical, so we'll just return the
×
5023
                // current backoff.
×
5024
                return nextBackoff
×
5025
        }
×
5026

5027
        // Otherwise add in our wiggle, but subtract out half of the margin so
5028
        // that the backoff can tweaked by 1/20 in either direction.
UNCOV
5029
        return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
×
5030
}
5031

5032
// errNoAdvertisedAddr is an error returned when we attempt to retrieve the
5033
// advertised address of a node, but they don't have one.
5034
var errNoAdvertisedAddr = errors.New("no advertised address found")
5035

5036
// fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a node.
UNCOV
5037
func (s *server) fetchNodeAdvertisedAddrs(pub *btcec.PublicKey) ([]net.Addr, error) {
×
UNCOV
5038
        vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed())
×
UNCOV
5039
        if err != nil {
×
5040
                return nil, err
×
5041
        }
×
5042

UNCOV
5043
        node, err := s.graphDB.FetchLightningNode(vertex)
×
UNCOV
5044
        if err != nil {
×
UNCOV
5045
                return nil, err
×
UNCOV
5046
        }
×
5047

UNCOV
5048
        if len(node.Addresses) == 0 {
×
UNCOV
5049
                return nil, errNoAdvertisedAddr
×
UNCOV
5050
        }
×
5051

UNCOV
5052
        return node.Addresses, nil
×
5053
}
5054

5055
// fetchLastChanUpdate returns a function which is able to retrieve our latest
5056
// channel update for a target channel.
5057
func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
UNCOV
5058
        *lnwire.ChannelUpdate1, error) {
×
UNCOV
5059

×
UNCOV
5060
        ourPubKey := s.identityECDH.PubKey().SerializeCompressed()
×
UNCOV
5061
        return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate1, error) {
×
UNCOV
5062
                info, edge1, edge2, err := s.graphBuilder.GetChannelByID(cid)
×
UNCOV
5063
                if err != nil {
×
UNCOV
5064
                        return nil, err
×
UNCOV
5065
                }
×
5066

UNCOV
5067
                return netann.ExtractChannelUpdate(
×
UNCOV
5068
                        ourPubKey[:], info, edge1, edge2,
×
UNCOV
5069
                )
×
5070
        }
5071
}
5072

5073
// applyChannelUpdate applies the channel update to the different sub-systems of
5074
// the server. The useAlias boolean denotes whether or not to send an alias in
5075
// place of the real SCID.
5076
func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate1,
UNCOV
5077
        op *wire.OutPoint, useAlias bool) error {
×
UNCOV
5078

×
UNCOV
5079
        var (
×
UNCOV
5080
                peerAlias    *lnwire.ShortChannelID
×
UNCOV
5081
                defaultAlias lnwire.ShortChannelID
×
UNCOV
5082
        )
×
UNCOV
5083

×
UNCOV
5084
        chanID := lnwire.NewChanIDFromOutPoint(*op)
×
UNCOV
5085

×
UNCOV
5086
        // Fetch the peer's alias from the lnwire.ChannelID so it can be used
×
UNCOV
5087
        // in the ChannelUpdate if it hasn't been announced yet.
×
UNCOV
5088
        if useAlias {
×
UNCOV
5089
                foundAlias, _ := s.aliasMgr.GetPeerAlias(chanID)
×
UNCOV
5090
                if foundAlias != defaultAlias {
×
UNCOV
5091
                        peerAlias = &foundAlias
×
UNCOV
5092
                }
×
5093
        }
5094

UNCOV
5095
        errChan := s.authGossiper.ProcessLocalAnnouncement(
×
UNCOV
5096
                update, discovery.RemoteAlias(peerAlias),
×
UNCOV
5097
        )
×
UNCOV
5098
        select {
×
UNCOV
5099
        case err := <-errChan:
×
UNCOV
5100
                return err
×
5101
        case <-s.quit:
×
5102
                return ErrServerShuttingDown
×
5103
        }
5104
}
5105

5106
// SendCustomMessage sends a custom message to the peer with the specified
5107
// pubkey.
5108
func (s *server) SendCustomMessage(peerPub [33]byte, msgType lnwire.MessageType,
UNCOV
5109
        data []byte) error {
×
UNCOV
5110

×
UNCOV
5111
        peer, err := s.FindPeerByPubStr(string(peerPub[:]))
×
UNCOV
5112
        if err != nil {
×
5113
                return err
×
5114
        }
×
5115

5116
        // We'll wait until the peer is active.
UNCOV
5117
        select {
×
UNCOV
5118
        case <-peer.ActiveSignal():
×
5119
        case <-peer.QuitSignal():
×
5120
                return fmt.Errorf("peer %x disconnected", peerPub)
×
5121
        case <-s.quit:
×
5122
                return ErrServerShuttingDown
×
5123
        }
5124

UNCOV
5125
        msg, err := lnwire.NewCustom(msgType, data)
×
UNCOV
5126
        if err != nil {
×
UNCOV
5127
                return err
×
UNCOV
5128
        }
×
5129

5130
        // Send the message as low-priority. For now we assume that all
5131
        // application-defined message are low priority.
UNCOV
5132
        return peer.SendMessageLazy(true, msg)
×
5133
}
5134

5135
// newSweepPkScriptGen creates closure that generates a new public key script
5136
// which should be used to sweep any funds into the on-chain wallet.
5137
// Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash
5138
// (p2wkh) output.
5139
func newSweepPkScriptGen(
5140
        wallet lnwallet.WalletController,
UNCOV
5141
        netParams *chaincfg.Params) func() fn.Result[lnwallet.AddrWithKey] {
×
UNCOV
5142

×
UNCOV
5143
        return func() fn.Result[lnwallet.AddrWithKey] {
×
UNCOV
5144
                sweepAddr, err := wallet.NewAddress(
×
UNCOV
5145
                        lnwallet.TaprootPubkey, false,
×
UNCOV
5146
                        lnwallet.DefaultAccountName,
×
UNCOV
5147
                )
×
UNCOV
5148
                if err != nil {
×
5149
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5150
                }
×
5151

UNCOV
5152
                addr, err := txscript.PayToAddrScript(sweepAddr)
×
UNCOV
5153
                if err != nil {
×
5154
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5155
                }
×
5156

UNCOV
5157
                internalKeyDesc, err := lnwallet.InternalKeyForAddr(
×
UNCOV
5158
                        wallet, netParams, addr,
×
UNCOV
5159
                )
×
UNCOV
5160
                if err != nil {
×
5161
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5162
                }
×
5163

UNCOV
5164
                return fn.Ok(lnwallet.AddrWithKey{
×
UNCOV
5165
                        DeliveryAddress: addr,
×
UNCOV
5166
                        InternalKey:     internalKeyDesc,
×
UNCOV
5167
                })
×
5168
        }
5169
}
5170

5171
// shouldPeerBootstrap returns true if we should attempt to perform peer
5172
// bootstrapping to actively seek our peers using the set of active network
5173
// bootstrappers.
5174
func shouldPeerBootstrap(cfg *Config) bool {
6✔
5175
        isSimnet := cfg.Bitcoin.SimNet
6✔
5176
        isSignet := cfg.Bitcoin.SigNet
6✔
5177
        isRegtest := cfg.Bitcoin.RegTest
6✔
5178
        isDevNetwork := isSimnet || isSignet || isRegtest
6✔
5179

6✔
5180
        // TODO(yy): remove the check on simnet/regtest such that the itest is
6✔
5181
        // covering the bootstrapping process.
6✔
5182
        return !cfg.NoNetBootstrap && !isDevNetwork
6✔
5183
}
6✔
5184

5185
// fetchClosedChannelSCIDs returns a set of SCIDs that have their force closing
5186
// finished.
UNCOV
5187
func (s *server) fetchClosedChannelSCIDs() map[lnwire.ShortChannelID]struct{} {
×
UNCOV
5188
        // Get a list of closed channels.
×
UNCOV
5189
        channels, err := s.chanStateDB.FetchClosedChannels(false)
×
UNCOV
5190
        if err != nil {
×
5191
                srvrLog.Errorf("Failed to fetch closed channels: %v", err)
×
5192
                return nil
×
5193
        }
×
5194

5195
        // Save the SCIDs in a map.
UNCOV
5196
        closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels))
×
UNCOV
5197
        for _, c := range channels {
×
UNCOV
5198
                // If the channel is not pending, its FC has been finalized.
×
UNCOV
5199
                if !c.IsPending {
×
UNCOV
5200
                        closedSCIDs[c.ShortChanID] = struct{}{}
×
UNCOV
5201
                }
×
5202
        }
5203

5204
        // Double check whether the reported closed channel has indeed finished
5205
        // closing.
5206
        //
5207
        // NOTE: There are misalignments regarding when a channel's FC is
5208
        // marked as finalized. We double check the pending channels to make
5209
        // sure the returned SCIDs are indeed terminated.
5210
        //
5211
        // TODO(yy): fix the misalignments in `FetchClosedChannels`.
UNCOV
5212
        pendings, err := s.chanStateDB.FetchPendingChannels()
×
UNCOV
5213
        if err != nil {
×
5214
                srvrLog.Errorf("Failed to fetch pending channels: %v", err)
×
5215
                return nil
×
5216
        }
×
5217

UNCOV
5218
        for _, c := range pendings {
×
UNCOV
5219
                if _, ok := closedSCIDs[c.ShortChannelID]; !ok {
×
UNCOV
5220
                        continue
×
5221
                }
5222

5223
                // If the channel is still reported as pending, remove it from
5224
                // the map.
5225
                delete(closedSCIDs, c.ShortChannelID)
×
5226

×
5227
                srvrLog.Warnf("Channel=%v is prematurely marked as finalized",
×
5228
                        c.ShortChannelID)
×
5229
        }
5230

UNCOV
5231
        return closedSCIDs
×
5232
}
5233

5234
// getStartingBeat returns the current beat. This is used during the startup to
5235
// initialize blockbeat consumers.
NEW
5236
func (s *server) getStartingBeat() (*chainio.Beat, error) {
×
NEW
5237
        // beat is the current blockbeat.
×
NEW
5238
        var beat *chainio.Beat
×
NEW
5239

×
NEW
5240
        // We should get a notification with the current best block immediately
×
NEW
5241
        // by passing a nil block.
×
NEW
5242
        blockEpochs, err := s.cc.ChainNotifier.RegisterBlockEpochNtfn(nil)
×
NEW
5243
        if err != nil {
×
NEW
5244
                return beat, fmt.Errorf("register block epoch ntfn: %w", err)
×
NEW
5245
        }
×
NEW
5246
        defer blockEpochs.Cancel()
×
NEW
5247

×
NEW
5248
        // We registered for the block epochs with a nil request. The notifier
×
NEW
5249
        // should send us the current best block immediately. So we need to
×
NEW
5250
        // wait for it here because we need to know the current best height.
×
NEW
5251
        select {
×
NEW
5252
        case bestBlock := <-blockEpochs.Epochs:
×
NEW
5253
                srvrLog.Infof("Received initial block %v at height %d",
×
NEW
5254
                        bestBlock.Hash, bestBlock.Height)
×
NEW
5255

×
NEW
5256
                // Update the current blockbeat.
×
NEW
5257
                beat = chainio.NewBeat(*bestBlock)
×
5258

NEW
5259
        case <-s.quit:
×
NEW
5260
                srvrLog.Debug("LND shutting down")
×
5261
        }
5262

NEW
5263
        return beat, nil
×
5264
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc