• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13567496470

27 Feb 2025 01:26PM UTC coverage: 58.757% (-0.1%) from 58.858%
13567496470

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

40 of 54 new or added lines in 4 files covered. (74.07%)

307 existing lines in 27 files now uncovered.

136396 of 232137 relevant lines covered (58.76%)

19208.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.96
/server.go
1
package lnd
2

3
import (
4
        "bytes"
5
        "context"
6
        "crypto/rand"
7
        "encoding/hex"
8
        "fmt"
9
        "math/big"
10
        prand "math/rand"
11
        "net"
12
        "strconv"
13
        "strings"
14
        "sync"
15
        "sync/atomic"
16
        "time"
17

18
        "github.com/btcsuite/btcd/btcec/v2"
19
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
20
        "github.com/btcsuite/btcd/btcutil"
21
        "github.com/btcsuite/btcd/chaincfg"
22
        "github.com/btcsuite/btcd/chaincfg/chainhash"
23
        "github.com/btcsuite/btcd/connmgr"
24
        "github.com/btcsuite/btcd/txscript"
25
        "github.com/btcsuite/btcd/wire"
26
        "github.com/go-errors/errors"
27
        sphinx "github.com/lightningnetwork/lightning-onion"
28
        "github.com/lightningnetwork/lnd/aliasmgr"
29
        "github.com/lightningnetwork/lnd/autopilot"
30
        "github.com/lightningnetwork/lnd/brontide"
31
        "github.com/lightningnetwork/lnd/chainio"
32
        "github.com/lightningnetwork/lnd/chainreg"
33
        "github.com/lightningnetwork/lnd/chanacceptor"
34
        "github.com/lightningnetwork/lnd/chanbackup"
35
        "github.com/lightningnetwork/lnd/chanfitness"
36
        "github.com/lightningnetwork/lnd/channeldb"
37
        "github.com/lightningnetwork/lnd/channelnotifier"
38
        "github.com/lightningnetwork/lnd/clock"
39
        "github.com/lightningnetwork/lnd/cluster"
40
        "github.com/lightningnetwork/lnd/contractcourt"
41
        "github.com/lightningnetwork/lnd/discovery"
42
        "github.com/lightningnetwork/lnd/feature"
43
        "github.com/lightningnetwork/lnd/fn/v2"
44
        "github.com/lightningnetwork/lnd/funding"
45
        "github.com/lightningnetwork/lnd/graph"
46
        graphdb "github.com/lightningnetwork/lnd/graph/db"
47
        "github.com/lightningnetwork/lnd/graph/db/models"
48
        "github.com/lightningnetwork/lnd/healthcheck"
49
        "github.com/lightningnetwork/lnd/htlcswitch"
50
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
51
        "github.com/lightningnetwork/lnd/input"
52
        "github.com/lightningnetwork/lnd/invoices"
53
        "github.com/lightningnetwork/lnd/keychain"
54
        "github.com/lightningnetwork/lnd/kvdb"
55
        "github.com/lightningnetwork/lnd/lncfg"
56
        "github.com/lightningnetwork/lnd/lnencrypt"
57
        "github.com/lightningnetwork/lnd/lnpeer"
58
        "github.com/lightningnetwork/lnd/lnrpc"
59
        "github.com/lightningnetwork/lnd/lnrpc/routerrpc"
60
        "github.com/lightningnetwork/lnd/lnwallet"
61
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
62
        "github.com/lightningnetwork/lnd/lnwallet/chanfunding"
63
        "github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
64
        "github.com/lightningnetwork/lnd/lnwire"
65
        "github.com/lightningnetwork/lnd/nat"
66
        "github.com/lightningnetwork/lnd/netann"
67
        "github.com/lightningnetwork/lnd/peer"
68
        "github.com/lightningnetwork/lnd/peernotifier"
69
        "github.com/lightningnetwork/lnd/pool"
70
        "github.com/lightningnetwork/lnd/queue"
71
        "github.com/lightningnetwork/lnd/routing"
72
        "github.com/lightningnetwork/lnd/routing/localchans"
73
        "github.com/lightningnetwork/lnd/routing/route"
74
        "github.com/lightningnetwork/lnd/subscribe"
75
        "github.com/lightningnetwork/lnd/sweep"
76
        "github.com/lightningnetwork/lnd/ticker"
77
        "github.com/lightningnetwork/lnd/tor"
78
        "github.com/lightningnetwork/lnd/walletunlocker"
79
        "github.com/lightningnetwork/lnd/watchtower/blob"
80
        "github.com/lightningnetwork/lnd/watchtower/wtclient"
81
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
82
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
83
)
84

85
const (
86
        // defaultMinPeers is the minimum number of peers nodes should always be
87
        // connected to.
88
        defaultMinPeers = 3
89

90
        // defaultStableConnDuration is a floor under which all reconnection
91
        // attempts will apply exponential randomized backoff. Connections
92
        // durations exceeding this value will be eligible to have their
93
        // backoffs reduced.
94
        defaultStableConnDuration = 10 * time.Minute
95

96
        // numInstantInitReconnect specifies how many persistent peers we should
97
        // always attempt outbound connections to immediately. After this value
98
        // is surpassed, the remaining peers will be randomly delayed using
99
        // maxInitReconnectDelay.
100
        numInstantInitReconnect = 10
101

102
        // maxInitReconnectDelay specifies the maximum delay in seconds we will
103
        // apply in attempting to reconnect to persistent peers on startup. The
104
        // value used or a particular peer will be chosen between 0s and this
105
        // value.
106
        maxInitReconnectDelay = 30
107

108
        // multiAddrConnectionStagger is the number of seconds to wait between
109
        // attempting to a peer with each of its advertised addresses.
110
        multiAddrConnectionStagger = 10 * time.Second
111
)
112

113
var (
114
        // ErrPeerNotConnected signals that the server has no connection to the
115
        // given peer.
116
        ErrPeerNotConnected = errors.New("peer is not connected")
117

118
        // ErrServerNotActive indicates that the server has started but hasn't
119
        // fully finished the startup process.
120
        ErrServerNotActive = errors.New("server is still in the process of " +
121
                "starting")
122

123
        // ErrServerShuttingDown indicates that the server is in the process of
124
        // gracefully exiting.
125
        ErrServerShuttingDown = errors.New("server is shutting down")
126

127
        // MaxFundingAmount is a soft-limit of the maximum channel size
128
        // currently accepted within the Lightning Protocol. This is
129
        // defined in BOLT-0002, and serves as an initial precautionary limit
130
        // while implementations are battle tested in the real world.
131
        //
132
        // At the moment, this value depends on which chain is active. It is set
133
        // to the value under the Bitcoin chain as default.
134
        //
135
        // TODO(roasbeef): add command line param to modify.
136
        MaxFundingAmount = funding.MaxBtcFundingAmount
137

138
        // EndorsementExperimentEnd is the time after which nodes should stop
139
        // propagating experimental endorsement signals.
140
        //
141
        // Per blip04: January 1, 2026 12:00:00 AM UTC in unix seconds.
142
        EndorsementExperimentEnd = time.Unix(1767225600, 0)
143
)
144

145
// errPeerAlreadyConnected is an error returned by the server when we're
146
// commanded to connect to a peer, but they're already connected.
147
type errPeerAlreadyConnected struct {
148
        peer *peer.Brontide
149
}
150

151
// Error returns the human readable version of this error type.
152
//
153
// NOTE: Part of the error interface.
154
func (e *errPeerAlreadyConnected) Error() string {
2✔
155
        return fmt.Sprintf("already connected to peer: %v", e.peer)
2✔
156
}
2✔
157

158
// server is the main server of the Lightning Network Daemon. The server houses
159
// global state pertaining to the wallet, database, and the rpcserver.
160
// Additionally, the server is also used as a central messaging bus to interact
161
// with any of its companion objects.
162
type server struct {
163
        active   int32 // atomic
164
        stopping int32 // atomic
165

166
        start sync.Once
167
        stop  sync.Once
168

169
        cfg *Config
170

171
        implCfg *ImplementationCfg
172

173
        // identityECDH is an ECDH capable wrapper for the private key used
174
        // to authenticate any incoming connections.
175
        identityECDH keychain.SingleKeyECDH
176

177
        // identityKeyLoc is the key locator for the above wrapped identity key.
178
        identityKeyLoc keychain.KeyLocator
179

180
        // nodeSigner is an implementation of the MessageSigner implementation
181
        // that's backed by the identity private key of the running lnd node.
182
        nodeSigner *netann.NodeSigner
183

184
        chanStatusMgr *netann.ChanStatusManager
185

186
        // listenAddrs is the list of addresses the server is currently
187
        // listening on.
188
        listenAddrs []net.Addr
189

190
        // torController is a client that will communicate with a locally
191
        // running Tor server. This client will handle initiating and
192
        // authenticating the connection to the Tor server, automatically
193
        // creating and setting up onion services, etc.
194
        torController *tor.Controller
195

196
        // natTraversal is the specific NAT traversal technique used to
197
        // automatically set up port forwarding rules in order to advertise to
198
        // the network that the node is accepting inbound connections.
199
        natTraversal nat.Traversal
200

201
        // lastDetectedIP is the last IP detected by the NAT traversal technique
202
        // above. This IP will be watched periodically in a goroutine in order
203
        // to handle dynamic IP changes.
204
        lastDetectedIP net.IP
205

206
        mu sync.RWMutex
207

208
        // peersByPub is a map of the active peers.
209
        //
210
        // NOTE: The key used here is the raw bytes of the peer's public key to
211
        // string conversion, which means it cannot be printed using `%s` as it
212
        // will just print the binary.
213
        //
214
        // TODO(yy): Use the hex string instead.
215
        peersByPub map[string]*peer.Brontide
216

217
        inboundPeers  map[string]*peer.Brontide
218
        outboundPeers map[string]*peer.Brontide
219

220
        peerConnectedListeners    map[string][]chan<- lnpeer.Peer
221
        peerDisconnectedListeners map[string][]chan<- struct{}
222

223
        // TODO(yy): the Brontide.Start doesn't know this value, which means it
224
        // will continue to send messages even if there are no active channels
225
        // and the value below is false. Once it's pruned, all its connections
226
        // will be closed, thus the Brontide.Start will return an error.
227
        persistentPeers        map[string]bool
228
        persistentPeersBackoff map[string]time.Duration
229
        persistentPeerAddrs    map[string][]*lnwire.NetAddress
230
        persistentConnReqs     map[string][]*connmgr.ConnReq
231
        persistentRetryCancels map[string]chan struct{}
232

233
        // peerErrors keeps a set of peer error buffers for peers that have
234
        // disconnected from us. This allows us to track historic peer errors
235
        // over connections. The string of the peer's compressed pubkey is used
236
        // as a key for this map.
237
        peerErrors map[string]*queue.CircularBuffer
238

239
        // ignorePeerTermination tracks peers for which the server has initiated
240
        // a disconnect. Adding a peer to this map causes the peer termination
241
        // watcher to short circuit in the event that peers are purposefully
242
        // disconnected.
243
        ignorePeerTermination map[*peer.Brontide]struct{}
244

245
        // scheduledPeerConnection maps a pubkey string to a callback that
246
        // should be executed in the peerTerminationWatcher the prior peer with
247
        // the same pubkey exits.  This allows the server to wait until the
248
        // prior peer has cleaned up successfully, before adding the new peer
249
        // intended to replace it.
250
        scheduledPeerConnection map[string]func()
251

252
        // pongBuf is a shared pong reply buffer we'll use across all active
253
        // peer goroutines. We know the max size of a pong message
254
        // (lnwire.MaxPongBytes), so we can allocate this ahead of time, and
255
        // avoid allocations each time we need to send a pong message.
256
        pongBuf []byte
257

258
        cc *chainreg.ChainControl
259

260
        fundingMgr *funding.Manager
261

262
        graphDB *graphdb.ChannelGraph
263

264
        chanStateDB *channeldb.ChannelStateDB
265

266
        addrSource channeldb.AddrSource
267

268
        // miscDB is the DB that contains all "other" databases within the main
269
        // channel DB that haven't been separated out yet.
270
        miscDB *channeldb.DB
271

272
        invoicesDB invoices.InvoiceDB
273

274
        aliasMgr *aliasmgr.Manager
275

276
        htlcSwitch *htlcswitch.Switch
277

278
        interceptableSwitch *htlcswitch.InterceptableSwitch
279

280
        invoices *invoices.InvoiceRegistry
281

282
        invoiceHtlcModifier *invoices.HtlcModificationInterceptor
283

284
        channelNotifier *channelnotifier.ChannelNotifier
285

286
        peerNotifier *peernotifier.PeerNotifier
287

288
        htlcNotifier *htlcswitch.HtlcNotifier
289

290
        witnessBeacon contractcourt.WitnessBeacon
291

292
        breachArbitrator *contractcourt.BreachArbitrator
293

294
        missionController *routing.MissionController
295
        defaultMC         *routing.MissionControl
296

297
        graphBuilder *graph.Builder
298

299
        chanRouter *routing.ChannelRouter
300

301
        controlTower routing.ControlTower
302

303
        authGossiper *discovery.AuthenticatedGossiper
304

305
        localChanMgr *localchans.Manager
306

307
        utxoNursery *contractcourt.UtxoNursery
308

309
        sweeper *sweep.UtxoSweeper
310

311
        chainArb *contractcourt.ChainArbitrator
312

313
        sphinx *hop.OnionProcessor
314

315
        towerClientMgr *wtclient.Manager
316

317
        connMgr *connmgr.ConnManager
318

319
        sigPool *lnwallet.SigPool
320

321
        writePool *pool.Write
322

323
        readPool *pool.Read
324

325
        tlsManager *TLSManager
326

327
        // featureMgr dispatches feature vectors for various contexts within the
328
        // daemon.
329
        featureMgr *feature.Manager
330

331
        // currentNodeAnn is the node announcement that has been broadcast to
332
        // the network upon startup, if the attributes of the node (us) has
333
        // changed since last start.
334
        currentNodeAnn *lnwire.NodeAnnouncement
335

336
        // chansToRestore is the set of channels that upon starting, the server
337
        // should attempt to restore/recover.
338
        chansToRestore walletunlocker.ChannelsToRecover
339

340
        // chanSubSwapper is a sub-system that will ensure our on-disk channel
341
        // backups are consistent at all times. It interacts with the
342
        // channelNotifier to be notified of newly opened and closed channels.
343
        chanSubSwapper *chanbackup.SubSwapper
344

345
        // chanEventStore tracks the behaviour of channels and their remote peers to
346
        // provide insights into their health and performance.
347
        chanEventStore *chanfitness.ChannelEventStore
348

349
        hostAnn *netann.HostAnnouncer
350

351
        // livenessMonitor monitors that lnd has access to critical resources.
352
        livenessMonitor *healthcheck.Monitor
353

354
        customMessageServer *subscribe.Server
355

356
        // txPublisher is a publisher with fee-bumping capability.
357
        txPublisher *sweep.TxPublisher
358

359
        // blockbeatDispatcher is a block dispatcher that notifies subscribers
360
        // of new blocks.
361
        blockbeatDispatcher *chainio.BlockbeatDispatcher
362

363
        quit chan struct{}
364

365
        wg sync.WaitGroup
366
}
367

368
// updatePersistentPeerAddrs subscribes to topology changes and stores
369
// advertised addresses for any NodeAnnouncements from our persisted peers.
370
func (s *server) updatePersistentPeerAddrs() error {
2✔
371
        graphSub, err := s.graphBuilder.SubscribeTopology()
2✔
372
        if err != nil {
2✔
373
                return err
×
374
        }
×
375

376
        s.wg.Add(1)
2✔
377
        go func() {
4✔
378
                defer func() {
4✔
379
                        graphSub.Cancel()
2✔
380
                        s.wg.Done()
2✔
381
                }()
2✔
382

383
                for {
4✔
384
                        select {
2✔
385
                        case <-s.quit:
2✔
386
                                return
2✔
387

388
                        case topChange, ok := <-graphSub.TopologyChanges:
2✔
389
                                // If the router is shutting down, then we will
2✔
390
                                // as well.
2✔
391
                                if !ok {
2✔
392
                                        return
×
393
                                }
×
394

395
                                for _, update := range topChange.NodeUpdates {
4✔
396
                                        pubKeyStr := string(
2✔
397
                                                update.IdentityKey.
2✔
398
                                                        SerializeCompressed(),
2✔
399
                                        )
2✔
400

2✔
401
                                        // We only care about updates from
2✔
402
                                        // our persistentPeers.
2✔
403
                                        s.mu.RLock()
2✔
404
                                        _, ok := s.persistentPeers[pubKeyStr]
2✔
405
                                        s.mu.RUnlock()
2✔
406
                                        if !ok {
4✔
407
                                                continue
2✔
408
                                        }
409

410
                                        addrs := make([]*lnwire.NetAddress, 0,
2✔
411
                                                len(update.Addresses))
2✔
412

2✔
413
                                        for _, addr := range update.Addresses {
4✔
414
                                                addrs = append(addrs,
2✔
415
                                                        &lnwire.NetAddress{
2✔
416
                                                                IdentityKey: update.IdentityKey,
2✔
417
                                                                Address:     addr,
2✔
418
                                                                ChainNet:    s.cfg.ActiveNetParams.Net,
2✔
419
                                                        },
2✔
420
                                                )
2✔
421
                                        }
2✔
422

423
                                        s.mu.Lock()
2✔
424

2✔
425
                                        // Update the stored addresses for this
2✔
426
                                        // to peer to reflect the new set.
2✔
427
                                        s.persistentPeerAddrs[pubKeyStr] = addrs
2✔
428

2✔
429
                                        // If there are no outstanding
2✔
430
                                        // connection requests for this peer
2✔
431
                                        // then our work is done since we are
2✔
432
                                        // not currently trying to connect to
2✔
433
                                        // them.
2✔
434
                                        if len(s.persistentConnReqs[pubKeyStr]) == 0 {
4✔
435
                                                s.mu.Unlock()
2✔
436
                                                continue
2✔
437
                                        }
438

439
                                        s.mu.Unlock()
2✔
440

2✔
441
                                        s.connectToPersistentPeer(pubKeyStr)
2✔
442
                                }
443
                        }
444
                }
445
        }()
446

447
        return nil
2✔
448
}
449

450
// CustomMessage is a custom message that is received from a peer.
451
type CustomMessage struct {
452
        // Peer is the peer pubkey
453
        Peer [33]byte
454

455
        // Msg is the custom wire message.
456
        Msg *lnwire.Custom
457
}
458

459
// parseAddr parses an address from its string format to a net.Addr.
460
func parseAddr(address string, netCfg tor.Net) (net.Addr, error) {
2✔
461
        var (
2✔
462
                host string
2✔
463
                port int
2✔
464
        )
2✔
465

2✔
466
        // Split the address into its host and port components.
2✔
467
        h, p, err := net.SplitHostPort(address)
2✔
468
        if err != nil {
2✔
469
                // If a port wasn't specified, we'll assume the address only
×
470
                // contains the host so we'll use the default port.
×
471
                host = address
×
472
                port = defaultPeerPort
×
473
        } else {
2✔
474
                // Otherwise, we'll note both the host and ports.
2✔
475
                host = h
2✔
476
                portNum, err := strconv.Atoi(p)
2✔
477
                if err != nil {
2✔
478
                        return nil, err
×
479
                }
×
480
                port = portNum
2✔
481
        }
482

483
        if tor.IsOnionHost(host) {
2✔
484
                return &tor.OnionAddr{OnionService: host, Port: port}, nil
×
485
        }
×
486

487
        // If the host is part of a TCP address, we'll use the network
488
        // specific ResolveTCPAddr function in order to resolve these
489
        // addresses over Tor in order to prevent leaking your real IP
490
        // address.
491
        hostPort := net.JoinHostPort(host, strconv.Itoa(port))
2✔
492
        return netCfg.ResolveTCPAddr("tcp", hostPort)
2✔
493
}
494

495
// noiseDial is a factory function which creates a connmgr compliant dialing
496
// function by returning a closure which includes the server's identity key.
497
func noiseDial(idKey keychain.SingleKeyECDH,
498
        netCfg tor.Net, timeout time.Duration) func(net.Addr) (net.Conn, error) {
2✔
499

2✔
500
        return func(a net.Addr) (net.Conn, error) {
4✔
501
                lnAddr := a.(*lnwire.NetAddress)
2✔
502
                return brontide.Dial(idKey, lnAddr, timeout, netCfg.Dial)
2✔
503
        }
2✔
504
}
505

506
// newServer creates a new instance of the server which is to listen using the
507
// passed listener address.
508
func newServer(cfg *Config, listenAddrs []net.Addr,
509
        dbs *DatabaseInstances, cc *chainreg.ChainControl,
510
        nodeKeyDesc *keychain.KeyDescriptor,
511
        chansToRestore walletunlocker.ChannelsToRecover,
512
        chanPredicate chanacceptor.ChannelAcceptor,
513
        torController *tor.Controller, tlsManager *TLSManager,
514
        leaderElector cluster.LeaderElector,
515
        implCfg *ImplementationCfg) (*server, error) {
2✔
516

2✔
517
        var (
2✔
518
                err         error
2✔
519
                nodeKeyECDH = keychain.NewPubKeyECDH(*nodeKeyDesc, cc.KeyRing)
2✔
520

2✔
521
                // We just derived the full descriptor, so we know the public
2✔
522
                // key is set on it.
2✔
523
                nodeKeySigner = keychain.NewPubKeyMessageSigner(
2✔
524
                        nodeKeyDesc.PubKey, nodeKeyDesc.KeyLocator, cc.KeyRing,
2✔
525
                )
2✔
526
        )
2✔
527

2✔
528
        listeners := make([]net.Listener, len(listenAddrs))
2✔
529
        for i, listenAddr := range listenAddrs {
4✔
530
                // Note: though brontide.NewListener uses ResolveTCPAddr, it
2✔
531
                // doesn't need to call the general lndResolveTCP function
2✔
532
                // since we are resolving a local address.
2✔
533
                listeners[i], err = brontide.NewListener(
2✔
534
                        nodeKeyECDH, listenAddr.String(),
2✔
535
                )
2✔
536
                if err != nil {
2✔
537
                        return nil, err
×
538
                }
×
539
        }
540

541
        var serializedPubKey [33]byte
2✔
542
        copy(serializedPubKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
2✔
543

2✔
544
        netParams := cfg.ActiveNetParams.Params
2✔
545

2✔
546
        // Initialize the sphinx router.
2✔
547
        replayLog := htlcswitch.NewDecayedLog(
2✔
548
                dbs.DecayedLogDB, cc.ChainNotifier,
2✔
549
        )
2✔
550
        sphinxRouter := sphinx.NewRouter(nodeKeyECDH, replayLog)
2✔
551

2✔
552
        writeBufferPool := pool.NewWriteBuffer(
2✔
553
                pool.DefaultWriteBufferGCInterval,
2✔
554
                pool.DefaultWriteBufferExpiryInterval,
2✔
555
        )
2✔
556

2✔
557
        writePool := pool.NewWrite(
2✔
558
                writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout,
2✔
559
        )
2✔
560

2✔
561
        readBufferPool := pool.NewReadBuffer(
2✔
562
                pool.DefaultReadBufferGCInterval,
2✔
563
                pool.DefaultReadBufferExpiryInterval,
2✔
564
        )
2✔
565

2✔
566
        readPool := pool.NewRead(
2✔
567
                readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout,
2✔
568
        )
2✔
569

2✔
570
        // If the taproot overlay flag is set, but we don't have an aux funding
2✔
571
        // controller, then we'll exit as this is incompatible.
2✔
572
        if cfg.ProtocolOptions.TaprootOverlayChans &&
2✔
573
                implCfg.AuxFundingController.IsNone() {
2✔
574

×
575
                return nil, fmt.Errorf("taproot overlay flag set, but not " +
×
576
                        "aux controllers")
×
577
        }
×
578

579
        //nolint:ll
580
        featureMgr, err := feature.NewManager(feature.Config{
2✔
581
                NoTLVOnion:                cfg.ProtocolOptions.LegacyOnion(),
2✔
582
                NoStaticRemoteKey:         cfg.ProtocolOptions.NoStaticRemoteKey(),
2✔
583
                NoAnchors:                 cfg.ProtocolOptions.NoAnchorCommitments(),
2✔
584
                NoWumbo:                   !cfg.ProtocolOptions.Wumbo(),
2✔
585
                NoScriptEnforcementLease:  cfg.ProtocolOptions.NoScriptEnforcementLease(),
2✔
586
                NoKeysend:                 !cfg.AcceptKeySend,
2✔
587
                NoOptionScidAlias:         !cfg.ProtocolOptions.ScidAlias(),
2✔
588
                NoZeroConf:                !cfg.ProtocolOptions.ZeroConf(),
2✔
589
                NoAnySegwit:               cfg.ProtocolOptions.NoAnySegwit(),
2✔
590
                CustomFeatures:            cfg.ProtocolOptions.CustomFeatures(),
2✔
591
                NoTaprootChans:            !cfg.ProtocolOptions.TaprootChans,
2✔
592
                NoTaprootOverlay:          !cfg.ProtocolOptions.TaprootOverlayChans,
2✔
593
                NoRouteBlinding:           cfg.ProtocolOptions.NoRouteBlinding(),
2✔
594
                NoExperimentalEndorsement: cfg.ProtocolOptions.NoExperimentalEndorsement(),
2✔
595
                NoQuiescence:              cfg.ProtocolOptions.NoQuiescence(),
2✔
596
        })
2✔
597
        if err != nil {
2✔
598
                return nil, err
×
599
        }
×
600

601
        invoiceHtlcModifier := invoices.NewHtlcModificationInterceptor()
2✔
602
        registryConfig := invoices.RegistryConfig{
2✔
603
                FinalCltvRejectDelta:        lncfg.DefaultFinalCltvRejectDelta,
2✔
604
                HtlcHoldDuration:            invoices.DefaultHtlcHoldDuration,
2✔
605
                Clock:                       clock.NewDefaultClock(),
2✔
606
                AcceptKeySend:               cfg.AcceptKeySend,
2✔
607
                AcceptAMP:                   cfg.AcceptAMP,
2✔
608
                GcCanceledInvoicesOnStartup: cfg.GcCanceledInvoicesOnStartup,
2✔
609
                GcCanceledInvoicesOnTheFly:  cfg.GcCanceledInvoicesOnTheFly,
2✔
610
                KeysendHoldTime:             cfg.KeysendHoldTime,
2✔
611
                HtlcInterceptor:             invoiceHtlcModifier,
2✔
612
        }
2✔
613

2✔
614
        addrSource := channeldb.NewMultiAddrSource(dbs.ChanStateDB, dbs.GraphDB)
2✔
615

2✔
616
        s := &server{
2✔
617
                cfg:            cfg,
2✔
618
                implCfg:        implCfg,
2✔
619
                graphDB:        dbs.GraphDB,
2✔
620
                chanStateDB:    dbs.ChanStateDB.ChannelStateDB(),
2✔
621
                addrSource:     addrSource,
2✔
622
                miscDB:         dbs.ChanStateDB,
2✔
623
                invoicesDB:     dbs.InvoiceDB,
2✔
624
                cc:             cc,
2✔
625
                sigPool:        lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer),
2✔
626
                writePool:      writePool,
2✔
627
                readPool:       readPool,
2✔
628
                chansToRestore: chansToRestore,
2✔
629

2✔
630
                blockbeatDispatcher: chainio.NewBlockbeatDispatcher(
2✔
631
                        cc.ChainNotifier,
2✔
632
                ),
2✔
633
                channelNotifier: channelnotifier.New(
2✔
634
                        dbs.ChanStateDB.ChannelStateDB(),
2✔
635
                ),
2✔
636

2✔
637
                identityECDH:   nodeKeyECDH,
2✔
638
                identityKeyLoc: nodeKeyDesc.KeyLocator,
2✔
639
                nodeSigner:     netann.NewNodeSigner(nodeKeySigner),
2✔
640

2✔
641
                listenAddrs: listenAddrs,
2✔
642

2✔
643
                // TODO(roasbeef): derive proper onion key based on rotation
2✔
644
                // schedule
2✔
645
                sphinx: hop.NewOnionProcessor(sphinxRouter),
2✔
646

2✔
647
                torController: torController,
2✔
648

2✔
649
                persistentPeers:         make(map[string]bool),
2✔
650
                persistentPeersBackoff:  make(map[string]time.Duration),
2✔
651
                persistentConnReqs:      make(map[string][]*connmgr.ConnReq),
2✔
652
                persistentPeerAddrs:     make(map[string][]*lnwire.NetAddress),
2✔
653
                persistentRetryCancels:  make(map[string]chan struct{}),
2✔
654
                peerErrors:              make(map[string]*queue.CircularBuffer),
2✔
655
                ignorePeerTermination:   make(map[*peer.Brontide]struct{}),
2✔
656
                scheduledPeerConnection: make(map[string]func()),
2✔
657
                pongBuf:                 make([]byte, lnwire.MaxPongBytes),
2✔
658

2✔
659
                peersByPub:                make(map[string]*peer.Brontide),
2✔
660
                inboundPeers:              make(map[string]*peer.Brontide),
2✔
661
                outboundPeers:             make(map[string]*peer.Brontide),
2✔
662
                peerConnectedListeners:    make(map[string][]chan<- lnpeer.Peer),
2✔
663
                peerDisconnectedListeners: make(map[string][]chan<- struct{}),
2✔
664

2✔
665
                invoiceHtlcModifier: invoiceHtlcModifier,
2✔
666

2✔
667
                customMessageServer: subscribe.NewServer(),
2✔
668

2✔
669
                tlsManager: tlsManager,
2✔
670

2✔
671
                featureMgr: featureMgr,
2✔
672
                quit:       make(chan struct{}),
2✔
673
        }
2✔
674

2✔
675
        // Start the low-level services once they are initialized.
2✔
676
        //
2✔
677
        // TODO(yy): break the server startup into four steps,
2✔
678
        // 1. init the low-level services.
2✔
679
        // 2. start the low-level services.
2✔
680
        // 3. init the high-level services.
2✔
681
        // 4. start the high-level services.
2✔
682
        if err := s.startLowLevelServices(); err != nil {
2✔
683
                return nil, err
×
684
        }
×
685

686
        currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock()
2✔
687
        if err != nil {
2✔
688
                return nil, err
×
689
        }
×
690

691
        expiryWatcher := invoices.NewInvoiceExpiryWatcher(
2✔
692
                clock.NewDefaultClock(), cfg.Invoices.HoldExpiryDelta,
2✔
693
                uint32(currentHeight), currentHash, cc.ChainNotifier,
2✔
694
        )
2✔
695
        s.invoices = invoices.NewRegistry(
2✔
696
                dbs.InvoiceDB, expiryWatcher, &registryConfig,
2✔
697
        )
2✔
698

2✔
699
        s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
2✔
700

2✔
701
        thresholdSats := btcutil.Amount(cfg.MaxFeeExposure)
2✔
702
        thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
2✔
703

2✔
704
        linkUpdater := func(shortID lnwire.ShortChannelID) error {
4✔
705
                link, err := s.htlcSwitch.GetLinkByShortID(shortID)
2✔
706
                if err != nil {
2✔
707
                        return err
×
708
                }
×
709

710
                s.htlcSwitch.UpdateLinkAliases(link)
2✔
711

2✔
712
                return nil
2✔
713
        }
714

715
        s.aliasMgr, err = aliasmgr.NewManager(dbs.ChanStateDB, linkUpdater)
2✔
716
        if err != nil {
2✔
717
                return nil, err
×
718
        }
×
719

720
        s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{
2✔
721
                DB:                   dbs.ChanStateDB,
2✔
722
                FetchAllOpenChannels: s.chanStateDB.FetchAllOpenChannels,
2✔
723
                FetchAllChannels:     s.chanStateDB.FetchAllChannels,
2✔
724
                FetchClosedChannels:  s.chanStateDB.FetchClosedChannels,
2✔
725
                LocalChannelClose: func(pubKey []byte,
2✔
726
                        request *htlcswitch.ChanClose) {
4✔
727

2✔
728
                        peer, err := s.FindPeerByPubStr(string(pubKey))
2✔
729
                        if err != nil {
2✔
730
                                srvrLog.Errorf("unable to close channel, peer"+
×
731
                                        " with %v id can't be found: %v",
×
732
                                        pubKey, err,
×
733
                                )
×
734
                                return
×
735
                        }
×
736

737
                        peer.HandleLocalCloseChanReqs(request)
2✔
738
                },
739
                FwdingLog:              dbs.ChanStateDB.ForwardingLog(),
740
                SwitchPackager:         channeldb.NewSwitchPackager(),
741
                ExtractErrorEncrypter:  s.sphinx.ExtractErrorEncrypter,
742
                FetchLastChannelUpdate: s.fetchLastChanUpdate(),
743
                Notifier:               s.cc.ChainNotifier,
744
                HtlcNotifier:           s.htlcNotifier,
745
                FwdEventTicker:         ticker.New(htlcswitch.DefaultFwdEventInterval),
746
                LogEventTicker:         ticker.New(htlcswitch.DefaultLogInterval),
747
                AckEventTicker:         ticker.New(htlcswitch.DefaultAckInterval),
748
                AllowCircularRoute:     cfg.AllowCircularRoute,
749
                RejectHTLC:             cfg.RejectHTLC,
750
                Clock:                  clock.NewDefaultClock(),
751
                MailboxDeliveryTimeout: cfg.Htlcswitch.MailboxDeliveryTimeout,
752
                MaxFeeExposure:         thresholdMSats,
753
                SignAliasUpdate:        s.signAliasUpdate,
754
                IsAlias:                aliasmgr.IsAlias,
755
        }, uint32(currentHeight))
756
        if err != nil {
2✔
757
                return nil, err
×
758
        }
×
759
        s.interceptableSwitch, err = htlcswitch.NewInterceptableSwitch(
2✔
760
                &htlcswitch.InterceptableSwitchConfig{
2✔
761
                        Switch:             s.htlcSwitch,
2✔
762
                        CltvRejectDelta:    lncfg.DefaultFinalCltvRejectDelta,
2✔
763
                        CltvInterceptDelta: lncfg.DefaultCltvInterceptDelta,
2✔
764
                        RequireInterceptor: s.cfg.RequireInterceptor,
2✔
765
                        Notifier:           s.cc.ChainNotifier,
2✔
766
                },
2✔
767
        )
2✔
768
        if err != nil {
2✔
769
                return nil, err
×
770
        }
×
771

772
        s.witnessBeacon = newPreimageBeacon(
2✔
773
                dbs.ChanStateDB.NewWitnessCache(),
2✔
774
                s.interceptableSwitch.ForwardPacket,
2✔
775
        )
2✔
776

2✔
777
        chanStatusMgrCfg := &netann.ChanStatusConfig{
2✔
778
                ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
2✔
779
                ChanEnableTimeout:        cfg.ChanEnableTimeout,
2✔
780
                ChanDisableTimeout:       cfg.ChanDisableTimeout,
2✔
781
                OurPubKey:                nodeKeyDesc.PubKey,
2✔
782
                OurKeyLoc:                nodeKeyDesc.KeyLocator,
2✔
783
                MessageSigner:            s.nodeSigner,
2✔
784
                IsChannelActive:          s.htlcSwitch.HasActiveLink,
2✔
785
                ApplyChannelUpdate:       s.applyChannelUpdate,
2✔
786
                DB:                       s.chanStateDB,
2✔
787
                Graph:                    dbs.GraphDB,
2✔
788
        }
2✔
789

2✔
790
        chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
2✔
791
        if err != nil {
2✔
792
                return nil, err
×
793
        }
×
794
        s.chanStatusMgr = chanStatusMgr
2✔
795

2✔
796
        // If enabled, use either UPnP or NAT-PMP to automatically configure
2✔
797
        // port forwarding for users behind a NAT.
2✔
798
        if cfg.NAT {
2✔
799
                srvrLog.Info("Scanning local network for a UPnP enabled device")
×
800

×
801
                discoveryTimeout := time.Duration(10 * time.Second)
×
802

×
803
                ctx, cancel := context.WithTimeout(
×
804
                        context.Background(), discoveryTimeout,
×
805
                )
×
806
                defer cancel()
×
807
                upnp, err := nat.DiscoverUPnP(ctx)
×
808
                if err == nil {
×
809
                        s.natTraversal = upnp
×
810
                } else {
×
811
                        // If we were not able to discover a UPnP enabled device
×
812
                        // on the local network, we'll fall back to attempting
×
813
                        // to discover a NAT-PMP enabled device.
×
814
                        srvrLog.Errorf("Unable to discover a UPnP enabled "+
×
815
                                "device on the local network: %v", err)
×
816

×
817
                        srvrLog.Info("Scanning local network for a NAT-PMP " +
×
818
                                "enabled device")
×
819

×
820
                        pmp, err := nat.DiscoverPMP(discoveryTimeout)
×
821
                        if err != nil {
×
822
                                err := fmt.Errorf("unable to discover a "+
×
823
                                        "NAT-PMP enabled device on the local "+
×
824
                                        "network: %v", err)
×
825
                                srvrLog.Error(err)
×
826
                                return nil, err
×
827
                        }
×
828

829
                        s.natTraversal = pmp
×
830
                }
831
        }
832

833
        // If we were requested to automatically configure port forwarding,
834
        // we'll use the ports that the server will be listening on.
835
        externalIPStrings := make([]string, len(cfg.ExternalIPs))
2✔
836
        for idx, ip := range cfg.ExternalIPs {
4✔
837
                externalIPStrings[idx] = ip.String()
2✔
838
        }
2✔
839
        if s.natTraversal != nil {
2✔
840
                listenPorts := make([]uint16, 0, len(listenAddrs))
×
841
                for _, listenAddr := range listenAddrs {
×
842
                        // At this point, the listen addresses should have
×
843
                        // already been normalized, so it's safe to ignore the
×
844
                        // errors.
×
845
                        _, portStr, _ := net.SplitHostPort(listenAddr.String())
×
846
                        port, _ := strconv.Atoi(portStr)
×
847

×
848
                        listenPorts = append(listenPorts, uint16(port))
×
849
                }
×
850

851
                ips, err := s.configurePortForwarding(listenPorts...)
×
852
                if err != nil {
×
853
                        srvrLog.Errorf("Unable to automatically set up port "+
×
854
                                "forwarding using %s: %v",
×
855
                                s.natTraversal.Name(), err)
×
856
                } else {
×
857
                        srvrLog.Infof("Automatically set up port forwarding "+
×
858
                                "using %s to advertise external IP",
×
859
                                s.natTraversal.Name())
×
860
                        externalIPStrings = append(externalIPStrings, ips...)
×
861
                }
×
862
        }
863

864
        // If external IP addresses have been specified, add those to the list
865
        // of this server's addresses.
866
        externalIPs, err := lncfg.NormalizeAddresses(
2✔
867
                externalIPStrings, strconv.Itoa(defaultPeerPort),
2✔
868
                cfg.net.ResolveTCPAddr,
2✔
869
        )
2✔
870
        if err != nil {
2✔
871
                return nil, err
×
872
        }
×
873

874
        selfAddrs := make([]net.Addr, 0, len(externalIPs))
2✔
875
        selfAddrs = append(selfAddrs, externalIPs...)
2✔
876

2✔
877
        // We'll now reconstruct a node announcement based on our current
2✔
878
        // configuration so we can send it out as a sort of heart beat within
2✔
879
        // the network.
2✔
880
        //
2✔
881
        // We'll start by parsing the node color from configuration.
2✔
882
        color, err := lncfg.ParseHexColor(cfg.Color)
2✔
883
        if err != nil {
2✔
884
                srvrLog.Errorf("unable to parse color: %v\n", err)
×
885
                return nil, err
×
886
        }
×
887

888
        // If no alias is provided, default to first 10 characters of public
889
        // key.
890
        alias := cfg.Alias
2✔
891
        if alias == "" {
4✔
892
                alias = hex.EncodeToString(serializedPubKey[:10])
2✔
893
        }
2✔
894
        nodeAlias, err := lnwire.NewNodeAlias(alias)
2✔
895
        if err != nil {
2✔
896
                return nil, err
×
897
        }
×
898
        selfNode := &models.LightningNode{
2✔
899
                HaveNodeAnnouncement: true,
2✔
900
                LastUpdate:           time.Now(),
2✔
901
                Addresses:            selfAddrs,
2✔
902
                Alias:                nodeAlias.String(),
2✔
903
                Features:             s.featureMgr.Get(feature.SetNodeAnn),
2✔
904
                Color:                color,
2✔
905
        }
2✔
906
        copy(selfNode.PubKeyBytes[:], nodeKeyDesc.PubKey.SerializeCompressed())
2✔
907

2✔
908
        // Based on the disk representation of the node announcement generated
2✔
909
        // above, we'll generate a node announcement that can go out on the
2✔
910
        // network so we can properly sign it.
2✔
911
        nodeAnn, err := selfNode.NodeAnnouncement(false)
2✔
912
        if err != nil {
2✔
913
                return nil, fmt.Errorf("unable to gen self node ann: %w", err)
×
914
        }
×
915

916
        // With the announcement generated, we'll sign it to properly
917
        // authenticate the message on the network.
918
        authSig, err := netann.SignAnnouncement(
2✔
919
                s.nodeSigner, nodeKeyDesc.KeyLocator, nodeAnn,
2✔
920
        )
2✔
921
        if err != nil {
2✔
922
                return nil, fmt.Errorf("unable to generate signature for "+
×
923
                        "self node announcement: %v", err)
×
924
        }
×
925
        selfNode.AuthSigBytes = authSig.Serialize()
2✔
926
        nodeAnn.Signature, err = lnwire.NewSigFromECDSARawSignature(
2✔
927
                selfNode.AuthSigBytes,
2✔
928
        )
2✔
929
        if err != nil {
2✔
930
                return nil, err
×
931
        }
×
932

933
        // Finally, we'll update the representation on disk, and update our
934
        // cached in-memory version as well.
935
        if err := dbs.GraphDB.SetSourceNode(selfNode); err != nil {
2✔
936
                return nil, fmt.Errorf("can't set self node: %w", err)
×
937
        }
×
938
        s.currentNodeAnn = nodeAnn
2✔
939

2✔
940
        // The router will get access to the payment ID sequencer, such that it
2✔
941
        // can generate unique payment IDs.
2✔
942
        sequencer, err := htlcswitch.NewPersistentSequencer(dbs.ChanStateDB)
2✔
943
        if err != nil {
2✔
944
                return nil, err
×
945
        }
×
946

947
        // Instantiate mission control with config from the sub server.
948
        //
949
        // TODO(joostjager): When we are further in the process of moving to sub
950
        // servers, the mission control instance itself can be moved there too.
951
        routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
2✔
952

2✔
953
        // We only initialize a probability estimator if there's no custom one.
2✔
954
        var estimator routing.Estimator
2✔
955
        if cfg.Estimator != nil {
2✔
956
                estimator = cfg.Estimator
×
957
        } else {
2✔
958
                switch routingConfig.ProbabilityEstimatorType {
2✔
959
                case routing.AprioriEstimatorName:
2✔
960
                        aCfg := routingConfig.AprioriConfig
2✔
961
                        aprioriConfig := routing.AprioriConfig{
2✔
962
                                AprioriHopProbability: aCfg.HopProbability,
2✔
963
                                PenaltyHalfLife:       aCfg.PenaltyHalfLife,
2✔
964
                                AprioriWeight:         aCfg.Weight,
2✔
965
                                CapacityFraction:      aCfg.CapacityFraction,
2✔
966
                        }
2✔
967

2✔
968
                        estimator, err = routing.NewAprioriEstimator(
2✔
969
                                aprioriConfig,
2✔
970
                        )
2✔
971
                        if err != nil {
2✔
972
                                return nil, err
×
973
                        }
×
974

975
                case routing.BimodalEstimatorName:
×
976
                        bCfg := routingConfig.BimodalConfig
×
977
                        bimodalConfig := routing.BimodalConfig{
×
978
                                BimodalNodeWeight: bCfg.NodeWeight,
×
979
                                BimodalScaleMsat: lnwire.MilliSatoshi(
×
980
                                        bCfg.Scale,
×
981
                                ),
×
982
                                BimodalDecayTime: bCfg.DecayTime,
×
983
                        }
×
984

×
985
                        estimator, err = routing.NewBimodalEstimator(
×
986
                                bimodalConfig,
×
987
                        )
×
988
                        if err != nil {
×
989
                                return nil, err
×
990
                        }
×
991

992
                default:
×
993
                        return nil, fmt.Errorf("unknown estimator type %v",
×
994
                                routingConfig.ProbabilityEstimatorType)
×
995
                }
996
        }
997

998
        mcCfg := &routing.MissionControlConfig{
2✔
999
                OnConfigUpdate:          fn.Some(s.UpdateRoutingConfig),
2✔
1000
                Estimator:               estimator,
2✔
1001
                MaxMcHistory:            routingConfig.MaxMcHistory,
2✔
1002
                McFlushInterval:         routingConfig.McFlushInterval,
2✔
1003
                MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
2✔
1004
        }
2✔
1005

2✔
1006
        s.missionController, err = routing.NewMissionController(
2✔
1007
                dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg,
2✔
1008
        )
2✔
1009
        if err != nil {
2✔
1010
                return nil, fmt.Errorf("can't create mission control "+
×
1011
                        "manager: %w", err)
×
1012
        }
×
1013
        s.defaultMC, err = s.missionController.GetNamespacedStore(
2✔
1014
                routing.DefaultMissionControlNamespace,
2✔
1015
        )
2✔
1016
        if err != nil {
2✔
1017
                return nil, fmt.Errorf("can't create mission control in the "+
×
1018
                        "default namespace: %w", err)
×
1019
        }
×
1020

1021
        srvrLog.Debugf("Instantiating payment session source with config: "+
2✔
1022
                "AttemptCost=%v + %v%%, MinRouteProbability=%v",
2✔
1023
                int64(routingConfig.AttemptCost),
2✔
1024
                float64(routingConfig.AttemptCostPPM)/10000,
2✔
1025
                routingConfig.MinRouteProbability)
2✔
1026

2✔
1027
        pathFindingConfig := routing.PathFindingConfig{
2✔
1028
                AttemptCost: lnwire.NewMSatFromSatoshis(
2✔
1029
                        routingConfig.AttemptCost,
2✔
1030
                ),
2✔
1031
                AttemptCostPPM: routingConfig.AttemptCostPPM,
2✔
1032
                MinProbability: routingConfig.MinRouteProbability,
2✔
1033
        }
2✔
1034

2✔
1035
        sourceNode, err := dbs.GraphDB.SourceNode()
2✔
1036
        if err != nil {
2✔
1037
                return nil, fmt.Errorf("error getting source node: %w", err)
×
1038
        }
×
1039
        paymentSessionSource := &routing.SessionSource{
2✔
1040
                GraphSessionFactory: dbs.GraphDB,
2✔
1041
                SourceNode:          sourceNode,
2✔
1042
                MissionControl:      s.defaultMC,
2✔
1043
                GetLink:             s.htlcSwitch.GetLinkByShortID,
2✔
1044
                PathFindingConfig:   pathFindingConfig,
2✔
1045
        }
2✔
1046

2✔
1047
        paymentControl := channeldb.NewPaymentControl(dbs.ChanStateDB)
2✔
1048

2✔
1049
        s.controlTower = routing.NewControlTower(paymentControl)
2✔
1050

2✔
1051
        strictPruning := cfg.Bitcoin.Node == "neutrino" ||
2✔
1052
                cfg.Routing.StrictZombiePruning
2✔
1053

2✔
1054
        s.graphBuilder, err = graph.NewBuilder(&graph.Config{
2✔
1055
                SelfNode:            selfNode.PubKeyBytes,
2✔
1056
                Graph:               dbs.GraphDB,
2✔
1057
                Chain:               cc.ChainIO,
2✔
1058
                ChainView:           cc.ChainView,
2✔
1059
                Notifier:            cc.ChainNotifier,
2✔
1060
                ChannelPruneExpiry:  graph.DefaultChannelPruneExpiry,
2✔
1061
                GraphPruneInterval:  time.Hour,
2✔
1062
                FirstTimePruneDelay: graph.DefaultFirstTimePruneDelay,
2✔
1063
                AssumeChannelValid:  cfg.Routing.AssumeChannelValid,
2✔
1064
                StrictZombiePruning: strictPruning,
2✔
1065
                IsAlias:             aliasmgr.IsAlias,
2✔
1066
        })
2✔
1067
        if err != nil {
2✔
1068
                return nil, fmt.Errorf("can't create graph builder: %w", err)
×
1069
        }
×
1070

1071
        s.chanRouter, err = routing.New(routing.Config{
2✔
1072
                SelfNode:           selfNode.PubKeyBytes,
2✔
1073
                RoutingGraph:       dbs.GraphDB,
2✔
1074
                Chain:              cc.ChainIO,
2✔
1075
                Payer:              s.htlcSwitch,
2✔
1076
                Control:            s.controlTower,
2✔
1077
                MissionControl:     s.defaultMC,
2✔
1078
                SessionSource:      paymentSessionSource,
2✔
1079
                GetLink:            s.htlcSwitch.GetLinkByShortID,
2✔
1080
                NextPaymentID:      sequencer.NextID,
2✔
1081
                PathFindingConfig:  pathFindingConfig,
2✔
1082
                Clock:              clock.NewDefaultClock(),
2✔
1083
                ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate,
2✔
1084
                ClosedSCIDs:        s.fetchClosedChannelSCIDs(),
2✔
1085
                TrafficShaper:      implCfg.TrafficShaper,
2✔
1086
        })
2✔
1087
        if err != nil {
2✔
1088
                return nil, fmt.Errorf("can't create router: %w", err)
×
1089
        }
×
1090

1091
        chanSeries := discovery.NewChanSeries(s.graphDB)
2✔
1092
        gossipMessageStore, err := discovery.NewMessageStore(dbs.ChanStateDB)
2✔
1093
        if err != nil {
2✔
1094
                return nil, err
×
1095
        }
×
1096
        waitingProofStore, err := channeldb.NewWaitingProofStore(dbs.ChanStateDB)
2✔
1097
        if err != nil {
2✔
1098
                return nil, err
×
1099
        }
×
1100

1101
        scidCloserMan := discovery.NewScidCloserMan(s.graphDB, s.chanStateDB)
2✔
1102

2✔
1103
        s.authGossiper = discovery.New(discovery.Config{
2✔
1104
                Graph:                 s.graphBuilder,
2✔
1105
                ChainIO:               s.cc.ChainIO,
2✔
1106
                Notifier:              s.cc.ChainNotifier,
2✔
1107
                ChainHash:             *s.cfg.ActiveNetParams.GenesisHash,
2✔
1108
                Broadcast:             s.BroadcastMessage,
2✔
1109
                ChanSeries:            chanSeries,
2✔
1110
                NotifyWhenOnline:      s.NotifyWhenOnline,
2✔
1111
                NotifyWhenOffline:     s.NotifyWhenOffline,
2✔
1112
                FetchSelfAnnouncement: s.getNodeAnnouncement,
2✔
1113
                UpdateSelfAnnouncement: func() (lnwire.NodeAnnouncement,
2✔
1114
                        error) {
2✔
1115

×
1116
                        return s.genNodeAnnouncement(nil)
×
1117
                },
×
1118
                ProofMatureDelta:        cfg.Gossip.AnnouncementConf,
1119
                TrickleDelay:            time.Millisecond * time.Duration(cfg.TrickleDelay),
1120
                RetransmitTicker:        ticker.New(time.Minute * 30),
1121
                RebroadcastInterval:     time.Hour * 24,
1122
                WaitingProofStore:       waitingProofStore,
1123
                MessageStore:            gossipMessageStore,
1124
                AnnSigner:               s.nodeSigner,
1125
                RotateTicker:            ticker.New(discovery.DefaultSyncerRotationInterval),
1126
                HistoricalSyncTicker:    ticker.New(cfg.HistoricalSyncInterval),
1127
                NumActiveSyncers:        cfg.NumGraphSyncPeers,
1128
                NoTimestampQueries:      cfg.ProtocolOptions.NoTimestampQueryOption, //nolint:ll
1129
                MinimumBatchSize:        10,
1130
                SubBatchDelay:           cfg.Gossip.SubBatchDelay,
1131
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters,
1132
                PinnedSyncers:           cfg.Gossip.PinnedSyncers,
1133
                MaxChannelUpdateBurst:   cfg.Gossip.MaxChannelUpdateBurst,
1134
                ChannelUpdateInterval:   cfg.Gossip.ChannelUpdateInterval,
1135
                IsAlias:                 aliasmgr.IsAlias,
1136
                SignAliasUpdate:         s.signAliasUpdate,
1137
                FindBaseByAlias:         s.aliasMgr.FindBaseSCID,
1138
                GetAlias:                s.aliasMgr.GetPeerAlias,
1139
                FindChannel:             s.findChannel,
1140
                IsStillZombieChannel:    s.graphBuilder.IsZombieChannel,
1141
                ScidCloser:              scidCloserMan,
1142
                AssumeChannelValid:      cfg.Routing.AssumeChannelValid,
1143
        }, nodeKeyDesc)
1144

1145
        selfVertex := route.Vertex(nodeKeyDesc.PubKey.SerializeCompressed())
2✔
1146
        //nolint:ll
2✔
1147
        s.localChanMgr = &localchans.Manager{
2✔
1148
                SelfPub:              nodeKeyDesc.PubKey,
2✔
1149
                DefaultRoutingPolicy: cc.RoutingPolicy,
2✔
1150
                ForAllOutgoingChannels: func(cb func(*models.ChannelEdgeInfo,
2✔
1151
                        *models.ChannelEdgePolicy) error) error {
4✔
1152

2✔
1153
                        return s.graphDB.ForEachNodeChannel(selfVertex,
2✔
1154
                                func(_ kvdb.RTx, c *models.ChannelEdgeInfo,
2✔
1155
                                        e *models.ChannelEdgePolicy,
2✔
1156
                                        _ *models.ChannelEdgePolicy) error {
4✔
1157

2✔
1158
                                        // NOTE: The invoked callback here may
2✔
1159
                                        // receive a nil channel policy.
2✔
1160
                                        return cb(c, e)
2✔
1161
                                },
2✔
1162
                        )
1163
                },
1164
                PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
1165
                UpdateForwardingPolicies:  s.htlcSwitch.UpdateForwardingPolicies,
1166
                FetchChannel:              s.chanStateDB.FetchChannel,
1167
                AddEdge: func(edge *models.ChannelEdgeInfo) error {
×
1168
                        return s.graphBuilder.AddEdge(edge)
×
1169
                },
×
1170
        }
1171

1172
        utxnStore, err := contractcourt.NewNurseryStore(
2✔
1173
                s.cfg.ActiveNetParams.GenesisHash, dbs.ChanStateDB,
2✔
1174
        )
2✔
1175
        if err != nil {
2✔
1176
                srvrLog.Errorf("unable to create nursery store: %v", err)
×
1177
                return nil, err
×
1178
        }
×
1179

1180
        sweeperStore, err := sweep.NewSweeperStore(
2✔
1181
                dbs.ChanStateDB, s.cfg.ActiveNetParams.GenesisHash,
2✔
1182
        )
2✔
1183
        if err != nil {
2✔
1184
                srvrLog.Errorf("unable to create sweeper store: %v", err)
×
1185
                return nil, err
×
1186
        }
×
1187

1188
        aggregator := sweep.NewBudgetAggregator(
2✔
1189
                cc.FeeEstimator, sweep.DefaultMaxInputsPerTx,
2✔
1190
                s.implCfg.AuxSweeper,
2✔
1191
        )
2✔
1192

2✔
1193
        s.txPublisher = sweep.NewTxPublisher(sweep.TxPublisherConfig{
2✔
1194
                Signer:     cc.Wallet.Cfg.Signer,
2✔
1195
                Wallet:     cc.Wallet,
2✔
1196
                Estimator:  cc.FeeEstimator,
2✔
1197
                Notifier:   cc.ChainNotifier,
2✔
1198
                AuxSweeper: s.implCfg.AuxSweeper,
2✔
1199
        })
2✔
1200

2✔
1201
        s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
2✔
1202
                FeeEstimator: cc.FeeEstimator,
2✔
1203
                GenSweepScript: newSweepPkScriptGen(
2✔
1204
                        cc.Wallet, s.cfg.ActiveNetParams.Params,
2✔
1205
                ),
2✔
1206
                Signer:               cc.Wallet.Cfg.Signer,
2✔
1207
                Wallet:               newSweeperWallet(cc.Wallet),
2✔
1208
                Mempool:              cc.MempoolNotifier,
2✔
1209
                Notifier:             cc.ChainNotifier,
2✔
1210
                Store:                sweeperStore,
2✔
1211
                MaxInputsPerTx:       sweep.DefaultMaxInputsPerTx,
2✔
1212
                MaxFeeRate:           cfg.Sweeper.MaxFeeRate,
2✔
1213
                Aggregator:           aggregator,
2✔
1214
                Publisher:            s.txPublisher,
2✔
1215
                NoDeadlineConfTarget: cfg.Sweeper.NoDeadlineConfTarget,
2✔
1216
        })
2✔
1217

2✔
1218
        s.utxoNursery = contractcourt.NewUtxoNursery(&contractcourt.NurseryConfig{
2✔
1219
                ChainIO:             cc.ChainIO,
2✔
1220
                ConfDepth:           1,
2✔
1221
                FetchClosedChannels: s.chanStateDB.FetchClosedChannels,
2✔
1222
                FetchClosedChannel:  s.chanStateDB.FetchClosedChannel,
2✔
1223
                Notifier:            cc.ChainNotifier,
2✔
1224
                PublishTransaction:  cc.Wallet.PublishTransaction,
2✔
1225
                Store:               utxnStore,
2✔
1226
                SweepInput:          s.sweeper.SweepInput,
2✔
1227
                Budget:              s.cfg.Sweeper.Budget,
2✔
1228
        })
2✔
1229

2✔
1230
        // Construct a closure that wraps the htlcswitch's CloseLink method.
2✔
1231
        closeLink := func(chanPoint *wire.OutPoint,
2✔
1232
                closureType contractcourt.ChannelCloseType) {
4✔
1233
                // TODO(conner): Properly respect the update and error channels
2✔
1234
                // returned by CloseLink.
2✔
1235

2✔
1236
                // Instruct the switch to close the channel.  Provide no close out
2✔
1237
                // delivery script or target fee per kw because user input is not
2✔
1238
                // available when the remote peer closes the channel.
2✔
1239
                s.htlcSwitch.CloseLink(chanPoint, closureType, 0, 0, nil)
2✔
1240
        }
2✔
1241

1242
        // We will use the following channel to reliably hand off contract
1243
        // breach events from the ChannelArbitrator to the BreachArbitrator,
1244
        contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1)
2✔
1245

2✔
1246
        s.breachArbitrator = contractcourt.NewBreachArbitrator(
2✔
1247
                &contractcourt.BreachConfig{
2✔
1248
                        CloseLink: closeLink,
2✔
1249
                        DB:        s.chanStateDB,
2✔
1250
                        Estimator: s.cc.FeeEstimator,
2✔
1251
                        GenSweepScript: newSweepPkScriptGen(
2✔
1252
                                cc.Wallet, s.cfg.ActiveNetParams.Params,
2✔
1253
                        ),
2✔
1254
                        Notifier:           cc.ChainNotifier,
2✔
1255
                        PublishTransaction: cc.Wallet.PublishTransaction,
2✔
1256
                        ContractBreaches:   contractBreaches,
2✔
1257
                        Signer:             cc.Wallet.Cfg.Signer,
2✔
1258
                        Store: contractcourt.NewRetributionStore(
2✔
1259
                                dbs.ChanStateDB,
2✔
1260
                        ),
2✔
1261
                        AuxSweeper: s.implCfg.AuxSweeper,
2✔
1262
                },
2✔
1263
        )
2✔
1264

2✔
1265
        //nolint:ll
2✔
1266
        s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{
2✔
1267
                ChainHash:              *s.cfg.ActiveNetParams.GenesisHash,
2✔
1268
                IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta,
2✔
1269
                OutgoingBroadcastDelta: lncfg.DefaultOutgoingBroadcastDelta,
2✔
1270
                NewSweepAddr: func() ([]byte, error) {
2✔
1271
                        addr, err := newSweepPkScriptGen(
×
1272
                                cc.Wallet, netParams,
×
1273
                        )().Unpack()
×
1274
                        if err != nil {
×
1275
                                return nil, err
×
1276
                        }
×
1277

1278
                        return addr.DeliveryAddress, nil
×
1279
                },
1280
                PublishTx: cc.Wallet.PublishTransaction,
1281
                DeliverResolutionMsg: func(msgs ...contractcourt.ResolutionMsg) error {
2✔
1282
                        for _, msg := range msgs {
4✔
1283
                                err := s.htlcSwitch.ProcessContractResolution(msg)
2✔
1284
                                if err != nil {
2✔
1285
                                        return err
×
1286
                                }
×
1287
                        }
1288
                        return nil
2✔
1289
                },
1290
                IncubateOutputs: func(chanPoint wire.OutPoint,
1291
                        outHtlcRes fn.Option[lnwallet.OutgoingHtlcResolution],
1292
                        inHtlcRes fn.Option[lnwallet.IncomingHtlcResolution],
1293
                        broadcastHeight uint32,
1294
                        deadlineHeight fn.Option[int32]) error {
2✔
1295

2✔
1296
                        return s.utxoNursery.IncubateOutputs(
2✔
1297
                                chanPoint, outHtlcRes, inHtlcRes,
2✔
1298
                                broadcastHeight, deadlineHeight,
2✔
1299
                        )
2✔
1300
                },
2✔
1301
                PreimageDB:   s.witnessBeacon,
1302
                Notifier:     cc.ChainNotifier,
1303
                Mempool:      cc.MempoolNotifier,
1304
                Signer:       cc.Wallet.Cfg.Signer,
1305
                FeeEstimator: cc.FeeEstimator,
1306
                ChainIO:      cc.ChainIO,
1307
                MarkLinkInactive: func(chanPoint wire.OutPoint) error {
2✔
1308
                        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
2✔
1309
                        s.htlcSwitch.RemoveLink(chanID)
2✔
1310
                        return nil
2✔
1311
                },
2✔
1312
                IsOurAddress: cc.Wallet.IsOurAddress,
1313
                ContractBreach: func(chanPoint wire.OutPoint,
1314
                        breachRet *lnwallet.BreachRetribution) error {
2✔
1315

2✔
1316
                        // processACK will handle the BreachArbitrator ACKing
2✔
1317
                        // the event.
2✔
1318
                        finalErr := make(chan error, 1)
2✔
1319
                        processACK := func(brarErr error) {
4✔
1320
                                if brarErr != nil {
2✔
1321
                                        finalErr <- brarErr
×
1322
                                        return
×
1323
                                }
×
1324

1325
                                // If the BreachArbitrator successfully handled
1326
                                // the event, we can signal that the handoff
1327
                                // was successful.
1328
                                finalErr <- nil
2✔
1329
                        }
1330

1331
                        event := &contractcourt.ContractBreachEvent{
2✔
1332
                                ChanPoint:         chanPoint,
2✔
1333
                                ProcessACK:        processACK,
2✔
1334
                                BreachRetribution: breachRet,
2✔
1335
                        }
2✔
1336

2✔
1337
                        // Send the contract breach event to the
2✔
1338
                        // BreachArbitrator.
2✔
1339
                        select {
2✔
1340
                        case contractBreaches <- event:
2✔
1341
                        case <-s.quit:
×
1342
                                return ErrServerShuttingDown
×
1343
                        }
1344

1345
                        // We'll wait for a final error to be available from
1346
                        // the BreachArbitrator.
1347
                        select {
2✔
1348
                        case err := <-finalErr:
2✔
1349
                                return err
2✔
1350
                        case <-s.quit:
×
1351
                                return ErrServerShuttingDown
×
1352
                        }
1353
                },
1354
                DisableChannel: func(chanPoint wire.OutPoint) error {
2✔
1355
                        return s.chanStatusMgr.RequestDisable(chanPoint, false)
2✔
1356
                },
2✔
1357
                Sweeper:                       s.sweeper,
1358
                Registry:                      s.invoices,
1359
                NotifyClosedChannel:           s.channelNotifier.NotifyClosedChannelEvent,
1360
                NotifyFullyResolvedChannel:    s.channelNotifier.NotifyFullyResolvedChannelEvent,
1361
                OnionProcessor:                s.sphinx,
1362
                PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
1363
                IsForwardedHTLC:               s.htlcSwitch.IsForwardedHTLC,
1364
                Clock:                         clock.NewDefaultClock(),
1365
                SubscribeBreachComplete:       s.breachArbitrator.SubscribeBreachComplete,
1366
                PutFinalHtlcOutcome:           s.chanStateDB.PutOnchainFinalHtlcOutcome,
1367
                HtlcNotifier:                  s.htlcNotifier,
1368
                Budget:                        *s.cfg.Sweeper.Budget,
1369

1370
                // TODO(yy): remove this hack once PaymentCircuit is interfaced.
1371
                QueryIncomingCircuit: func(
1372
                        circuit models.CircuitKey) *models.CircuitKey {
2✔
1373

2✔
1374
                        // Get the circuit map.
2✔
1375
                        circuits := s.htlcSwitch.CircuitLookup()
2✔
1376

2✔
1377
                        // Lookup the outgoing circuit.
2✔
1378
                        pc := circuits.LookupOpenCircuit(circuit)
2✔
1379
                        if pc == nil {
3✔
1380
                                return nil
1✔
1381
                        }
1✔
1382

1383
                        return &pc.Incoming
2✔
1384
                },
1385
                AuxLeafStore: implCfg.AuxLeafStore,
1386
                AuxSigner:    implCfg.AuxSigner,
1387
                AuxResolver:  implCfg.AuxContractResolver,
1388
        }, dbs.ChanStateDB)
1389

1390
        // Select the configuration and funding parameters for Bitcoin.
1391
        chainCfg := cfg.Bitcoin
2✔
1392
        minRemoteDelay := funding.MinBtcRemoteDelay
2✔
1393
        maxRemoteDelay := funding.MaxBtcRemoteDelay
2✔
1394

2✔
1395
        var chanIDSeed [32]byte
2✔
1396
        if _, err := rand.Read(chanIDSeed[:]); err != nil {
2✔
1397
                return nil, err
×
1398
        }
×
1399

1400
        // Wrap the DeleteChannelEdges method so that the funding manager can
1401
        // use it without depending on several layers of indirection.
1402
        deleteAliasEdge := func(scid lnwire.ShortChannelID) (
2✔
1403
                *models.ChannelEdgePolicy, error) {
4✔
1404

2✔
1405
                info, e1, e2, err := s.graphDB.FetchChannelEdgesByID(
2✔
1406
                        scid.ToUint64(),
2✔
1407
                )
2✔
1408
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
2✔
1409
                        // This is unlikely but there is a slim chance of this
×
1410
                        // being hit if lnd was killed via SIGKILL and the
×
1411
                        // funding manager was stepping through the delete
×
1412
                        // alias edge logic.
×
1413
                        return nil, nil
×
1414
                } else if err != nil {
2✔
1415
                        return nil, err
×
1416
                }
×
1417

1418
                // Grab our key to find our policy.
1419
                var ourKey [33]byte
2✔
1420
                copy(ourKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
2✔
1421

2✔
1422
                var ourPolicy *models.ChannelEdgePolicy
2✔
1423
                if info != nil && info.NodeKey1Bytes == ourKey {
4✔
1424
                        ourPolicy = e1
2✔
1425
                } else {
4✔
1426
                        ourPolicy = e2
2✔
1427
                }
2✔
1428

1429
                if ourPolicy == nil {
2✔
1430
                        // Something is wrong, so return an error.
×
1431
                        return nil, fmt.Errorf("we don't have an edge")
×
1432
                }
×
1433

1434
                err = s.graphDB.DeleteChannelEdges(
2✔
1435
                        false, false, scid.ToUint64(),
2✔
1436
                )
2✔
1437
                return ourPolicy, err
2✔
1438
        }
1439

1440
        // For the reservationTimeout and the zombieSweeperInterval different
1441
        // values are set in case we are in a dev environment so enhance test
1442
        // capacilities.
1443
        reservationTimeout := chanfunding.DefaultReservationTimeout
2✔
1444
        zombieSweeperInterval := lncfg.DefaultZombieSweeperInterval
2✔
1445

2✔
1446
        // Get the development config for funding manager. If we are not in
2✔
1447
        // development mode, this would be nil.
2✔
1448
        var devCfg *funding.DevConfig
2✔
1449
        if lncfg.IsDevBuild() {
4✔
1450
                devCfg = &funding.DevConfig{
2✔
1451
                        ProcessChannelReadyWait: cfg.Dev.ChannelReadyWait(),
2✔
1452
                }
2✔
1453

2✔
1454
                reservationTimeout = cfg.Dev.GetReservationTimeout()
2✔
1455
                zombieSweeperInterval = cfg.Dev.GetZombieSweeperInterval()
2✔
1456

2✔
1457
                srvrLog.Debugf("Using the dev config for the fundingMgr: %v, "+
2✔
1458
                        "reservationTimeout=%v, zombieSweeperInterval=%v",
2✔
1459
                        devCfg, reservationTimeout, zombieSweeperInterval)
2✔
1460
        }
2✔
1461

1462
        //nolint:ll
1463
        s.fundingMgr, err = funding.NewFundingManager(funding.Config{
2✔
1464
                Dev:                devCfg,
2✔
1465
                NoWumboChans:       !cfg.ProtocolOptions.Wumbo(),
2✔
1466
                IDKey:              nodeKeyDesc.PubKey,
2✔
1467
                IDKeyLoc:           nodeKeyDesc.KeyLocator,
2✔
1468
                Wallet:             cc.Wallet,
2✔
1469
                PublishTransaction: cc.Wallet.PublishTransaction,
2✔
1470
                UpdateLabel: func(hash chainhash.Hash, label string) error {
4✔
1471
                        return cc.Wallet.LabelTransaction(hash, label, true)
2✔
1472
                },
2✔
1473
                Notifier:     cc.ChainNotifier,
1474
                ChannelDB:    s.chanStateDB,
1475
                FeeEstimator: cc.FeeEstimator,
1476
                SignMessage:  cc.MsgSigner.SignMessage,
1477
                CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement,
1478
                        error) {
2✔
1479

2✔
1480
                        return s.genNodeAnnouncement(nil)
2✔
1481
                },
2✔
1482
                SendAnnouncement:     s.authGossiper.ProcessLocalAnnouncement,
1483
                NotifyWhenOnline:     s.NotifyWhenOnline,
1484
                TempChanIDSeed:       chanIDSeed,
1485
                FindChannel:          s.findChannel,
1486
                DefaultRoutingPolicy: cc.RoutingPolicy,
1487
                DefaultMinHtlcIn:     cc.MinHtlcIn,
1488
                NumRequiredConfs: func(chanAmt btcutil.Amount,
1489
                        pushAmt lnwire.MilliSatoshi) uint16 {
2✔
1490
                        // For large channels we increase the number
2✔
1491
                        // of confirmations we require for the
2✔
1492
                        // channel to be considered open. As it is
2✔
1493
                        // always the responder that gets to choose
2✔
1494
                        // value, the pushAmt is value being pushed
2✔
1495
                        // to us. This means we have more to lose
2✔
1496
                        // in the case this gets re-orged out, and
2✔
1497
                        // we will require more confirmations before
2✔
1498
                        // we consider it open.
2✔
1499

2✔
1500
                        // In case the user has explicitly specified
2✔
1501
                        // a default value for the number of
2✔
1502
                        // confirmations, we use it.
2✔
1503
                        defaultConf := uint16(chainCfg.DefaultNumChanConfs)
2✔
1504
                        if defaultConf != 0 {
4✔
1505
                                return defaultConf
2✔
1506
                        }
2✔
1507

1508
                        minConf := uint64(3)
×
1509
                        maxConf := uint64(6)
×
1510

×
1511
                        // If this is a wumbo channel, then we'll require the
×
1512
                        // max amount of confirmations.
×
1513
                        if chanAmt > MaxFundingAmount {
×
1514
                                return uint16(maxConf)
×
1515
                        }
×
1516

1517
                        // If not we return a value scaled linearly
1518
                        // between 3 and 6, depending on channel size.
1519
                        // TODO(halseth): Use 1 as minimum?
1520
                        maxChannelSize := uint64(
×
1521
                                lnwire.NewMSatFromSatoshis(MaxFundingAmount))
×
1522
                        stake := lnwire.NewMSatFromSatoshis(chanAmt) + pushAmt
×
1523
                        conf := maxConf * uint64(stake) / maxChannelSize
×
1524
                        if conf < minConf {
×
1525
                                conf = minConf
×
1526
                        }
×
1527
                        if conf > maxConf {
×
1528
                                conf = maxConf
×
1529
                        }
×
1530
                        return uint16(conf)
×
1531
                },
1532
                RequiredRemoteDelay: func(chanAmt btcutil.Amount) uint16 {
2✔
1533
                        // We scale the remote CSV delay (the time the
2✔
1534
                        // remote have to claim funds in case of a unilateral
2✔
1535
                        // close) linearly from minRemoteDelay blocks
2✔
1536
                        // for small channels, to maxRemoteDelay blocks
2✔
1537
                        // for channels of size MaxFundingAmount.
2✔
1538

2✔
1539
                        // In case the user has explicitly specified
2✔
1540
                        // a default value for the remote delay, we
2✔
1541
                        // use it.
2✔
1542
                        defaultDelay := uint16(chainCfg.DefaultRemoteDelay)
2✔
1543
                        if defaultDelay > 0 {
4✔
1544
                                return defaultDelay
2✔
1545
                        }
2✔
1546

1547
                        // If this is a wumbo channel, then we'll require the
1548
                        // max value.
1549
                        if chanAmt > MaxFundingAmount {
×
1550
                                return maxRemoteDelay
×
1551
                        }
×
1552

1553
                        // If not we scale according to channel size.
1554
                        delay := uint16(btcutil.Amount(maxRemoteDelay) *
×
1555
                                chanAmt / MaxFundingAmount)
×
1556
                        if delay < minRemoteDelay {
×
1557
                                delay = minRemoteDelay
×
1558
                        }
×
1559
                        if delay > maxRemoteDelay {
×
1560
                                delay = maxRemoteDelay
×
1561
                        }
×
1562
                        return delay
×
1563
                },
1564
                WatchNewChannel: func(channel *channeldb.OpenChannel,
1565
                        peerKey *btcec.PublicKey) error {
2✔
1566

2✔
1567
                        // First, we'll mark this new peer as a persistent peer
2✔
1568
                        // for re-connection purposes. If the peer is not yet
2✔
1569
                        // tracked or the user hasn't requested it to be perm,
2✔
1570
                        // we'll set false to prevent the server from continuing
2✔
1571
                        // to connect to this peer even if the number of
2✔
1572
                        // channels with this peer is zero.
2✔
1573
                        s.mu.Lock()
2✔
1574
                        pubStr := string(peerKey.SerializeCompressed())
2✔
1575
                        if _, ok := s.persistentPeers[pubStr]; !ok {
4✔
1576
                                s.persistentPeers[pubStr] = false
2✔
1577
                        }
2✔
1578
                        s.mu.Unlock()
2✔
1579

2✔
1580
                        // With that taken care of, we'll send this channel to
2✔
1581
                        // the chain arb so it can react to on-chain events.
2✔
1582
                        return s.chainArb.WatchNewChannel(channel)
2✔
1583
                },
1584
                ReportShortChanID: func(chanPoint wire.OutPoint) error {
2✔
1585
                        cid := lnwire.NewChanIDFromOutPoint(chanPoint)
2✔
1586
                        return s.htlcSwitch.UpdateShortChanID(cid)
2✔
1587
                },
2✔
1588
                RequiredRemoteChanReserve: func(chanAmt,
1589
                        dustLimit btcutil.Amount) btcutil.Amount {
2✔
1590

2✔
1591
                        // By default, we'll require the remote peer to maintain
2✔
1592
                        // at least 1% of the total channel capacity at all
2✔
1593
                        // times. If this value ends up dipping below the dust
2✔
1594
                        // limit, then we'll use the dust limit itself as the
2✔
1595
                        // reserve as required by BOLT #2.
2✔
1596
                        reserve := chanAmt / 100
2✔
1597
                        if reserve < dustLimit {
4✔
1598
                                reserve = dustLimit
2✔
1599
                        }
2✔
1600

1601
                        return reserve
2✔
1602
                },
1603
                RequiredRemoteMaxValue: func(chanAmt btcutil.Amount) lnwire.MilliSatoshi {
2✔
1604
                        // By default, we'll allow the remote peer to fully
2✔
1605
                        // utilize the full bandwidth of the channel, minus our
2✔
1606
                        // required reserve.
2✔
1607
                        reserve := lnwire.NewMSatFromSatoshis(chanAmt / 100)
2✔
1608
                        return lnwire.NewMSatFromSatoshis(chanAmt) - reserve
2✔
1609
                },
2✔
1610
                RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 {
2✔
1611
                        if cfg.DefaultRemoteMaxHtlcs > 0 {
4✔
1612
                                return cfg.DefaultRemoteMaxHtlcs
2✔
1613
                        }
2✔
1614

1615
                        // By default, we'll permit them to utilize the full
1616
                        // channel bandwidth.
1617
                        return uint16(input.MaxHTLCNumber / 2)
×
1618
                },
1619
                ZombieSweeperInterval:         zombieSweeperInterval,
1620
                ReservationTimeout:            reservationTimeout,
1621
                MinChanSize:                   btcutil.Amount(cfg.MinChanSize),
1622
                MaxChanSize:                   btcutil.Amount(cfg.MaxChanSize),
1623
                MaxPendingChannels:            cfg.MaxPendingChannels,
1624
                RejectPush:                    cfg.RejectPush,
1625
                MaxLocalCSVDelay:              chainCfg.MaxLocalDelay,
1626
                NotifyOpenChannelEvent:        s.channelNotifier.NotifyOpenChannelEvent,
1627
                OpenChannelPredicate:          chanPredicate,
1628
                NotifyPendingOpenChannelEvent: s.channelNotifier.NotifyPendingOpenChannelEvent,
1629
                EnableUpfrontShutdown:         cfg.EnableUpfrontShutdown,
1630
                MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
1631
                        s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
1632
                DeleteAliasEdge:      deleteAliasEdge,
1633
                AliasManager:         s.aliasMgr,
1634
                IsSweeperOutpoint:    s.sweeper.IsSweeperOutpoint,
1635
                AuxFundingController: implCfg.AuxFundingController,
1636
                AuxSigner:            implCfg.AuxSigner,
1637
                AuxResolver:          implCfg.AuxContractResolver,
1638
        })
1639
        if err != nil {
2✔
1640
                return nil, err
×
1641
        }
×
1642

1643
        // Next, we'll assemble the sub-system that will maintain an on-disk
1644
        // static backup of the latest channel state.
1645
        chanNotifier := &channelNotifier{
2✔
1646
                chanNotifier: s.channelNotifier,
2✔
1647
                addrs:        s.addrSource,
2✔
1648
        }
2✔
1649
        backupFile := chanbackup.NewMultiFile(
2✔
1650
                cfg.BackupFilePath, cfg.NoBackupArchive,
2✔
1651
        )
2✔
1652
        startingChans, err := chanbackup.FetchStaticChanBackups(
2✔
1653
                s.chanStateDB, s.addrSource,
2✔
1654
        )
2✔
1655
        if err != nil {
2✔
1656
                return nil, err
×
1657
        }
×
1658
        s.chanSubSwapper, err = chanbackup.NewSubSwapper(
2✔
1659
                startingChans, chanNotifier, s.cc.KeyRing, backupFile,
2✔
1660
        )
2✔
1661
        if err != nil {
2✔
1662
                return nil, err
×
1663
        }
×
1664

1665
        // Assemble a peer notifier which will provide clients with subscriptions
1666
        // to peer online and offline events.
1667
        s.peerNotifier = peernotifier.New()
2✔
1668

2✔
1669
        // Create a channel event store which monitors all open channels.
2✔
1670
        s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
2✔
1671
                SubscribeChannelEvents: func() (subscribe.Subscription, error) {
4✔
1672
                        return s.channelNotifier.SubscribeChannelEvents()
2✔
1673
                },
2✔
1674
                SubscribePeerEvents: func() (subscribe.Subscription, error) {
2✔
1675
                        return s.peerNotifier.SubscribePeerEvents()
2✔
1676
                },
2✔
1677
                GetOpenChannels: s.chanStateDB.FetchAllOpenChannels,
1678
                Clock:           clock.NewDefaultClock(),
1679
                ReadFlapCount:   s.miscDB.ReadFlapCount,
1680
                WriteFlapCount:  s.miscDB.WriteFlapCounts,
1681
                FlapCountTicker: ticker.New(chanfitness.FlapCountFlushRate),
1682
        })
1683

1684
        if cfg.WtClient.Active {
4✔
1685
                policy := wtpolicy.DefaultPolicy()
2✔
1686
                policy.MaxUpdates = cfg.WtClient.MaxUpdates
2✔
1687

2✔
1688
                // We expose the sweep fee rate in sat/vbyte, but the tower
2✔
1689
                // protocol operations on sat/kw.
2✔
1690
                sweepRateSatPerVByte := chainfee.SatPerKVByte(
2✔
1691
                        1000 * cfg.WtClient.SweepFeeRate,
2✔
1692
                )
2✔
1693

2✔
1694
                policy.SweepFeeRate = sweepRateSatPerVByte.FeePerKWeight()
2✔
1695

2✔
1696
                if err := policy.Validate(); err != nil {
2✔
1697
                        return nil, err
×
1698
                }
×
1699

1700
                // authDial is the wrapper around the btrontide.Dial for the
1701
                // watchtower.
1702
                authDial := func(localKey keychain.SingleKeyECDH,
2✔
1703
                        netAddr *lnwire.NetAddress,
2✔
1704
                        dialer tor.DialFunc) (wtserver.Peer, error) {
4✔
1705

2✔
1706
                        return brontide.Dial(
2✔
1707
                                localKey, netAddr, cfg.ConnectionTimeout, dialer,
2✔
1708
                        )
2✔
1709
                }
2✔
1710

1711
                // buildBreachRetribution is a call-back that can be used to
1712
                // query the BreachRetribution info and channel type given a
1713
                // channel ID and commitment height.
1714
                buildBreachRetribution := func(chanID lnwire.ChannelID,
2✔
1715
                        commitHeight uint64) (*lnwallet.BreachRetribution,
2✔
1716
                        channeldb.ChannelType, error) {
4✔
1717

2✔
1718
                        channel, err := s.chanStateDB.FetchChannelByID(
2✔
1719
                                nil, chanID,
2✔
1720
                        )
2✔
1721
                        if err != nil {
2✔
1722
                                return nil, 0, err
×
1723
                        }
×
1724

1725
                        br, err := lnwallet.NewBreachRetribution(
2✔
1726
                                channel, commitHeight, 0, nil,
2✔
1727
                                implCfg.AuxLeafStore,
2✔
1728
                                implCfg.AuxContractResolver,
2✔
1729
                        )
2✔
1730
                        if err != nil {
2✔
1731
                                return nil, 0, err
×
1732
                        }
×
1733

1734
                        return br, channel.ChanType, nil
2✔
1735
                }
1736

1737
                fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID
2✔
1738

2✔
1739
                // Copy the policy for legacy channels and set the blob flag
2✔
1740
                // signalling support for anchor channels.
2✔
1741
                anchorPolicy := policy
2✔
1742
                anchorPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel)
2✔
1743

2✔
1744
                // Copy the policy for legacy channels and set the blob flag
2✔
1745
                // signalling support for taproot channels.
2✔
1746
                taprootPolicy := policy
2✔
1747
                taprootPolicy.TxPolicy.BlobType |= blob.Type(
2✔
1748
                        blob.FlagTaprootChannel,
2✔
1749
                )
2✔
1750

2✔
1751
                s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{
2✔
1752
                        FetchClosedChannel:     fetchClosedChannel,
2✔
1753
                        BuildBreachRetribution: buildBreachRetribution,
2✔
1754
                        SessionCloseRange:      cfg.WtClient.SessionCloseRange,
2✔
1755
                        ChainNotifier:          s.cc.ChainNotifier,
2✔
1756
                        SubscribeChannelEvents: func() (subscribe.Subscription,
2✔
1757
                                error) {
4✔
1758

2✔
1759
                                return s.channelNotifier.
2✔
1760
                                        SubscribeChannelEvents()
2✔
1761
                        },
2✔
1762
                        Signer: cc.Wallet.Cfg.Signer,
1763
                        NewAddress: func() ([]byte, error) {
2✔
1764
                                addr, err := newSweepPkScriptGen(
2✔
1765
                                        cc.Wallet, netParams,
2✔
1766
                                )().Unpack()
2✔
1767
                                if err != nil {
2✔
1768
                                        return nil, err
×
1769
                                }
×
1770

1771
                                return addr.DeliveryAddress, nil
2✔
1772
                        },
1773
                        SecretKeyRing:      s.cc.KeyRing,
1774
                        Dial:               cfg.net.Dial,
1775
                        AuthDial:           authDial,
1776
                        DB:                 dbs.TowerClientDB,
1777
                        ChainHash:          *s.cfg.ActiveNetParams.GenesisHash,
1778
                        MinBackoff:         10 * time.Second,
1779
                        MaxBackoff:         5 * time.Minute,
1780
                        MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
1781
                }, policy, anchorPolicy, taprootPolicy)
1782
                if err != nil {
2✔
1783
                        return nil, err
×
1784
                }
×
1785
        }
1786

1787
        if len(cfg.ExternalHosts) != 0 {
2✔
1788
                advertisedIPs := make(map[string]struct{})
×
1789
                for _, addr := range s.currentNodeAnn.Addresses {
×
1790
                        advertisedIPs[addr.String()] = struct{}{}
×
1791
                }
×
1792

1793
                s.hostAnn = netann.NewHostAnnouncer(netann.HostAnnouncerConfig{
×
1794
                        Hosts:         cfg.ExternalHosts,
×
1795
                        RefreshTicker: ticker.New(defaultHostSampleInterval),
×
1796
                        LookupHost: func(host string) (net.Addr, error) {
×
1797
                                return lncfg.ParseAddressString(
×
1798
                                        host, strconv.Itoa(defaultPeerPort),
×
1799
                                        cfg.net.ResolveTCPAddr,
×
1800
                                )
×
1801
                        },
×
1802
                        AdvertisedIPs: advertisedIPs,
1803
                        AnnounceNewIPs: netann.IPAnnouncer(
1804
                                func(modifier ...netann.NodeAnnModifier) (
1805
                                        lnwire.NodeAnnouncement, error) {
×
1806

×
1807
                                        return s.genNodeAnnouncement(
×
1808
                                                nil, modifier...,
×
1809
                                        )
×
1810
                                }),
×
1811
                })
1812
        }
1813

1814
        // Create liveness monitor.
1815
        s.createLivenessMonitor(cfg, cc, leaderElector)
2✔
1816

2✔
1817
        // Create the connection manager which will be responsible for
2✔
1818
        // maintaining persistent outbound connections and also accepting new
2✔
1819
        // incoming connections
2✔
1820
        cmgr, err := connmgr.New(&connmgr.Config{
2✔
1821
                Listeners:      listeners,
2✔
1822
                OnAccept:       s.InboundPeerConnected,
2✔
1823
                RetryDuration:  time.Second * 5,
2✔
1824
                TargetOutbound: 100,
2✔
1825
                Dial: noiseDial(
2✔
1826
                        nodeKeyECDH, s.cfg.net, s.cfg.ConnectionTimeout,
2✔
1827
                ),
2✔
1828
                OnConnection: s.OutboundPeerConnected,
2✔
1829
        })
2✔
1830
        if err != nil {
2✔
1831
                return nil, err
×
1832
        }
×
1833
        s.connMgr = cmgr
2✔
1834

2✔
1835
        // Finally, register the subsystems in blockbeat.
2✔
1836
        s.registerBlockConsumers()
2✔
1837

2✔
1838
        return s, nil
2✔
1839
}
1840

1841
// UpdateRoutingConfig is a callback function to update the routing config
1842
// values in the main cfg.
1843
func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) {
2✔
1844
        routerCfg := s.cfg.SubRPCServers.RouterRPC
2✔
1845

2✔
1846
        switch c := cfg.Estimator.Config().(type) {
2✔
1847
        case routing.AprioriConfig:
2✔
1848
                routerCfg.ProbabilityEstimatorType =
2✔
1849
                        routing.AprioriEstimatorName
2✔
1850

2✔
1851
                targetCfg := routerCfg.AprioriConfig
2✔
1852
                targetCfg.PenaltyHalfLife = c.PenaltyHalfLife
2✔
1853
                targetCfg.Weight = c.AprioriWeight
2✔
1854
                targetCfg.CapacityFraction = c.CapacityFraction
2✔
1855
                targetCfg.HopProbability = c.AprioriHopProbability
2✔
1856

1857
        case routing.BimodalConfig:
2✔
1858
                routerCfg.ProbabilityEstimatorType =
2✔
1859
                        routing.BimodalEstimatorName
2✔
1860

2✔
1861
                targetCfg := routerCfg.BimodalConfig
2✔
1862
                targetCfg.Scale = int64(c.BimodalScaleMsat)
2✔
1863
                targetCfg.NodeWeight = c.BimodalNodeWeight
2✔
1864
                targetCfg.DecayTime = c.BimodalDecayTime
2✔
1865
        }
1866

1867
        routerCfg.MaxMcHistory = cfg.MaxMcHistory
2✔
1868
}
1869

1870
// registerBlockConsumers registers the subsystems that consume block events.
1871
// By calling `RegisterQueue`, a list of subsystems are registered in the
1872
// blockbeat for block notifications. When a new block arrives, the subsystems
1873
// in the same queue are notified sequentially, and different queues are
1874
// notified concurrently.
1875
//
1876
// NOTE: To put a subsystem in a different queue, create a slice and pass it to
1877
// a new `RegisterQueue` call.
1878
func (s *server) registerBlockConsumers() {
2✔
1879
        // In this queue, when a new block arrives, it will be received and
2✔
1880
        // processed in this order: chainArb -> sweeper -> txPublisher.
2✔
1881
        consumers := []chainio.Consumer{
2✔
1882
                s.chainArb,
2✔
1883
                s.sweeper,
2✔
1884
                s.txPublisher,
2✔
1885
        }
2✔
1886
        s.blockbeatDispatcher.RegisterQueue(consumers)
2✔
1887
}
2✔
1888

1889
// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
1890
// used for option_scid_alias channels where the ChannelUpdate to be sent back
1891
// may differ from what is on disk.
1892
func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
1893
        error) {
2✔
1894

2✔
1895
        data, err := u.DataToSign()
2✔
1896
        if err != nil {
2✔
1897
                return nil, err
×
1898
        }
×
1899

1900
        return s.cc.MsgSigner.SignMessage(s.identityKeyLoc, data, true)
2✔
1901
}
1902

1903
// createLivenessMonitor creates a set of health checks using our configured
1904
// values and uses these checks to create a liveness monitor. Available
1905
// health checks,
1906
//   - chainHealthCheck (will be disabled for --nochainbackend mode)
1907
//   - diskCheck
1908
//   - tlsHealthCheck
1909
//   - torController, only created when tor is enabled.
1910
//
1911
// If a health check has been disabled by setting attempts to 0, our monitor
1912
// will not run it.
1913
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl,
1914
        leaderElector cluster.LeaderElector) {
2✔
1915

2✔
1916
        chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts
2✔
1917
        if cfg.Bitcoin.Node == "nochainbackend" {
2✔
1918
                srvrLog.Info("Disabling chain backend checks for " +
×
1919
                        "nochainbackend mode")
×
1920

×
1921
                chainBackendAttempts = 0
×
1922
        }
×
1923

1924
        chainHealthCheck := healthcheck.NewObservation(
2✔
1925
                "chain backend",
2✔
1926
                cc.HealthCheck,
2✔
1927
                cfg.HealthChecks.ChainCheck.Interval,
2✔
1928
                cfg.HealthChecks.ChainCheck.Timeout,
2✔
1929
                cfg.HealthChecks.ChainCheck.Backoff,
2✔
1930
                chainBackendAttempts,
2✔
1931
        )
2✔
1932

2✔
1933
        diskCheck := healthcheck.NewObservation(
2✔
1934
                "disk space",
2✔
1935
                func() error {
2✔
1936
                        free, err := healthcheck.AvailableDiskSpaceRatio(
×
1937
                                cfg.LndDir,
×
1938
                        )
×
1939
                        if err != nil {
×
1940
                                return err
×
1941
                        }
×
1942

1943
                        // If we have more free space than we require,
1944
                        // we return a nil error.
1945
                        if free > cfg.HealthChecks.DiskCheck.RequiredRemaining {
×
1946
                                return nil
×
1947
                        }
×
1948

1949
                        return fmt.Errorf("require: %v free space, got: %v",
×
1950
                                cfg.HealthChecks.DiskCheck.RequiredRemaining,
×
1951
                                free)
×
1952
                },
1953
                cfg.HealthChecks.DiskCheck.Interval,
1954
                cfg.HealthChecks.DiskCheck.Timeout,
1955
                cfg.HealthChecks.DiskCheck.Backoff,
1956
                cfg.HealthChecks.DiskCheck.Attempts,
1957
        )
1958

1959
        tlsHealthCheck := healthcheck.NewObservation(
2✔
1960
                "tls",
2✔
1961
                func() error {
2✔
1962
                        expired, expTime, err := s.tlsManager.IsCertExpired(
×
1963
                                s.cc.KeyRing,
×
1964
                        )
×
1965
                        if err != nil {
×
1966
                                return err
×
1967
                        }
×
1968
                        if expired {
×
1969
                                return fmt.Errorf("TLS certificate is "+
×
1970
                                        "expired as of %v", expTime)
×
1971
                        }
×
1972

1973
                        // If the certificate is not outdated, no error needs
1974
                        // to be returned
1975
                        return nil
×
1976
                },
1977
                cfg.HealthChecks.TLSCheck.Interval,
1978
                cfg.HealthChecks.TLSCheck.Timeout,
1979
                cfg.HealthChecks.TLSCheck.Backoff,
1980
                cfg.HealthChecks.TLSCheck.Attempts,
1981
        )
1982

1983
        checks := []*healthcheck.Observation{
2✔
1984
                chainHealthCheck, diskCheck, tlsHealthCheck,
2✔
1985
        }
2✔
1986

2✔
1987
        // If Tor is enabled, add the healthcheck for tor connection.
2✔
1988
        if s.torController != nil {
2✔
1989
                torConnectionCheck := healthcheck.NewObservation(
×
1990
                        "tor connection",
×
1991
                        func() error {
×
1992
                                return healthcheck.CheckTorServiceStatus(
×
1993
                                        s.torController,
×
1994
                                        s.createNewHiddenService,
×
1995
                                )
×
1996
                        },
×
1997
                        cfg.HealthChecks.TorConnection.Interval,
1998
                        cfg.HealthChecks.TorConnection.Timeout,
1999
                        cfg.HealthChecks.TorConnection.Backoff,
2000
                        cfg.HealthChecks.TorConnection.Attempts,
2001
                )
2002
                checks = append(checks, torConnectionCheck)
×
2003
        }
2004

2005
        // If remote signing is enabled, add the healthcheck for the remote
2006
        // signing RPC interface.
2007
        if s.cfg.RemoteSigner != nil && s.cfg.RemoteSigner.Enable {
4✔
2008
                // Because we have two cascading timeouts here, we need to add
2✔
2009
                // some slack to the "outer" one of them in case the "inner"
2✔
2010
                // returns exactly on time.
2✔
2011
                overhead := time.Millisecond * 10
2✔
2012

2✔
2013
                remoteSignerConnectionCheck := healthcheck.NewObservation(
2✔
2014
                        "remote signer connection",
2✔
2015
                        rpcwallet.HealthCheck(
2✔
2016
                                s.cfg.RemoteSigner,
2✔
2017

2✔
2018
                                // For the health check we might to be even
2✔
2019
                                // stricter than the initial/normal connect, so
2✔
2020
                                // we use the health check timeout here.
2✔
2021
                                cfg.HealthChecks.RemoteSigner.Timeout,
2✔
2022
                        ),
2✔
2023
                        cfg.HealthChecks.RemoteSigner.Interval,
2✔
2024
                        cfg.HealthChecks.RemoteSigner.Timeout+overhead,
2✔
2025
                        cfg.HealthChecks.RemoteSigner.Backoff,
2✔
2026
                        cfg.HealthChecks.RemoteSigner.Attempts,
2✔
2027
                )
2✔
2028
                checks = append(checks, remoteSignerConnectionCheck)
2✔
2029
        }
2✔
2030

2031
        // If we have a leader elector, we add a health check to ensure we are
2032
        // still the leader. During normal operation, we should always be the
2033
        // leader, but there are circumstances where this may change, such as
2034
        // when we lose network connectivity for long enough expiring out lease.
2035
        if leaderElector != nil {
2✔
2036
                leaderCheck := healthcheck.NewObservation(
×
2037
                        "leader status",
×
2038
                        func() error {
×
2039
                                // Check if we are still the leader. Note that
×
2040
                                // we don't need to use a timeout context here
×
2041
                                // as the healthcheck observer will handle the
×
2042
                                // timeout case for us.
×
2043
                                timeoutCtx, cancel := context.WithTimeout(
×
2044
                                        context.Background(),
×
2045
                                        cfg.HealthChecks.LeaderCheck.Timeout,
×
2046
                                )
×
2047
                                defer cancel()
×
2048

×
2049
                                leader, err := leaderElector.IsLeader(
×
2050
                                        timeoutCtx,
×
2051
                                )
×
2052
                                if err != nil {
×
2053
                                        return fmt.Errorf("unable to check if "+
×
2054
                                                "still leader: %v", err)
×
2055
                                }
×
2056

2057
                                if !leader {
×
2058
                                        srvrLog.Debug("Not the current leader")
×
2059
                                        return fmt.Errorf("not the current " +
×
2060
                                                "leader")
×
2061
                                }
×
2062

2063
                                return nil
×
2064
                        },
2065
                        cfg.HealthChecks.LeaderCheck.Interval,
2066
                        cfg.HealthChecks.LeaderCheck.Timeout,
2067
                        cfg.HealthChecks.LeaderCheck.Backoff,
2068
                        cfg.HealthChecks.LeaderCheck.Attempts,
2069
                )
2070

2071
                checks = append(checks, leaderCheck)
×
2072
        }
2073

2074
        // If we have not disabled all of our health checks, we create a
2075
        // liveness monitor with our configured checks.
2076
        s.livenessMonitor = healthcheck.NewMonitor(
2✔
2077
                &healthcheck.Config{
2✔
2078
                        Checks:   checks,
2✔
2079
                        Shutdown: srvrLog.Criticalf,
2✔
2080
                },
2✔
2081
        )
2✔
2082
}
2083

2084
// Started returns true if the server has been started, and false otherwise.
2085
// NOTE: This function is safe for concurrent access.
2086
func (s *server) Started() bool {
2✔
2087
        return atomic.LoadInt32(&s.active) != 0
2✔
2088
}
2✔
2089

2090
// cleaner is used to aggregate "cleanup" functions during an operation that
2091
// starts several subsystems. In case one of the subsystem fails to start
2092
// and a proper resource cleanup is required, the "run" method achieves this
2093
// by running all these added "cleanup" functions.
2094
type cleaner []func() error
2095

2096
// add is used to add a cleanup function to be called when
2097
// the run function is executed.
2098
func (c cleaner) add(cleanup func() error) cleaner {
2✔
2099
        return append(c, cleanup)
2✔
2100
}
2✔
2101

2102
// run is used to run all the previousely added cleanup functions.
2103
func (c cleaner) run() {
×
2104
        for i := len(c) - 1; i >= 0; i-- {
×
2105
                if err := c[i](); err != nil {
×
2106
                        srvrLog.Infof("Cleanup failed: %v", err)
×
2107
                }
×
2108
        }
2109
}
2110

2111
// startLowLevelServices starts the low-level services of the server. These
2112
// services must be started successfully before running the main server. The
2113
// services are,
2114
// 1. the chain notifier.
2115
//
2116
// TODO(yy): identify and add more low-level services here.
2117
func (s *server) startLowLevelServices() error {
2✔
2118
        var startErr error
2✔
2119

2✔
2120
        cleanup := cleaner{}
2✔
2121

2✔
2122
        cleanup = cleanup.add(s.cc.ChainNotifier.Stop)
2✔
2123
        if err := s.cc.ChainNotifier.Start(); err != nil {
2✔
2124
                startErr = err
×
2125
        }
×
2126

2127
        if startErr != nil {
2✔
2128
                cleanup.run()
×
2129
        }
×
2130

2131
        return startErr
2✔
2132
}
2133

2134
// Start starts the main daemon server, all requested listeners, and any helper
2135
// goroutines.
2136
// NOTE: This function is safe for concurrent access.
2137
//
2138
//nolint:funlen
2139
func (s *server) Start() error {
2✔
2140
        // Get the current blockbeat.
2✔
2141
        beat, err := s.getStartingBeat()
2✔
2142
        if err != nil {
2✔
2143
                return err
×
2144
        }
×
2145

2146
        var startErr error
2✔
2147

2✔
2148
        // If one sub system fails to start, the following code ensures that the
2✔
2149
        // previous started ones are stopped. It also ensures a proper wallet
2✔
2150
        // shutdown which is important for releasing its resources (boltdb, etc...)
2✔
2151
        cleanup := cleaner{}
2✔
2152

2✔
2153
        s.start.Do(func() {
4✔
2154
                cleanup = cleanup.add(s.customMessageServer.Stop)
2✔
2155
                if err := s.customMessageServer.Start(); err != nil {
2✔
2156
                        startErr = err
×
2157
                        return
×
2158
                }
×
2159

2160
                if s.hostAnn != nil {
2✔
2161
                        cleanup = cleanup.add(s.hostAnn.Stop)
×
2162
                        if err := s.hostAnn.Start(); err != nil {
×
2163
                                startErr = err
×
2164
                                return
×
2165
                        }
×
2166
                }
2167

2168
                if s.livenessMonitor != nil {
4✔
2169
                        cleanup = cleanup.add(s.livenessMonitor.Stop)
2✔
2170
                        if err := s.livenessMonitor.Start(); err != nil {
2✔
2171
                                startErr = err
×
2172
                                return
×
2173
                        }
×
2174
                }
2175

2176
                // Start the notification server. This is used so channel
2177
                // management goroutines can be notified when a funding
2178
                // transaction reaches a sufficient number of confirmations, or
2179
                // when the input for the funding transaction is spent in an
2180
                // attempt at an uncooperative close by the counterparty.
2181
                cleanup = cleanup.add(s.sigPool.Stop)
2✔
2182
                if err := s.sigPool.Start(); err != nil {
2✔
2183
                        startErr = err
×
2184
                        return
×
2185
                }
×
2186

2187
                cleanup = cleanup.add(s.writePool.Stop)
2✔
2188
                if err := s.writePool.Start(); err != nil {
2✔
2189
                        startErr = err
×
2190
                        return
×
2191
                }
×
2192

2193
                cleanup = cleanup.add(s.readPool.Stop)
2✔
2194
                if err := s.readPool.Start(); err != nil {
2✔
2195
                        startErr = err
×
2196
                        return
×
2197
                }
×
2198

2199
                cleanup = cleanup.add(s.cc.BestBlockTracker.Stop)
2✔
2200
                if err := s.cc.BestBlockTracker.Start(); err != nil {
2✔
2201
                        startErr = err
×
2202
                        return
×
2203
                }
×
2204

2205
                cleanup = cleanup.add(s.channelNotifier.Stop)
2✔
2206
                if err := s.channelNotifier.Start(); err != nil {
2✔
2207
                        startErr = err
×
2208
                        return
×
2209
                }
×
2210

2211
                cleanup = cleanup.add(func() error {
2✔
2212
                        return s.peerNotifier.Stop()
×
2213
                })
×
2214
                if err := s.peerNotifier.Start(); err != nil {
2✔
2215
                        startErr = err
×
2216
                        return
×
2217
                }
×
2218

2219
                cleanup = cleanup.add(s.htlcNotifier.Stop)
2✔
2220
                if err := s.htlcNotifier.Start(); err != nil {
2✔
2221
                        startErr = err
×
2222
                        return
×
2223
                }
×
2224

2225
                if s.towerClientMgr != nil {
4✔
2226
                        cleanup = cleanup.add(s.towerClientMgr.Stop)
2✔
2227
                        if err := s.towerClientMgr.Start(); err != nil {
2✔
2228
                                startErr = err
×
2229
                                return
×
2230
                        }
×
2231
                }
2232

2233
                cleanup = cleanup.add(s.txPublisher.Stop)
2✔
2234
                if err := s.txPublisher.Start(beat); err != nil {
2✔
2235
                        startErr = err
×
2236
                        return
×
2237
                }
×
2238

2239
                cleanup = cleanup.add(s.sweeper.Stop)
2✔
2240
                if err := s.sweeper.Start(beat); err != nil {
2✔
2241
                        startErr = err
×
2242
                        return
×
2243
                }
×
2244

2245
                cleanup = cleanup.add(s.utxoNursery.Stop)
2✔
2246
                if err := s.utxoNursery.Start(); err != nil {
2✔
2247
                        startErr = err
×
2248
                        return
×
2249
                }
×
2250

2251
                cleanup = cleanup.add(s.breachArbitrator.Stop)
2✔
2252
                if err := s.breachArbitrator.Start(); err != nil {
2✔
2253
                        startErr = err
×
2254
                        return
×
2255
                }
×
2256

2257
                cleanup = cleanup.add(s.fundingMgr.Stop)
2✔
2258
                if err := s.fundingMgr.Start(); err != nil {
2✔
2259
                        startErr = err
×
2260
                        return
×
2261
                }
×
2262

2263
                // htlcSwitch must be started before chainArb since the latter
2264
                // relies on htlcSwitch to deliver resolution message upon
2265
                // start.
2266
                cleanup = cleanup.add(s.htlcSwitch.Stop)
2✔
2267
                if err := s.htlcSwitch.Start(); err != nil {
2✔
2268
                        startErr = err
×
2269
                        return
×
2270
                }
×
2271

2272
                cleanup = cleanup.add(s.interceptableSwitch.Stop)
2✔
2273
                if err := s.interceptableSwitch.Start(); err != nil {
2✔
2274
                        startErr = err
×
2275
                        return
×
2276
                }
×
2277

2278
                cleanup = cleanup.add(s.invoiceHtlcModifier.Stop)
2✔
2279
                if err := s.invoiceHtlcModifier.Start(); err != nil {
2✔
2280
                        startErr = err
×
2281
                        return
×
2282
                }
×
2283

2284
                cleanup = cleanup.add(s.chainArb.Stop)
2✔
2285
                if err := s.chainArb.Start(beat); err != nil {
2✔
2286
                        startErr = err
×
2287
                        return
×
2288
                }
×
2289

2290
                cleanup = cleanup.add(s.graphDB.Stop)
2✔
2291
                if err := s.graphDB.Start(); err != nil {
2✔
NEW
2292
                        startErr = err
×
NEW
2293
                        return
×
NEW
2294
                }
×
2295

2296
                cleanup = cleanup.add(s.graphBuilder.Stop)
2✔
2297
                if err := s.graphBuilder.Start(); err != nil {
2✔
2298
                        startErr = err
×
2299
                        return
×
2300
                }
×
2301

2302
                cleanup = cleanup.add(s.chanRouter.Stop)
2✔
2303
                if err := s.chanRouter.Start(); err != nil {
2✔
2304
                        startErr = err
×
2305
                        return
×
2306
                }
×
2307
                // The authGossiper depends on the chanRouter and therefore
2308
                // should be started after it.
2309
                cleanup = cleanup.add(s.authGossiper.Stop)
2✔
2310
                if err := s.authGossiper.Start(); err != nil {
2✔
2311
                        startErr = err
×
2312
                        return
×
2313
                }
×
2314

2315
                cleanup = cleanup.add(s.invoices.Stop)
2✔
2316
                if err := s.invoices.Start(); err != nil {
2✔
2317
                        startErr = err
×
2318
                        return
×
2319
                }
×
2320

2321
                cleanup = cleanup.add(s.sphinx.Stop)
2✔
2322
                if err := s.sphinx.Start(); err != nil {
2✔
2323
                        startErr = err
×
2324
                        return
×
2325
                }
×
2326

2327
                cleanup = cleanup.add(s.chanStatusMgr.Stop)
2✔
2328
                if err := s.chanStatusMgr.Start(); err != nil {
2✔
2329
                        startErr = err
×
2330
                        return
×
2331
                }
×
2332

2333
                cleanup = cleanup.add(s.chanEventStore.Stop)
2✔
2334
                if err := s.chanEventStore.Start(); err != nil {
2✔
2335
                        startErr = err
×
2336
                        return
×
2337
                }
×
2338

2339
                cleanup.add(func() error {
2✔
2340
                        s.missionController.StopStoreTickers()
×
2341
                        return nil
×
2342
                })
×
2343
                s.missionController.RunStoreTickers()
2✔
2344

2✔
2345
                // Before we start the connMgr, we'll check to see if we have
2✔
2346
                // any backups to recover. We do this now as we want to ensure
2✔
2347
                // that have all the information we need to handle channel
2✔
2348
                // recovery _before_ we even accept connections from any peers.
2✔
2349
                chanRestorer := &chanDBRestorer{
2✔
2350
                        db:         s.chanStateDB,
2✔
2351
                        secretKeys: s.cc.KeyRing,
2✔
2352
                        chainArb:   s.chainArb,
2✔
2353
                }
2✔
2354
                if len(s.chansToRestore.PackedSingleChanBackups) != 0 {
2✔
2355
                        _, err := chanbackup.UnpackAndRecoverSingles(
×
2356
                                s.chansToRestore.PackedSingleChanBackups,
×
2357
                                s.cc.KeyRing, chanRestorer, s,
×
2358
                        )
×
2359
                        if err != nil {
×
2360
                                startErr = fmt.Errorf("unable to unpack single "+
×
2361
                                        "backups: %v", err)
×
2362
                                return
×
2363
                        }
×
2364
                }
2365
                if len(s.chansToRestore.PackedMultiChanBackup) != 0 {
4✔
2366
                        _, err := chanbackup.UnpackAndRecoverMulti(
2✔
2367
                                s.chansToRestore.PackedMultiChanBackup,
2✔
2368
                                s.cc.KeyRing, chanRestorer, s,
2✔
2369
                        )
2✔
2370
                        if err != nil {
2✔
2371
                                startErr = fmt.Errorf("unable to unpack chan "+
×
2372
                                        "backup: %v", err)
×
2373
                                return
×
2374
                        }
×
2375
                }
2376

2377
                // chanSubSwapper must be started after the `channelNotifier`
2378
                // because it depends on channel events as a synchronization
2379
                // point.
2380
                cleanup = cleanup.add(s.chanSubSwapper.Stop)
2✔
2381
                if err := s.chanSubSwapper.Start(); err != nil {
2✔
2382
                        startErr = err
×
2383
                        return
×
2384
                }
×
2385

2386
                if s.torController != nil {
2✔
2387
                        cleanup = cleanup.add(s.torController.Stop)
×
2388
                        if err := s.createNewHiddenService(); err != nil {
×
2389
                                startErr = err
×
2390
                                return
×
2391
                        }
×
2392
                }
2393

2394
                if s.natTraversal != nil {
2✔
2395
                        s.wg.Add(1)
×
2396
                        go s.watchExternalIP()
×
2397
                }
×
2398

2399
                // Start connmgr last to prevent connections before init.
2400
                cleanup = cleanup.add(func() error {
2✔
2401
                        s.connMgr.Stop()
×
2402
                        return nil
×
2403
                })
×
2404
                s.connMgr.Start()
2✔
2405

2✔
2406
                // If peers are specified as a config option, we'll add those
2✔
2407
                // peers first.
2✔
2408
                for _, peerAddrCfg := range s.cfg.AddPeers {
4✔
2409
                        parsedPubkey, parsedHost, err := lncfg.ParseLNAddressPubkey(
2✔
2410
                                peerAddrCfg,
2✔
2411
                        )
2✔
2412
                        if err != nil {
2✔
2413
                                startErr = fmt.Errorf("unable to parse peer "+
×
2414
                                        "pubkey from config: %v", err)
×
2415
                                return
×
2416
                        }
×
2417
                        addr, err := parseAddr(parsedHost, s.cfg.net)
2✔
2418
                        if err != nil {
2✔
2419
                                startErr = fmt.Errorf("unable to parse peer "+
×
2420
                                        "address provided as a config option: "+
×
2421
                                        "%v", err)
×
2422
                                return
×
2423
                        }
×
2424

2425
                        peerAddr := &lnwire.NetAddress{
2✔
2426
                                IdentityKey: parsedPubkey,
2✔
2427
                                Address:     addr,
2✔
2428
                                ChainNet:    s.cfg.ActiveNetParams.Net,
2✔
2429
                        }
2✔
2430

2✔
2431
                        err = s.ConnectToPeer(
2✔
2432
                                peerAddr, true,
2✔
2433
                                s.cfg.ConnectionTimeout,
2✔
2434
                        )
2✔
2435
                        if err != nil {
2✔
2436
                                startErr = fmt.Errorf("unable to connect to "+
×
2437
                                        "peer address provided as a config "+
×
2438
                                        "option: %v", err)
×
2439
                                return
×
2440
                        }
×
2441
                }
2442

2443
                // Subscribe to NodeAnnouncements that advertise new addresses
2444
                // our persistent peers.
2445
                if err := s.updatePersistentPeerAddrs(); err != nil {
2✔
2446
                        startErr = err
×
2447
                        return
×
2448
                }
×
2449

2450
                // With all the relevant sub-systems started, we'll now attempt
2451
                // to establish persistent connections to our direct channel
2452
                // collaborators within the network. Before doing so however,
2453
                // we'll prune our set of link nodes found within the database
2454
                // to ensure we don't reconnect to any nodes we no longer have
2455
                // open channels with.
2456
                if err := s.chanStateDB.PruneLinkNodes(); err != nil {
2✔
2457
                        startErr = err
×
2458
                        return
×
2459
                }
×
2460
                if err := s.establishPersistentConnections(); err != nil {
2✔
2461
                        startErr = err
×
2462
                        return
×
2463
                }
×
2464

2465
                // setSeedList is a helper function that turns multiple DNS seed
2466
                // server tuples from the command line or config file into the
2467
                // data structure we need and does a basic formal sanity check
2468
                // in the process.
2469
                setSeedList := func(tuples []string, genesisHash chainhash.Hash) {
2✔
2470
                        if len(tuples) == 0 {
×
2471
                                return
×
2472
                        }
×
2473

2474
                        result := make([][2]string, len(tuples))
×
2475
                        for idx, tuple := range tuples {
×
2476
                                tuple = strings.TrimSpace(tuple)
×
2477
                                if len(tuple) == 0 {
×
2478
                                        return
×
2479
                                }
×
2480

2481
                                servers := strings.Split(tuple, ",")
×
2482
                                if len(servers) > 2 || len(servers) == 0 {
×
2483
                                        srvrLog.Warnf("Ignoring invalid DNS "+
×
2484
                                                "seed tuple: %v", servers)
×
2485
                                        return
×
2486
                                }
×
2487

2488
                                copy(result[idx][:], servers)
×
2489
                        }
2490

2491
                        chainreg.ChainDNSSeeds[genesisHash] = result
×
2492
                }
2493

2494
                // Let users overwrite the DNS seed nodes. We only allow them
2495
                // for bitcoin mainnet/testnet/signet.
2496
                if s.cfg.Bitcoin.MainNet {
2✔
2497
                        setSeedList(
×
2498
                                s.cfg.Bitcoin.DNSSeeds,
×
2499
                                chainreg.BitcoinMainnetGenesis,
×
2500
                        )
×
2501
                }
×
2502
                if s.cfg.Bitcoin.TestNet3 {
2✔
2503
                        setSeedList(
×
2504
                                s.cfg.Bitcoin.DNSSeeds,
×
2505
                                chainreg.BitcoinTestnetGenesis,
×
2506
                        )
×
2507
                }
×
2508
                if s.cfg.Bitcoin.SigNet {
2✔
2509
                        setSeedList(
×
2510
                                s.cfg.Bitcoin.DNSSeeds,
×
2511
                                chainreg.BitcoinSignetGenesis,
×
2512
                        )
×
2513
                }
×
2514

2515
                // If network bootstrapping hasn't been disabled, then we'll
2516
                // configure the set of active bootstrappers, and launch a
2517
                // dedicated goroutine to maintain a set of persistent
2518
                // connections.
2519
                if shouldPeerBootstrap(s.cfg) {
2✔
2520
                        bootstrappers, err := initNetworkBootstrappers(s)
×
2521
                        if err != nil {
×
2522
                                startErr = err
×
2523
                                return
×
2524
                        }
×
2525

2526
                        s.wg.Add(1)
×
2527
                        go s.peerBootstrapper(defaultMinPeers, bootstrappers)
×
2528
                } else {
2✔
2529
                        srvrLog.Infof("Auto peer bootstrapping is disabled")
2✔
2530
                }
2✔
2531

2532
                // Start the blockbeat after all other subsystems have been
2533
                // started so they are ready to receive new blocks.
2534
                cleanup = cleanup.add(func() error {
2✔
2535
                        s.blockbeatDispatcher.Stop()
×
2536
                        return nil
×
2537
                })
×
2538
                if err := s.blockbeatDispatcher.Start(); err != nil {
2✔
2539
                        startErr = err
×
2540
                        return
×
2541
                }
×
2542

2543
                // Set the active flag now that we've completed the full
2544
                // startup.
2545
                atomic.StoreInt32(&s.active, 1)
2✔
2546
        })
2547

2548
        if startErr != nil {
2✔
2549
                cleanup.run()
×
2550
        }
×
2551
        return startErr
2✔
2552
}
2553

2554
// Stop gracefully shutsdown the main daemon server. This function will signal
2555
// any active goroutines, or helper objects to exit, then blocks until they've
2556
// all successfully exited. Additionally, any/all listeners are closed.
2557
// NOTE: This function is safe for concurrent access.
2558
func (s *server) Stop() error {
2✔
2559
        s.stop.Do(func() {
4✔
2560
                atomic.StoreInt32(&s.stopping, 1)
2✔
2561

2✔
2562
                close(s.quit)
2✔
2563

2✔
2564
                // Shutdown connMgr first to prevent conns during shutdown.
2✔
2565
                s.connMgr.Stop()
2✔
2566

2✔
2567
                // Stop dispatching blocks to other systems immediately.
2✔
2568
                s.blockbeatDispatcher.Stop()
2✔
2569

2✔
2570
                // Shutdown the wallet, funding manager, and the rpc server.
2✔
2571
                if err := s.chanStatusMgr.Stop(); err != nil {
2✔
2572
                        srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)
×
2573
                }
×
2574
                if err := s.htlcSwitch.Stop(); err != nil {
2✔
2575
                        srvrLog.Warnf("failed to stop htlcSwitch: %v", err)
×
2576
                }
×
2577
                if err := s.sphinx.Stop(); err != nil {
2✔
2578
                        srvrLog.Warnf("failed to stop sphinx: %v", err)
×
2579
                }
×
2580
                if err := s.invoices.Stop(); err != nil {
2✔
2581
                        srvrLog.Warnf("failed to stop invoices: %v", err)
×
2582
                }
×
2583
                if err := s.interceptableSwitch.Stop(); err != nil {
2✔
2584
                        srvrLog.Warnf("failed to stop interceptable "+
×
2585
                                "switch: %v", err)
×
2586
                }
×
2587
                if err := s.invoiceHtlcModifier.Stop(); err != nil {
2✔
2588
                        srvrLog.Warnf("failed to stop htlc invoices "+
×
2589
                                "modifier: %v", err)
×
2590
                }
×
2591
                if err := s.chanRouter.Stop(); err != nil {
2✔
2592
                        srvrLog.Warnf("failed to stop chanRouter: %v", err)
×
2593
                }
×
2594
                if err := s.graphBuilder.Stop(); err != nil {
2✔
2595
                        srvrLog.Warnf("failed to stop graphBuilder %v", err)
×
2596
                }
×
2597
                if err := s.graphDB.Stop(); err != nil {
2✔
NEW
2598
                        srvrLog.Warnf("failed to stop graphDB %v", err)
×
NEW
2599
                }
×
2600
                if err := s.chainArb.Stop(); err != nil {
2✔
2601
                        srvrLog.Warnf("failed to stop chainArb: %v", err)
×
2602
                }
×
2603
                if err := s.fundingMgr.Stop(); err != nil {
2✔
2604
                        srvrLog.Warnf("failed to stop fundingMgr: %v", err)
×
2605
                }
×
2606
                if err := s.breachArbitrator.Stop(); err != nil {
2✔
2607
                        srvrLog.Warnf("failed to stop breachArbitrator: %v",
×
2608
                                err)
×
2609
                }
×
2610
                if err := s.utxoNursery.Stop(); err != nil {
2✔
2611
                        srvrLog.Warnf("failed to stop utxoNursery: %v", err)
×
2612
                }
×
2613
                if err := s.authGossiper.Stop(); err != nil {
2✔
2614
                        srvrLog.Warnf("failed to stop authGossiper: %v", err)
×
2615
                }
×
2616
                if err := s.sweeper.Stop(); err != nil {
2✔
2617
                        srvrLog.Warnf("failed to stop sweeper: %v", err)
×
2618
                }
×
2619
                if err := s.txPublisher.Stop(); err != nil {
2✔
2620
                        srvrLog.Warnf("failed to stop txPublisher: %v", err)
×
2621
                }
×
2622
                if err := s.channelNotifier.Stop(); err != nil {
2✔
2623
                        srvrLog.Warnf("failed to stop channelNotifier: %v", err)
×
2624
                }
×
2625
                if err := s.peerNotifier.Stop(); err != nil {
2✔
2626
                        srvrLog.Warnf("failed to stop peerNotifier: %v", err)
×
2627
                }
×
2628
                if err := s.htlcNotifier.Stop(); err != nil {
2✔
2629
                        srvrLog.Warnf("failed to stop htlcNotifier: %v", err)
×
2630
                }
×
2631

2632
                // Update channel.backup file. Make sure to do it before
2633
                // stopping chanSubSwapper.
2634
                singles, err := chanbackup.FetchStaticChanBackups(
2✔
2635
                        s.chanStateDB, s.addrSource,
2✔
2636
                )
2✔
2637
                if err != nil {
2✔
2638
                        srvrLog.Warnf("failed to fetch channel states: %v",
×
2639
                                err)
×
2640
                } else {
2✔
2641
                        err := s.chanSubSwapper.ManualUpdate(singles)
2✔
2642
                        if err != nil {
4✔
2643
                                srvrLog.Warnf("Manual update of channel "+
2✔
2644
                                        "backup failed: %v", err)
2✔
2645
                        }
2✔
2646
                }
2647

2648
                if err := s.chanSubSwapper.Stop(); err != nil {
2✔
2649
                        srvrLog.Warnf("failed to stop chanSubSwapper: %v", err)
×
2650
                }
×
2651
                if err := s.cc.ChainNotifier.Stop(); err != nil {
2✔
2652
                        srvrLog.Warnf("Unable to stop ChainNotifier: %v", err)
×
2653
                }
×
2654
                if err := s.cc.BestBlockTracker.Stop(); err != nil {
2✔
2655
                        srvrLog.Warnf("Unable to stop BestBlockTracker: %v",
×
2656
                                err)
×
2657
                }
×
2658
                if err := s.chanEventStore.Stop(); err != nil {
2✔
2659
                        srvrLog.Warnf("Unable to stop ChannelEventStore: %v",
×
2660
                                err)
×
2661
                }
×
2662
                s.missionController.StopStoreTickers()
2✔
2663

2✔
2664
                // Disconnect from each active peers to ensure that
2✔
2665
                // peerTerminationWatchers signal completion to each peer.
2✔
2666
                for _, peer := range s.Peers() {
4✔
2667
                        err := s.DisconnectPeer(peer.IdentityKey())
2✔
2668
                        if err != nil {
2✔
2669
                                srvrLog.Warnf("could not disconnect peer: %v"+
×
2670
                                        "received error: %v", peer.IdentityKey(),
×
2671
                                        err,
×
2672
                                )
×
2673
                        }
×
2674
                }
2675

2676
                // Now that all connections have been torn down, stop the tower
2677
                // client which will reliably flush all queued states to the
2678
                // tower. If this is halted for any reason, the force quit timer
2679
                // will kick in and abort to allow this method to return.
2680
                if s.towerClientMgr != nil {
4✔
2681
                        if err := s.towerClientMgr.Stop(); err != nil {
2✔
2682
                                srvrLog.Warnf("Unable to shut down tower "+
×
2683
                                        "client manager: %v", err)
×
2684
                        }
×
2685
                }
2686

2687
                if s.hostAnn != nil {
2✔
2688
                        if err := s.hostAnn.Stop(); err != nil {
×
2689
                                srvrLog.Warnf("unable to shut down host "+
×
2690
                                        "annoucner: %v", err)
×
2691
                        }
×
2692
                }
2693

2694
                if s.livenessMonitor != nil {
4✔
2695
                        if err := s.livenessMonitor.Stop(); err != nil {
2✔
2696
                                srvrLog.Warnf("unable to shutdown liveness "+
×
2697
                                        "monitor: %v", err)
×
2698
                        }
×
2699
                }
2700

2701
                // Wait for all lingering goroutines to quit.
2702
                srvrLog.Debug("Waiting for server to shutdown...")
2✔
2703
                s.wg.Wait()
2✔
2704

2✔
2705
                srvrLog.Debug("Stopping buffer pools...")
2✔
2706
                s.sigPool.Stop()
2✔
2707
                s.writePool.Stop()
2✔
2708
                s.readPool.Stop()
2✔
2709
        })
2710

2711
        return nil
2✔
2712
}
2713

2714
// Stopped returns true if the server has been instructed to shutdown.
2715
// NOTE: This function is safe for concurrent access.
2716
func (s *server) Stopped() bool {
2✔
2717
        return atomic.LoadInt32(&s.stopping) != 0
2✔
2718
}
2✔
2719

2720
// configurePortForwarding attempts to set up port forwarding for the different
2721
// ports that the server will be listening on.
2722
//
2723
// NOTE: This should only be used when using some kind of NAT traversal to
2724
// automatically set up forwarding rules.
2725
func (s *server) configurePortForwarding(ports ...uint16) ([]string, error) {
×
2726
        ip, err := s.natTraversal.ExternalIP()
×
2727
        if err != nil {
×
2728
                return nil, err
×
2729
        }
×
2730
        s.lastDetectedIP = ip
×
2731

×
2732
        externalIPs := make([]string, 0, len(ports))
×
2733
        for _, port := range ports {
×
2734
                if err := s.natTraversal.AddPortMapping(port); err != nil {
×
2735
                        srvrLog.Debugf("Unable to forward port %d: %v", port, err)
×
2736
                        continue
×
2737
                }
2738

2739
                hostIP := fmt.Sprintf("%v:%d", ip, port)
×
2740
                externalIPs = append(externalIPs, hostIP)
×
2741
        }
2742

2743
        return externalIPs, nil
×
2744
}
2745

2746
// removePortForwarding attempts to clear the forwarding rules for the different
2747
// ports the server is currently listening on.
2748
//
2749
// NOTE: This should only be used when using some kind of NAT traversal to
2750
// automatically set up forwarding rules.
2751
func (s *server) removePortForwarding() {
×
2752
        forwardedPorts := s.natTraversal.ForwardedPorts()
×
2753
        for _, port := range forwardedPorts {
×
2754
                if err := s.natTraversal.DeletePortMapping(port); err != nil {
×
2755
                        srvrLog.Errorf("Unable to remove forwarding rules for "+
×
2756
                                "port %d: %v", port, err)
×
2757
                }
×
2758
        }
2759
}
2760

2761
// watchExternalIP continuously checks for an updated external IP address every
2762
// 15 minutes. Once a new IP address has been detected, it will automatically
2763
// handle port forwarding rules and send updated node announcements to the
2764
// currently connected peers.
2765
//
2766
// NOTE: This MUST be run as a goroutine.
2767
func (s *server) watchExternalIP() {
×
2768
        defer s.wg.Done()
×
2769

×
2770
        // Before exiting, we'll make sure to remove the forwarding rules set
×
2771
        // up by the server.
×
2772
        defer s.removePortForwarding()
×
2773

×
2774
        // Keep track of the external IPs set by the user to avoid replacing
×
2775
        // them when detecting a new IP.
×
2776
        ipsSetByUser := make(map[string]struct{})
×
2777
        for _, ip := range s.cfg.ExternalIPs {
×
2778
                ipsSetByUser[ip.String()] = struct{}{}
×
2779
        }
×
2780

2781
        forwardedPorts := s.natTraversal.ForwardedPorts()
×
2782

×
2783
        ticker := time.NewTicker(15 * time.Minute)
×
2784
        defer ticker.Stop()
×
2785
out:
×
2786
        for {
×
2787
                select {
×
2788
                case <-ticker.C:
×
2789
                        // We'll start off by making sure a new IP address has
×
2790
                        // been detected.
×
2791
                        ip, err := s.natTraversal.ExternalIP()
×
2792
                        if err != nil {
×
2793
                                srvrLog.Debugf("Unable to retrieve the "+
×
2794
                                        "external IP address: %v", err)
×
2795
                                continue
×
2796
                        }
2797

2798
                        // Periodically renew the NAT port forwarding.
2799
                        for _, port := range forwardedPorts {
×
2800
                                err := s.natTraversal.AddPortMapping(port)
×
2801
                                if err != nil {
×
2802
                                        srvrLog.Warnf("Unable to automatically "+
×
2803
                                                "re-create port forwarding using %s: %v",
×
2804
                                                s.natTraversal.Name(), err)
×
2805
                                } else {
×
2806
                                        srvrLog.Debugf("Automatically re-created "+
×
2807
                                                "forwarding for port %d using %s to "+
×
2808
                                                "advertise external IP",
×
2809
                                                port, s.natTraversal.Name())
×
2810
                                }
×
2811
                        }
2812

2813
                        if ip.Equal(s.lastDetectedIP) {
×
2814
                                continue
×
2815
                        }
2816

2817
                        srvrLog.Infof("Detected new external IP address %s", ip)
×
2818

×
2819
                        // Next, we'll craft the new addresses that will be
×
2820
                        // included in the new node announcement and advertised
×
2821
                        // to the network. Each address will consist of the new
×
2822
                        // IP detected and one of the currently advertised
×
2823
                        // ports.
×
2824
                        var newAddrs []net.Addr
×
2825
                        for _, port := range forwardedPorts {
×
2826
                                hostIP := fmt.Sprintf("%v:%d", ip, port)
×
2827
                                addr, err := net.ResolveTCPAddr("tcp", hostIP)
×
2828
                                if err != nil {
×
2829
                                        srvrLog.Debugf("Unable to resolve "+
×
2830
                                                "host %v: %v", addr, err)
×
2831
                                        continue
×
2832
                                }
2833

2834
                                newAddrs = append(newAddrs, addr)
×
2835
                        }
2836

2837
                        // Skip the update if we weren't able to resolve any of
2838
                        // the new addresses.
2839
                        if len(newAddrs) == 0 {
×
2840
                                srvrLog.Debug("Skipping node announcement " +
×
2841
                                        "update due to not being able to " +
×
2842
                                        "resolve any new addresses")
×
2843
                                continue
×
2844
                        }
2845

2846
                        // Now, we'll need to update the addresses in our node's
2847
                        // announcement in order to propagate the update
2848
                        // throughout the network. We'll only include addresses
2849
                        // that have a different IP from the previous one, as
2850
                        // the previous IP is no longer valid.
2851
                        currentNodeAnn := s.getNodeAnnouncement()
×
2852

×
2853
                        for _, addr := range currentNodeAnn.Addresses {
×
2854
                                host, _, err := net.SplitHostPort(addr.String())
×
2855
                                if err != nil {
×
2856
                                        srvrLog.Debugf("Unable to determine "+
×
2857
                                                "host from address %v: %v",
×
2858
                                                addr, err)
×
2859
                                        continue
×
2860
                                }
2861

2862
                                // We'll also make sure to include external IPs
2863
                                // set manually by the user.
2864
                                _, setByUser := ipsSetByUser[addr.String()]
×
2865
                                if setByUser || host != s.lastDetectedIP.String() {
×
2866
                                        newAddrs = append(newAddrs, addr)
×
2867
                                }
×
2868
                        }
2869

2870
                        // Then, we'll generate a new timestamped node
2871
                        // announcement with the updated addresses and broadcast
2872
                        // it to our peers.
2873
                        newNodeAnn, err := s.genNodeAnnouncement(
×
2874
                                nil, netann.NodeAnnSetAddrs(newAddrs),
×
2875
                        )
×
2876
                        if err != nil {
×
2877
                                srvrLog.Debugf("Unable to generate new node "+
×
2878
                                        "announcement: %v", err)
×
2879
                                continue
×
2880
                        }
2881

2882
                        err = s.BroadcastMessage(nil, &newNodeAnn)
×
2883
                        if err != nil {
×
2884
                                srvrLog.Debugf("Unable to broadcast new node "+
×
2885
                                        "announcement to peers: %v", err)
×
2886
                                continue
×
2887
                        }
2888

2889
                        // Finally, update the last IP seen to the current one.
2890
                        s.lastDetectedIP = ip
×
2891
                case <-s.quit:
×
2892
                        break out
×
2893
                }
2894
        }
2895
}
2896

2897
// initNetworkBootstrappers initializes a set of network peer bootstrappers
2898
// based on the server, and currently active bootstrap mechanisms as defined
2899
// within the current configuration.
2900
func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, error) {
×
2901
        srvrLog.Infof("Initializing peer network bootstrappers!")
×
2902

×
2903
        var bootStrappers []discovery.NetworkPeerBootstrapper
×
2904

×
2905
        // First, we'll create an instance of the ChannelGraphBootstrapper as
×
2906
        // this can be used by default if we've already partially seeded the
×
2907
        // network.
×
2908
        chanGraph := autopilot.ChannelGraphFromDatabase(s.graphDB)
×
2909
        graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph)
×
2910
        if err != nil {
×
2911
                return nil, err
×
2912
        }
×
2913
        bootStrappers = append(bootStrappers, graphBootstrapper)
×
2914

×
2915
        // If this isn't simnet mode, then one of our additional bootstrapping
×
2916
        // sources will be the set of running DNS seeds.
×
2917
        if !s.cfg.Bitcoin.SimNet {
×
2918
                dnsSeeds, ok := chainreg.ChainDNSSeeds[*s.cfg.ActiveNetParams.GenesisHash]
×
2919

×
2920
                // If we have a set of DNS seeds for this chain, then we'll add
×
2921
                // it as an additional bootstrapping source.
×
2922
                if ok {
×
2923
                        srvrLog.Infof("Creating DNS peer bootstrapper with "+
×
2924
                                "seeds: %v", dnsSeeds)
×
2925

×
2926
                        dnsBootStrapper := discovery.NewDNSSeedBootstrapper(
×
2927
                                dnsSeeds, s.cfg.net, s.cfg.ConnectionTimeout,
×
2928
                        )
×
2929
                        bootStrappers = append(bootStrappers, dnsBootStrapper)
×
2930
                }
×
2931
        }
2932

2933
        return bootStrappers, nil
×
2934
}
2935

2936
// createBootstrapIgnorePeers creates a map of peers that the bootstrap process
2937
// needs to ignore, which is made of three parts,
2938
//   - the node itself needs to be skipped as it doesn't make sense to connect
2939
//     to itself.
2940
//   - the peers that already have connections with, as in s.peersByPub.
2941
//   - the peers that we are attempting to connect, as in s.persistentPeers.
2942
func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} {
×
2943
        s.mu.RLock()
×
2944
        defer s.mu.RUnlock()
×
2945

×
2946
        ignore := make(map[autopilot.NodeID]struct{})
×
2947

×
2948
        // We should ignore ourselves from bootstrapping.
×
2949
        selfKey := autopilot.NewNodeID(s.identityECDH.PubKey())
×
2950
        ignore[selfKey] = struct{}{}
×
2951

×
2952
        // Ignore all connected peers.
×
2953
        for _, peer := range s.peersByPub {
×
2954
                nID := autopilot.NewNodeID(peer.IdentityKey())
×
2955
                ignore[nID] = struct{}{}
×
2956
        }
×
2957

2958
        // Ignore all persistent peers as they have a dedicated reconnecting
2959
        // process.
2960
        for pubKeyStr := range s.persistentPeers {
×
2961
                var nID autopilot.NodeID
×
2962
                copy(nID[:], []byte(pubKeyStr))
×
2963
                ignore[nID] = struct{}{}
×
2964
        }
×
2965

2966
        return ignore
×
2967
}
2968

2969
// peerBootstrapper is a goroutine which is tasked with attempting to establish
2970
// and maintain a target minimum number of outbound connections. With this
2971
// invariant, we ensure that our node is connected to a diverse set of peers
2972
// and that nodes newly joining the network receive an up to date network view
2973
// as soon as possible.
2974
func (s *server) peerBootstrapper(numTargetPeers uint32,
2975
        bootstrappers []discovery.NetworkPeerBootstrapper) {
×
2976

×
2977
        defer s.wg.Done()
×
2978

×
2979
        // Before we continue, init the ignore peers map.
×
2980
        ignoreList := s.createBootstrapIgnorePeers()
×
2981

×
2982
        // We'll start off by aggressively attempting connections to peers in
×
2983
        // order to be a part of the network as soon as possible.
×
2984
        s.initialPeerBootstrap(ignoreList, numTargetPeers, bootstrappers)
×
2985

×
2986
        // Once done, we'll attempt to maintain our target minimum number of
×
2987
        // peers.
×
2988
        //
×
2989
        // We'll use a 15 second backoff, and double the time every time an
×
2990
        // epoch fails up to a ceiling.
×
2991
        backOff := time.Second * 15
×
2992

×
2993
        // We'll create a new ticker to wake us up every 15 seconds so we can
×
2994
        // see if we've reached our minimum number of peers.
×
2995
        sampleTicker := time.NewTicker(backOff)
×
2996
        defer sampleTicker.Stop()
×
2997

×
2998
        // We'll use the number of attempts and errors to determine if we need
×
2999
        // to increase the time between discovery epochs.
×
3000
        var epochErrors uint32 // To be used atomically.
×
3001
        var epochAttempts uint32
×
3002

×
3003
        for {
×
3004
                select {
×
3005
                // The ticker has just woken us up, so we'll need to check if
3006
                // we need to attempt to connect our to any more peers.
3007
                case <-sampleTicker.C:
×
3008
                        // Obtain the current number of peers, so we can gauge
×
3009
                        // if we need to sample more peers or not.
×
3010
                        s.mu.RLock()
×
3011
                        numActivePeers := uint32(len(s.peersByPub))
×
3012
                        s.mu.RUnlock()
×
3013

×
3014
                        // If we have enough peers, then we can loop back
×
3015
                        // around to the next round as we're done here.
×
3016
                        if numActivePeers >= numTargetPeers {
×
3017
                                continue
×
3018
                        }
3019

3020
                        // If all of our attempts failed during this last back
3021
                        // off period, then will increase our backoff to 5
3022
                        // minute ceiling to avoid an excessive number of
3023
                        // queries
3024
                        //
3025
                        // TODO(roasbeef): add reverse policy too?
3026

3027
                        if epochAttempts > 0 &&
×
3028
                                atomic.LoadUint32(&epochErrors) >= epochAttempts {
×
3029

×
3030
                                sampleTicker.Stop()
×
3031

×
3032
                                backOff *= 2
×
3033
                                if backOff > bootstrapBackOffCeiling {
×
3034
                                        backOff = bootstrapBackOffCeiling
×
3035
                                }
×
3036

3037
                                srvrLog.Debugf("Backing off peer bootstrapper to "+
×
3038
                                        "%v", backOff)
×
3039
                                sampleTicker = time.NewTicker(backOff)
×
3040
                                continue
×
3041
                        }
3042

3043
                        atomic.StoreUint32(&epochErrors, 0)
×
3044
                        epochAttempts = 0
×
3045

×
3046
                        // Since we know need more peers, we'll compute the
×
3047
                        // exact number we need to reach our threshold.
×
3048
                        numNeeded := numTargetPeers - numActivePeers
×
3049

×
3050
                        srvrLog.Debugf("Attempting to obtain %v more network "+
×
3051
                                "peers", numNeeded)
×
3052

×
3053
                        // With the number of peers we need calculated, we'll
×
3054
                        // query the network bootstrappers to sample a set of
×
3055
                        // random addrs for us.
×
3056
                        //
×
3057
                        // Before we continue, get a copy of the ignore peers
×
3058
                        // map.
×
3059
                        ignoreList = s.createBootstrapIgnorePeers()
×
3060

×
3061
                        peerAddrs, err := discovery.MultiSourceBootstrap(
×
3062
                                ignoreList, numNeeded*2, bootstrappers...,
×
3063
                        )
×
3064
                        if err != nil {
×
3065
                                srvrLog.Errorf("Unable to retrieve bootstrap "+
×
3066
                                        "peers: %v", err)
×
3067
                                continue
×
3068
                        }
3069

3070
                        // Finally, we'll launch a new goroutine for each
3071
                        // prospective peer candidates.
3072
                        for _, addr := range peerAddrs {
×
3073
                                epochAttempts++
×
3074

×
3075
                                go func(a *lnwire.NetAddress) {
×
3076
                                        // TODO(roasbeef): can do AS, subnet,
×
3077
                                        // country diversity, etc
×
3078
                                        errChan := make(chan error, 1)
×
3079
                                        s.connectToPeer(
×
3080
                                                a, errChan,
×
3081
                                                s.cfg.ConnectionTimeout,
×
3082
                                        )
×
3083
                                        select {
×
3084
                                        case err := <-errChan:
×
3085
                                                if err == nil {
×
3086
                                                        return
×
3087
                                                }
×
3088

3089
                                                srvrLog.Errorf("Unable to "+
×
3090
                                                        "connect to %v: %v",
×
3091
                                                        a, err)
×
3092
                                                atomic.AddUint32(&epochErrors, 1)
×
3093
                                        case <-s.quit:
×
3094
                                        }
3095
                                }(addr)
3096
                        }
3097
                case <-s.quit:
×
3098
                        return
×
3099
                }
3100
        }
3101
}
3102

3103
// bootstrapBackOffCeiling is the maximum amount of time we'll wait between
3104
// failed attempts to locate a set of bootstrap peers. We'll slowly double our
3105
// query back off each time we encounter a failure.
3106
const bootstrapBackOffCeiling = time.Minute * 5
3107

3108
// initialPeerBootstrap attempts to continuously connect to peers on startup
3109
// until the target number of peers has been reached. This ensures that nodes
3110
// receive an up to date network view as soon as possible.
3111
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
3112
        numTargetPeers uint32,
3113
        bootstrappers []discovery.NetworkPeerBootstrapper) {
×
3114

×
3115
        srvrLog.Debugf("Init bootstrap with targetPeers=%v, bootstrappers=%v, "+
×
3116
                "ignore=%v", numTargetPeers, len(bootstrappers), len(ignore))
×
3117

×
3118
        // We'll start off by waiting 2 seconds between failed attempts, then
×
3119
        // double each time we fail until we hit the bootstrapBackOffCeiling.
×
3120
        var delaySignal <-chan time.Time
×
3121
        delayTime := time.Second * 2
×
3122

×
3123
        // As want to be more aggressive, we'll use a lower back off celling
×
3124
        // then the main peer bootstrap logic.
×
3125
        backOffCeiling := bootstrapBackOffCeiling / 5
×
3126

×
3127
        for attempts := 0; ; attempts++ {
×
3128
                // Check if the server has been requested to shut down in order
×
3129
                // to prevent blocking.
×
3130
                if s.Stopped() {
×
3131
                        return
×
3132
                }
×
3133

3134
                // We can exit our aggressive initial peer bootstrapping stage
3135
                // if we've reached out target number of peers.
3136
                s.mu.RLock()
×
3137
                numActivePeers := uint32(len(s.peersByPub))
×
3138
                s.mu.RUnlock()
×
3139

×
3140
                if numActivePeers >= numTargetPeers {
×
3141
                        return
×
3142
                }
×
3143

3144
                if attempts > 0 {
×
3145
                        srvrLog.Debugf("Waiting %v before trying to locate "+
×
3146
                                "bootstrap peers (attempt #%v)", delayTime,
×
3147
                                attempts)
×
3148

×
3149
                        // We've completed at least one iterating and haven't
×
3150
                        // finished, so we'll start to insert a delay period
×
3151
                        // between each attempt.
×
3152
                        delaySignal = time.After(delayTime)
×
3153
                        select {
×
3154
                        case <-delaySignal:
×
3155
                        case <-s.quit:
×
3156
                                return
×
3157
                        }
3158

3159
                        // After our delay, we'll double the time we wait up to
3160
                        // the max back off period.
3161
                        delayTime *= 2
×
3162
                        if delayTime > backOffCeiling {
×
3163
                                delayTime = backOffCeiling
×
3164
                        }
×
3165
                }
3166

3167
                // Otherwise, we'll request for the remaining number of peers
3168
                // in order to reach our target.
3169
                peersNeeded := numTargetPeers - numActivePeers
×
3170
                bootstrapAddrs, err := discovery.MultiSourceBootstrap(
×
3171
                        ignore, peersNeeded, bootstrappers...,
×
3172
                )
×
3173
                if err != nil {
×
3174
                        srvrLog.Errorf("Unable to retrieve initial bootstrap "+
×
3175
                                "peers: %v", err)
×
3176
                        continue
×
3177
                }
3178

3179
                // Then, we'll attempt to establish a connection to the
3180
                // different peer addresses retrieved by our bootstrappers.
3181
                var wg sync.WaitGroup
×
3182
                for _, bootstrapAddr := range bootstrapAddrs {
×
3183
                        wg.Add(1)
×
3184
                        go func(addr *lnwire.NetAddress) {
×
3185
                                defer wg.Done()
×
3186

×
3187
                                errChan := make(chan error, 1)
×
3188
                                go s.connectToPeer(
×
3189
                                        addr, errChan, s.cfg.ConnectionTimeout,
×
3190
                                )
×
3191

×
3192
                                // We'll only allow this connection attempt to
×
3193
                                // take up to 3 seconds. This allows us to move
×
3194
                                // quickly by discarding peers that are slowing
×
3195
                                // us down.
×
3196
                                select {
×
3197
                                case err := <-errChan:
×
3198
                                        if err == nil {
×
3199
                                                return
×
3200
                                        }
×
3201
                                        srvrLog.Errorf("Unable to connect to "+
×
3202
                                                "%v: %v", addr, err)
×
3203
                                // TODO: tune timeout? 3 seconds might be *too*
3204
                                // aggressive but works well.
3205
                                case <-time.After(3 * time.Second):
×
3206
                                        srvrLog.Tracef("Skipping peer %v due "+
×
3207
                                                "to not establishing a "+
×
3208
                                                "connection within 3 seconds",
×
3209
                                                addr)
×
3210
                                case <-s.quit:
×
3211
                                }
3212
                        }(bootstrapAddr)
3213
                }
3214

3215
                wg.Wait()
×
3216
        }
3217
}
3218

3219
// createNewHiddenService automatically sets up a v2 or v3 onion service in
3220
// order to listen for inbound connections over Tor.
3221
func (s *server) createNewHiddenService() error {
×
3222
        // Determine the different ports the server is listening on. The onion
×
3223
        // service's virtual port will map to these ports and one will be picked
×
3224
        // at random when the onion service is being accessed.
×
3225
        listenPorts := make([]int, 0, len(s.listenAddrs))
×
3226
        for _, listenAddr := range s.listenAddrs {
×
3227
                port := listenAddr.(*net.TCPAddr).Port
×
3228
                listenPorts = append(listenPorts, port)
×
3229
        }
×
3230

3231
        encrypter, err := lnencrypt.KeyRingEncrypter(s.cc.KeyRing)
×
3232
        if err != nil {
×
3233
                return err
×
3234
        }
×
3235

3236
        // Once the port mapping has been set, we can go ahead and automatically
3237
        // create our onion service. The service's private key will be saved to
3238
        // disk in order to regain access to this service when restarting `lnd`.
3239
        onionCfg := tor.AddOnionConfig{
×
3240
                VirtualPort: defaultPeerPort,
×
3241
                TargetPorts: listenPorts,
×
3242
                Store: tor.NewOnionFile(
×
3243
                        s.cfg.Tor.PrivateKeyPath, 0600, s.cfg.Tor.EncryptKey,
×
3244
                        encrypter,
×
3245
                ),
×
3246
        }
×
3247

×
3248
        switch {
×
3249
        case s.cfg.Tor.V2:
×
3250
                onionCfg.Type = tor.V2
×
3251
        case s.cfg.Tor.V3:
×
3252
                onionCfg.Type = tor.V3
×
3253
        }
3254

3255
        addr, err := s.torController.AddOnion(onionCfg)
×
3256
        if err != nil {
×
3257
                return err
×
3258
        }
×
3259

3260
        // Now that the onion service has been created, we'll add the onion
3261
        // address it can be reached at to our list of advertised addresses.
3262
        newNodeAnn, err := s.genNodeAnnouncement(
×
3263
                nil, func(currentAnn *lnwire.NodeAnnouncement) {
×
3264
                        currentAnn.Addresses = append(currentAnn.Addresses, addr)
×
3265
                },
×
3266
        )
3267
        if err != nil {
×
3268
                return fmt.Errorf("unable to generate new node "+
×
3269
                        "announcement: %v", err)
×
3270
        }
×
3271

3272
        // Finally, we'll update the on-disk version of our announcement so it
3273
        // will eventually propagate to nodes in the network.
3274
        selfNode := &models.LightningNode{
×
3275
                HaveNodeAnnouncement: true,
×
3276
                LastUpdate:           time.Unix(int64(newNodeAnn.Timestamp), 0),
×
3277
                Addresses:            newNodeAnn.Addresses,
×
3278
                Alias:                newNodeAnn.Alias.String(),
×
3279
                Features: lnwire.NewFeatureVector(
×
3280
                        newNodeAnn.Features, lnwire.Features,
×
3281
                ),
×
3282
                Color:        newNodeAnn.RGBColor,
×
3283
                AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(),
×
3284
        }
×
3285
        copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
×
3286
        if err := s.graphDB.SetSourceNode(selfNode); err != nil {
×
3287
                return fmt.Errorf("can't set self node: %w", err)
×
3288
        }
×
3289

3290
        return nil
×
3291
}
3292

3293
// findChannel finds a channel given a public key and ChannelID. It is an
3294
// optimization that is quicker than seeking for a channel given only the
3295
// ChannelID.
3296
func (s *server) findChannel(node *btcec.PublicKey, chanID lnwire.ChannelID) (
3297
        *channeldb.OpenChannel, error) {
2✔
3298

2✔
3299
        nodeChans, err := s.chanStateDB.FetchOpenChannels(node)
2✔
3300
        if err != nil {
2✔
3301
                return nil, err
×
3302
        }
×
3303

3304
        for _, channel := range nodeChans {
4✔
3305
                if chanID.IsChanPoint(&channel.FundingOutpoint) {
4✔
3306
                        return channel, nil
2✔
3307
                }
2✔
3308
        }
3309

3310
        return nil, fmt.Errorf("unable to find channel")
2✔
3311
}
3312

3313
// getNodeAnnouncement fetches the current, fully signed node announcement.
3314
func (s *server) getNodeAnnouncement() lnwire.NodeAnnouncement {
2✔
3315
        s.mu.Lock()
2✔
3316
        defer s.mu.Unlock()
2✔
3317

2✔
3318
        return *s.currentNodeAnn
2✔
3319
}
2✔
3320

3321
// genNodeAnnouncement generates and returns the current fully signed node
3322
// announcement. The time stamp of the announcement will be updated in order
3323
// to ensure it propagates through the network.
3324
func (s *server) genNodeAnnouncement(features *lnwire.RawFeatureVector,
3325
        modifiers ...netann.NodeAnnModifier) (lnwire.NodeAnnouncement, error) {
2✔
3326

2✔
3327
        s.mu.Lock()
2✔
3328
        defer s.mu.Unlock()
2✔
3329

2✔
3330
        // First, try to update our feature manager with the updated set of
2✔
3331
        // features.
2✔
3332
        if features != nil {
4✔
3333
                proposedFeatures := map[feature.Set]*lnwire.RawFeatureVector{
2✔
3334
                        feature.SetNodeAnn: features,
2✔
3335
                }
2✔
3336
                err := s.featureMgr.UpdateFeatureSets(proposedFeatures)
2✔
3337
                if err != nil {
4✔
3338
                        return lnwire.NodeAnnouncement{}, err
2✔
3339
                }
2✔
3340

3341
                // If we could successfully update our feature manager, add
3342
                // an update modifier to include these new features to our
3343
                // set.
3344
                modifiers = append(
2✔
3345
                        modifiers, netann.NodeAnnSetFeatures(features),
2✔
3346
                )
2✔
3347
        }
3348

3349
        // Always update the timestamp when refreshing to ensure the update
3350
        // propagates.
3351
        modifiers = append(modifiers, netann.NodeAnnSetTimestamp)
2✔
3352

2✔
3353
        // Apply the requested changes to the node announcement.
2✔
3354
        for _, modifier := range modifiers {
4✔
3355
                modifier(s.currentNodeAnn)
2✔
3356
        }
2✔
3357

3358
        // Sign a new update after applying all of the passed modifiers.
3359
        err := netann.SignNodeAnnouncement(
2✔
3360
                s.nodeSigner, s.identityKeyLoc, s.currentNodeAnn,
2✔
3361
        )
2✔
3362
        if err != nil {
2✔
3363
                return lnwire.NodeAnnouncement{}, err
×
3364
        }
×
3365

3366
        return *s.currentNodeAnn, nil
2✔
3367
}
3368

3369
// updateAndBroadcastSelfNode generates a new node announcement
3370
// applying the giving modifiers and updating the time stamp
3371
// to ensure it propagates through the network. Then it broadcasts
3372
// it to the network.
3373
func (s *server) updateAndBroadcastSelfNode(features *lnwire.RawFeatureVector,
3374
        modifiers ...netann.NodeAnnModifier) error {
2✔
3375

2✔
3376
        newNodeAnn, err := s.genNodeAnnouncement(features, modifiers...)
2✔
3377
        if err != nil {
4✔
3378
                return fmt.Errorf("unable to generate new node "+
2✔
3379
                        "announcement: %v", err)
2✔
3380
        }
2✔
3381

3382
        // Update the on-disk version of our announcement.
3383
        // Load and modify self node istead of creating anew instance so we
3384
        // don't risk overwriting any existing values.
3385
        selfNode, err := s.graphDB.SourceNode()
2✔
3386
        if err != nil {
2✔
3387
                return fmt.Errorf("unable to get current source node: %w", err)
×
3388
        }
×
3389

3390
        selfNode.HaveNodeAnnouncement = true
2✔
3391
        selfNode.LastUpdate = time.Unix(int64(newNodeAnn.Timestamp), 0)
2✔
3392
        selfNode.Addresses = newNodeAnn.Addresses
2✔
3393
        selfNode.Alias = newNodeAnn.Alias.String()
2✔
3394
        selfNode.Features = s.featureMgr.Get(feature.SetNodeAnn)
2✔
3395
        selfNode.Color = newNodeAnn.RGBColor
2✔
3396
        selfNode.AuthSigBytes = newNodeAnn.Signature.ToSignatureBytes()
2✔
3397

2✔
3398
        copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
2✔
3399

2✔
3400
        if err := s.graphDB.SetSourceNode(selfNode); err != nil {
2✔
3401
                return fmt.Errorf("can't set self node: %w", err)
×
3402
        }
×
3403

3404
        // Finally, propagate it to the nodes in the network.
3405
        err = s.BroadcastMessage(nil, &newNodeAnn)
2✔
3406
        if err != nil {
2✔
3407
                rpcsLog.Debugf("Unable to broadcast new node "+
×
3408
                        "announcement to peers: %v", err)
×
3409
                return err
×
3410
        }
×
3411

3412
        return nil
2✔
3413
}
3414

3415
type nodeAddresses struct {
3416
        pubKey    *btcec.PublicKey
3417
        addresses []net.Addr
3418
}
3419

3420
// establishPersistentConnections attempts to establish persistent connections
3421
// to all our direct channel collaborators. In order to promote liveness of our
3422
// active channels, we instruct the connection manager to attempt to establish
3423
// and maintain persistent connections to all our direct channel counterparties.
3424
func (s *server) establishPersistentConnections() error {
2✔
3425
        // nodeAddrsMap stores the combination of node public keys and addresses
2✔
3426
        // that we'll attempt to reconnect to. PubKey strings are used as keys
2✔
3427
        // since other PubKey forms can't be compared.
2✔
3428
        nodeAddrsMap := map[string]*nodeAddresses{}
2✔
3429

2✔
3430
        // Iterate through the list of LinkNodes to find addresses we should
2✔
3431
        // attempt to connect to based on our set of previous connections. Set
2✔
3432
        // the reconnection port to the default peer port.
2✔
3433
        linkNodes, err := s.chanStateDB.LinkNodeDB().FetchAllLinkNodes()
2✔
3434
        if err != nil && err != channeldb.ErrLinkNodesNotFound {
2✔
3435
                return err
×
3436
        }
×
3437
        for _, node := range linkNodes {
4✔
3438
                pubStr := string(node.IdentityPub.SerializeCompressed())
2✔
3439
                nodeAddrs := &nodeAddresses{
2✔
3440
                        pubKey:    node.IdentityPub,
2✔
3441
                        addresses: node.Addresses,
2✔
3442
                }
2✔
3443
                nodeAddrsMap[pubStr] = nodeAddrs
2✔
3444
        }
2✔
3445

3446
        // After checking our previous connections for addresses to connect to,
3447
        // iterate through the nodes in our channel graph to find addresses
3448
        // that have been added via NodeAnnouncement messages.
3449
        sourceNode, err := s.graphDB.SourceNode()
2✔
3450
        if err != nil {
2✔
3451
                return err
×
3452
        }
×
3453

3454
        // TODO(roasbeef): instead iterate over link nodes and query graph for
3455
        // each of the nodes.
3456
        selfPub := s.identityECDH.PubKey().SerializeCompressed()
2✔
3457
        err = s.graphDB.ForEachNodeChannel(sourceNode.PubKeyBytes, func(
2✔
3458
                tx kvdb.RTx,
2✔
3459
                chanInfo *models.ChannelEdgeInfo,
2✔
3460
                policy, _ *models.ChannelEdgePolicy) error {
4✔
3461

2✔
3462
                // If the remote party has announced the channel to us, but we
2✔
3463
                // haven't yet, then we won't have a policy. However, we don't
2✔
3464
                // need this to connect to the peer, so we'll log it and move on.
2✔
3465
                if policy == nil {
2✔
3466
                        srvrLog.Warnf("No channel policy found for "+
×
3467
                                "ChannelPoint(%v): ", chanInfo.ChannelPoint)
×
3468
                }
×
3469

3470
                // We'll now fetch the peer opposite from us within this
3471
                // channel so we can queue up a direct connection to them.
3472
                channelPeer, err := s.graphDB.FetchOtherNode(
2✔
3473
                        tx, chanInfo, selfPub,
2✔
3474
                )
2✔
3475
                if err != nil {
2✔
3476
                        return fmt.Errorf("unable to fetch channel peer for "+
×
3477
                                "ChannelPoint(%v): %v", chanInfo.ChannelPoint,
×
3478
                                err)
×
3479
                }
×
3480

3481
                pubStr := string(channelPeer.PubKeyBytes[:])
2✔
3482

2✔
3483
                // Add all unique addresses from channel
2✔
3484
                // graph/NodeAnnouncements to the list of addresses we'll
2✔
3485
                // connect to for this peer.
2✔
3486
                addrSet := make(map[string]net.Addr)
2✔
3487
                for _, addr := range channelPeer.Addresses {
4✔
3488
                        switch addr.(type) {
2✔
3489
                        case *net.TCPAddr:
2✔
3490
                                addrSet[addr.String()] = addr
2✔
3491

3492
                        // We'll only attempt to connect to Tor addresses if Tor
3493
                        // outbound support is enabled.
3494
                        case *tor.OnionAddr:
×
3495
                                if s.cfg.Tor.Active {
×
3496
                                        addrSet[addr.String()] = addr
×
3497
                                }
×
3498
                        }
3499
                }
3500

3501
                // If this peer is also recorded as a link node, we'll add any
3502
                // additional addresses that have not already been selected.
3503
                linkNodeAddrs, ok := nodeAddrsMap[pubStr]
2✔
3504
                if ok {
4✔
3505
                        for _, lnAddress := range linkNodeAddrs.addresses {
4✔
3506
                                switch lnAddress.(type) {
2✔
3507
                                case *net.TCPAddr:
2✔
3508
                                        addrSet[lnAddress.String()] = lnAddress
2✔
3509

3510
                                // We'll only attempt to connect to Tor
3511
                                // addresses if Tor outbound support is enabled.
3512
                                case *tor.OnionAddr:
×
3513
                                        if s.cfg.Tor.Active {
×
3514
                                                addrSet[lnAddress.String()] = lnAddress
×
3515
                                        }
×
3516
                                }
3517
                        }
3518
                }
3519

3520
                // Construct a slice of the deduped addresses.
3521
                var addrs []net.Addr
2✔
3522
                for _, addr := range addrSet {
4✔
3523
                        addrs = append(addrs, addr)
2✔
3524
                }
2✔
3525

3526
                n := &nodeAddresses{
2✔
3527
                        addresses: addrs,
2✔
3528
                }
2✔
3529
                n.pubKey, err = channelPeer.PubKey()
2✔
3530
                if err != nil {
2✔
3531
                        return err
×
3532
                }
×
3533

3534
                nodeAddrsMap[pubStr] = n
2✔
3535
                return nil
2✔
3536
        })
3537
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
2✔
3538
                return err
×
3539
        }
×
3540

3541
        srvrLog.Debugf("Establishing %v persistent connections on start",
2✔
3542
                len(nodeAddrsMap))
2✔
3543

2✔
3544
        // Acquire and hold server lock until all persistent connection requests
2✔
3545
        // have been recorded and sent to the connection manager.
2✔
3546
        s.mu.Lock()
2✔
3547
        defer s.mu.Unlock()
2✔
3548

2✔
3549
        // Iterate through the combined list of addresses from prior links and
2✔
3550
        // node announcements and attempt to reconnect to each node.
2✔
3551
        var numOutboundConns int
2✔
3552
        for pubStr, nodeAddr := range nodeAddrsMap {
4✔
3553
                // Add this peer to the set of peers we should maintain a
2✔
3554
                // persistent connection with. We set the value to false to
2✔
3555
                // indicate that we should not continue to reconnect if the
2✔
3556
                // number of channels returns to zero, since this peer has not
2✔
3557
                // been requested as perm by the user.
2✔
3558
                s.persistentPeers[pubStr] = false
2✔
3559
                if _, ok := s.persistentPeersBackoff[pubStr]; !ok {
4✔
3560
                        s.persistentPeersBackoff[pubStr] = s.cfg.MinBackoff
2✔
3561
                }
2✔
3562

3563
                for _, address := range nodeAddr.addresses {
4✔
3564
                        // Create a wrapper address which couples the IP and
2✔
3565
                        // the pubkey so the brontide authenticated connection
2✔
3566
                        // can be established.
2✔
3567
                        lnAddr := &lnwire.NetAddress{
2✔
3568
                                IdentityKey: nodeAddr.pubKey,
2✔
3569
                                Address:     address,
2✔
3570
                        }
2✔
3571

2✔
3572
                        s.persistentPeerAddrs[pubStr] = append(
2✔
3573
                                s.persistentPeerAddrs[pubStr], lnAddr)
2✔
3574
                }
2✔
3575

3576
                // We'll connect to the first 10 peers immediately, then
3577
                // randomly stagger any remaining connections if the
3578
                // stagger initial reconnect flag is set. This ensures
3579
                // that mobile nodes or nodes with a small number of
3580
                // channels obtain connectivity quickly, but larger
3581
                // nodes are able to disperse the costs of connecting to
3582
                // all peers at once.
3583
                if numOutboundConns < numInstantInitReconnect ||
2✔
3584
                        !s.cfg.StaggerInitialReconnect {
4✔
3585

2✔
3586
                        go s.connectToPersistentPeer(pubStr)
2✔
3587
                } else {
2✔
3588
                        go s.delayInitialReconnect(pubStr)
×
3589
                }
×
3590

3591
                numOutboundConns++
2✔
3592
        }
3593

3594
        return nil
2✔
3595
}
3596

3597
// delayInitialReconnect will attempt a reconnection to the given peer after
3598
// sampling a value for the delay between 0s and the maxInitReconnectDelay.
3599
//
3600
// NOTE: This method MUST be run as a goroutine.
3601
func (s *server) delayInitialReconnect(pubStr string) {
×
3602
        delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second
×
3603
        select {
×
3604
        case <-time.After(delay):
×
3605
                s.connectToPersistentPeer(pubStr)
×
3606
        case <-s.quit:
×
3607
        }
3608
}
3609

3610
// prunePersistentPeerConnection removes all internal state related to
3611
// persistent connections to a peer within the server. This is used to avoid
3612
// persistent connection retries to peers we do not have any open channels with.
3613
func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
2✔
3614
        pubKeyStr := string(compressedPubKey[:])
2✔
3615

2✔
3616
        s.mu.Lock()
2✔
3617
        if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm {
4✔
3618
                delete(s.persistentPeers, pubKeyStr)
2✔
3619
                delete(s.persistentPeersBackoff, pubKeyStr)
2✔
3620
                delete(s.persistentPeerAddrs, pubKeyStr)
2✔
3621
                s.cancelConnReqs(pubKeyStr, nil)
2✔
3622
                s.mu.Unlock()
2✔
3623

2✔
3624
                srvrLog.Infof("Pruned peer %x from persistent connections, "+
2✔
3625
                        "peer has no open channels", compressedPubKey)
2✔
3626

2✔
3627
                return
2✔
3628
        }
2✔
3629
        s.mu.Unlock()
2✔
3630
}
3631

3632
// BroadcastMessage sends a request to the server to broadcast a set of
3633
// messages to all peers other than the one specified by the `skips` parameter.
3634
// All messages sent via BroadcastMessage will be queued for lazy delivery to
3635
// the target peers.
3636
//
3637
// NOTE: This function is safe for concurrent access.
3638
func (s *server) BroadcastMessage(skips map[route.Vertex]struct{},
3639
        msgs ...lnwire.Message) error {
2✔
3640

2✔
3641
        // Filter out peers found in the skips map. We synchronize access to
2✔
3642
        // peersByPub throughout this process to ensure we deliver messages to
2✔
3643
        // exact set of peers present at the time of invocation.
2✔
3644
        s.mu.RLock()
2✔
3645
        peers := make([]*peer.Brontide, 0, len(s.peersByPub))
2✔
3646
        for pubStr, sPeer := range s.peersByPub {
4✔
3647
                if skips != nil {
4✔
3648
                        if _, ok := skips[sPeer.PubKey()]; ok {
4✔
3649
                                srvrLog.Tracef("Skipping %x in broadcast with "+
2✔
3650
                                        "pubStr=%x", sPeer.PubKey(), pubStr)
2✔
3651
                                continue
2✔
3652
                        }
3653
                }
3654

3655
                peers = append(peers, sPeer)
2✔
3656
        }
3657
        s.mu.RUnlock()
2✔
3658

2✔
3659
        // Iterate over all known peers, dispatching a go routine to enqueue
2✔
3660
        // all messages to each of peers.
2✔
3661
        var wg sync.WaitGroup
2✔
3662
        for _, sPeer := range peers {
4✔
3663
                srvrLog.Debugf("Sending %v messages to peer %x", len(msgs),
2✔
3664
                        sPeer.PubKey())
2✔
3665

2✔
3666
                // Dispatch a go routine to enqueue all messages to this peer.
2✔
3667
                wg.Add(1)
2✔
3668
                s.wg.Add(1)
2✔
3669
                go func(p lnpeer.Peer) {
4✔
3670
                        defer s.wg.Done()
2✔
3671
                        defer wg.Done()
2✔
3672

2✔
3673
                        p.SendMessageLazy(false, msgs...)
2✔
3674
                }(sPeer)
2✔
3675
        }
3676

3677
        // Wait for all messages to have been dispatched before returning to
3678
        // caller.
3679
        wg.Wait()
2✔
3680

2✔
3681
        return nil
2✔
3682
}
3683

3684
// NotifyWhenOnline can be called by other subsystems to get notified when a
3685
// particular peer comes online. The peer itself is sent across the peerChan.
3686
//
3687
// NOTE: This function is safe for concurrent access.
3688
func (s *server) NotifyWhenOnline(peerKey [33]byte,
3689
        peerChan chan<- lnpeer.Peer) {
2✔
3690

2✔
3691
        s.mu.Lock()
2✔
3692

2✔
3693
        // Compute the target peer's identifier.
2✔
3694
        pubStr := string(peerKey[:])
2✔
3695

2✔
3696
        // Check if peer is connected.
2✔
3697
        peer, ok := s.peersByPub[pubStr]
2✔
3698
        if ok {
4✔
3699
                // Unlock here so that the mutex isn't held while we are
2✔
3700
                // waiting for the peer to become active.
2✔
3701
                s.mu.Unlock()
2✔
3702

2✔
3703
                // Wait until the peer signals that it is actually active
2✔
3704
                // rather than only in the server's maps.
2✔
3705
                select {
2✔
3706
                case <-peer.ActiveSignal():
2✔
3707
                case <-peer.QuitSignal():
×
3708
                        // The peer quit, so we'll add the channel to the slice
×
3709
                        // and return.
×
3710
                        s.mu.Lock()
×
3711
                        s.peerConnectedListeners[pubStr] = append(
×
3712
                                s.peerConnectedListeners[pubStr], peerChan,
×
3713
                        )
×
3714
                        s.mu.Unlock()
×
3715
                        return
×
3716
                }
3717

3718
                // Connected, can return early.
3719
                srvrLog.Debugf("Notifying that peer %x is online", peerKey)
2✔
3720

2✔
3721
                select {
2✔
3722
                case peerChan <- peer:
2✔
3723
                case <-s.quit:
×
3724
                }
3725

3726
                return
2✔
3727
        }
3728

3729
        // Not connected, store this listener such that it can be notified when
3730
        // the peer comes online.
3731
        s.peerConnectedListeners[pubStr] = append(
2✔
3732
                s.peerConnectedListeners[pubStr], peerChan,
2✔
3733
        )
2✔
3734
        s.mu.Unlock()
2✔
3735
}
3736

3737
// NotifyWhenOffline delivers a notification to the caller of when the peer with
3738
// the given public key has been disconnected. The notification is signaled by
3739
// closing the channel returned.
3740
func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
2✔
3741
        s.mu.Lock()
2✔
3742
        defer s.mu.Unlock()
2✔
3743

2✔
3744
        c := make(chan struct{})
2✔
3745

2✔
3746
        // If the peer is already offline, we can immediately trigger the
2✔
3747
        // notification.
2✔
3748
        peerPubKeyStr := string(peerPubKey[:])
2✔
3749
        if _, ok := s.peersByPub[peerPubKeyStr]; !ok {
2✔
3750
                srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey)
×
3751
                close(c)
×
3752
                return c
×
3753
        }
×
3754

3755
        // Otherwise, the peer is online, so we'll keep track of the channel to
3756
        // trigger the notification once the server detects the peer
3757
        // disconnects.
3758
        s.peerDisconnectedListeners[peerPubKeyStr] = append(
2✔
3759
                s.peerDisconnectedListeners[peerPubKeyStr], c,
2✔
3760
        )
2✔
3761

2✔
3762
        return c
2✔
3763
}
3764

3765
// FindPeer will return the peer that corresponds to the passed in public key.
3766
// This function is used by the funding manager, allowing it to update the
3767
// daemon's local representation of the remote peer.
3768
//
3769
// NOTE: This function is safe for concurrent access.
3770
func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer.Brontide, error) {
2✔
3771
        s.mu.RLock()
2✔
3772
        defer s.mu.RUnlock()
2✔
3773

2✔
3774
        pubStr := string(peerKey.SerializeCompressed())
2✔
3775

2✔
3776
        return s.findPeerByPubStr(pubStr)
2✔
3777
}
2✔
3778

3779
// FindPeerByPubStr will return the peer that corresponds to the passed peerID,
3780
// which should be a string representation of the peer's serialized, compressed
3781
// public key.
3782
//
3783
// NOTE: This function is safe for concurrent access.
3784
func (s *server) FindPeerByPubStr(pubStr string) (*peer.Brontide, error) {
2✔
3785
        s.mu.RLock()
2✔
3786
        defer s.mu.RUnlock()
2✔
3787

2✔
3788
        return s.findPeerByPubStr(pubStr)
2✔
3789
}
2✔
3790

3791
// findPeerByPubStr is an internal method that retrieves the specified peer from
3792
// the server's internal state using.
3793
func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) {
2✔
3794
        peer, ok := s.peersByPub[pubStr]
2✔
3795
        if !ok {
4✔
3796
                return nil, ErrPeerNotConnected
2✔
3797
        }
2✔
3798

3799
        return peer, nil
2✔
3800
}
3801

3802
// nextPeerBackoff computes the next backoff duration for a peer's pubkey using
3803
// exponential backoff. If no previous backoff was known, the default is
3804
// returned.
3805
func (s *server) nextPeerBackoff(pubStr string,
3806
        startTime time.Time) time.Duration {
2✔
3807

2✔
3808
        // Now, determine the appropriate backoff to use for the retry.
2✔
3809
        backoff, ok := s.persistentPeersBackoff[pubStr]
2✔
3810
        if !ok {
4✔
3811
                // If an existing backoff was unknown, use the default.
2✔
3812
                return s.cfg.MinBackoff
2✔
3813
        }
2✔
3814

3815
        // If the peer failed to start properly, we'll just use the previous
3816
        // backoff to compute the subsequent randomized exponential backoff
3817
        // duration. This will roughly double on average.
3818
        if startTime.IsZero() {
2✔
3819
                return computeNextBackoff(backoff, s.cfg.MaxBackoff)
×
3820
        }
×
3821

3822
        // The peer succeeded in starting. If the connection didn't last long
3823
        // enough to be considered stable, we'll continue to back off retries
3824
        // with this peer.
3825
        connDuration := time.Since(startTime)
2✔
3826
        if connDuration < defaultStableConnDuration {
4✔
3827
                return computeNextBackoff(backoff, s.cfg.MaxBackoff)
2✔
3828
        }
2✔
3829

3830
        // The peer succeed in starting and this was stable peer, so we'll
3831
        // reduce the timeout duration by the length of the connection after
3832
        // applying randomized exponential backoff. We'll only apply this in the
3833
        // case that:
3834
        //   reb(curBackoff) - connDuration > cfg.MinBackoff
3835
        relaxedBackoff := computeNextBackoff(backoff, s.cfg.MaxBackoff) - connDuration
×
3836
        if relaxedBackoff > s.cfg.MinBackoff {
×
3837
                return relaxedBackoff
×
3838
        }
×
3839

3840
        // Lastly, if reb(currBackoff) - connDuration <= cfg.MinBackoff, meaning
3841
        // the stable connection lasted much longer than our previous backoff.
3842
        // To reward such good behavior, we'll reconnect after the default
3843
        // timeout.
3844
        return s.cfg.MinBackoff
×
3845
}
3846

3847
// shouldDropLocalConnection determines if our local connection to a remote peer
3848
// should be dropped in the case of concurrent connection establishment. In
3849
// order to deterministically decide which connection should be dropped, we'll
3850
// utilize the ordering of the local and remote public key. If we didn't use
3851
// such a tie breaker, then we risk _both_ connections erroneously being
3852
// dropped.
3853
func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool {
×
3854
        localPubBytes := local.SerializeCompressed()
×
3855
        remotePubPbytes := remote.SerializeCompressed()
×
3856

×
3857
        // The connection that comes from the node with a "smaller" pubkey
×
3858
        // should be kept. Therefore, if our pubkey is "greater" than theirs, we
×
3859
        // should drop our established connection.
×
3860
        return bytes.Compare(localPubBytes, remotePubPbytes) > 0
×
3861
}
×
3862

3863
// InboundPeerConnected initializes a new peer in response to a new inbound
3864
// connection.
3865
//
3866
// NOTE: This function is safe for concurrent access.
3867
func (s *server) InboundPeerConnected(conn net.Conn) {
2✔
3868
        // Exit early if we have already been instructed to shutdown, this
2✔
3869
        // prevents any delayed callbacks from accidentally registering peers.
2✔
3870
        if s.Stopped() {
2✔
3871
                return
×
3872
        }
×
3873

3874
        nodePub := conn.(*brontide.Conn).RemotePub()
2✔
3875
        pubSer := nodePub.SerializeCompressed()
2✔
3876
        pubStr := string(pubSer)
2✔
3877

2✔
3878
        var pubBytes [33]byte
2✔
3879
        copy(pubBytes[:], pubSer)
2✔
3880

2✔
3881
        s.mu.Lock()
2✔
3882
        defer s.mu.Unlock()
2✔
3883

2✔
3884
        // If the remote node's public key is banned, drop the connection.
2✔
3885
        shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
2✔
3886
        if dcErr != nil {
2✔
3887
                srvrLog.Errorf("Unable to check if we should disconnect "+
×
3888
                        "peer: %v", dcErr)
×
3889
                conn.Close()
×
3890

×
3891
                return
×
3892
        }
×
3893

3894
        if shouldDc {
2✔
3895
                srvrLog.Debugf("Dropping connection for %v since they are "+
×
3896
                        "banned.", pubSer)
×
3897

×
3898
                conn.Close()
×
3899

×
3900
                return
×
3901
        }
×
3902

3903
        // If we already have an outbound connection to this peer, then ignore
3904
        // this new connection.
3905
        if p, ok := s.outboundPeers[pubStr]; ok {
4✔
3906
                srvrLog.Debugf("Already have outbound connection for %v, "+
2✔
3907
                        "ignoring inbound connection from local=%v, remote=%v",
2✔
3908
                        p, conn.LocalAddr(), conn.RemoteAddr())
2✔
3909

2✔
3910
                conn.Close()
2✔
3911
                return
2✔
3912
        }
2✔
3913

3914
        // If we already have a valid connection that is scheduled to take
3915
        // precedence once the prior peer has finished disconnecting, we'll
3916
        // ignore this connection.
3917
        if p, ok := s.scheduledPeerConnection[pubStr]; ok {
2✔
3918
                srvrLog.Debugf("Ignoring connection from %v, peer %v already "+
×
3919
                        "scheduled", conn.RemoteAddr(), p)
×
3920
                conn.Close()
×
3921
                return
×
3922
        }
×
3923

3924
        srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
2✔
3925

2✔
3926
        // Check to see if we already have a connection with this peer. If so,
2✔
3927
        // we may need to drop our existing connection. This prevents us from
2✔
3928
        // having duplicate connections to the same peer. We forgo adding a
2✔
3929
        // default case as we expect these to be the only error values returned
2✔
3930
        // from findPeerByPubStr.
2✔
3931
        connectedPeer, err := s.findPeerByPubStr(pubStr)
2✔
3932
        switch err {
2✔
3933
        case ErrPeerNotConnected:
2✔
3934
                // We were unable to locate an existing connection with the
2✔
3935
                // target peer, proceed to connect.
2✔
3936
                s.cancelConnReqs(pubStr, nil)
2✔
3937
                s.peerConnected(conn, nil, true)
2✔
3938

3939
        case nil:
×
3940
                // We already have a connection with the incoming peer. If the
×
3941
                // connection we've already established should be kept and is
×
3942
                // not of the same type of the new connection (inbound), then
×
3943
                // we'll close out the new connection s.t there's only a single
×
3944
                // connection between us.
×
3945
                localPub := s.identityECDH.PubKey()
×
3946
                if !connectedPeer.Inbound() &&
×
3947
                        !shouldDropLocalConnection(localPub, nodePub) {
×
3948

×
3949
                        srvrLog.Warnf("Received inbound connection from "+
×
3950
                                "peer %v, but already have outbound "+
×
3951
                                "connection, dropping conn", connectedPeer)
×
3952
                        conn.Close()
×
3953
                        return
×
3954
                }
×
3955

3956
                // Otherwise, if we should drop the connection, then we'll
3957
                // disconnect our already connected peer.
3958
                srvrLog.Debugf("Disconnecting stale connection to %v",
×
3959
                        connectedPeer)
×
3960

×
3961
                s.cancelConnReqs(pubStr, nil)
×
3962

×
3963
                // Remove the current peer from the server's internal state and
×
3964
                // signal that the peer termination watcher does not need to
×
3965
                // execute for this peer.
×
3966
                s.removePeer(connectedPeer)
×
3967
                s.ignorePeerTermination[connectedPeer] = struct{}{}
×
3968
                s.scheduledPeerConnection[pubStr] = func() {
×
3969
                        s.peerConnected(conn, nil, true)
×
3970
                }
×
3971
        }
3972
}
3973

3974
// OutboundPeerConnected initializes a new peer in response to a new outbound
3975
// connection.
3976
// NOTE: This function is safe for concurrent access.
3977
func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
2✔
3978
        // Exit early if we have already been instructed to shutdown, this
2✔
3979
        // prevents any delayed callbacks from accidentally registering peers.
2✔
3980
        if s.Stopped() {
2✔
3981
                return
×
3982
        }
×
3983

3984
        nodePub := conn.(*brontide.Conn).RemotePub()
2✔
3985
        pubSer := nodePub.SerializeCompressed()
2✔
3986
        pubStr := string(pubSer)
2✔
3987

2✔
3988
        var pubBytes [33]byte
2✔
3989
        copy(pubBytes[:], pubSer)
2✔
3990

2✔
3991
        s.mu.Lock()
2✔
3992
        defer s.mu.Unlock()
2✔
3993

2✔
3994
        // If the remote node's public key is banned, drop the connection.
2✔
3995
        shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
2✔
3996
        if dcErr != nil {
2✔
3997
                srvrLog.Errorf("Unable to check if we should disconnect "+
×
3998
                        "peer: %v", dcErr)
×
3999
                conn.Close()
×
4000

×
4001
                return
×
4002
        }
×
4003

4004
        if shouldDc {
2✔
4005
                srvrLog.Debugf("Dropping connection for %v since they are "+
×
4006
                        "banned.", pubSer)
×
4007

×
4008
                if connReq != nil {
×
4009
                        s.connMgr.Remove(connReq.ID())
×
4010
                }
×
4011

4012
                conn.Close()
×
4013

×
4014
                return
×
4015
        }
4016

4017
        // If we already have an inbound connection to this peer, then ignore
4018
        // this new connection.
4019
        if p, ok := s.inboundPeers[pubStr]; ok {
4✔
4020
                srvrLog.Debugf("Already have inbound connection for %v, "+
2✔
4021
                        "ignoring outbound connection from local=%v, remote=%v",
2✔
4022
                        p, conn.LocalAddr(), conn.RemoteAddr())
2✔
4023

2✔
4024
                if connReq != nil {
4✔
4025
                        s.connMgr.Remove(connReq.ID())
2✔
4026
                }
2✔
4027
                conn.Close()
2✔
4028
                return
2✔
4029
        }
4030
        if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
2✔
4031
                srvrLog.Debugf("Ignoring canceled outbound connection")
×
4032
                s.connMgr.Remove(connReq.ID())
×
4033
                conn.Close()
×
4034
                return
×
4035
        }
×
4036

4037
        // If we already have a valid connection that is scheduled to take
4038
        // precedence once the prior peer has finished disconnecting, we'll
4039
        // ignore this connection.
4040
        if _, ok := s.scheduledPeerConnection[pubStr]; ok {
2✔
4041
                srvrLog.Debugf("Ignoring connection, peer already scheduled")
×
4042

×
4043
                if connReq != nil {
×
4044
                        s.connMgr.Remove(connReq.ID())
×
4045
                }
×
4046

4047
                conn.Close()
×
4048
                return
×
4049
        }
4050

4051
        srvrLog.Infof("Established connection to: %x@%v", pubStr,
2✔
4052
                conn.RemoteAddr())
2✔
4053

2✔
4054
        if connReq != nil {
4✔
4055
                // A successful connection was returned by the connmgr.
2✔
4056
                // Immediately cancel all pending requests, excluding the
2✔
4057
                // outbound connection we just established.
2✔
4058
                ignore := connReq.ID()
2✔
4059
                s.cancelConnReqs(pubStr, &ignore)
2✔
4060
        } else {
4✔
4061
                // This was a successful connection made by some other
2✔
4062
                // subsystem. Remove all requests being managed by the connmgr.
2✔
4063
                s.cancelConnReqs(pubStr, nil)
2✔
4064
        }
2✔
4065

4066
        // If we already have a connection with this peer, decide whether or not
4067
        // we need to drop the stale connection. We forgo adding a default case
4068
        // as we expect these to be the only error values returned from
4069
        // findPeerByPubStr.
4070
        connectedPeer, err := s.findPeerByPubStr(pubStr)
2✔
4071
        switch err {
2✔
4072
        case ErrPeerNotConnected:
2✔
4073
                // We were unable to locate an existing connection with the
2✔
4074
                // target peer, proceed to connect.
2✔
4075
                s.peerConnected(conn, connReq, false)
2✔
4076

4077
        case nil:
×
4078
                // We already have a connection with the incoming peer. If the
×
4079
                // connection we've already established should be kept and is
×
4080
                // not of the same type of the new connection (outbound), then
×
4081
                // we'll close out the new connection s.t there's only a single
×
4082
                // connection between us.
×
4083
                localPub := s.identityECDH.PubKey()
×
4084
                if connectedPeer.Inbound() &&
×
4085
                        shouldDropLocalConnection(localPub, nodePub) {
×
4086

×
4087
                        srvrLog.Warnf("Established outbound connection to "+
×
4088
                                "peer %v, but already have inbound "+
×
4089
                                "connection, dropping conn", connectedPeer)
×
4090
                        if connReq != nil {
×
4091
                                s.connMgr.Remove(connReq.ID())
×
4092
                        }
×
4093
                        conn.Close()
×
4094
                        return
×
4095
                }
4096

4097
                // Otherwise, _their_ connection should be dropped. So we'll
4098
                // disconnect the peer and send the now obsolete peer to the
4099
                // server for garbage collection.
4100
                srvrLog.Debugf("Disconnecting stale connection to %v",
×
4101
                        connectedPeer)
×
4102

×
4103
                // Remove the current peer from the server's internal state and
×
4104
                // signal that the peer termination watcher does not need to
×
4105
                // execute for this peer.
×
4106
                s.removePeer(connectedPeer)
×
4107
                s.ignorePeerTermination[connectedPeer] = struct{}{}
×
4108
                s.scheduledPeerConnection[pubStr] = func() {
×
4109
                        s.peerConnected(conn, connReq, false)
×
4110
                }
×
4111
        }
4112
}
4113

4114
// UnassignedConnID is the default connection ID that a request can have before
4115
// it actually is submitted to the connmgr.
4116
// TODO(conner): move into connmgr package, or better, add connmgr method for
4117
// generating atomic IDs
4118
const UnassignedConnID uint64 = 0
4119

4120
// cancelConnReqs stops all persistent connection requests for a given pubkey.
4121
// Any attempts initiated by the peerTerminationWatcher are canceled first.
4122
// Afterwards, each connection request removed from the connmgr. The caller can
4123
// optionally specify a connection ID to ignore, which prevents us from
4124
// canceling a successful request. All persistent connreqs for the provided
4125
// pubkey are discarded after the operationjw.
4126
func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
2✔
4127
        // First, cancel any lingering persistent retry attempts, which will
2✔
4128
        // prevent retries for any with backoffs that are still maturing.
2✔
4129
        if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok {
4✔
4130
                close(cancelChan)
2✔
4131
                delete(s.persistentRetryCancels, pubStr)
2✔
4132
        }
2✔
4133

4134
        // Next, check to see if we have any outstanding persistent connection
4135
        // requests to this peer. If so, then we'll remove all of these
4136
        // connection requests, and also delete the entry from the map.
4137
        connReqs, ok := s.persistentConnReqs[pubStr]
2✔
4138
        if !ok {
4✔
4139
                return
2✔
4140
        }
2✔
4141

4142
        for _, connReq := range connReqs {
4✔
4143
                srvrLog.Tracef("Canceling %s:", connReqs)
2✔
4144

2✔
4145
                // Atomically capture the current request identifier.
2✔
4146
                connID := connReq.ID()
2✔
4147

2✔
4148
                // Skip any zero IDs, this indicates the request has not
2✔
4149
                // yet been schedule.
2✔
4150
                if connID == UnassignedConnID {
2✔
4151
                        continue
×
4152
                }
4153

4154
                // Skip a particular connection ID if instructed.
4155
                if skip != nil && connID == *skip {
4✔
4156
                        continue
2✔
4157
                }
4158

4159
                s.connMgr.Remove(connID)
2✔
4160
        }
4161

4162
        delete(s.persistentConnReqs, pubStr)
2✔
4163
}
4164

4165
// handleCustomMessage dispatches an incoming custom peers message to
4166
// subscribers.
4167
func (s *server) handleCustomMessage(peer [33]byte, msg *lnwire.Custom) error {
2✔
4168
        srvrLog.Debugf("Custom message received: peer=%x, type=%d",
2✔
4169
                peer, msg.Type)
2✔
4170

2✔
4171
        return s.customMessageServer.SendUpdate(&CustomMessage{
2✔
4172
                Peer: peer,
2✔
4173
                Msg:  msg,
2✔
4174
        })
2✔
4175
}
2✔
4176

4177
// SubscribeCustomMessages subscribes to a stream of incoming custom peer
4178
// messages.
4179
func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) {
2✔
4180
        return s.customMessageServer.Subscribe()
2✔
4181
}
2✔
4182

4183
// peerConnected is a function that handles initialization a newly connected
4184
// peer by adding it to the server's global list of all active peers, and
4185
// starting all the goroutines the peer needs to function properly. The inbound
4186
// boolean should be true if the peer initiated the connection to us.
4187
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
4188
        inbound bool) {
2✔
4189

2✔
4190
        brontideConn := conn.(*brontide.Conn)
2✔
4191
        addr := conn.RemoteAddr()
2✔
4192
        pubKey := brontideConn.RemotePub()
2✔
4193

2✔
4194
        srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v",
2✔
4195
                pubKey.SerializeCompressed(), addr, inbound)
2✔
4196

2✔
4197
        peerAddr := &lnwire.NetAddress{
2✔
4198
                IdentityKey: pubKey,
2✔
4199
                Address:     addr,
2✔
4200
                ChainNet:    s.cfg.ActiveNetParams.Net,
2✔
4201
        }
2✔
4202

2✔
4203
        // With the brontide connection established, we'll now craft the feature
2✔
4204
        // vectors to advertise to the remote node.
2✔
4205
        initFeatures := s.featureMgr.Get(feature.SetInit)
2✔
4206
        legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal)
2✔
4207

2✔
4208
        // Lookup past error caches for the peer in the server. If no buffer is
2✔
4209
        // found, create a fresh buffer.
2✔
4210
        pkStr := string(peerAddr.IdentityKey.SerializeCompressed())
2✔
4211
        errBuffer, ok := s.peerErrors[pkStr]
2✔
4212
        if !ok {
4✔
4213
                var err error
2✔
4214
                errBuffer, err = queue.NewCircularBuffer(peer.ErrorBufferSize)
2✔
4215
                if err != nil {
2✔
4216
                        srvrLog.Errorf("unable to create peer %v", err)
×
4217
                        return
×
4218
                }
×
4219
        }
4220

4221
        // If we directly set the peer.Config TowerClient member to the
4222
        // s.towerClientMgr then in the case that the s.towerClientMgr is nil,
4223
        // the peer.Config's TowerClient member will not evaluate to nil even
4224
        // though the underlying value is nil. To avoid this gotcha which can
4225
        // cause a panic, we need to explicitly pass nil to the peer.Config's
4226
        // TowerClient if needed.
4227
        var towerClient wtclient.ClientManager
2✔
4228
        if s.towerClientMgr != nil {
4✔
4229
                towerClient = s.towerClientMgr
2✔
4230
        }
2✔
4231

4232
        thresholdSats := btcutil.Amount(s.cfg.MaxFeeExposure)
2✔
4233
        thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
2✔
4234

2✔
4235
        // Now that we've established a connection, create a peer, and it to the
2✔
4236
        // set of currently active peers. Configure the peer with the incoming
2✔
4237
        // and outgoing broadcast deltas to prevent htlcs from being accepted or
2✔
4238
        // offered that would trigger channel closure. In case of outgoing
2✔
4239
        // htlcs, an extra block is added to prevent the channel from being
2✔
4240
        // closed when the htlc is outstanding and a new block comes in.
2✔
4241
        pCfg := peer.Config{
2✔
4242
                Conn:                    brontideConn,
2✔
4243
                ConnReq:                 connReq,
2✔
4244
                Addr:                    peerAddr,
2✔
4245
                Inbound:                 inbound,
2✔
4246
                Features:                initFeatures,
2✔
4247
                LegacyFeatures:          legacyFeatures,
2✔
4248
                OutgoingCltvRejectDelta: lncfg.DefaultOutgoingCltvRejectDelta,
2✔
4249
                ChanActiveTimeout:       s.cfg.ChanEnableTimeout,
2✔
4250
                ErrorBuffer:             errBuffer,
2✔
4251
                WritePool:               s.writePool,
2✔
4252
                ReadPool:                s.readPool,
2✔
4253
                Switch:                  s.htlcSwitch,
2✔
4254
                InterceptSwitch:         s.interceptableSwitch,
2✔
4255
                ChannelDB:               s.chanStateDB,
2✔
4256
                ChannelGraph:            s.graphDB,
2✔
4257
                ChainArb:                s.chainArb,
2✔
4258
                AuthGossiper:            s.authGossiper,
2✔
4259
                ChanStatusMgr:           s.chanStatusMgr,
2✔
4260
                ChainIO:                 s.cc.ChainIO,
2✔
4261
                FeeEstimator:            s.cc.FeeEstimator,
2✔
4262
                Signer:                  s.cc.Wallet.Cfg.Signer,
2✔
4263
                SigPool:                 s.sigPool,
2✔
4264
                Wallet:                  s.cc.Wallet,
2✔
4265
                ChainNotifier:           s.cc.ChainNotifier,
2✔
4266
                BestBlockView:           s.cc.BestBlockTracker,
2✔
4267
                RoutingPolicy:           s.cc.RoutingPolicy,
2✔
4268
                Sphinx:                  s.sphinx,
2✔
4269
                WitnessBeacon:           s.witnessBeacon,
2✔
4270
                Invoices:                s.invoices,
2✔
4271
                ChannelNotifier:         s.channelNotifier,
2✔
4272
                HtlcNotifier:            s.htlcNotifier,
2✔
4273
                TowerClient:             towerClient,
2✔
4274
                DisconnectPeer:          s.DisconnectPeer,
2✔
4275
                GenNodeAnnouncement: func(...netann.NodeAnnModifier) (
2✔
4276
                        lnwire.NodeAnnouncement, error) {
4✔
4277

2✔
4278
                        return s.genNodeAnnouncement(nil)
2✔
4279
                },
2✔
4280

4281
                PongBuf: s.pongBuf,
4282

4283
                PrunePersistentPeerConnection: s.prunePersistentPeerConnection,
4284

4285
                FetchLastChanUpdate: s.fetchLastChanUpdate(),
4286

4287
                FundingManager: s.fundingMgr,
4288

4289
                Hodl:                    s.cfg.Hodl,
4290
                UnsafeReplay:            s.cfg.UnsafeReplay,
4291
                MaxOutgoingCltvExpiry:   s.cfg.MaxOutgoingCltvExpiry,
4292
                MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation,
4293
                CoopCloseTargetConfs:    s.cfg.CoopCloseTargetConfs,
4294
                MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
4295
                        s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
4296
                ChannelCommitInterval:  s.cfg.ChannelCommitInterval,
4297
                PendingCommitInterval:  s.cfg.PendingCommitInterval,
4298
                ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize,
4299
                HandleCustomMessage:    s.handleCustomMessage,
4300
                GetAliases:             s.aliasMgr.GetAliases,
4301
                RequestAlias:           s.aliasMgr.RequestAlias,
4302
                AddLocalAlias:          s.aliasMgr.AddLocalAlias,
4303
                DisallowRouteBlinding:  s.cfg.ProtocolOptions.NoRouteBlinding(),
4304
                DisallowQuiescence:     s.cfg.ProtocolOptions.NoQuiescence(),
4305
                MaxFeeExposure:         thresholdMSats,
4306
                Quit:                   s.quit,
4307
                AuxLeafStore:           s.implCfg.AuxLeafStore,
4308
                AuxSigner:              s.implCfg.AuxSigner,
4309
                MsgRouter:              s.implCfg.MsgRouter,
4310
                AuxChanCloser:          s.implCfg.AuxChanCloser,
4311
                AuxResolver:            s.implCfg.AuxContractResolver,
4312
                AuxTrafficShaper:       s.implCfg.TrafficShaper,
4313
                ShouldFwdExpEndorsement: func() bool {
2✔
4314
                        if s.cfg.ProtocolOptions.NoExperimentalEndorsement() {
4✔
4315
                                return false
2✔
4316
                        }
2✔
4317

4318
                        return clock.NewDefaultClock().Now().Before(
2✔
4319
                                EndorsementExperimentEnd,
2✔
4320
                        )
2✔
4321
                },
4322
        }
4323

4324
        copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed())
2✔
4325
        copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed())
2✔
4326

2✔
4327
        p := peer.NewBrontide(pCfg)
2✔
4328

2✔
4329
        // TODO(roasbeef): update IP address for link-node
2✔
4330
        //  * also mark last-seen, do it one single transaction?
2✔
4331

2✔
4332
        s.addPeer(p)
2✔
4333

2✔
4334
        // Once we have successfully added the peer to the server, we can
2✔
4335
        // delete the previous error buffer from the server's map of error
2✔
4336
        // buffers.
2✔
4337
        delete(s.peerErrors, pkStr)
2✔
4338

2✔
4339
        // Dispatch a goroutine to asynchronously start the peer. This process
2✔
4340
        // includes sending and receiving Init messages, which would be a DOS
2✔
4341
        // vector if we held the server's mutex throughout the procedure.
2✔
4342
        s.wg.Add(1)
2✔
4343
        go s.peerInitializer(p)
2✔
4344
}
4345

4346
// addPeer adds the passed peer to the server's global state of all active
4347
// peers.
4348
func (s *server) addPeer(p *peer.Brontide) {
2✔
4349
        if p == nil {
2✔
4350
                return
×
4351
        }
×
4352

4353
        pubBytes := p.IdentityKey().SerializeCompressed()
2✔
4354

2✔
4355
        // Ignore new peers if we're shutting down.
2✔
4356
        if s.Stopped() {
2✔
4357
                srvrLog.Infof("Server stopped, skipped adding peer=%x",
×
4358
                        pubBytes)
×
4359
                p.Disconnect(ErrServerShuttingDown)
×
4360

×
4361
                return
×
4362
        }
×
4363

4364
        // Track the new peer in our indexes so we can quickly look it up either
4365
        // according to its public key, or its peer ID.
4366
        // TODO(roasbeef): pipe all requests through to the
4367
        // queryHandler/peerManager
4368

4369
        // NOTE: This pubStr is a raw bytes to string conversion and will NOT
4370
        // be human-readable.
4371
        pubStr := string(pubBytes)
2✔
4372

2✔
4373
        s.peersByPub[pubStr] = p
2✔
4374

2✔
4375
        if p.Inbound() {
4✔
4376
                s.inboundPeers[pubStr] = p
2✔
4377
        } else {
4✔
4378
                s.outboundPeers[pubStr] = p
2✔
4379
        }
2✔
4380

4381
        // Inform the peer notifier of a peer online event so that it can be reported
4382
        // to clients listening for peer events.
4383
        var pubKey [33]byte
2✔
4384
        copy(pubKey[:], pubBytes)
2✔
4385

2✔
4386
        s.peerNotifier.NotifyPeerOnline(pubKey)
2✔
4387
}
4388

4389
// peerInitializer asynchronously starts a newly connected peer after it has
4390
// been added to the server's peer map. This method sets up a
4391
// peerTerminationWatcher for the given peer, and ensures that it executes even
4392
// if the peer failed to start. In the event of a successful connection, this
4393
// method reads the negotiated, local feature-bits and spawns the appropriate
4394
// graph synchronization method. Any registered clients of NotifyWhenOnline will
4395
// be signaled of the new peer once the method returns.
4396
//
4397
// NOTE: This MUST be launched as a goroutine.
4398
func (s *server) peerInitializer(p *peer.Brontide) {
2✔
4399
        defer s.wg.Done()
2✔
4400

2✔
4401
        pubBytes := p.IdentityKey().SerializeCompressed()
2✔
4402

2✔
4403
        // Avoid initializing peers while the server is exiting.
2✔
4404
        if s.Stopped() {
2✔
4405
                srvrLog.Infof("Server stopped, skipped initializing peer=%x",
×
4406
                        pubBytes)
×
4407
                return
×
4408
        }
×
4409

4410
        // Create a channel that will be used to signal a successful start of
4411
        // the link. This prevents the peer termination watcher from beginning
4412
        // its duty too early.
4413
        ready := make(chan struct{})
2✔
4414

2✔
4415
        // Before starting the peer, launch a goroutine to watch for the
2✔
4416
        // unexpected termination of this peer, which will ensure all resources
2✔
4417
        // are properly cleaned up, and re-establish persistent connections when
2✔
4418
        // necessary. The peer termination watcher will be short circuited if
2✔
4419
        // the peer is ever added to the ignorePeerTermination map, indicating
2✔
4420
        // that the server has already handled the removal of this peer.
2✔
4421
        s.wg.Add(1)
2✔
4422
        go s.peerTerminationWatcher(p, ready)
2✔
4423

2✔
4424
        // Start the peer! If an error occurs, we Disconnect the peer, which
2✔
4425
        // will unblock the peerTerminationWatcher.
2✔
4426
        if err := p.Start(); err != nil {
3✔
4427
                srvrLog.Warnf("Starting peer=%x got error: %v", pubBytes, err)
1✔
4428

1✔
4429
                p.Disconnect(fmt.Errorf("unable to start peer: %w", err))
1✔
4430
                return
1✔
4431
        }
1✔
4432

4433
        // Otherwise, signal to the peerTerminationWatcher that the peer startup
4434
        // was successful, and to begin watching the peer's wait group.
4435
        close(ready)
2✔
4436

2✔
4437
        s.mu.Lock()
2✔
4438
        defer s.mu.Unlock()
2✔
4439

2✔
4440
        // Check if there are listeners waiting for this peer to come online.
2✔
4441
        srvrLog.Debugf("Notifying that peer %v is online", p)
2✔
4442

2✔
4443
        // TODO(guggero): Do a proper conversion to a string everywhere, or use
2✔
4444
        // route.Vertex as the key type of peerConnectedListeners.
2✔
4445
        pubStr := string(pubBytes)
2✔
4446
        for _, peerChan := range s.peerConnectedListeners[pubStr] {
4✔
4447
                select {
2✔
4448
                case peerChan <- p:
2✔
4449
                case <-s.quit:
×
4450
                        return
×
4451
                }
4452
        }
4453
        delete(s.peerConnectedListeners, pubStr)
2✔
4454
}
4455

4456
// peerTerminationWatcher waits until a peer has been disconnected unexpectedly,
4457
// and then cleans up all resources allocated to the peer, notifies relevant
4458
// sub-systems of its demise, and finally handles re-connecting to the peer if
4459
// it's persistent. If the server intentionally disconnects a peer, it should
4460
// have a corresponding entry in the ignorePeerTermination map which will cause
4461
// the cleanup routine to exit early. The passed `ready` chan is used to
4462
// synchronize when WaitForDisconnect should begin watching on the peer's
4463
// waitgroup. The ready chan should only be signaled if the peer starts
4464
// successfully, otherwise the peer should be disconnected instead.
4465
//
4466
// NOTE: This MUST be launched as a goroutine.
4467
func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
2✔
4468
        defer s.wg.Done()
2✔
4469

2✔
4470
        p.WaitForDisconnect(ready)
2✔
4471

2✔
4472
        srvrLog.Debugf("Peer %v has been disconnected", p)
2✔
4473

2✔
4474
        // If the server is exiting then we can bail out early ourselves as all
2✔
4475
        // the other sub-systems will already be shutting down.
2✔
4476
        if s.Stopped() {
4✔
4477
                srvrLog.Debugf("Server quitting, exit early for peer %v", p)
2✔
4478
                return
2✔
4479
        }
2✔
4480

4481
        // Next, we'll cancel all pending funding reservations with this node.
4482
        // If we tried to initiate any funding flows that haven't yet finished,
4483
        // then we need to unlock those committed outputs so they're still
4484
        // available for use.
4485
        s.fundingMgr.CancelPeerReservations(p.PubKey())
2✔
4486

2✔
4487
        pubKey := p.IdentityKey()
2✔
4488

2✔
4489
        // We'll also inform the gossiper that this peer is no longer active,
2✔
4490
        // so we don't need to maintain sync state for it any longer.
2✔
4491
        s.authGossiper.PruneSyncState(p.PubKey())
2✔
4492

2✔
4493
        // Tell the switch to remove all links associated with this peer.
2✔
4494
        // Passing nil as the target link indicates that all links associated
2✔
4495
        // with this interface should be closed.
2✔
4496
        //
2✔
4497
        // TODO(roasbeef): instead add a PurgeInterfaceLinks function?
2✔
4498
        links, err := s.htlcSwitch.GetLinksByInterface(p.PubKey())
2✔
4499
        if err != nil && err != htlcswitch.ErrNoLinksFound {
2✔
4500
                srvrLog.Errorf("Unable to get channel links for %v: %v", p, err)
×
4501
        }
×
4502

4503
        for _, link := range links {
4✔
4504
                s.htlcSwitch.RemoveLink(link.ChanID())
2✔
4505
        }
2✔
4506

4507
        s.mu.Lock()
2✔
4508
        defer s.mu.Unlock()
2✔
4509

2✔
4510
        // If there were any notification requests for when this peer
2✔
4511
        // disconnected, we can trigger them now.
2✔
4512
        srvrLog.Debugf("Notifying that peer %v is offline", p)
2✔
4513
        pubStr := string(pubKey.SerializeCompressed())
2✔
4514
        for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
4✔
4515
                close(offlineChan)
2✔
4516
        }
2✔
4517
        delete(s.peerDisconnectedListeners, pubStr)
2✔
4518

2✔
4519
        // If the server has already removed this peer, we can short circuit the
2✔
4520
        // peer termination watcher and skip cleanup.
2✔
4521
        if _, ok := s.ignorePeerTermination[p]; ok {
2✔
4522
                delete(s.ignorePeerTermination, p)
×
4523

×
4524
                pubKey := p.PubKey()
×
4525
                pubStr := string(pubKey[:])
×
4526

×
4527
                // If a connection callback is present, we'll go ahead and
×
4528
                // execute it now that previous peer has fully disconnected. If
×
4529
                // the callback is not present, this likely implies the peer was
×
4530
                // purposefully disconnected via RPC, and that no reconnect
×
4531
                // should be attempted.
×
4532
                connCallback, ok := s.scheduledPeerConnection[pubStr]
×
4533
                if ok {
×
4534
                        delete(s.scheduledPeerConnection, pubStr)
×
4535
                        connCallback()
×
4536
                }
×
4537
                return
×
4538
        }
4539

4540
        // First, cleanup any remaining state the server has regarding the peer
4541
        // in question.
4542
        s.removePeer(p)
2✔
4543

2✔
4544
        // Next, check to see if this is a persistent peer or not.
2✔
4545
        if _, ok := s.persistentPeers[pubStr]; !ok {
4✔
4546
                return
2✔
4547
        }
2✔
4548

4549
        // Get the last address that we used to connect to the peer.
4550
        addrs := []net.Addr{
2✔
4551
                p.NetAddress().Address,
2✔
4552
        }
2✔
4553

2✔
4554
        // We'll ensure that we locate all the peers advertised addresses for
2✔
4555
        // reconnection purposes.
2✔
4556
        advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(pubKey)
2✔
4557
        switch {
2✔
4558
        // We found advertised addresses, so use them.
4559
        case err == nil:
2✔
4560
                addrs = advertisedAddrs
2✔
4561

4562
        // The peer doesn't have an advertised address.
4563
        case err == errNoAdvertisedAddr:
2✔
4564
                // If it is an outbound peer then we fall back to the existing
2✔
4565
                // peer address.
2✔
4566
                if !p.Inbound() {
4✔
4567
                        break
2✔
4568
                }
4569

4570
                // Fall back to the existing peer address if
4571
                // we're not accepting connections over Tor.
4572
                if s.torController == nil {
4✔
4573
                        break
2✔
4574
                }
4575

4576
                // If we are, the peer's address won't be known
4577
                // to us (we'll see a private address, which is
4578
                // the address used by our onion service to dial
4579
                // to lnd), so we don't have enough information
4580
                // to attempt a reconnect.
4581
                srvrLog.Debugf("Ignoring reconnection attempt "+
×
4582
                        "to inbound peer %v without "+
×
4583
                        "advertised address", p)
×
4584
                return
×
4585

4586
        // We came across an error retrieving an advertised
4587
        // address, log it, and fall back to the existing peer
4588
        // address.
4589
        default:
2✔
4590
                srvrLog.Errorf("Unable to retrieve advertised "+
2✔
4591
                        "address for node %x: %v", p.PubKey(),
2✔
4592
                        err)
2✔
4593
        }
4594

4595
        // Make an easy lookup map so that we can check if an address
4596
        // is already in the address list that we have stored for this peer.
4597
        existingAddrs := make(map[string]bool)
2✔
4598
        for _, addr := range s.persistentPeerAddrs[pubStr] {
4✔
4599
                existingAddrs[addr.String()] = true
2✔
4600
        }
2✔
4601

4602
        // Add any missing addresses for this peer to persistentPeerAddr.
4603
        for _, addr := range addrs {
4✔
4604
                if existingAddrs[addr.String()] {
2✔
4605
                        continue
×
4606
                }
4607

4608
                s.persistentPeerAddrs[pubStr] = append(
2✔
4609
                        s.persistentPeerAddrs[pubStr],
2✔
4610
                        &lnwire.NetAddress{
2✔
4611
                                IdentityKey: p.IdentityKey(),
2✔
4612
                                Address:     addr,
2✔
4613
                                ChainNet:    p.NetAddress().ChainNet,
2✔
4614
                        },
2✔
4615
                )
2✔
4616
        }
4617

4618
        // Record the computed backoff in the backoff map.
4619
        backoff := s.nextPeerBackoff(pubStr, p.StartTime())
2✔
4620
        s.persistentPeersBackoff[pubStr] = backoff
2✔
4621

2✔
4622
        // Initialize a retry canceller for this peer if one does not
2✔
4623
        // exist.
2✔
4624
        cancelChan, ok := s.persistentRetryCancels[pubStr]
2✔
4625
        if !ok {
4✔
4626
                cancelChan = make(chan struct{})
2✔
4627
                s.persistentRetryCancels[pubStr] = cancelChan
2✔
4628
        }
2✔
4629

4630
        // We choose not to wait group this go routine since the Connect
4631
        // call can stall for arbitrarily long if we shutdown while an
4632
        // outbound connection attempt is being made.
4633
        go func() {
4✔
4634
                srvrLog.Debugf("Scheduling connection re-establishment to "+
2✔
4635
                        "persistent peer %x in %s",
2✔
4636
                        p.IdentityKey().SerializeCompressed(), backoff)
2✔
4637

2✔
4638
                select {
2✔
4639
                case <-time.After(backoff):
2✔
4640
                case <-cancelChan:
2✔
4641
                        return
2✔
4642
                case <-s.quit:
2✔
4643
                        return
2✔
4644
                }
4645

4646
                srvrLog.Debugf("Attempting to re-establish persistent "+
2✔
4647
                        "connection to peer %x",
2✔
4648
                        p.IdentityKey().SerializeCompressed())
2✔
4649

2✔
4650
                s.connectToPersistentPeer(pubStr)
2✔
4651
        }()
4652
}
4653

4654
// connectToPersistentPeer uses all the stored addresses for a peer to attempt
4655
// to connect to the peer. It creates connection requests if there are
4656
// currently none for a given address and it removes old connection requests
4657
// if the associated address is no longer in the latest address list for the
4658
// peer.
4659
func (s *server) connectToPersistentPeer(pubKeyStr string) {
2✔
4660
        s.mu.Lock()
2✔
4661
        defer s.mu.Unlock()
2✔
4662

2✔
4663
        // Create an easy lookup map of the addresses we have stored for the
2✔
4664
        // peer. We will remove entries from this map if we have existing
2✔
4665
        // connection requests for the associated address and then any leftover
2✔
4666
        // entries will indicate which addresses we should create new
2✔
4667
        // connection requests for.
2✔
4668
        addrMap := make(map[string]*lnwire.NetAddress)
2✔
4669
        for _, addr := range s.persistentPeerAddrs[pubKeyStr] {
4✔
4670
                addrMap[addr.String()] = addr
2✔
4671
        }
2✔
4672

4673
        // Go through each of the existing connection requests and
4674
        // check if they correspond to the latest set of addresses. If
4675
        // there is a connection requests that does not use one of the latest
4676
        // advertised addresses then remove that connection request.
4677
        var updatedConnReqs []*connmgr.ConnReq
2✔
4678
        for _, connReq := range s.persistentConnReqs[pubKeyStr] {
4✔
4679
                lnAddr := connReq.Addr.(*lnwire.NetAddress).Address.String()
2✔
4680

2✔
4681
                switch _, ok := addrMap[lnAddr]; ok {
2✔
4682
                // If the existing connection request is using one of the
4683
                // latest advertised addresses for the peer then we add it to
4684
                // updatedConnReqs and remove the associated address from
4685
                // addrMap so that we don't recreate this connReq later on.
4686
                case true:
×
4687
                        updatedConnReqs = append(
×
4688
                                updatedConnReqs, connReq,
×
4689
                        )
×
4690
                        delete(addrMap, lnAddr)
×
4691

4692
                // If the existing connection request is using an address that
4693
                // is not one of the latest advertised addresses for the peer
4694
                // then we remove the connecting request from the connection
4695
                // manager.
4696
                case false:
2✔
4697
                        srvrLog.Info(
2✔
4698
                                "Removing conn req:", connReq.Addr.String(),
2✔
4699
                        )
2✔
4700
                        s.connMgr.Remove(connReq.ID())
2✔
4701
                }
4702
        }
4703

4704
        s.persistentConnReqs[pubKeyStr] = updatedConnReqs
2✔
4705

2✔
4706
        cancelChan, ok := s.persistentRetryCancels[pubKeyStr]
2✔
4707
        if !ok {
4✔
4708
                cancelChan = make(chan struct{})
2✔
4709
                s.persistentRetryCancels[pubKeyStr] = cancelChan
2✔
4710
        }
2✔
4711

4712
        // Any addresses left in addrMap are new ones that we have not made
4713
        // connection requests for. So create new connection requests for those.
4714
        // If there is more than one address in the address map, stagger the
4715
        // creation of the connection requests for those.
4716
        go func() {
4✔
4717
                ticker := time.NewTicker(multiAddrConnectionStagger)
2✔
4718
                defer ticker.Stop()
2✔
4719

2✔
4720
                for _, addr := range addrMap {
4✔
4721
                        // Send the persistent connection request to the
2✔
4722
                        // connection manager, saving the request itself so we
2✔
4723
                        // can cancel/restart the process as needed.
2✔
4724
                        connReq := &connmgr.ConnReq{
2✔
4725
                                Addr:      addr,
2✔
4726
                                Permanent: true,
2✔
4727
                        }
2✔
4728

2✔
4729
                        s.mu.Lock()
2✔
4730
                        s.persistentConnReqs[pubKeyStr] = append(
2✔
4731
                                s.persistentConnReqs[pubKeyStr], connReq,
2✔
4732
                        )
2✔
4733
                        s.mu.Unlock()
2✔
4734

2✔
4735
                        srvrLog.Debugf("Attempting persistent connection to "+
2✔
4736
                                "channel peer %v", addr)
2✔
4737

2✔
4738
                        go s.connMgr.Connect(connReq)
2✔
4739

2✔
4740
                        select {
2✔
4741
                        case <-s.quit:
2✔
4742
                                return
2✔
4743
                        case <-cancelChan:
2✔
4744
                                return
2✔
4745
                        case <-ticker.C:
2✔
4746
                        }
4747
                }
4748
        }()
4749
}
4750

4751
// removePeer removes the passed peer from the server's state of all active
4752
// peers.
4753
func (s *server) removePeer(p *peer.Brontide) {
2✔
4754
        if p == nil {
2✔
4755
                return
×
4756
        }
×
4757

4758
        srvrLog.Debugf("removing peer %v", p)
2✔
4759

2✔
4760
        // As the peer is now finished, ensure that the TCP connection is
2✔
4761
        // closed and all of its related goroutines have exited.
2✔
4762
        p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p))
2✔
4763

2✔
4764
        // If this peer had an active persistent connection request, remove it.
2✔
4765
        if p.ConnReq() != nil {
4✔
4766
                s.connMgr.Remove(p.ConnReq().ID())
2✔
4767
        }
2✔
4768

4769
        // Ignore deleting peers if we're shutting down.
4770
        if s.Stopped() {
2✔
4771
                return
×
4772
        }
×
4773

4774
        pKey := p.PubKey()
2✔
4775
        pubSer := pKey[:]
2✔
4776
        pubStr := string(pubSer)
2✔
4777

2✔
4778
        delete(s.peersByPub, pubStr)
2✔
4779

2✔
4780
        if p.Inbound() {
4✔
4781
                delete(s.inboundPeers, pubStr)
2✔
4782
        } else {
4✔
4783
                delete(s.outboundPeers, pubStr)
2✔
4784
        }
2✔
4785

4786
        // Copy the peer's error buffer across to the server if it has any items
4787
        // in it so that we can restore peer errors across connections.
4788
        if p.ErrorBuffer().Total() > 0 {
4✔
4789
                s.peerErrors[pubStr] = p.ErrorBuffer()
2✔
4790
        }
2✔
4791

4792
        // Inform the peer notifier of a peer offline event so that it can be
4793
        // reported to clients listening for peer events.
4794
        var pubKey [33]byte
2✔
4795
        copy(pubKey[:], pubSer)
2✔
4796

2✔
4797
        s.peerNotifier.NotifyPeerOffline(pubKey)
2✔
4798
}
4799

4800
// ConnectToPeer requests that the server connect to a Lightning Network peer
4801
// at the specified address. This function will *block* until either a
4802
// connection is established, or the initial handshake process fails.
4803
//
4804
// NOTE: This function is safe for concurrent access.
4805
func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
4806
        perm bool, timeout time.Duration) error {
2✔
4807

2✔
4808
        targetPub := string(addr.IdentityKey.SerializeCompressed())
2✔
4809

2✔
4810
        // Acquire mutex, but use explicit unlocking instead of defer for
2✔
4811
        // better granularity.  In certain conditions, this method requires
2✔
4812
        // making an outbound connection to a remote peer, which requires the
2✔
4813
        // lock to be released, and subsequently reacquired.
2✔
4814
        s.mu.Lock()
2✔
4815

2✔
4816
        // Ensure we're not already connected to this peer.
2✔
4817
        peer, err := s.findPeerByPubStr(targetPub)
2✔
4818
        if err == nil {
4✔
4819
                s.mu.Unlock()
2✔
4820
                return &errPeerAlreadyConnected{peer: peer}
2✔
4821
        }
2✔
4822

4823
        // Peer was not found, continue to pursue connection with peer.
4824

4825
        // If there's already a pending connection request for this pubkey,
4826
        // then we ignore this request to ensure we don't create a redundant
4827
        // connection.
4828
        if reqs, ok := s.persistentConnReqs[targetPub]; ok {
4✔
4829
                srvrLog.Warnf("Already have %d persistent connection "+
2✔
4830
                        "requests for %v, connecting anyway.", len(reqs), addr)
2✔
4831
        }
2✔
4832

4833
        // If there's not already a pending or active connection to this node,
4834
        // then instruct the connection manager to attempt to establish a
4835
        // persistent connection to the peer.
4836
        srvrLog.Debugf("Connecting to %v", addr)
2✔
4837
        if perm {
4✔
4838
                connReq := &connmgr.ConnReq{
2✔
4839
                        Addr:      addr,
2✔
4840
                        Permanent: true,
2✔
4841
                }
2✔
4842

2✔
4843
                // Since the user requested a permanent connection, we'll set
2✔
4844
                // the entry to true which will tell the server to continue
2✔
4845
                // reconnecting even if the number of channels with this peer is
2✔
4846
                // zero.
2✔
4847
                s.persistentPeers[targetPub] = true
2✔
4848
                if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
4✔
4849
                        s.persistentPeersBackoff[targetPub] = s.cfg.MinBackoff
2✔
4850
                }
2✔
4851
                s.persistentConnReqs[targetPub] = append(
2✔
4852
                        s.persistentConnReqs[targetPub], connReq,
2✔
4853
                )
2✔
4854
                s.mu.Unlock()
2✔
4855

2✔
4856
                go s.connMgr.Connect(connReq)
2✔
4857

2✔
4858
                return nil
2✔
4859
        }
4860
        s.mu.Unlock()
2✔
4861

2✔
4862
        // If we're not making a persistent connection, then we'll attempt to
2✔
4863
        // connect to the target peer. If the we can't make the connection, or
2✔
4864
        // the crypto negotiation breaks down, then return an error to the
2✔
4865
        // caller.
2✔
4866
        errChan := make(chan error, 1)
2✔
4867
        s.connectToPeer(addr, errChan, timeout)
2✔
4868

2✔
4869
        select {
2✔
4870
        case err := <-errChan:
2✔
4871
                return err
2✔
4872
        case <-s.quit:
×
4873
                return ErrServerShuttingDown
×
4874
        }
4875
}
4876

4877
// connectToPeer establishes a connection to a remote peer. errChan is used to
4878
// notify the caller if the connection attempt has failed. Otherwise, it will be
4879
// closed.
4880
func (s *server) connectToPeer(addr *lnwire.NetAddress,
4881
        errChan chan<- error, timeout time.Duration) {
2✔
4882

2✔
4883
        conn, err := brontide.Dial(
2✔
4884
                s.identityECDH, addr, timeout, s.cfg.net.Dial,
2✔
4885
        )
2✔
4886
        if err != nil {
4✔
4887
                srvrLog.Errorf("Unable to connect to %v: %v", addr, err)
2✔
4888
                select {
2✔
4889
                case errChan <- err:
2✔
4890
                case <-s.quit:
×
4891
                }
4892
                return
2✔
4893
        }
4894

4895
        close(errChan)
2✔
4896

2✔
4897
        srvrLog.Tracef("Brontide dialer made local=%v, remote=%v",
2✔
4898
                conn.LocalAddr(), conn.RemoteAddr())
2✔
4899

2✔
4900
        s.OutboundPeerConnected(nil, conn)
2✔
4901
}
4902

4903
// DisconnectPeer sends the request to server to close the connection with peer
4904
// identified by public key.
4905
//
4906
// NOTE: This function is safe for concurrent access.
4907
func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
2✔
4908
        pubBytes := pubKey.SerializeCompressed()
2✔
4909
        pubStr := string(pubBytes)
2✔
4910

2✔
4911
        s.mu.Lock()
2✔
4912
        defer s.mu.Unlock()
2✔
4913

2✔
4914
        // Check that were actually connected to this peer. If not, then we'll
2✔
4915
        // exit in an error as we can't disconnect from a peer that we're not
2✔
4916
        // currently connected to.
2✔
4917
        peer, err := s.findPeerByPubStr(pubStr)
2✔
4918
        if err == ErrPeerNotConnected {
4✔
4919
                return fmt.Errorf("peer %x is not connected", pubBytes)
2✔
4920
        }
2✔
4921

4922
        srvrLog.Infof("Disconnecting from %v", peer)
2✔
4923

2✔
4924
        s.cancelConnReqs(pubStr, nil)
2✔
4925

2✔
4926
        // If this peer was formerly a persistent connection, then we'll remove
2✔
4927
        // them from this map so we don't attempt to re-connect after we
2✔
4928
        // disconnect.
2✔
4929
        delete(s.persistentPeers, pubStr)
2✔
4930
        delete(s.persistentPeersBackoff, pubStr)
2✔
4931

2✔
4932
        // Remove the peer by calling Disconnect. Previously this was done with
2✔
4933
        // removePeer, which bypassed the peerTerminationWatcher.
2✔
4934
        peer.Disconnect(fmt.Errorf("server: DisconnectPeer called"))
2✔
4935

2✔
4936
        return nil
2✔
4937
}
4938

4939
// OpenChannel sends a request to the server to open a channel to the specified
4940
// peer identified by nodeKey with the passed channel funding parameters.
4941
//
4942
// NOTE: This function is safe for concurrent access.
4943
func (s *server) OpenChannel(
4944
        req *funding.InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) {
2✔
4945

2✔
4946
        // The updateChan will have a buffer of 2, since we expect a ChanPending
2✔
4947
        // + a ChanOpen update, and we want to make sure the funding process is
2✔
4948
        // not blocked if the caller is not reading the updates.
2✔
4949
        req.Updates = make(chan *lnrpc.OpenStatusUpdate, 2)
2✔
4950
        req.Err = make(chan error, 1)
2✔
4951

2✔
4952
        // First attempt to locate the target peer to open a channel with, if
2✔
4953
        // we're unable to locate the peer then this request will fail.
2✔
4954
        pubKeyBytes := req.TargetPubkey.SerializeCompressed()
2✔
4955
        s.mu.RLock()
2✔
4956
        peer, ok := s.peersByPub[string(pubKeyBytes)]
2✔
4957
        if !ok {
2✔
4958
                s.mu.RUnlock()
×
4959

×
4960
                req.Err <- fmt.Errorf("peer %x is not online", pubKeyBytes)
×
4961
                return req.Updates, req.Err
×
4962
        }
×
4963
        req.Peer = peer
2✔
4964
        s.mu.RUnlock()
2✔
4965

2✔
4966
        // We'll wait until the peer is active before beginning the channel
2✔
4967
        // opening process.
2✔
4968
        select {
2✔
4969
        case <-peer.ActiveSignal():
2✔
4970
        case <-peer.QuitSignal():
×
4971
                req.Err <- fmt.Errorf("peer %x disconnected", pubKeyBytes)
×
4972
                return req.Updates, req.Err
×
4973
        case <-s.quit:
×
4974
                req.Err <- ErrServerShuttingDown
×
4975
                return req.Updates, req.Err
×
4976
        }
4977

4978
        // If the fee rate wasn't specified at this point we fail the funding
4979
        // because of the missing fee rate information. The caller of the
4980
        // `OpenChannel` method needs to make sure that default values for the
4981
        // fee rate are set beforehand.
4982
        if req.FundingFeePerKw == 0 {
2✔
4983
                req.Err <- fmt.Errorf("no FundingFeePerKw specified for " +
×
4984
                        "the channel opening transaction")
×
4985

×
4986
                return req.Updates, req.Err
×
4987
        }
×
4988

4989
        // Spawn a goroutine to send the funding workflow request to the funding
4990
        // manager. This allows the server to continue handling queries instead
4991
        // of blocking on this request which is exported as a synchronous
4992
        // request to the outside world.
4993
        go s.fundingMgr.InitFundingWorkflow(req)
2✔
4994

2✔
4995
        return req.Updates, req.Err
2✔
4996
}
4997

4998
// Peers returns a slice of all active peers.
4999
//
5000
// NOTE: This function is safe for concurrent access.
5001
func (s *server) Peers() []*peer.Brontide {
2✔
5002
        s.mu.RLock()
2✔
5003
        defer s.mu.RUnlock()
2✔
5004

2✔
5005
        peers := make([]*peer.Brontide, 0, len(s.peersByPub))
2✔
5006
        for _, peer := range s.peersByPub {
4✔
5007
                peers = append(peers, peer)
2✔
5008
        }
2✔
5009

5010
        return peers
2✔
5011
}
5012

5013
// computeNextBackoff uses a truncated exponential backoff to compute the next
5014
// backoff using the value of the exiting backoff. The returned duration is
5015
// randomized in either direction by 1/20 to prevent tight loops from
5016
// stabilizing.
5017
func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration {
2✔
5018
        // Double the current backoff, truncating if it exceeds our maximum.
2✔
5019
        nextBackoff := 2 * currBackoff
2✔
5020
        if nextBackoff > maxBackoff {
4✔
5021
                nextBackoff = maxBackoff
2✔
5022
        }
2✔
5023

5024
        // Using 1/10 of our duration as a margin, compute a random offset to
5025
        // avoid the nodes entering connection cycles.
5026
        margin := nextBackoff / 10
2✔
5027

2✔
5028
        var wiggle big.Int
2✔
5029
        wiggle.SetUint64(uint64(margin))
2✔
5030
        if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
2✔
5031
                // Randomizing is not mission critical, so we'll just return the
×
5032
                // current backoff.
×
5033
                return nextBackoff
×
5034
        }
×
5035

5036
        // Otherwise add in our wiggle, but subtract out half of the margin so
5037
        // that the backoff can tweaked by 1/20 in either direction.
5038
        return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
2✔
5039
}
5040

5041
// errNoAdvertisedAddr is an error returned when we attempt to retrieve the
5042
// advertised address of a node, but they don't have one.
5043
var errNoAdvertisedAddr = errors.New("no advertised address found")
5044

5045
// fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a node.
5046
func (s *server) fetchNodeAdvertisedAddrs(pub *btcec.PublicKey) ([]net.Addr, error) {
2✔
5047
        vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed())
2✔
5048
        if err != nil {
2✔
5049
                return nil, err
×
5050
        }
×
5051

5052
        node, err := s.graphDB.FetchLightningNode(vertex)
2✔
5053
        if err != nil {
4✔
5054
                return nil, err
2✔
5055
        }
2✔
5056

5057
        if len(node.Addresses) == 0 {
4✔
5058
                return nil, errNoAdvertisedAddr
2✔
5059
        }
2✔
5060

5061
        return node.Addresses, nil
2✔
5062
}
5063

5064
// fetchLastChanUpdate returns a function which is able to retrieve our latest
5065
// channel update for a target channel.
5066
func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
5067
        *lnwire.ChannelUpdate1, error) {
2✔
5068

2✔
5069
        ourPubKey := s.identityECDH.PubKey().SerializeCompressed()
2✔
5070
        return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate1, error) {
4✔
5071
                info, edge1, edge2, err := s.graphBuilder.GetChannelByID(cid)
2✔
5072
                if err != nil {
4✔
5073
                        return nil, err
2✔
5074
                }
2✔
5075

5076
                return netann.ExtractChannelUpdate(
2✔
5077
                        ourPubKey[:], info, edge1, edge2,
2✔
5078
                )
2✔
5079
        }
5080
}
5081

5082
// applyChannelUpdate applies the channel update to the different sub-systems of
5083
// the server. The useAlias boolean denotes whether or not to send an alias in
5084
// place of the real SCID.
5085
func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate1,
5086
        op *wire.OutPoint, useAlias bool) error {
2✔
5087

2✔
5088
        var (
2✔
5089
                peerAlias    *lnwire.ShortChannelID
2✔
5090
                defaultAlias lnwire.ShortChannelID
2✔
5091
        )
2✔
5092

2✔
5093
        chanID := lnwire.NewChanIDFromOutPoint(*op)
2✔
5094

2✔
5095
        // Fetch the peer's alias from the lnwire.ChannelID so it can be used
2✔
5096
        // in the ChannelUpdate if it hasn't been announced yet.
2✔
5097
        if useAlias {
4✔
5098
                foundAlias, _ := s.aliasMgr.GetPeerAlias(chanID)
2✔
5099
                if foundAlias != defaultAlias {
4✔
5100
                        peerAlias = &foundAlias
2✔
5101
                }
2✔
5102
        }
5103

5104
        errChan := s.authGossiper.ProcessLocalAnnouncement(
2✔
5105
                update, discovery.RemoteAlias(peerAlias),
2✔
5106
        )
2✔
5107
        select {
2✔
5108
        case err := <-errChan:
2✔
5109
                return err
2✔
5110
        case <-s.quit:
×
5111
                return ErrServerShuttingDown
×
5112
        }
5113
}
5114

5115
// SendCustomMessage sends a custom message to the peer with the specified
5116
// pubkey.
5117
func (s *server) SendCustomMessage(peerPub [33]byte, msgType lnwire.MessageType,
5118
        data []byte) error {
2✔
5119

2✔
5120
        peer, err := s.FindPeerByPubStr(string(peerPub[:]))
2✔
5121
        if err != nil {
2✔
5122
                return err
×
5123
        }
×
5124

5125
        // We'll wait until the peer is active.
5126
        select {
2✔
5127
        case <-peer.ActiveSignal():
2✔
5128
        case <-peer.QuitSignal():
×
5129
                return fmt.Errorf("peer %x disconnected", peerPub)
×
5130
        case <-s.quit:
×
5131
                return ErrServerShuttingDown
×
5132
        }
5133

5134
        msg, err := lnwire.NewCustom(msgType, data)
2✔
5135
        if err != nil {
4✔
5136
                return err
2✔
5137
        }
2✔
5138

5139
        // Send the message as low-priority. For now we assume that all
5140
        // application-defined message are low priority.
5141
        return peer.SendMessageLazy(true, msg)
2✔
5142
}
5143

5144
// newSweepPkScriptGen creates closure that generates a new public key script
5145
// which should be used to sweep any funds into the on-chain wallet.
5146
// Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash
5147
// (p2wkh) output.
5148
func newSweepPkScriptGen(
5149
        wallet lnwallet.WalletController,
5150
        netParams *chaincfg.Params) func() fn.Result[lnwallet.AddrWithKey] {
2✔
5151

2✔
5152
        return func() fn.Result[lnwallet.AddrWithKey] {
4✔
5153
                sweepAddr, err := wallet.NewAddress(
2✔
5154
                        lnwallet.TaprootPubkey, false,
2✔
5155
                        lnwallet.DefaultAccountName,
2✔
5156
                )
2✔
5157
                if err != nil {
2✔
5158
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5159
                }
×
5160

5161
                addr, err := txscript.PayToAddrScript(sweepAddr)
2✔
5162
                if err != nil {
2✔
5163
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5164
                }
×
5165

5166
                internalKeyDesc, err := lnwallet.InternalKeyForAddr(
2✔
5167
                        wallet, netParams, addr,
2✔
5168
                )
2✔
5169
                if err != nil {
2✔
5170
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5171
                }
×
5172

5173
                return fn.Ok(lnwallet.AddrWithKey{
2✔
5174
                        DeliveryAddress: addr,
2✔
5175
                        InternalKey:     internalKeyDesc,
2✔
5176
                })
2✔
5177
        }
5178
}
5179

5180
// shouldPeerBootstrap returns true if we should attempt to perform peer
5181
// bootstrapping to actively seek our peers using the set of active network
5182
// bootstrappers.
5183
func shouldPeerBootstrap(cfg *Config) bool {
8✔
5184
        isSimnet := cfg.Bitcoin.SimNet
8✔
5185
        isSignet := cfg.Bitcoin.SigNet
8✔
5186
        isRegtest := cfg.Bitcoin.RegTest
8✔
5187
        isDevNetwork := isSimnet || isSignet || isRegtest
8✔
5188

8✔
5189
        // TODO(yy): remove the check on simnet/regtest such that the itest is
8✔
5190
        // covering the bootstrapping process.
8✔
5191
        return !cfg.NoNetBootstrap && !isDevNetwork
8✔
5192
}
8✔
5193

5194
// fetchClosedChannelSCIDs returns a set of SCIDs that have their force closing
5195
// finished.
5196
func (s *server) fetchClosedChannelSCIDs() map[lnwire.ShortChannelID]struct{} {
2✔
5197
        // Get a list of closed channels.
2✔
5198
        channels, err := s.chanStateDB.FetchClosedChannels(false)
2✔
5199
        if err != nil {
2✔
5200
                srvrLog.Errorf("Failed to fetch closed channels: %v", err)
×
5201
                return nil
×
5202
        }
×
5203

5204
        // Save the SCIDs in a map.
5205
        closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels))
2✔
5206
        for _, c := range channels {
4✔
5207
                // If the channel is not pending, its FC has been finalized.
2✔
5208
                if !c.IsPending {
4✔
5209
                        closedSCIDs[c.ShortChanID] = struct{}{}
2✔
5210
                }
2✔
5211
        }
5212

5213
        // Double check whether the reported closed channel has indeed finished
5214
        // closing.
5215
        //
5216
        // NOTE: There are misalignments regarding when a channel's FC is
5217
        // marked as finalized. We double check the pending channels to make
5218
        // sure the returned SCIDs are indeed terminated.
5219
        //
5220
        // TODO(yy): fix the misalignments in `FetchClosedChannels`.
5221
        pendings, err := s.chanStateDB.FetchPendingChannels()
2✔
5222
        if err != nil {
2✔
5223
                srvrLog.Errorf("Failed to fetch pending channels: %v", err)
×
5224
                return nil
×
5225
        }
×
5226

5227
        for _, c := range pendings {
4✔
5228
                if _, ok := closedSCIDs[c.ShortChannelID]; !ok {
4✔
5229
                        continue
2✔
5230
                }
5231

5232
                // If the channel is still reported as pending, remove it from
5233
                // the map.
5234
                delete(closedSCIDs, c.ShortChannelID)
×
5235

×
5236
                srvrLog.Warnf("Channel=%v is prematurely marked as finalized",
×
5237
                        c.ShortChannelID)
×
5238
        }
5239

5240
        return closedSCIDs
2✔
5241
}
5242

5243
// getStartingBeat returns the current beat. This is used during the startup to
5244
// initialize blockbeat consumers.
5245
func (s *server) getStartingBeat() (*chainio.Beat, error) {
2✔
5246
        // beat is the current blockbeat.
2✔
5247
        var beat *chainio.Beat
2✔
5248

2✔
5249
        // We should get a notification with the current best block immediately
2✔
5250
        // by passing a nil block.
2✔
5251
        blockEpochs, err := s.cc.ChainNotifier.RegisterBlockEpochNtfn(nil)
2✔
5252
        if err != nil {
2✔
5253
                return beat, fmt.Errorf("register block epoch ntfn: %w", err)
×
5254
        }
×
5255
        defer blockEpochs.Cancel()
2✔
5256

2✔
5257
        // We registered for the block epochs with a nil request. The notifier
2✔
5258
        // should send us the current best block immediately. So we need to
2✔
5259
        // wait for it here because we need to know the current best height.
2✔
5260
        select {
2✔
5261
        case bestBlock := <-blockEpochs.Epochs:
2✔
5262
                srvrLog.Infof("Received initial block %v at height %d",
2✔
5263
                        bestBlock.Hash, bestBlock.Height)
2✔
5264

2✔
5265
                // Update the current blockbeat.
2✔
5266
                beat = chainio.NewBeat(*bestBlock)
2✔
5267

5268
        case <-s.quit:
×
5269
                srvrLog.Debug("LND shutting down")
×
5270
        }
5271

5272
        return beat, nil
2✔
5273
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc