• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12583319996

02 Jan 2025 01:38PM UTC coverage: 57.522% (-1.1%) from 58.598%
12583319996

Pull #9361

github

starius
fn/ContextGuard: use context.AfterFunc to wait

Simplifies context cancellation handling by using context.AfterFunc instead of a
goroutine to wait for context cancellation. This approach avoids the overhead of
a goroutine during the waiting period.

For ctxQuitUnsafe, since g.quit is closed only in the Quit method (which also
cancels all associated contexts), waiting on context cancellation ensures the
same behavior without unnecessary dependency on g.quit.

Added a test to ensure that the Create method does not launch any goroutines.
Pull Request #9361: fn: optimize context guard

102587 of 178344 relevant lines covered (57.52%)

24734.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.28
/server.go
1
package lnd
2

3
import (
4
        "bytes"
5
        "context"
6
        "crypto/rand"
7
        "encoding/hex"
8
        "fmt"
9
        "math/big"
10
        prand "math/rand"
11
        "net"
12
        "strconv"
13
        "strings"
14
        "sync"
15
        "sync/atomic"
16
        "time"
17

18
        "github.com/btcsuite/btcd/btcec/v2"
19
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
20
        "github.com/btcsuite/btcd/btcutil"
21
        "github.com/btcsuite/btcd/chaincfg"
22
        "github.com/btcsuite/btcd/chaincfg/chainhash"
23
        "github.com/btcsuite/btcd/connmgr"
24
        "github.com/btcsuite/btcd/txscript"
25
        "github.com/btcsuite/btcd/wire"
26
        "github.com/go-errors/errors"
27
        sphinx "github.com/lightningnetwork/lightning-onion"
28
        "github.com/lightningnetwork/lnd/aliasmgr"
29
        "github.com/lightningnetwork/lnd/autopilot"
30
        "github.com/lightningnetwork/lnd/brontide"
31
        "github.com/lightningnetwork/lnd/chainio"
32
        "github.com/lightningnetwork/lnd/chainreg"
33
        "github.com/lightningnetwork/lnd/chanacceptor"
34
        "github.com/lightningnetwork/lnd/chanbackup"
35
        "github.com/lightningnetwork/lnd/chanfitness"
36
        "github.com/lightningnetwork/lnd/channeldb"
37
        "github.com/lightningnetwork/lnd/channelnotifier"
38
        "github.com/lightningnetwork/lnd/clock"
39
        "github.com/lightningnetwork/lnd/cluster"
40
        "github.com/lightningnetwork/lnd/contractcourt"
41
        "github.com/lightningnetwork/lnd/discovery"
42
        "github.com/lightningnetwork/lnd/feature"
43
        "github.com/lightningnetwork/lnd/fn/v2"
44
        "github.com/lightningnetwork/lnd/funding"
45
        "github.com/lightningnetwork/lnd/graph"
46
        graphdb "github.com/lightningnetwork/lnd/graph/db"
47
        "github.com/lightningnetwork/lnd/graph/db/models"
48
        "github.com/lightningnetwork/lnd/graph/graphsession"
49
        "github.com/lightningnetwork/lnd/healthcheck"
50
        "github.com/lightningnetwork/lnd/htlcswitch"
51
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
52
        "github.com/lightningnetwork/lnd/input"
53
        "github.com/lightningnetwork/lnd/invoices"
54
        "github.com/lightningnetwork/lnd/keychain"
55
        "github.com/lightningnetwork/lnd/kvdb"
56
        "github.com/lightningnetwork/lnd/lncfg"
57
        "github.com/lightningnetwork/lnd/lnencrypt"
58
        "github.com/lightningnetwork/lnd/lnpeer"
59
        "github.com/lightningnetwork/lnd/lnrpc"
60
        "github.com/lightningnetwork/lnd/lnrpc/routerrpc"
61
        "github.com/lightningnetwork/lnd/lnwallet"
62
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
63
        "github.com/lightningnetwork/lnd/lnwallet/chanfunding"
64
        "github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
65
        "github.com/lightningnetwork/lnd/lnwire"
66
        "github.com/lightningnetwork/lnd/nat"
67
        "github.com/lightningnetwork/lnd/netann"
68
        "github.com/lightningnetwork/lnd/peer"
69
        "github.com/lightningnetwork/lnd/peernotifier"
70
        "github.com/lightningnetwork/lnd/pool"
71
        "github.com/lightningnetwork/lnd/queue"
72
        "github.com/lightningnetwork/lnd/routing"
73
        "github.com/lightningnetwork/lnd/routing/localchans"
74
        "github.com/lightningnetwork/lnd/routing/route"
75
        "github.com/lightningnetwork/lnd/subscribe"
76
        "github.com/lightningnetwork/lnd/sweep"
77
        "github.com/lightningnetwork/lnd/ticker"
78
        "github.com/lightningnetwork/lnd/tor"
79
        "github.com/lightningnetwork/lnd/walletunlocker"
80
        "github.com/lightningnetwork/lnd/watchtower/blob"
81
        "github.com/lightningnetwork/lnd/watchtower/wtclient"
82
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
83
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
84
)
85

86
const (
87
        // defaultMinPeers is the minimum number of peers nodes should always be
88
        // connected to.
89
        defaultMinPeers = 3
90

91
        // defaultStableConnDuration is a floor under which all reconnection
92
        // attempts will apply exponential randomized backoff. Connections
93
        // durations exceeding this value will be eligible to have their
94
        // backoffs reduced.
95
        defaultStableConnDuration = 10 * time.Minute
96

97
        // numInstantInitReconnect specifies how many persistent peers we should
98
        // always attempt outbound connections to immediately. After this value
99
        // is surpassed, the remaining peers will be randomly delayed using
100
        // maxInitReconnectDelay.
101
        numInstantInitReconnect = 10
102

103
        // maxInitReconnectDelay specifies the maximum delay in seconds we will
104
        // apply in attempting to reconnect to persistent peers on startup. The
105
        // value used or a particular peer will be chosen between 0s and this
106
        // value.
107
        maxInitReconnectDelay = 30
108

109
        // multiAddrConnectionStagger is the number of seconds to wait between
110
        // attempting to a peer with each of its advertised addresses.
111
        multiAddrConnectionStagger = 10 * time.Second
112
)
113

114
var (
115
        // ErrPeerNotConnected signals that the server has no connection to the
116
        // given peer.
117
        ErrPeerNotConnected = errors.New("peer is not connected")
118

119
        // ErrServerNotActive indicates that the server has started but hasn't
120
        // fully finished the startup process.
121
        ErrServerNotActive = errors.New("server is still in the process of " +
122
                "starting")
123

124
        // ErrServerShuttingDown indicates that the server is in the process of
125
        // gracefully exiting.
126
        ErrServerShuttingDown = errors.New("server is shutting down")
127

128
        // MaxFundingAmount is a soft-limit of the maximum channel size
129
        // currently accepted within the Lightning Protocol. This is
130
        // defined in BOLT-0002, and serves as an initial precautionary limit
131
        // while implementations are battle tested in the real world.
132
        //
133
        // At the moment, this value depends on which chain is active. It is set
134
        // to the value under the Bitcoin chain as default.
135
        //
136
        // TODO(roasbeef): add command line param to modify.
137
        MaxFundingAmount = funding.MaxBtcFundingAmount
138

139
        // EndorsementExperimentEnd is the time after which nodes should stop
140
        // propagating experimental endorsement signals.
141
        //
142
        // Per blip04: January 1, 2026 12:00:00 AM UTC in unix seconds.
143
        EndorsementExperimentEnd = time.Unix(1767225600, 0)
144
)
145

146
// errPeerAlreadyConnected is an error returned by the server when we're
147
// commanded to connect to a peer, but they're already connected.
148
type errPeerAlreadyConnected struct {
149
        peer *peer.Brontide
150
}
151

152
// Error returns the human readable version of this error type.
153
//
154
// NOTE: Part of the error interface.
155
func (e *errPeerAlreadyConnected) Error() string {
×
156
        return fmt.Sprintf("already connected to peer: %v", e.peer)
×
157
}
×
158

159
// server is the main server of the Lightning Network Daemon. The server houses
160
// global state pertaining to the wallet, database, and the rpcserver.
161
// Additionally, the server is also used as a central messaging bus to interact
162
// with any of its companion objects.
163
type server struct {
164
        active   int32 // atomic
165
        stopping int32 // atomic
166

167
        start sync.Once
168
        stop  sync.Once
169

170
        cfg *Config
171

172
        implCfg *ImplementationCfg
173

174
        // identityECDH is an ECDH capable wrapper for the private key used
175
        // to authenticate any incoming connections.
176
        identityECDH keychain.SingleKeyECDH
177

178
        // identityKeyLoc is the key locator for the above wrapped identity key.
179
        identityKeyLoc keychain.KeyLocator
180

181
        // nodeSigner is an implementation of the MessageSigner implementation
182
        // that's backed by the identity private key of the running lnd node.
183
        nodeSigner *netann.NodeSigner
184

185
        chanStatusMgr *netann.ChanStatusManager
186

187
        // listenAddrs is the list of addresses the server is currently
188
        // listening on.
189
        listenAddrs []net.Addr
190

191
        // torController is a client that will communicate with a locally
192
        // running Tor server. This client will handle initiating and
193
        // authenticating the connection to the Tor server, automatically
194
        // creating and setting up onion services, etc.
195
        torController *tor.Controller
196

197
        // natTraversal is the specific NAT traversal technique used to
198
        // automatically set up port forwarding rules in order to advertise to
199
        // the network that the node is accepting inbound connections.
200
        natTraversal nat.Traversal
201

202
        // lastDetectedIP is the last IP detected by the NAT traversal technique
203
        // above. This IP will be watched periodically in a goroutine in order
204
        // to handle dynamic IP changes.
205
        lastDetectedIP net.IP
206

207
        mu sync.RWMutex
208

209
        // peersByPub is a map of the active peers.
210
        //
211
        // NOTE: The key used here is the raw bytes of the peer's public key to
212
        // string conversion, which means it cannot be printed using `%s` as it
213
        // will just print the binary.
214
        //
215
        // TODO(yy): Use the hex string instead.
216
        peersByPub map[string]*peer.Brontide
217

218
        inboundPeers  map[string]*peer.Brontide
219
        outboundPeers map[string]*peer.Brontide
220

221
        peerConnectedListeners    map[string][]chan<- lnpeer.Peer
222
        peerDisconnectedListeners map[string][]chan<- struct{}
223

224
        // TODO(yy): the Brontide.Start doesn't know this value, which means it
225
        // will continue to send messages even if there are no active channels
226
        // and the value below is false. Once it's pruned, all its connections
227
        // will be closed, thus the Brontide.Start will return an error.
228
        persistentPeers        map[string]bool
229
        persistentPeersBackoff map[string]time.Duration
230
        persistentPeerAddrs    map[string][]*lnwire.NetAddress
231
        persistentConnReqs     map[string][]*connmgr.ConnReq
232
        persistentRetryCancels map[string]chan struct{}
233

234
        // peerErrors keeps a set of peer error buffers for peers that have
235
        // disconnected from us. This allows us to track historic peer errors
236
        // over connections. The string of the peer's compressed pubkey is used
237
        // as a key for this map.
238
        peerErrors map[string]*queue.CircularBuffer
239

240
        // ignorePeerTermination tracks peers for which the server has initiated
241
        // a disconnect. Adding a peer to this map causes the peer termination
242
        // watcher to short circuit in the event that peers are purposefully
243
        // disconnected.
244
        ignorePeerTermination map[*peer.Brontide]struct{}
245

246
        // scheduledPeerConnection maps a pubkey string to a callback that
247
        // should be executed in the peerTerminationWatcher the prior peer with
248
        // the same pubkey exits.  This allows the server to wait until the
249
        // prior peer has cleaned up successfully, before adding the new peer
250
        // intended to replace it.
251
        scheduledPeerConnection map[string]func()
252

253
        // pongBuf is a shared pong reply buffer we'll use across all active
254
        // peer goroutines. We know the max size of a pong message
255
        // (lnwire.MaxPongBytes), so we can allocate this ahead of time, and
256
        // avoid allocations each time we need to send a pong message.
257
        pongBuf []byte
258

259
        cc *chainreg.ChainControl
260

261
        fundingMgr *funding.Manager
262

263
        graphDB *graphdb.ChannelGraph
264

265
        chanStateDB *channeldb.ChannelStateDB
266

267
        addrSource channeldb.AddrSource
268

269
        // miscDB is the DB that contains all "other" databases within the main
270
        // channel DB that haven't been separated out yet.
271
        miscDB *channeldb.DB
272

273
        invoicesDB invoices.InvoiceDB
274

275
        aliasMgr *aliasmgr.Manager
276

277
        htlcSwitch *htlcswitch.Switch
278

279
        interceptableSwitch *htlcswitch.InterceptableSwitch
280

281
        invoices *invoices.InvoiceRegistry
282

283
        invoiceHtlcModifier *invoices.HtlcModificationInterceptor
284

285
        channelNotifier *channelnotifier.ChannelNotifier
286

287
        peerNotifier *peernotifier.PeerNotifier
288

289
        htlcNotifier *htlcswitch.HtlcNotifier
290

291
        witnessBeacon contractcourt.WitnessBeacon
292

293
        breachArbitrator *contractcourt.BreachArbitrator
294

295
        missionController *routing.MissionController
296
        defaultMC         *routing.MissionControl
297

298
        graphBuilder *graph.Builder
299

300
        chanRouter *routing.ChannelRouter
301

302
        controlTower routing.ControlTower
303

304
        authGossiper *discovery.AuthenticatedGossiper
305

306
        localChanMgr *localchans.Manager
307

308
        utxoNursery *contractcourt.UtxoNursery
309

310
        sweeper *sweep.UtxoSweeper
311

312
        chainArb *contractcourt.ChainArbitrator
313

314
        sphinx *hop.OnionProcessor
315

316
        towerClientMgr *wtclient.Manager
317

318
        connMgr *connmgr.ConnManager
319

320
        sigPool *lnwallet.SigPool
321

322
        writePool *pool.Write
323

324
        readPool *pool.Read
325

326
        tlsManager *TLSManager
327

328
        // featureMgr dispatches feature vectors for various contexts within the
329
        // daemon.
330
        featureMgr *feature.Manager
331

332
        // currentNodeAnn is the node announcement that has been broadcast to
333
        // the network upon startup, if the attributes of the node (us) has
334
        // changed since last start.
335
        currentNodeAnn *lnwire.NodeAnnouncement
336

337
        // chansToRestore is the set of channels that upon starting, the server
338
        // should attempt to restore/recover.
339
        chansToRestore walletunlocker.ChannelsToRecover
340

341
        // chanSubSwapper is a sub-system that will ensure our on-disk channel
342
        // backups are consistent at all times. It interacts with the
343
        // channelNotifier to be notified of newly opened and closed channels.
344
        chanSubSwapper *chanbackup.SubSwapper
345

346
        // chanEventStore tracks the behaviour of channels and their remote peers to
347
        // provide insights into their health and performance.
348
        chanEventStore *chanfitness.ChannelEventStore
349

350
        hostAnn *netann.HostAnnouncer
351

352
        // livenessMonitor monitors that lnd has access to critical resources.
353
        livenessMonitor *healthcheck.Monitor
354

355
        customMessageServer *subscribe.Server
356

357
        // txPublisher is a publisher with fee-bumping capability.
358
        txPublisher *sweep.TxPublisher
359

360
        // blockbeatDispatcher is a block dispatcher that notifies subscribers
361
        // of new blocks.
362
        blockbeatDispatcher *chainio.BlockbeatDispatcher
363

364
        quit chan struct{}
365

366
        wg sync.WaitGroup
367
}
368

369
// updatePersistentPeerAddrs subscribes to topology changes and stores
370
// advertised addresses for any NodeAnnouncements from our persisted peers.
371
func (s *server) updatePersistentPeerAddrs() error {
×
372
        graphSub, err := s.graphBuilder.SubscribeTopology()
×
373
        if err != nil {
×
374
                return err
×
375
        }
×
376

377
        s.wg.Add(1)
×
378
        go func() {
×
379
                defer func() {
×
380
                        graphSub.Cancel()
×
381
                        s.wg.Done()
×
382
                }()
×
383

384
                for {
×
385
                        select {
×
386
                        case <-s.quit:
×
387
                                return
×
388

389
                        case topChange, ok := <-graphSub.TopologyChanges:
×
390
                                // If the router is shutting down, then we will
×
391
                                // as well.
×
392
                                if !ok {
×
393
                                        return
×
394
                                }
×
395

396
                                for _, update := range topChange.NodeUpdates {
×
397
                                        pubKeyStr := string(
×
398
                                                update.IdentityKey.
×
399
                                                        SerializeCompressed(),
×
400
                                        )
×
401

×
402
                                        // We only care about updates from
×
403
                                        // our persistentPeers.
×
404
                                        s.mu.RLock()
×
405
                                        _, ok := s.persistentPeers[pubKeyStr]
×
406
                                        s.mu.RUnlock()
×
407
                                        if !ok {
×
408
                                                continue
×
409
                                        }
410

411
                                        addrs := make([]*lnwire.NetAddress, 0,
×
412
                                                len(update.Addresses))
×
413

×
414
                                        for _, addr := range update.Addresses {
×
415
                                                addrs = append(addrs,
×
416
                                                        &lnwire.NetAddress{
×
417
                                                                IdentityKey: update.IdentityKey,
×
418
                                                                Address:     addr,
×
419
                                                                ChainNet:    s.cfg.ActiveNetParams.Net,
×
420
                                                        },
×
421
                                                )
×
422
                                        }
×
423

424
                                        s.mu.Lock()
×
425

×
426
                                        // Update the stored addresses for this
×
427
                                        // to peer to reflect the new set.
×
428
                                        s.persistentPeerAddrs[pubKeyStr] = addrs
×
429

×
430
                                        // If there are no outstanding
×
431
                                        // connection requests for this peer
×
432
                                        // then our work is done since we are
×
433
                                        // not currently trying to connect to
×
434
                                        // them.
×
435
                                        if len(s.persistentConnReqs[pubKeyStr]) == 0 {
×
436
                                                s.mu.Unlock()
×
437
                                                continue
×
438
                                        }
439

440
                                        s.mu.Unlock()
×
441

×
442
                                        s.connectToPersistentPeer(pubKeyStr)
×
443
                                }
444
                        }
445
                }
446
        }()
447

448
        return nil
×
449
}
450

451
// CustomMessage is a custom message that is received from a peer.
452
type CustomMessage struct {
453
        // Peer is the peer pubkey
454
        Peer [33]byte
455

456
        // Msg is the custom wire message.
457
        Msg *lnwire.Custom
458
}
459

460
// parseAddr parses an address from its string format to a net.Addr.
461
func parseAddr(address string, netCfg tor.Net) (net.Addr, error) {
×
462
        var (
×
463
                host string
×
464
                port int
×
465
        )
×
466

×
467
        // Split the address into its host and port components.
×
468
        h, p, err := net.SplitHostPort(address)
×
469
        if err != nil {
×
470
                // If a port wasn't specified, we'll assume the address only
×
471
                // contains the host so we'll use the default port.
×
472
                host = address
×
473
                port = defaultPeerPort
×
474
        } else {
×
475
                // Otherwise, we'll note both the host and ports.
×
476
                host = h
×
477
                portNum, err := strconv.Atoi(p)
×
478
                if err != nil {
×
479
                        return nil, err
×
480
                }
×
481
                port = portNum
×
482
        }
483

484
        if tor.IsOnionHost(host) {
×
485
                return &tor.OnionAddr{OnionService: host, Port: port}, nil
×
486
        }
×
487

488
        // If the host is part of a TCP address, we'll use the network
489
        // specific ResolveTCPAddr function in order to resolve these
490
        // addresses over Tor in order to prevent leaking your real IP
491
        // address.
492
        hostPort := net.JoinHostPort(host, strconv.Itoa(port))
×
493
        return netCfg.ResolveTCPAddr("tcp", hostPort)
×
494
}
495

496
// noiseDial is a factory function which creates a connmgr compliant dialing
497
// function by returning a closure which includes the server's identity key.
498
func noiseDial(idKey keychain.SingleKeyECDH,
499
        netCfg tor.Net, timeout time.Duration) func(net.Addr) (net.Conn, error) {
×
500

×
501
        return func(a net.Addr) (net.Conn, error) {
×
502
                lnAddr := a.(*lnwire.NetAddress)
×
503
                return brontide.Dial(idKey, lnAddr, timeout, netCfg.Dial)
×
504
        }
×
505
}
506

507
// newServer creates a new instance of the server which is to listen using the
508
// passed listener address.
509
func newServer(cfg *Config, listenAddrs []net.Addr,
510
        dbs *DatabaseInstances, cc *chainreg.ChainControl,
511
        nodeKeyDesc *keychain.KeyDescriptor,
512
        chansToRestore walletunlocker.ChannelsToRecover,
513
        chanPredicate chanacceptor.ChannelAcceptor,
514
        torController *tor.Controller, tlsManager *TLSManager,
515
        leaderElector cluster.LeaderElector,
516
        implCfg *ImplementationCfg) (*server, error) {
×
517

×
518
        var (
×
519
                err         error
×
520
                nodeKeyECDH = keychain.NewPubKeyECDH(*nodeKeyDesc, cc.KeyRing)
×
521

×
522
                // We just derived the full descriptor, so we know the public
×
523
                // key is set on it.
×
524
                nodeKeySigner = keychain.NewPubKeyMessageSigner(
×
525
                        nodeKeyDesc.PubKey, nodeKeyDesc.KeyLocator, cc.KeyRing,
×
526
                )
×
527
        )
×
528

×
529
        listeners := make([]net.Listener, len(listenAddrs))
×
530
        for i, listenAddr := range listenAddrs {
×
531
                // Note: though brontide.NewListener uses ResolveTCPAddr, it
×
532
                // doesn't need to call the general lndResolveTCP function
×
533
                // since we are resolving a local address.
×
534
                listeners[i], err = brontide.NewListener(
×
535
                        nodeKeyECDH, listenAddr.String(),
×
536
                )
×
537
                if err != nil {
×
538
                        return nil, err
×
539
                }
×
540
        }
541

542
        var serializedPubKey [33]byte
×
543
        copy(serializedPubKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
544

×
545
        netParams := cfg.ActiveNetParams.Params
×
546

×
547
        // Initialize the sphinx router.
×
548
        replayLog := htlcswitch.NewDecayedLog(
×
549
                dbs.DecayedLogDB, cc.ChainNotifier,
×
550
        )
×
551
        sphinxRouter := sphinx.NewRouter(nodeKeyECDH, replayLog)
×
552

×
553
        writeBufferPool := pool.NewWriteBuffer(
×
554
                pool.DefaultWriteBufferGCInterval,
×
555
                pool.DefaultWriteBufferExpiryInterval,
×
556
        )
×
557

×
558
        writePool := pool.NewWrite(
×
559
                writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout,
×
560
        )
×
561

×
562
        readBufferPool := pool.NewReadBuffer(
×
563
                pool.DefaultReadBufferGCInterval,
×
564
                pool.DefaultReadBufferExpiryInterval,
×
565
        )
×
566

×
567
        readPool := pool.NewRead(
×
568
                readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout,
×
569
        )
×
570

×
571
        // If the taproot overlay flag is set, but we don't have an aux funding
×
572
        // controller, then we'll exit as this is incompatible.
×
573
        if cfg.ProtocolOptions.TaprootOverlayChans &&
×
574
                implCfg.AuxFundingController.IsNone() {
×
575

×
576
                return nil, fmt.Errorf("taproot overlay flag set, but not " +
×
577
                        "aux controllers")
×
578
        }
×
579

580
        //nolint:ll
581
        featureMgr, err := feature.NewManager(feature.Config{
×
582
                NoTLVOnion:                cfg.ProtocolOptions.LegacyOnion(),
×
583
                NoStaticRemoteKey:         cfg.ProtocolOptions.NoStaticRemoteKey(),
×
584
                NoAnchors:                 cfg.ProtocolOptions.NoAnchorCommitments(),
×
585
                NoWumbo:                   !cfg.ProtocolOptions.Wumbo(),
×
586
                NoScriptEnforcementLease:  cfg.ProtocolOptions.NoScriptEnforcementLease(),
×
587
                NoKeysend:                 !cfg.AcceptKeySend,
×
588
                NoOptionScidAlias:         !cfg.ProtocolOptions.ScidAlias(),
×
589
                NoZeroConf:                !cfg.ProtocolOptions.ZeroConf(),
×
590
                NoAnySegwit:               cfg.ProtocolOptions.NoAnySegwit(),
×
591
                CustomFeatures:            cfg.ProtocolOptions.CustomFeatures(),
×
592
                NoTaprootChans:            !cfg.ProtocolOptions.TaprootChans,
×
593
                NoTaprootOverlay:          !cfg.ProtocolOptions.TaprootOverlayChans,
×
594
                NoRouteBlinding:           cfg.ProtocolOptions.NoRouteBlinding(),
×
595
                NoExperimentalEndorsement: cfg.ProtocolOptions.NoExperimentalEndorsement(),
×
596
                NoQuiescence:              cfg.ProtocolOptions.NoQuiescence(),
×
597
        })
×
598
        if err != nil {
×
599
                return nil, err
×
600
        }
×
601

602
        invoiceHtlcModifier := invoices.NewHtlcModificationInterceptor()
×
603
        registryConfig := invoices.RegistryConfig{
×
604
                FinalCltvRejectDelta:        lncfg.DefaultFinalCltvRejectDelta,
×
605
                HtlcHoldDuration:            invoices.DefaultHtlcHoldDuration,
×
606
                Clock:                       clock.NewDefaultClock(),
×
607
                AcceptKeySend:               cfg.AcceptKeySend,
×
608
                AcceptAMP:                   cfg.AcceptAMP,
×
609
                GcCanceledInvoicesOnStartup: cfg.GcCanceledInvoicesOnStartup,
×
610
                GcCanceledInvoicesOnTheFly:  cfg.GcCanceledInvoicesOnTheFly,
×
611
                KeysendHoldTime:             cfg.KeysendHoldTime,
×
612
                HtlcInterceptor:             invoiceHtlcModifier,
×
613
        }
×
614

×
615
        addrSource := channeldb.NewMultiAddrSource(dbs.ChanStateDB, dbs.GraphDB)
×
616

×
617
        s := &server{
×
618
                cfg:            cfg,
×
619
                implCfg:        implCfg,
×
620
                graphDB:        dbs.GraphDB,
×
621
                chanStateDB:    dbs.ChanStateDB.ChannelStateDB(),
×
622
                addrSource:     addrSource,
×
623
                miscDB:         dbs.ChanStateDB,
×
624
                invoicesDB:     dbs.InvoiceDB,
×
625
                cc:             cc,
×
626
                sigPool:        lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer),
×
627
                writePool:      writePool,
×
628
                readPool:       readPool,
×
629
                chansToRestore: chansToRestore,
×
630

×
631
                blockbeatDispatcher: chainio.NewBlockbeatDispatcher(
×
632
                        cc.ChainNotifier,
×
633
                ),
×
634
                channelNotifier: channelnotifier.New(
×
635
                        dbs.ChanStateDB.ChannelStateDB(),
×
636
                ),
×
637

×
638
                identityECDH:   nodeKeyECDH,
×
639
                identityKeyLoc: nodeKeyDesc.KeyLocator,
×
640
                nodeSigner:     netann.NewNodeSigner(nodeKeySigner),
×
641

×
642
                listenAddrs: listenAddrs,
×
643

×
644
                // TODO(roasbeef): derive proper onion key based on rotation
×
645
                // schedule
×
646
                sphinx: hop.NewOnionProcessor(sphinxRouter),
×
647

×
648
                torController: torController,
×
649

×
650
                persistentPeers:         make(map[string]bool),
×
651
                persistentPeersBackoff:  make(map[string]time.Duration),
×
652
                persistentConnReqs:      make(map[string][]*connmgr.ConnReq),
×
653
                persistentPeerAddrs:     make(map[string][]*lnwire.NetAddress),
×
654
                persistentRetryCancels:  make(map[string]chan struct{}),
×
655
                peerErrors:              make(map[string]*queue.CircularBuffer),
×
656
                ignorePeerTermination:   make(map[*peer.Brontide]struct{}),
×
657
                scheduledPeerConnection: make(map[string]func()),
×
658
                pongBuf:                 make([]byte, lnwire.MaxPongBytes),
×
659

×
660
                peersByPub:                make(map[string]*peer.Brontide),
×
661
                inboundPeers:              make(map[string]*peer.Brontide),
×
662
                outboundPeers:             make(map[string]*peer.Brontide),
×
663
                peerConnectedListeners:    make(map[string][]chan<- lnpeer.Peer),
×
664
                peerDisconnectedListeners: make(map[string][]chan<- struct{}),
×
665

×
666
                invoiceHtlcModifier: invoiceHtlcModifier,
×
667

×
668
                customMessageServer: subscribe.NewServer(),
×
669

×
670
                tlsManager: tlsManager,
×
671

×
672
                featureMgr: featureMgr,
×
673
                quit:       make(chan struct{}),
×
674
        }
×
675

×
676
        // Start the low-level services once they are initialized.
×
677
        //
×
678
        // TODO(yy): break the server startup into four steps,
×
679
        // 1. init the low-level services.
×
680
        // 2. start the low-level services.
×
681
        // 3. init the high-level services.
×
682
        // 4. start the high-level services.
×
683
        if err := s.startLowLevelServices(); err != nil {
×
684
                return nil, err
×
685
        }
×
686

687
        currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock()
×
688
        if err != nil {
×
689
                return nil, err
×
690
        }
×
691

692
        expiryWatcher := invoices.NewInvoiceExpiryWatcher(
×
693
                clock.NewDefaultClock(), cfg.Invoices.HoldExpiryDelta,
×
694
                uint32(currentHeight), currentHash, cc.ChainNotifier,
×
695
        )
×
696
        s.invoices = invoices.NewRegistry(
×
697
                dbs.InvoiceDB, expiryWatcher, &registryConfig,
×
698
        )
×
699

×
700
        s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
×
701

×
702
        thresholdSats := btcutil.Amount(cfg.MaxFeeExposure)
×
703
        thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
×
704

×
705
        linkUpdater := func(shortID lnwire.ShortChannelID) error {
×
706
                link, err := s.htlcSwitch.GetLinkByShortID(shortID)
×
707
                if err != nil {
×
708
                        return err
×
709
                }
×
710

711
                s.htlcSwitch.UpdateLinkAliases(link)
×
712

×
713
                return nil
×
714
        }
715

716
        s.aliasMgr, err = aliasmgr.NewManager(dbs.ChanStateDB, linkUpdater)
×
717
        if err != nil {
×
718
                return nil, err
×
719
        }
×
720

721
        s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{
×
722
                DB:                   dbs.ChanStateDB,
×
723
                FetchAllOpenChannels: s.chanStateDB.FetchAllOpenChannels,
×
724
                FetchAllChannels:     s.chanStateDB.FetchAllChannels,
×
725
                FetchClosedChannels:  s.chanStateDB.FetchClosedChannels,
×
726
                LocalChannelClose: func(pubKey []byte,
×
727
                        request *htlcswitch.ChanClose) {
×
728

×
729
                        peer, err := s.FindPeerByPubStr(string(pubKey))
×
730
                        if err != nil {
×
731
                                srvrLog.Errorf("unable to close channel, peer"+
×
732
                                        " with %v id can't be found: %v",
×
733
                                        pubKey, err,
×
734
                                )
×
735
                                return
×
736
                        }
×
737

738
                        peer.HandleLocalCloseChanReqs(request)
×
739
                },
740
                FwdingLog:              dbs.ChanStateDB.ForwardingLog(),
741
                SwitchPackager:         channeldb.NewSwitchPackager(),
742
                ExtractErrorEncrypter:  s.sphinx.ExtractErrorEncrypter,
743
                FetchLastChannelUpdate: s.fetchLastChanUpdate(),
744
                Notifier:               s.cc.ChainNotifier,
745
                HtlcNotifier:           s.htlcNotifier,
746
                FwdEventTicker:         ticker.New(htlcswitch.DefaultFwdEventInterval),
747
                LogEventTicker:         ticker.New(htlcswitch.DefaultLogInterval),
748
                AckEventTicker:         ticker.New(htlcswitch.DefaultAckInterval),
749
                AllowCircularRoute:     cfg.AllowCircularRoute,
750
                RejectHTLC:             cfg.RejectHTLC,
751
                Clock:                  clock.NewDefaultClock(),
752
                MailboxDeliveryTimeout: cfg.Htlcswitch.MailboxDeliveryTimeout,
753
                MaxFeeExposure:         thresholdMSats,
754
                SignAliasUpdate:        s.signAliasUpdate,
755
                IsAlias:                aliasmgr.IsAlias,
756
        }, uint32(currentHeight))
757
        if err != nil {
×
758
                return nil, err
×
759
        }
×
760
        s.interceptableSwitch, err = htlcswitch.NewInterceptableSwitch(
×
761
                &htlcswitch.InterceptableSwitchConfig{
×
762
                        Switch:             s.htlcSwitch,
×
763
                        CltvRejectDelta:    lncfg.DefaultFinalCltvRejectDelta,
×
764
                        CltvInterceptDelta: lncfg.DefaultCltvInterceptDelta,
×
765
                        RequireInterceptor: s.cfg.RequireInterceptor,
×
766
                        Notifier:           s.cc.ChainNotifier,
×
767
                },
×
768
        )
×
769
        if err != nil {
×
770
                return nil, err
×
771
        }
×
772

773
        s.witnessBeacon = newPreimageBeacon(
×
774
                dbs.ChanStateDB.NewWitnessCache(),
×
775
                s.interceptableSwitch.ForwardPacket,
×
776
        )
×
777

×
778
        chanStatusMgrCfg := &netann.ChanStatusConfig{
×
779
                ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
×
780
                ChanEnableTimeout:        cfg.ChanEnableTimeout,
×
781
                ChanDisableTimeout:       cfg.ChanDisableTimeout,
×
782
                OurPubKey:                nodeKeyDesc.PubKey,
×
783
                OurKeyLoc:                nodeKeyDesc.KeyLocator,
×
784
                MessageSigner:            s.nodeSigner,
×
785
                IsChannelActive:          s.htlcSwitch.HasActiveLink,
×
786
                ApplyChannelUpdate:       s.applyChannelUpdate,
×
787
                DB:                       s.chanStateDB,
×
788
                Graph:                    dbs.GraphDB,
×
789
        }
×
790

×
791
        chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
×
792
        if err != nil {
×
793
                return nil, err
×
794
        }
×
795
        s.chanStatusMgr = chanStatusMgr
×
796

×
797
        // If enabled, use either UPnP or NAT-PMP to automatically configure
×
798
        // port forwarding for users behind a NAT.
×
799
        if cfg.NAT {
×
800
                srvrLog.Info("Scanning local network for a UPnP enabled device")
×
801

×
802
                discoveryTimeout := time.Duration(10 * time.Second)
×
803

×
804
                ctx, cancel := context.WithTimeout(
×
805
                        context.Background(), discoveryTimeout,
×
806
                )
×
807
                defer cancel()
×
808
                upnp, err := nat.DiscoverUPnP(ctx)
×
809
                if err == nil {
×
810
                        s.natTraversal = upnp
×
811
                } else {
×
812
                        // If we were not able to discover a UPnP enabled device
×
813
                        // on the local network, we'll fall back to attempting
×
814
                        // to discover a NAT-PMP enabled device.
×
815
                        srvrLog.Errorf("Unable to discover a UPnP enabled "+
×
816
                                "device on the local network: %v", err)
×
817

×
818
                        srvrLog.Info("Scanning local network for a NAT-PMP " +
×
819
                                "enabled device")
×
820

×
821
                        pmp, err := nat.DiscoverPMP(discoveryTimeout)
×
822
                        if err != nil {
×
823
                                err := fmt.Errorf("unable to discover a "+
×
824
                                        "NAT-PMP enabled device on the local "+
×
825
                                        "network: %v", err)
×
826
                                srvrLog.Error(err)
×
827
                                return nil, err
×
828
                        }
×
829

830
                        s.natTraversal = pmp
×
831
                }
832
        }
833

834
        // If we were requested to automatically configure port forwarding,
835
        // we'll use the ports that the server will be listening on.
836
        externalIPStrings := make([]string, len(cfg.ExternalIPs))
×
837
        for idx, ip := range cfg.ExternalIPs {
×
838
                externalIPStrings[idx] = ip.String()
×
839
        }
×
840
        if s.natTraversal != nil {
×
841
                listenPorts := make([]uint16, 0, len(listenAddrs))
×
842
                for _, listenAddr := range listenAddrs {
×
843
                        // At this point, the listen addresses should have
×
844
                        // already been normalized, so it's safe to ignore the
×
845
                        // errors.
×
846
                        _, portStr, _ := net.SplitHostPort(listenAddr.String())
×
847
                        port, _ := strconv.Atoi(portStr)
×
848

×
849
                        listenPorts = append(listenPorts, uint16(port))
×
850
                }
×
851

852
                ips, err := s.configurePortForwarding(listenPorts...)
×
853
                if err != nil {
×
854
                        srvrLog.Errorf("Unable to automatically set up port "+
×
855
                                "forwarding using %s: %v",
×
856
                                s.natTraversal.Name(), err)
×
857
                } else {
×
858
                        srvrLog.Infof("Automatically set up port forwarding "+
×
859
                                "using %s to advertise external IP",
×
860
                                s.natTraversal.Name())
×
861
                        externalIPStrings = append(externalIPStrings, ips...)
×
862
                }
×
863
        }
864

865
        // If external IP addresses have been specified, add those to the list
866
        // of this server's addresses.
867
        externalIPs, err := lncfg.NormalizeAddresses(
×
868
                externalIPStrings, strconv.Itoa(defaultPeerPort),
×
869
                cfg.net.ResolveTCPAddr,
×
870
        )
×
871
        if err != nil {
×
872
                return nil, err
×
873
        }
×
874

875
        selfAddrs := make([]net.Addr, 0, len(externalIPs))
×
876
        selfAddrs = append(selfAddrs, externalIPs...)
×
877

×
878
        // We'll now reconstruct a node announcement based on our current
×
879
        // configuration so we can send it out as a sort of heart beat within
×
880
        // the network.
×
881
        //
×
882
        // We'll start by parsing the node color from configuration.
×
883
        color, err := lncfg.ParseHexColor(cfg.Color)
×
884
        if err != nil {
×
885
                srvrLog.Errorf("unable to parse color: %v\n", err)
×
886
                return nil, err
×
887
        }
×
888

889
        // If no alias is provided, default to first 10 characters of public
890
        // key.
891
        alias := cfg.Alias
×
892
        if alias == "" {
×
893
                alias = hex.EncodeToString(serializedPubKey[:10])
×
894
        }
×
895
        nodeAlias, err := lnwire.NewNodeAlias(alias)
×
896
        if err != nil {
×
897
                return nil, err
×
898
        }
×
899
        selfNode := &models.LightningNode{
×
900
                HaveNodeAnnouncement: true,
×
901
                LastUpdate:           time.Now(),
×
902
                Addresses:            selfAddrs,
×
903
                Alias:                nodeAlias.String(),
×
904
                Features:             s.featureMgr.Get(feature.SetNodeAnn),
×
905
                Color:                color,
×
906
        }
×
907
        copy(selfNode.PubKeyBytes[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
908

×
909
        // Based on the disk representation of the node announcement generated
×
910
        // above, we'll generate a node announcement that can go out on the
×
911
        // network so we can properly sign it.
×
912
        nodeAnn, err := selfNode.NodeAnnouncement(false)
×
913
        if err != nil {
×
914
                return nil, fmt.Errorf("unable to gen self node ann: %w", err)
×
915
        }
×
916

917
        // With the announcement generated, we'll sign it to properly
918
        // authenticate the message on the network.
919
        authSig, err := netann.SignAnnouncement(
×
920
                s.nodeSigner, nodeKeyDesc.KeyLocator, nodeAnn,
×
921
        )
×
922
        if err != nil {
×
923
                return nil, fmt.Errorf("unable to generate signature for "+
×
924
                        "self node announcement: %v", err)
×
925
        }
×
926
        selfNode.AuthSigBytes = authSig.Serialize()
×
927
        nodeAnn.Signature, err = lnwire.NewSigFromECDSARawSignature(
×
928
                selfNode.AuthSigBytes,
×
929
        )
×
930
        if err != nil {
×
931
                return nil, err
×
932
        }
×
933

934
        // Finally, we'll update the representation on disk, and update our
935
        // cached in-memory version as well.
936
        if err := dbs.GraphDB.SetSourceNode(selfNode); err != nil {
×
937
                return nil, fmt.Errorf("can't set self node: %w", err)
×
938
        }
×
939
        s.currentNodeAnn = nodeAnn
×
940

×
941
        // The router will get access to the payment ID sequencer, such that it
×
942
        // can generate unique payment IDs.
×
943
        sequencer, err := htlcswitch.NewPersistentSequencer(dbs.ChanStateDB)
×
944
        if err != nil {
×
945
                return nil, err
×
946
        }
×
947

948
        // Instantiate mission control with config from the sub server.
949
        //
950
        // TODO(joostjager): When we are further in the process of moving to sub
951
        // servers, the mission control instance itself can be moved there too.
952
        routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
×
953

×
954
        // We only initialize a probability estimator if there's no custom one.
×
955
        var estimator routing.Estimator
×
956
        if cfg.Estimator != nil {
×
957
                estimator = cfg.Estimator
×
958
        } else {
×
959
                switch routingConfig.ProbabilityEstimatorType {
×
960
                case routing.AprioriEstimatorName:
×
961
                        aCfg := routingConfig.AprioriConfig
×
962
                        aprioriConfig := routing.AprioriConfig{
×
963
                                AprioriHopProbability: aCfg.HopProbability,
×
964
                                PenaltyHalfLife:       aCfg.PenaltyHalfLife,
×
965
                                AprioriWeight:         aCfg.Weight,
×
966
                                CapacityFraction:      aCfg.CapacityFraction,
×
967
                        }
×
968

×
969
                        estimator, err = routing.NewAprioriEstimator(
×
970
                                aprioriConfig,
×
971
                        )
×
972
                        if err != nil {
×
973
                                return nil, err
×
974
                        }
×
975

976
                case routing.BimodalEstimatorName:
×
977
                        bCfg := routingConfig.BimodalConfig
×
978
                        bimodalConfig := routing.BimodalConfig{
×
979
                                BimodalNodeWeight: bCfg.NodeWeight,
×
980
                                BimodalScaleMsat: lnwire.MilliSatoshi(
×
981
                                        bCfg.Scale,
×
982
                                ),
×
983
                                BimodalDecayTime: bCfg.DecayTime,
×
984
                        }
×
985

×
986
                        estimator, err = routing.NewBimodalEstimator(
×
987
                                bimodalConfig,
×
988
                        )
×
989
                        if err != nil {
×
990
                                return nil, err
×
991
                        }
×
992

993
                default:
×
994
                        return nil, fmt.Errorf("unknown estimator type %v",
×
995
                                routingConfig.ProbabilityEstimatorType)
×
996
                }
997
        }
998

999
        mcCfg := &routing.MissionControlConfig{
×
1000
                OnConfigUpdate:          fn.Some(s.UpdateRoutingConfig),
×
1001
                Estimator:               estimator,
×
1002
                MaxMcHistory:            routingConfig.MaxMcHistory,
×
1003
                McFlushInterval:         routingConfig.McFlushInterval,
×
1004
                MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
×
1005
        }
×
1006

×
1007
        s.missionController, err = routing.NewMissionController(
×
1008
                dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg,
×
1009
        )
×
1010
        if err != nil {
×
1011
                return nil, fmt.Errorf("can't create mission control "+
×
1012
                        "manager: %w", err)
×
1013
        }
×
1014
        s.defaultMC, err = s.missionController.GetNamespacedStore(
×
1015
                routing.DefaultMissionControlNamespace,
×
1016
        )
×
1017
        if err != nil {
×
1018
                return nil, fmt.Errorf("can't create mission control in the "+
×
1019
                        "default namespace: %w", err)
×
1020
        }
×
1021

1022
        srvrLog.Debugf("Instantiating payment session source with config: "+
×
1023
                "AttemptCost=%v + %v%%, MinRouteProbability=%v",
×
1024
                int64(routingConfig.AttemptCost),
×
1025
                float64(routingConfig.AttemptCostPPM)/10000,
×
1026
                routingConfig.MinRouteProbability)
×
1027

×
1028
        pathFindingConfig := routing.PathFindingConfig{
×
1029
                AttemptCost: lnwire.NewMSatFromSatoshis(
×
1030
                        routingConfig.AttemptCost,
×
1031
                ),
×
1032
                AttemptCostPPM: routingConfig.AttemptCostPPM,
×
1033
                MinProbability: routingConfig.MinRouteProbability,
×
1034
        }
×
1035

×
1036
        sourceNode, err := dbs.GraphDB.SourceNode()
×
1037
        if err != nil {
×
1038
                return nil, fmt.Errorf("error getting source node: %w", err)
×
1039
        }
×
1040
        paymentSessionSource := &routing.SessionSource{
×
1041
                GraphSessionFactory: graphsession.NewGraphSessionFactory(
×
1042
                        dbs.GraphDB,
×
1043
                ),
×
1044
                SourceNode:        sourceNode,
×
1045
                MissionControl:    s.defaultMC,
×
1046
                GetLink:           s.htlcSwitch.GetLinkByShortID,
×
1047
                PathFindingConfig: pathFindingConfig,
×
1048
        }
×
1049

×
1050
        paymentControl := channeldb.NewPaymentControl(dbs.ChanStateDB)
×
1051

×
1052
        s.controlTower = routing.NewControlTower(paymentControl)
×
1053

×
1054
        strictPruning := cfg.Bitcoin.Node == "neutrino" ||
×
1055
                cfg.Routing.StrictZombiePruning
×
1056

×
1057
        s.graphBuilder, err = graph.NewBuilder(&graph.Config{
×
1058
                SelfNode:            selfNode.PubKeyBytes,
×
1059
                Graph:               dbs.GraphDB,
×
1060
                Chain:               cc.ChainIO,
×
1061
                ChainView:           cc.ChainView,
×
1062
                Notifier:            cc.ChainNotifier,
×
1063
                ChannelPruneExpiry:  graph.DefaultChannelPruneExpiry,
×
1064
                GraphPruneInterval:  time.Hour,
×
1065
                FirstTimePruneDelay: graph.DefaultFirstTimePruneDelay,
×
1066
                AssumeChannelValid:  cfg.Routing.AssumeChannelValid,
×
1067
                StrictZombiePruning: strictPruning,
×
1068
                IsAlias:             aliasmgr.IsAlias,
×
1069
        })
×
1070
        if err != nil {
×
1071
                return nil, fmt.Errorf("can't create graph builder: %w", err)
×
1072
        }
×
1073

1074
        s.chanRouter, err = routing.New(routing.Config{
×
1075
                SelfNode:           selfNode.PubKeyBytes,
×
1076
                RoutingGraph:       graphsession.NewRoutingGraph(dbs.GraphDB),
×
1077
                Chain:              cc.ChainIO,
×
1078
                Payer:              s.htlcSwitch,
×
1079
                Control:            s.controlTower,
×
1080
                MissionControl:     s.defaultMC,
×
1081
                SessionSource:      paymentSessionSource,
×
1082
                GetLink:            s.htlcSwitch.GetLinkByShortID,
×
1083
                NextPaymentID:      sequencer.NextID,
×
1084
                PathFindingConfig:  pathFindingConfig,
×
1085
                Clock:              clock.NewDefaultClock(),
×
1086
                ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate,
×
1087
                ClosedSCIDs:        s.fetchClosedChannelSCIDs(),
×
1088
                TrafficShaper:      implCfg.TrafficShaper,
×
1089
        })
×
1090
        if err != nil {
×
1091
                return nil, fmt.Errorf("can't create router: %w", err)
×
1092
        }
×
1093

1094
        chanSeries := discovery.NewChanSeries(s.graphDB)
×
1095
        gossipMessageStore, err := discovery.NewMessageStore(dbs.ChanStateDB)
×
1096
        if err != nil {
×
1097
                return nil, err
×
1098
        }
×
1099
        waitingProofStore, err := channeldb.NewWaitingProofStore(dbs.ChanStateDB)
×
1100
        if err != nil {
×
1101
                return nil, err
×
1102
        }
×
1103

1104
        scidCloserMan := discovery.NewScidCloserMan(s.graphDB, s.chanStateDB)
×
1105

×
1106
        s.authGossiper = discovery.New(discovery.Config{
×
1107
                Graph:                 s.graphBuilder,
×
1108
                ChainIO:               s.cc.ChainIO,
×
1109
                Notifier:              s.cc.ChainNotifier,
×
1110
                ChainHash:             *s.cfg.ActiveNetParams.GenesisHash,
×
1111
                Broadcast:             s.BroadcastMessage,
×
1112
                ChanSeries:            chanSeries,
×
1113
                NotifyWhenOnline:      s.NotifyWhenOnline,
×
1114
                NotifyWhenOffline:     s.NotifyWhenOffline,
×
1115
                FetchSelfAnnouncement: s.getNodeAnnouncement,
×
1116
                UpdateSelfAnnouncement: func() (lnwire.NodeAnnouncement,
×
1117
                        error) {
×
1118

×
1119
                        return s.genNodeAnnouncement(nil)
×
1120
                },
×
1121
                ProofMatureDelta:        0,
1122
                TrickleDelay:            time.Millisecond * time.Duration(cfg.TrickleDelay),
1123
                RetransmitTicker:        ticker.New(time.Minute * 30),
1124
                RebroadcastInterval:     time.Hour * 24,
1125
                WaitingProofStore:       waitingProofStore,
1126
                MessageStore:            gossipMessageStore,
1127
                AnnSigner:               s.nodeSigner,
1128
                RotateTicker:            ticker.New(discovery.DefaultSyncerRotationInterval),
1129
                HistoricalSyncTicker:    ticker.New(cfg.HistoricalSyncInterval),
1130
                NumActiveSyncers:        cfg.NumGraphSyncPeers,
1131
                NoTimestampQueries:      cfg.ProtocolOptions.NoTimestampQueryOption, //nolint:ll
1132
                MinimumBatchSize:        10,
1133
                SubBatchDelay:           cfg.Gossip.SubBatchDelay,
1134
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters,
1135
                PinnedSyncers:           cfg.Gossip.PinnedSyncers,
1136
                MaxChannelUpdateBurst:   cfg.Gossip.MaxChannelUpdateBurst,
1137
                ChannelUpdateInterval:   cfg.Gossip.ChannelUpdateInterval,
1138
                IsAlias:                 aliasmgr.IsAlias,
1139
                SignAliasUpdate:         s.signAliasUpdate,
1140
                FindBaseByAlias:         s.aliasMgr.FindBaseSCID,
1141
                GetAlias:                s.aliasMgr.GetPeerAlias,
1142
                FindChannel:             s.findChannel,
1143
                IsStillZombieChannel:    s.graphBuilder.IsZombieChannel,
1144
                ScidCloser:              scidCloserMan,
1145
        }, nodeKeyDesc)
1146

1147
        selfVertex := route.Vertex(nodeKeyDesc.PubKey.SerializeCompressed())
×
1148
        //nolint:ll
×
1149
        s.localChanMgr = &localchans.Manager{
×
1150
                SelfPub:              nodeKeyDesc.PubKey,
×
1151
                DefaultRoutingPolicy: cc.RoutingPolicy,
×
1152
                ForAllOutgoingChannels: func(cb func(*models.ChannelEdgeInfo,
×
1153
                        *models.ChannelEdgePolicy) error) error {
×
1154

×
1155
                        return s.graphDB.ForEachNodeChannel(selfVertex,
×
1156
                                func(_ kvdb.RTx, c *models.ChannelEdgeInfo,
×
1157
                                        e *models.ChannelEdgePolicy,
×
1158
                                        _ *models.ChannelEdgePolicy) error {
×
1159

×
1160
                                        // NOTE: The invoked callback here may
×
1161
                                        // receive a nil channel policy.
×
1162
                                        return cb(c, e)
×
1163
                                },
×
1164
                        )
1165
                },
1166
                PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
1167
                UpdateForwardingPolicies:  s.htlcSwitch.UpdateForwardingPolicies,
1168
                FetchChannel:              s.chanStateDB.FetchChannel,
1169
                AddEdge: func(edge *models.ChannelEdgeInfo) error {
×
1170
                        return s.graphBuilder.AddEdge(edge)
×
1171
                },
×
1172
        }
1173

1174
        utxnStore, err := contractcourt.NewNurseryStore(
×
1175
                s.cfg.ActiveNetParams.GenesisHash, dbs.ChanStateDB,
×
1176
        )
×
1177
        if err != nil {
×
1178
                srvrLog.Errorf("unable to create nursery store: %v", err)
×
1179
                return nil, err
×
1180
        }
×
1181

1182
        sweeperStore, err := sweep.NewSweeperStore(
×
1183
                dbs.ChanStateDB, s.cfg.ActiveNetParams.GenesisHash,
×
1184
        )
×
1185
        if err != nil {
×
1186
                srvrLog.Errorf("unable to create sweeper store: %v", err)
×
1187
                return nil, err
×
1188
        }
×
1189

1190
        aggregator := sweep.NewBudgetAggregator(
×
1191
                cc.FeeEstimator, sweep.DefaultMaxInputsPerTx,
×
1192
                s.implCfg.AuxSweeper,
×
1193
        )
×
1194

×
1195
        s.txPublisher = sweep.NewTxPublisher(sweep.TxPublisherConfig{
×
1196
                Signer:     cc.Wallet.Cfg.Signer,
×
1197
                Wallet:     cc.Wallet,
×
1198
                Estimator:  cc.FeeEstimator,
×
1199
                Notifier:   cc.ChainNotifier,
×
1200
                AuxSweeper: s.implCfg.AuxSweeper,
×
1201
        })
×
1202

×
1203
        s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
×
1204
                FeeEstimator: cc.FeeEstimator,
×
1205
                GenSweepScript: newSweepPkScriptGen(
×
1206
                        cc.Wallet, s.cfg.ActiveNetParams.Params,
×
1207
                ),
×
1208
                Signer:               cc.Wallet.Cfg.Signer,
×
1209
                Wallet:               newSweeperWallet(cc.Wallet),
×
1210
                Mempool:              cc.MempoolNotifier,
×
1211
                Notifier:             cc.ChainNotifier,
×
1212
                Store:                sweeperStore,
×
1213
                MaxInputsPerTx:       sweep.DefaultMaxInputsPerTx,
×
1214
                MaxFeeRate:           cfg.Sweeper.MaxFeeRate,
×
1215
                Aggregator:           aggregator,
×
1216
                Publisher:            s.txPublisher,
×
1217
                NoDeadlineConfTarget: cfg.Sweeper.NoDeadlineConfTarget,
×
1218
        })
×
1219

×
1220
        s.utxoNursery = contractcourt.NewUtxoNursery(&contractcourt.NurseryConfig{
×
1221
                ChainIO:             cc.ChainIO,
×
1222
                ConfDepth:           1,
×
1223
                FetchClosedChannels: s.chanStateDB.FetchClosedChannels,
×
1224
                FetchClosedChannel:  s.chanStateDB.FetchClosedChannel,
×
1225
                Notifier:            cc.ChainNotifier,
×
1226
                PublishTransaction:  cc.Wallet.PublishTransaction,
×
1227
                Store:               utxnStore,
×
1228
                SweepInput:          s.sweeper.SweepInput,
×
1229
                Budget:              s.cfg.Sweeper.Budget,
×
1230
        })
×
1231

×
1232
        // Construct a closure that wraps the htlcswitch's CloseLink method.
×
1233
        closeLink := func(chanPoint *wire.OutPoint,
×
1234
                closureType contractcourt.ChannelCloseType) {
×
1235
                // TODO(conner): Properly respect the update and error channels
×
1236
                // returned by CloseLink.
×
1237

×
1238
                // Instruct the switch to close the channel.  Provide no close out
×
1239
                // delivery script or target fee per kw because user input is not
×
1240
                // available when the remote peer closes the channel.
×
1241
                s.htlcSwitch.CloseLink(chanPoint, closureType, 0, 0, nil)
×
1242
        }
×
1243

1244
        // We will use the following channel to reliably hand off contract
1245
        // breach events from the ChannelArbitrator to the BreachArbitrator,
1246
        contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1)
×
1247

×
1248
        s.breachArbitrator = contractcourt.NewBreachArbitrator(
×
1249
                &contractcourt.BreachConfig{
×
1250
                        CloseLink: closeLink,
×
1251
                        DB:        s.chanStateDB,
×
1252
                        Estimator: s.cc.FeeEstimator,
×
1253
                        GenSweepScript: newSweepPkScriptGen(
×
1254
                                cc.Wallet, s.cfg.ActiveNetParams.Params,
×
1255
                        ),
×
1256
                        Notifier:           cc.ChainNotifier,
×
1257
                        PublishTransaction: cc.Wallet.PublishTransaction,
×
1258
                        ContractBreaches:   contractBreaches,
×
1259
                        Signer:             cc.Wallet.Cfg.Signer,
×
1260
                        Store: contractcourt.NewRetributionStore(
×
1261
                                dbs.ChanStateDB,
×
1262
                        ),
×
1263
                        AuxSweeper: s.implCfg.AuxSweeper,
×
1264
                },
×
1265
        )
×
1266

×
1267
        //nolint:ll
×
1268
        s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{
×
1269
                ChainHash:              *s.cfg.ActiveNetParams.GenesisHash,
×
1270
                IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta,
×
1271
                OutgoingBroadcastDelta: lncfg.DefaultOutgoingBroadcastDelta,
×
1272
                NewSweepAddr: func() ([]byte, error) {
×
1273
                        addr, err := newSweepPkScriptGen(
×
1274
                                cc.Wallet, netParams,
×
1275
                        )().Unpack()
×
1276
                        if err != nil {
×
1277
                                return nil, err
×
1278
                        }
×
1279

1280
                        return addr.DeliveryAddress, nil
×
1281
                },
1282
                PublishTx: cc.Wallet.PublishTransaction,
1283
                DeliverResolutionMsg: func(msgs ...contractcourt.ResolutionMsg) error {
×
1284
                        for _, msg := range msgs {
×
1285
                                err := s.htlcSwitch.ProcessContractResolution(msg)
×
1286
                                if err != nil {
×
1287
                                        return err
×
1288
                                }
×
1289
                        }
1290
                        return nil
×
1291
                },
1292
                IncubateOutputs: func(chanPoint wire.OutPoint,
1293
                        outHtlcRes fn.Option[lnwallet.OutgoingHtlcResolution],
1294
                        inHtlcRes fn.Option[lnwallet.IncomingHtlcResolution],
1295
                        broadcastHeight uint32,
1296
                        deadlineHeight fn.Option[int32]) error {
×
1297

×
1298
                        return s.utxoNursery.IncubateOutputs(
×
1299
                                chanPoint, outHtlcRes, inHtlcRes,
×
1300
                                broadcastHeight, deadlineHeight,
×
1301
                        )
×
1302
                },
×
1303
                PreimageDB:   s.witnessBeacon,
1304
                Notifier:     cc.ChainNotifier,
1305
                Mempool:      cc.MempoolNotifier,
1306
                Signer:       cc.Wallet.Cfg.Signer,
1307
                FeeEstimator: cc.FeeEstimator,
1308
                ChainIO:      cc.ChainIO,
1309
                MarkLinkInactive: func(chanPoint wire.OutPoint) error {
×
1310
                        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
×
1311
                        s.htlcSwitch.RemoveLink(chanID)
×
1312
                        return nil
×
1313
                },
×
1314
                IsOurAddress: cc.Wallet.IsOurAddress,
1315
                ContractBreach: func(chanPoint wire.OutPoint,
1316
                        breachRet *lnwallet.BreachRetribution) error {
×
1317

×
1318
                        // processACK will handle the BreachArbitrator ACKing
×
1319
                        // the event.
×
1320
                        finalErr := make(chan error, 1)
×
1321
                        processACK := func(brarErr error) {
×
1322
                                if brarErr != nil {
×
1323
                                        finalErr <- brarErr
×
1324
                                        return
×
1325
                                }
×
1326

1327
                                // If the BreachArbitrator successfully handled
1328
                                // the event, we can signal that the handoff
1329
                                // was successful.
1330
                                finalErr <- nil
×
1331
                        }
1332

1333
                        event := &contractcourt.ContractBreachEvent{
×
1334
                                ChanPoint:         chanPoint,
×
1335
                                ProcessACK:        processACK,
×
1336
                                BreachRetribution: breachRet,
×
1337
                        }
×
1338

×
1339
                        // Send the contract breach event to the
×
1340
                        // BreachArbitrator.
×
1341
                        select {
×
1342
                        case contractBreaches <- event:
×
1343
                        case <-s.quit:
×
1344
                                return ErrServerShuttingDown
×
1345
                        }
1346

1347
                        // We'll wait for a final error to be available from
1348
                        // the BreachArbitrator.
1349
                        select {
×
1350
                        case err := <-finalErr:
×
1351
                                return err
×
1352
                        case <-s.quit:
×
1353
                                return ErrServerShuttingDown
×
1354
                        }
1355
                },
1356
                DisableChannel: func(chanPoint wire.OutPoint) error {
×
1357
                        return s.chanStatusMgr.RequestDisable(chanPoint, false)
×
1358
                },
×
1359
                Sweeper:                       s.sweeper,
1360
                Registry:                      s.invoices,
1361
                NotifyClosedChannel:           s.channelNotifier.NotifyClosedChannelEvent,
1362
                NotifyFullyResolvedChannel:    s.channelNotifier.NotifyFullyResolvedChannelEvent,
1363
                OnionProcessor:                s.sphinx,
1364
                PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
1365
                IsForwardedHTLC:               s.htlcSwitch.IsForwardedHTLC,
1366
                Clock:                         clock.NewDefaultClock(),
1367
                SubscribeBreachComplete:       s.breachArbitrator.SubscribeBreachComplete,
1368
                PutFinalHtlcOutcome:           s.chanStateDB.PutOnchainFinalHtlcOutcome,
1369
                HtlcNotifier:                  s.htlcNotifier,
1370
                Budget:                        *s.cfg.Sweeper.Budget,
1371

1372
                // TODO(yy): remove this hack once PaymentCircuit is interfaced.
1373
                QueryIncomingCircuit: func(
1374
                        circuit models.CircuitKey) *models.CircuitKey {
×
1375

×
1376
                        // Get the circuit map.
×
1377
                        circuits := s.htlcSwitch.CircuitLookup()
×
1378

×
1379
                        // Lookup the outgoing circuit.
×
1380
                        pc := circuits.LookupOpenCircuit(circuit)
×
1381
                        if pc == nil {
×
1382
                                return nil
×
1383
                        }
×
1384

1385
                        return &pc.Incoming
×
1386
                },
1387
                AuxLeafStore: implCfg.AuxLeafStore,
1388
                AuxSigner:    implCfg.AuxSigner,
1389
                AuxResolver:  implCfg.AuxContractResolver,
1390
        }, dbs.ChanStateDB)
1391

1392
        // Select the configuration and funding parameters for Bitcoin.
1393
        chainCfg := cfg.Bitcoin
×
1394
        minRemoteDelay := funding.MinBtcRemoteDelay
×
1395
        maxRemoteDelay := funding.MaxBtcRemoteDelay
×
1396

×
1397
        var chanIDSeed [32]byte
×
1398
        if _, err := rand.Read(chanIDSeed[:]); err != nil {
×
1399
                return nil, err
×
1400
        }
×
1401

1402
        // Wrap the DeleteChannelEdges method so that the funding manager can
1403
        // use it without depending on several layers of indirection.
1404
        deleteAliasEdge := func(scid lnwire.ShortChannelID) (
×
1405
                *models.ChannelEdgePolicy, error) {
×
1406

×
1407
                info, e1, e2, err := s.graphDB.FetchChannelEdgesByID(
×
1408
                        scid.ToUint64(),
×
1409
                )
×
1410
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
×
1411
                        // This is unlikely but there is a slim chance of this
×
1412
                        // being hit if lnd was killed via SIGKILL and the
×
1413
                        // funding manager was stepping through the delete
×
1414
                        // alias edge logic.
×
1415
                        return nil, nil
×
1416
                } else if err != nil {
×
1417
                        return nil, err
×
1418
                }
×
1419

1420
                // Grab our key to find our policy.
1421
                var ourKey [33]byte
×
1422
                copy(ourKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
1423

×
1424
                var ourPolicy *models.ChannelEdgePolicy
×
1425
                if info != nil && info.NodeKey1Bytes == ourKey {
×
1426
                        ourPolicy = e1
×
1427
                } else {
×
1428
                        ourPolicy = e2
×
1429
                }
×
1430

1431
                if ourPolicy == nil {
×
1432
                        // Something is wrong, so return an error.
×
1433
                        return nil, fmt.Errorf("we don't have an edge")
×
1434
                }
×
1435

1436
                err = s.graphDB.DeleteChannelEdges(
×
1437
                        false, false, scid.ToUint64(),
×
1438
                )
×
1439
                return ourPolicy, err
×
1440
        }
1441

1442
        // For the reservationTimeout and the zombieSweeperInterval different
1443
        // values are set in case we are in a dev environment so enhance test
1444
        // capacilities.
1445
        reservationTimeout := chanfunding.DefaultReservationTimeout
×
1446
        zombieSweeperInterval := lncfg.DefaultZombieSweeperInterval
×
1447

×
1448
        // Get the development config for funding manager. If we are not in
×
1449
        // development mode, this would be nil.
×
1450
        var devCfg *funding.DevConfig
×
1451
        if lncfg.IsDevBuild() {
×
1452
                devCfg = &funding.DevConfig{
×
1453
                        ProcessChannelReadyWait: cfg.Dev.ChannelReadyWait(),
×
1454
                }
×
1455

×
1456
                reservationTimeout = cfg.Dev.GetReservationTimeout()
×
1457
                zombieSweeperInterval = cfg.Dev.GetZombieSweeperInterval()
×
1458

×
1459
                srvrLog.Debugf("Using the dev config for the fundingMgr: %v, "+
×
1460
                        "reservationTimeout=%v, zombieSweeperInterval=%v",
×
1461
                        devCfg, reservationTimeout, zombieSweeperInterval)
×
1462
        }
×
1463

1464
        //nolint:ll
1465
        s.fundingMgr, err = funding.NewFundingManager(funding.Config{
×
1466
                Dev:                devCfg,
×
1467
                NoWumboChans:       !cfg.ProtocolOptions.Wumbo(),
×
1468
                IDKey:              nodeKeyDesc.PubKey,
×
1469
                IDKeyLoc:           nodeKeyDesc.KeyLocator,
×
1470
                Wallet:             cc.Wallet,
×
1471
                PublishTransaction: cc.Wallet.PublishTransaction,
×
1472
                UpdateLabel: func(hash chainhash.Hash, label string) error {
×
1473
                        return cc.Wallet.LabelTransaction(hash, label, true)
×
1474
                },
×
1475
                Notifier:     cc.ChainNotifier,
1476
                ChannelDB:    s.chanStateDB,
1477
                FeeEstimator: cc.FeeEstimator,
1478
                SignMessage:  cc.MsgSigner.SignMessage,
1479
                CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement,
1480
                        error) {
×
1481

×
1482
                        return s.genNodeAnnouncement(nil)
×
1483
                },
×
1484
                SendAnnouncement:     s.authGossiper.ProcessLocalAnnouncement,
1485
                NotifyWhenOnline:     s.NotifyWhenOnline,
1486
                TempChanIDSeed:       chanIDSeed,
1487
                FindChannel:          s.findChannel,
1488
                DefaultRoutingPolicy: cc.RoutingPolicy,
1489
                DefaultMinHtlcIn:     cc.MinHtlcIn,
1490
                NumRequiredConfs: func(chanAmt btcutil.Amount,
1491
                        pushAmt lnwire.MilliSatoshi) uint16 {
×
1492
                        // For large channels we increase the number
×
1493
                        // of confirmations we require for the
×
1494
                        // channel to be considered open. As it is
×
1495
                        // always the responder that gets to choose
×
1496
                        // value, the pushAmt is value being pushed
×
1497
                        // to us. This means we have more to lose
×
1498
                        // in the case this gets re-orged out, and
×
1499
                        // we will require more confirmations before
×
1500
                        // we consider it open.
×
1501

×
1502
                        // In case the user has explicitly specified
×
1503
                        // a default value for the number of
×
1504
                        // confirmations, we use it.
×
1505
                        defaultConf := uint16(chainCfg.DefaultNumChanConfs)
×
1506
                        if defaultConf != 0 {
×
1507
                                return defaultConf
×
1508
                        }
×
1509

1510
                        minConf := uint64(3)
×
1511
                        maxConf := uint64(6)
×
1512

×
1513
                        // If this is a wumbo channel, then we'll require the
×
1514
                        // max amount of confirmations.
×
1515
                        if chanAmt > MaxFundingAmount {
×
1516
                                return uint16(maxConf)
×
1517
                        }
×
1518

1519
                        // If not we return a value scaled linearly
1520
                        // between 3 and 6, depending on channel size.
1521
                        // TODO(halseth): Use 1 as minimum?
1522
                        maxChannelSize := uint64(
×
1523
                                lnwire.NewMSatFromSatoshis(MaxFundingAmount))
×
1524
                        stake := lnwire.NewMSatFromSatoshis(chanAmt) + pushAmt
×
1525
                        conf := maxConf * uint64(stake) / maxChannelSize
×
1526
                        if conf < minConf {
×
1527
                                conf = minConf
×
1528
                        }
×
1529
                        if conf > maxConf {
×
1530
                                conf = maxConf
×
1531
                        }
×
1532
                        return uint16(conf)
×
1533
                },
1534
                RequiredRemoteDelay: func(chanAmt btcutil.Amount) uint16 {
×
1535
                        // We scale the remote CSV delay (the time the
×
1536
                        // remote have to claim funds in case of a unilateral
×
1537
                        // close) linearly from minRemoteDelay blocks
×
1538
                        // for small channels, to maxRemoteDelay blocks
×
1539
                        // for channels of size MaxFundingAmount.
×
1540

×
1541
                        // In case the user has explicitly specified
×
1542
                        // a default value for the remote delay, we
×
1543
                        // use it.
×
1544
                        defaultDelay := uint16(chainCfg.DefaultRemoteDelay)
×
1545
                        if defaultDelay > 0 {
×
1546
                                return defaultDelay
×
1547
                        }
×
1548

1549
                        // If this is a wumbo channel, then we'll require the
1550
                        // max value.
1551
                        if chanAmt > MaxFundingAmount {
×
1552
                                return maxRemoteDelay
×
1553
                        }
×
1554

1555
                        // If not we scale according to channel size.
1556
                        delay := uint16(btcutil.Amount(maxRemoteDelay) *
×
1557
                                chanAmt / MaxFundingAmount)
×
1558
                        if delay < minRemoteDelay {
×
1559
                                delay = minRemoteDelay
×
1560
                        }
×
1561
                        if delay > maxRemoteDelay {
×
1562
                                delay = maxRemoteDelay
×
1563
                        }
×
1564
                        return delay
×
1565
                },
1566
                WatchNewChannel: func(channel *channeldb.OpenChannel,
1567
                        peerKey *btcec.PublicKey) error {
×
1568

×
1569
                        // First, we'll mark this new peer as a persistent peer
×
1570
                        // for re-connection purposes. If the peer is not yet
×
1571
                        // tracked or the user hasn't requested it to be perm,
×
1572
                        // we'll set false to prevent the server from continuing
×
1573
                        // to connect to this peer even if the number of
×
1574
                        // channels with this peer is zero.
×
1575
                        s.mu.Lock()
×
1576
                        pubStr := string(peerKey.SerializeCompressed())
×
1577
                        if _, ok := s.persistentPeers[pubStr]; !ok {
×
1578
                                s.persistentPeers[pubStr] = false
×
1579
                        }
×
1580
                        s.mu.Unlock()
×
1581

×
1582
                        // With that taken care of, we'll send this channel to
×
1583
                        // the chain arb so it can react to on-chain events.
×
1584
                        return s.chainArb.WatchNewChannel(channel)
×
1585
                },
1586
                ReportShortChanID: func(chanPoint wire.OutPoint) error {
×
1587
                        cid := lnwire.NewChanIDFromOutPoint(chanPoint)
×
1588
                        return s.htlcSwitch.UpdateShortChanID(cid)
×
1589
                },
×
1590
                RequiredRemoteChanReserve: func(chanAmt,
1591
                        dustLimit btcutil.Amount) btcutil.Amount {
×
1592

×
1593
                        // By default, we'll require the remote peer to maintain
×
1594
                        // at least 1% of the total channel capacity at all
×
1595
                        // times. If this value ends up dipping below the dust
×
1596
                        // limit, then we'll use the dust limit itself as the
×
1597
                        // reserve as required by BOLT #2.
×
1598
                        reserve := chanAmt / 100
×
1599
                        if reserve < dustLimit {
×
1600
                                reserve = dustLimit
×
1601
                        }
×
1602

1603
                        return reserve
×
1604
                },
1605
                RequiredRemoteMaxValue: func(chanAmt btcutil.Amount) lnwire.MilliSatoshi {
×
1606
                        // By default, we'll allow the remote peer to fully
×
1607
                        // utilize the full bandwidth of the channel, minus our
×
1608
                        // required reserve.
×
1609
                        reserve := lnwire.NewMSatFromSatoshis(chanAmt / 100)
×
1610
                        return lnwire.NewMSatFromSatoshis(chanAmt) - reserve
×
1611
                },
×
1612
                RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 {
×
1613
                        if cfg.DefaultRemoteMaxHtlcs > 0 {
×
1614
                                return cfg.DefaultRemoteMaxHtlcs
×
1615
                        }
×
1616

1617
                        // By default, we'll permit them to utilize the full
1618
                        // channel bandwidth.
1619
                        return uint16(input.MaxHTLCNumber / 2)
×
1620
                },
1621
                ZombieSweeperInterval:         zombieSweeperInterval,
1622
                ReservationTimeout:            reservationTimeout,
1623
                MinChanSize:                   btcutil.Amount(cfg.MinChanSize),
1624
                MaxChanSize:                   btcutil.Amount(cfg.MaxChanSize),
1625
                MaxPendingChannels:            cfg.MaxPendingChannels,
1626
                RejectPush:                    cfg.RejectPush,
1627
                MaxLocalCSVDelay:              chainCfg.MaxLocalDelay,
1628
                NotifyOpenChannelEvent:        s.channelNotifier.NotifyOpenChannelEvent,
1629
                OpenChannelPredicate:          chanPredicate,
1630
                NotifyPendingOpenChannelEvent: s.channelNotifier.NotifyPendingOpenChannelEvent,
1631
                EnableUpfrontShutdown:         cfg.EnableUpfrontShutdown,
1632
                MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
1633
                        s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
1634
                DeleteAliasEdge:      deleteAliasEdge,
1635
                AliasManager:         s.aliasMgr,
1636
                IsSweeperOutpoint:    s.sweeper.IsSweeperOutpoint,
1637
                AuxFundingController: implCfg.AuxFundingController,
1638
                AuxSigner:            implCfg.AuxSigner,
1639
                AuxResolver:          implCfg.AuxContractResolver,
1640
        })
1641
        if err != nil {
×
1642
                return nil, err
×
1643
        }
×
1644

1645
        // Next, we'll assemble the sub-system that will maintain an on-disk
1646
        // static backup of the latest channel state.
1647
        chanNotifier := &channelNotifier{
×
1648
                chanNotifier: s.channelNotifier,
×
1649
                addrs:        s.addrSource,
×
1650
        }
×
1651
        backupFile := chanbackup.NewMultiFile(cfg.BackupFilePath)
×
1652
        startingChans, err := chanbackup.FetchStaticChanBackups(
×
1653
                s.chanStateDB, s.addrSource,
×
1654
        )
×
1655
        if err != nil {
×
1656
                return nil, err
×
1657
        }
×
1658
        s.chanSubSwapper, err = chanbackup.NewSubSwapper(
×
1659
                startingChans, chanNotifier, s.cc.KeyRing, backupFile,
×
1660
        )
×
1661
        if err != nil {
×
1662
                return nil, err
×
1663
        }
×
1664

1665
        // Assemble a peer notifier which will provide clients with subscriptions
1666
        // to peer online and offline events.
1667
        s.peerNotifier = peernotifier.New()
×
1668

×
1669
        // Create a channel event store which monitors all open channels.
×
1670
        s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
×
1671
                SubscribeChannelEvents: func() (subscribe.Subscription, error) {
×
1672
                        return s.channelNotifier.SubscribeChannelEvents()
×
1673
                },
×
1674
                SubscribePeerEvents: func() (subscribe.Subscription, error) {
×
1675
                        return s.peerNotifier.SubscribePeerEvents()
×
1676
                },
×
1677
                GetOpenChannels: s.chanStateDB.FetchAllOpenChannels,
1678
                Clock:           clock.NewDefaultClock(),
1679
                ReadFlapCount:   s.miscDB.ReadFlapCount,
1680
                WriteFlapCount:  s.miscDB.WriteFlapCounts,
1681
                FlapCountTicker: ticker.New(chanfitness.FlapCountFlushRate),
1682
        })
1683

1684
        if cfg.WtClient.Active {
×
1685
                policy := wtpolicy.DefaultPolicy()
×
1686
                policy.MaxUpdates = cfg.WtClient.MaxUpdates
×
1687

×
1688
                // We expose the sweep fee rate in sat/vbyte, but the tower
×
1689
                // protocol operations on sat/kw.
×
1690
                sweepRateSatPerVByte := chainfee.SatPerKVByte(
×
1691
                        1000 * cfg.WtClient.SweepFeeRate,
×
1692
                )
×
1693

×
1694
                policy.SweepFeeRate = sweepRateSatPerVByte.FeePerKWeight()
×
1695

×
1696
                if err := policy.Validate(); err != nil {
×
1697
                        return nil, err
×
1698
                }
×
1699

1700
                // authDial is the wrapper around the btrontide.Dial for the
1701
                // watchtower.
1702
                authDial := func(localKey keychain.SingleKeyECDH,
×
1703
                        netAddr *lnwire.NetAddress,
×
1704
                        dialer tor.DialFunc) (wtserver.Peer, error) {
×
1705

×
1706
                        return brontide.Dial(
×
1707
                                localKey, netAddr, cfg.ConnectionTimeout, dialer,
×
1708
                        )
×
1709
                }
×
1710

1711
                // buildBreachRetribution is a call-back that can be used to
1712
                // query the BreachRetribution info and channel type given a
1713
                // channel ID and commitment height.
1714
                buildBreachRetribution := func(chanID lnwire.ChannelID,
×
1715
                        commitHeight uint64) (*lnwallet.BreachRetribution,
×
1716
                        channeldb.ChannelType, error) {
×
1717

×
1718
                        channel, err := s.chanStateDB.FetchChannelByID(
×
1719
                                nil, chanID,
×
1720
                        )
×
1721
                        if err != nil {
×
1722
                                return nil, 0, err
×
1723
                        }
×
1724

1725
                        br, err := lnwallet.NewBreachRetribution(
×
1726
                                channel, commitHeight, 0, nil,
×
1727
                                implCfg.AuxLeafStore,
×
1728
                                implCfg.AuxContractResolver,
×
1729
                        )
×
1730
                        if err != nil {
×
1731
                                return nil, 0, err
×
1732
                        }
×
1733

1734
                        return br, channel.ChanType, nil
×
1735
                }
1736

1737
                fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID
×
1738

×
1739
                // Copy the policy for legacy channels and set the blob flag
×
1740
                // signalling support for anchor channels.
×
1741
                anchorPolicy := policy
×
1742
                anchorPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel)
×
1743

×
1744
                // Copy the policy for legacy channels and set the blob flag
×
1745
                // signalling support for taproot channels.
×
1746
                taprootPolicy := policy
×
1747
                taprootPolicy.TxPolicy.BlobType |= blob.Type(
×
1748
                        blob.FlagTaprootChannel,
×
1749
                )
×
1750

×
1751
                s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{
×
1752
                        FetchClosedChannel:     fetchClosedChannel,
×
1753
                        BuildBreachRetribution: buildBreachRetribution,
×
1754
                        SessionCloseRange:      cfg.WtClient.SessionCloseRange,
×
1755
                        ChainNotifier:          s.cc.ChainNotifier,
×
1756
                        SubscribeChannelEvents: func() (subscribe.Subscription,
×
1757
                                error) {
×
1758

×
1759
                                return s.channelNotifier.
×
1760
                                        SubscribeChannelEvents()
×
1761
                        },
×
1762
                        Signer: cc.Wallet.Cfg.Signer,
1763
                        NewAddress: func() ([]byte, error) {
×
1764
                                addr, err := newSweepPkScriptGen(
×
1765
                                        cc.Wallet, netParams,
×
1766
                                )().Unpack()
×
1767
                                if err != nil {
×
1768
                                        return nil, err
×
1769
                                }
×
1770

1771
                                return addr.DeliveryAddress, nil
×
1772
                        },
1773
                        SecretKeyRing:      s.cc.KeyRing,
1774
                        Dial:               cfg.net.Dial,
1775
                        AuthDial:           authDial,
1776
                        DB:                 dbs.TowerClientDB,
1777
                        ChainHash:          *s.cfg.ActiveNetParams.GenesisHash,
1778
                        MinBackoff:         10 * time.Second,
1779
                        MaxBackoff:         5 * time.Minute,
1780
                        MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
1781
                }, policy, anchorPolicy, taprootPolicy)
1782
                if err != nil {
×
1783
                        return nil, err
×
1784
                }
×
1785
        }
1786

1787
        if len(cfg.ExternalHosts) != 0 {
×
1788
                advertisedIPs := make(map[string]struct{})
×
1789
                for _, addr := range s.currentNodeAnn.Addresses {
×
1790
                        advertisedIPs[addr.String()] = struct{}{}
×
1791
                }
×
1792

1793
                s.hostAnn = netann.NewHostAnnouncer(netann.HostAnnouncerConfig{
×
1794
                        Hosts:         cfg.ExternalHosts,
×
1795
                        RefreshTicker: ticker.New(defaultHostSampleInterval),
×
1796
                        LookupHost: func(host string) (net.Addr, error) {
×
1797
                                return lncfg.ParseAddressString(
×
1798
                                        host, strconv.Itoa(defaultPeerPort),
×
1799
                                        cfg.net.ResolveTCPAddr,
×
1800
                                )
×
1801
                        },
×
1802
                        AdvertisedIPs: advertisedIPs,
1803
                        AnnounceNewIPs: netann.IPAnnouncer(
1804
                                func(modifier ...netann.NodeAnnModifier) (
1805
                                        lnwire.NodeAnnouncement, error) {
×
1806

×
1807
                                        return s.genNodeAnnouncement(
×
1808
                                                nil, modifier...,
×
1809
                                        )
×
1810
                                }),
×
1811
                })
1812
        }
1813

1814
        // Create liveness monitor.
1815
        s.createLivenessMonitor(cfg, cc, leaderElector)
×
1816

×
1817
        // Create the connection manager which will be responsible for
×
1818
        // maintaining persistent outbound connections and also accepting new
×
1819
        // incoming connections
×
1820
        cmgr, err := connmgr.New(&connmgr.Config{
×
1821
                Listeners:      listeners,
×
1822
                OnAccept:       s.InboundPeerConnected,
×
1823
                RetryDuration:  time.Second * 5,
×
1824
                TargetOutbound: 100,
×
1825
                Dial: noiseDial(
×
1826
                        nodeKeyECDH, s.cfg.net, s.cfg.ConnectionTimeout,
×
1827
                ),
×
1828
                OnConnection: s.OutboundPeerConnected,
×
1829
        })
×
1830
        if err != nil {
×
1831
                return nil, err
×
1832
        }
×
1833
        s.connMgr = cmgr
×
1834

×
1835
        // Finally, register the subsystems in blockbeat.
×
1836
        s.registerBlockConsumers()
×
1837

×
1838
        return s, nil
×
1839
}
1840

1841
// UpdateRoutingConfig is a callback function to update the routing config
1842
// values in the main cfg.
1843
func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) {
×
1844
        routerCfg := s.cfg.SubRPCServers.RouterRPC
×
1845

×
1846
        switch c := cfg.Estimator.Config().(type) {
×
1847
        case routing.AprioriConfig:
×
1848
                routerCfg.ProbabilityEstimatorType =
×
1849
                        routing.AprioriEstimatorName
×
1850

×
1851
                targetCfg := routerCfg.AprioriConfig
×
1852
                targetCfg.PenaltyHalfLife = c.PenaltyHalfLife
×
1853
                targetCfg.Weight = c.AprioriWeight
×
1854
                targetCfg.CapacityFraction = c.CapacityFraction
×
1855
                targetCfg.HopProbability = c.AprioriHopProbability
×
1856

1857
        case routing.BimodalConfig:
×
1858
                routerCfg.ProbabilityEstimatorType =
×
1859
                        routing.BimodalEstimatorName
×
1860

×
1861
                targetCfg := routerCfg.BimodalConfig
×
1862
                targetCfg.Scale = int64(c.BimodalScaleMsat)
×
1863
                targetCfg.NodeWeight = c.BimodalNodeWeight
×
1864
                targetCfg.DecayTime = c.BimodalDecayTime
×
1865
        }
1866

1867
        routerCfg.MaxMcHistory = cfg.MaxMcHistory
×
1868
}
1869

1870
// registerBlockConsumers registers the subsystems that consume block events.
1871
// By calling `RegisterQueue`, a list of subsystems are registered in the
1872
// blockbeat for block notifications. When a new block arrives, the subsystems
1873
// in the same queue are notified sequentially, and different queues are
1874
// notified concurrently.
1875
//
1876
// NOTE: To put a subsystem in a different queue, create a slice and pass it to
1877
// a new `RegisterQueue` call.
1878
func (s *server) registerBlockConsumers() {
×
1879
        // In this queue, when a new block arrives, it will be received and
×
1880
        // processed in this order: chainArb -> sweeper -> txPublisher.
×
1881
        consumers := []chainio.Consumer{
×
1882
                s.chainArb,
×
1883
                s.sweeper,
×
1884
                s.txPublisher,
×
1885
        }
×
1886
        s.blockbeatDispatcher.RegisterQueue(consumers)
×
1887
}
×
1888

1889
// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
1890
// used for option_scid_alias channels where the ChannelUpdate to be sent back
1891
// may differ from what is on disk.
1892
func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
1893
        error) {
×
1894

×
1895
        data, err := u.DataToSign()
×
1896
        if err != nil {
×
1897
                return nil, err
×
1898
        }
×
1899

1900
        return s.cc.MsgSigner.SignMessage(s.identityKeyLoc, data, true)
×
1901
}
1902

1903
// createLivenessMonitor creates a set of health checks using our configured
1904
// values and uses these checks to create a liveness monitor. Available
1905
// health checks,
1906
//   - chainHealthCheck (will be disabled for --nochainbackend mode)
1907
//   - diskCheck
1908
//   - tlsHealthCheck
1909
//   - torController, only created when tor is enabled.
1910
//
1911
// If a health check has been disabled by setting attempts to 0, our monitor
1912
// will not run it.
1913
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl,
1914
        leaderElector cluster.LeaderElector) {
×
1915

×
1916
        chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts
×
1917
        if cfg.Bitcoin.Node == "nochainbackend" {
×
1918
                srvrLog.Info("Disabling chain backend checks for " +
×
1919
                        "nochainbackend mode")
×
1920

×
1921
                chainBackendAttempts = 0
×
1922
        }
×
1923

1924
        chainHealthCheck := healthcheck.NewObservation(
×
1925
                "chain backend",
×
1926
                cc.HealthCheck,
×
1927
                cfg.HealthChecks.ChainCheck.Interval,
×
1928
                cfg.HealthChecks.ChainCheck.Timeout,
×
1929
                cfg.HealthChecks.ChainCheck.Backoff,
×
1930
                chainBackendAttempts,
×
1931
        )
×
1932

×
1933
        diskCheck := healthcheck.NewObservation(
×
1934
                "disk space",
×
1935
                func() error {
×
1936
                        free, err := healthcheck.AvailableDiskSpaceRatio(
×
1937
                                cfg.LndDir,
×
1938
                        )
×
1939
                        if err != nil {
×
1940
                                return err
×
1941
                        }
×
1942

1943
                        // If we have more free space than we require,
1944
                        // we return a nil error.
1945
                        if free > cfg.HealthChecks.DiskCheck.RequiredRemaining {
×
1946
                                return nil
×
1947
                        }
×
1948

1949
                        return fmt.Errorf("require: %v free space, got: %v",
×
1950
                                cfg.HealthChecks.DiskCheck.RequiredRemaining,
×
1951
                                free)
×
1952
                },
1953
                cfg.HealthChecks.DiskCheck.Interval,
1954
                cfg.HealthChecks.DiskCheck.Timeout,
1955
                cfg.HealthChecks.DiskCheck.Backoff,
1956
                cfg.HealthChecks.DiskCheck.Attempts,
1957
        )
1958

1959
        tlsHealthCheck := healthcheck.NewObservation(
×
1960
                "tls",
×
1961
                func() error {
×
1962
                        expired, expTime, err := s.tlsManager.IsCertExpired(
×
1963
                                s.cc.KeyRing,
×
1964
                        )
×
1965
                        if err != nil {
×
1966
                                return err
×
1967
                        }
×
1968
                        if expired {
×
1969
                                return fmt.Errorf("TLS certificate is "+
×
1970
                                        "expired as of %v", expTime)
×
1971
                        }
×
1972

1973
                        // If the certificate is not outdated, no error needs
1974
                        // to be returned
1975
                        return nil
×
1976
                },
1977
                cfg.HealthChecks.TLSCheck.Interval,
1978
                cfg.HealthChecks.TLSCheck.Timeout,
1979
                cfg.HealthChecks.TLSCheck.Backoff,
1980
                cfg.HealthChecks.TLSCheck.Attempts,
1981
        )
1982

1983
        checks := []*healthcheck.Observation{
×
1984
                chainHealthCheck, diskCheck, tlsHealthCheck,
×
1985
        }
×
1986

×
1987
        // If Tor is enabled, add the healthcheck for tor connection.
×
1988
        if s.torController != nil {
×
1989
                torConnectionCheck := healthcheck.NewObservation(
×
1990
                        "tor connection",
×
1991
                        func() error {
×
1992
                                return healthcheck.CheckTorServiceStatus(
×
1993
                                        s.torController,
×
1994
                                        s.createNewHiddenService,
×
1995
                                )
×
1996
                        },
×
1997
                        cfg.HealthChecks.TorConnection.Interval,
1998
                        cfg.HealthChecks.TorConnection.Timeout,
1999
                        cfg.HealthChecks.TorConnection.Backoff,
2000
                        cfg.HealthChecks.TorConnection.Attempts,
2001
                )
2002
                checks = append(checks, torConnectionCheck)
×
2003
        }
2004

2005
        // If remote signing is enabled, add the healthcheck for the remote
2006
        // signing RPC interface.
2007
        if s.cfg.RemoteSigner != nil && s.cfg.RemoteSigner.Enable {
×
2008
                // Because we have two cascading timeouts here, we need to add
×
2009
                // some slack to the "outer" one of them in case the "inner"
×
2010
                // returns exactly on time.
×
2011
                overhead := time.Millisecond * 10
×
2012

×
2013
                remoteSignerConnectionCheck := healthcheck.NewObservation(
×
2014
                        "remote signer connection",
×
2015
                        rpcwallet.HealthCheck(
×
2016
                                s.cfg.RemoteSigner,
×
2017

×
2018
                                // For the health check we might to be even
×
2019
                                // stricter than the initial/normal connect, so
×
2020
                                // we use the health check timeout here.
×
2021
                                cfg.HealthChecks.RemoteSigner.Timeout,
×
2022
                        ),
×
2023
                        cfg.HealthChecks.RemoteSigner.Interval,
×
2024
                        cfg.HealthChecks.RemoteSigner.Timeout+overhead,
×
2025
                        cfg.HealthChecks.RemoteSigner.Backoff,
×
2026
                        cfg.HealthChecks.RemoteSigner.Attempts,
×
2027
                )
×
2028
                checks = append(checks, remoteSignerConnectionCheck)
×
2029
        }
×
2030

2031
        // If we have a leader elector, we add a health check to ensure we are
2032
        // still the leader. During normal operation, we should always be the
2033
        // leader, but there are circumstances where this may change, such as
2034
        // when we lose network connectivity for long enough expiring out lease.
2035
        if leaderElector != nil {
×
2036
                leaderCheck := healthcheck.NewObservation(
×
2037
                        "leader status",
×
2038
                        func() error {
×
2039
                                // Check if we are still the leader. Note that
×
2040
                                // we don't need to use a timeout context here
×
2041
                                // as the healthcheck observer will handle the
×
2042
                                // timeout case for us.
×
2043
                                timeoutCtx, cancel := context.WithTimeout(
×
2044
                                        context.Background(),
×
2045
                                        cfg.HealthChecks.LeaderCheck.Timeout,
×
2046
                                )
×
2047
                                defer cancel()
×
2048

×
2049
                                leader, err := leaderElector.IsLeader(
×
2050
                                        timeoutCtx,
×
2051
                                )
×
2052
                                if err != nil {
×
2053
                                        return fmt.Errorf("unable to check if "+
×
2054
                                                "still leader: %v", err)
×
2055
                                }
×
2056

2057
                                if !leader {
×
2058
                                        srvrLog.Debug("Not the current leader")
×
2059
                                        return fmt.Errorf("not the current " +
×
2060
                                                "leader")
×
2061
                                }
×
2062

2063
                                return nil
×
2064
                        },
2065
                        cfg.HealthChecks.LeaderCheck.Interval,
2066
                        cfg.HealthChecks.LeaderCheck.Timeout,
2067
                        cfg.HealthChecks.LeaderCheck.Backoff,
2068
                        cfg.HealthChecks.LeaderCheck.Attempts,
2069
                )
2070

2071
                checks = append(checks, leaderCheck)
×
2072
        }
2073

2074
        // If we have not disabled all of our health checks, we create a
2075
        // liveness monitor with our configured checks.
2076
        s.livenessMonitor = healthcheck.NewMonitor(
×
2077
                &healthcheck.Config{
×
2078
                        Checks:   checks,
×
2079
                        Shutdown: srvrLog.Criticalf,
×
2080
                },
×
2081
        )
×
2082
}
2083

2084
// Started returns true if the server has been started, and false otherwise.
2085
// NOTE: This function is safe for concurrent access.
2086
func (s *server) Started() bool {
×
2087
        return atomic.LoadInt32(&s.active) != 0
×
2088
}
×
2089

2090
// cleaner is used to aggregate "cleanup" functions during an operation that
2091
// starts several subsystems. In case one of the subsystem fails to start
2092
// and a proper resource cleanup is required, the "run" method achieves this
2093
// by running all these added "cleanup" functions.
2094
type cleaner []func() error
2095

2096
// add is used to add a cleanup function to be called when
2097
// the run function is executed.
2098
func (c cleaner) add(cleanup func() error) cleaner {
×
2099
        return append(c, cleanup)
×
2100
}
×
2101

2102
// run is used to run all the previousely added cleanup functions.
2103
func (c cleaner) run() {
×
2104
        for i := len(c) - 1; i >= 0; i-- {
×
2105
                if err := c[i](); err != nil {
×
2106
                        srvrLog.Infof("Cleanup failed: %v", err)
×
2107
                }
×
2108
        }
2109
}
2110

2111
// startLowLevelServices starts the low-level services of the server. These
2112
// services must be started successfully before running the main server. The
2113
// services are,
2114
// 1. the chain notifier.
2115
//
2116
// TODO(yy): identify and add more low-level services here.
2117
func (s *server) startLowLevelServices() error {
×
2118
        var startErr error
×
2119

×
2120
        cleanup := cleaner{}
×
2121

×
2122
        cleanup = cleanup.add(s.cc.ChainNotifier.Stop)
×
2123
        if err := s.cc.ChainNotifier.Start(); err != nil {
×
2124
                startErr = err
×
2125
        }
×
2126

2127
        if startErr != nil {
×
2128
                cleanup.run()
×
2129
        }
×
2130

2131
        return startErr
×
2132
}
2133

2134
// Start starts the main daemon server, all requested listeners, and any helper
2135
// goroutines.
2136
// NOTE: This function is safe for concurrent access.
2137
//
2138
//nolint:funlen
2139
func (s *server) Start() error {
×
2140
        // Get the current blockbeat.
×
2141
        beat, err := s.getStartingBeat()
×
2142
        if err != nil {
×
2143
                return err
×
2144
        }
×
2145

2146
        var startErr error
×
2147

×
2148
        // If one sub system fails to start, the following code ensures that the
×
2149
        // previous started ones are stopped. It also ensures a proper wallet
×
2150
        // shutdown which is important for releasing its resources (boltdb, etc...)
×
2151
        cleanup := cleaner{}
×
2152

×
2153
        s.start.Do(func() {
×
2154
                cleanup = cleanup.add(s.customMessageServer.Stop)
×
2155
                if err := s.customMessageServer.Start(); err != nil {
×
2156
                        startErr = err
×
2157
                        return
×
2158
                }
×
2159

2160
                if s.hostAnn != nil {
×
2161
                        cleanup = cleanup.add(s.hostAnn.Stop)
×
2162
                        if err := s.hostAnn.Start(); err != nil {
×
2163
                                startErr = err
×
2164
                                return
×
2165
                        }
×
2166
                }
2167

2168
                if s.livenessMonitor != nil {
×
2169
                        cleanup = cleanup.add(s.livenessMonitor.Stop)
×
2170
                        if err := s.livenessMonitor.Start(); err != nil {
×
2171
                                startErr = err
×
2172
                                return
×
2173
                        }
×
2174
                }
2175

2176
                // Start the notification server. This is used so channel
2177
                // management goroutines can be notified when a funding
2178
                // transaction reaches a sufficient number of confirmations, or
2179
                // when the input for the funding transaction is spent in an
2180
                // attempt at an uncooperative close by the counterparty.
2181
                cleanup = cleanup.add(s.sigPool.Stop)
×
2182
                if err := s.sigPool.Start(); err != nil {
×
2183
                        startErr = err
×
2184
                        return
×
2185
                }
×
2186

2187
                cleanup = cleanup.add(s.writePool.Stop)
×
2188
                if err := s.writePool.Start(); err != nil {
×
2189
                        startErr = err
×
2190
                        return
×
2191
                }
×
2192

2193
                cleanup = cleanup.add(s.readPool.Stop)
×
2194
                if err := s.readPool.Start(); err != nil {
×
2195
                        startErr = err
×
2196
                        return
×
2197
                }
×
2198

2199
                cleanup = cleanup.add(s.cc.BestBlockTracker.Stop)
×
2200
                if err := s.cc.BestBlockTracker.Start(); err != nil {
×
2201
                        startErr = err
×
2202
                        return
×
2203
                }
×
2204

2205
                cleanup = cleanup.add(s.channelNotifier.Stop)
×
2206
                if err := s.channelNotifier.Start(); err != nil {
×
2207
                        startErr = err
×
2208
                        return
×
2209
                }
×
2210

2211
                cleanup = cleanup.add(func() error {
×
2212
                        return s.peerNotifier.Stop()
×
2213
                })
×
2214
                if err := s.peerNotifier.Start(); err != nil {
×
2215
                        startErr = err
×
2216
                        return
×
2217
                }
×
2218

2219
                cleanup = cleanup.add(s.htlcNotifier.Stop)
×
2220
                if err := s.htlcNotifier.Start(); err != nil {
×
2221
                        startErr = err
×
2222
                        return
×
2223
                }
×
2224

2225
                if s.towerClientMgr != nil {
×
2226
                        cleanup = cleanup.add(s.towerClientMgr.Stop)
×
2227
                        if err := s.towerClientMgr.Start(); err != nil {
×
2228
                                startErr = err
×
2229
                                return
×
2230
                        }
×
2231
                }
2232

2233
                cleanup = cleanup.add(s.txPublisher.Stop)
×
2234
                if err := s.txPublisher.Start(beat); err != nil {
×
2235
                        startErr = err
×
2236
                        return
×
2237
                }
×
2238

2239
                cleanup = cleanup.add(s.sweeper.Stop)
×
2240
                if err := s.sweeper.Start(beat); err != nil {
×
2241
                        startErr = err
×
2242
                        return
×
2243
                }
×
2244

2245
                cleanup = cleanup.add(s.utxoNursery.Stop)
×
2246
                if err := s.utxoNursery.Start(); err != nil {
×
2247
                        startErr = err
×
2248
                        return
×
2249
                }
×
2250

2251
                cleanup = cleanup.add(s.breachArbitrator.Stop)
×
2252
                if err := s.breachArbitrator.Start(); err != nil {
×
2253
                        startErr = err
×
2254
                        return
×
2255
                }
×
2256

2257
                cleanup = cleanup.add(s.fundingMgr.Stop)
×
2258
                if err := s.fundingMgr.Start(); err != nil {
×
2259
                        startErr = err
×
2260
                        return
×
2261
                }
×
2262

2263
                // htlcSwitch must be started before chainArb since the latter
2264
                // relies on htlcSwitch to deliver resolution message upon
2265
                // start.
2266
                cleanup = cleanup.add(s.htlcSwitch.Stop)
×
2267
                if err := s.htlcSwitch.Start(); err != nil {
×
2268
                        startErr = err
×
2269
                        return
×
2270
                }
×
2271

2272
                cleanup = cleanup.add(s.interceptableSwitch.Stop)
×
2273
                if err := s.interceptableSwitch.Start(); err != nil {
×
2274
                        startErr = err
×
2275
                        return
×
2276
                }
×
2277

2278
                cleanup = cleanup.add(s.invoiceHtlcModifier.Stop)
×
2279
                if err := s.invoiceHtlcModifier.Start(); err != nil {
×
2280
                        startErr = err
×
2281
                        return
×
2282
                }
×
2283

2284
                cleanup = cleanup.add(s.chainArb.Stop)
×
2285
                if err := s.chainArb.Start(beat); err != nil {
×
2286
                        startErr = err
×
2287
                        return
×
2288
                }
×
2289

2290
                cleanup = cleanup.add(s.graphBuilder.Stop)
×
2291
                if err := s.graphBuilder.Start(); err != nil {
×
2292
                        startErr = err
×
2293
                        return
×
2294
                }
×
2295

2296
                cleanup = cleanup.add(s.chanRouter.Stop)
×
2297
                if err := s.chanRouter.Start(); err != nil {
×
2298
                        startErr = err
×
2299
                        return
×
2300
                }
×
2301
                // The authGossiper depends on the chanRouter and therefore
2302
                // should be started after it.
2303
                cleanup = cleanup.add(s.authGossiper.Stop)
×
2304
                if err := s.authGossiper.Start(); err != nil {
×
2305
                        startErr = err
×
2306
                        return
×
2307
                }
×
2308

2309
                cleanup = cleanup.add(s.invoices.Stop)
×
2310
                if err := s.invoices.Start(); err != nil {
×
2311
                        startErr = err
×
2312
                        return
×
2313
                }
×
2314

2315
                cleanup = cleanup.add(s.sphinx.Stop)
×
2316
                if err := s.sphinx.Start(); err != nil {
×
2317
                        startErr = err
×
2318
                        return
×
2319
                }
×
2320

2321
                cleanup = cleanup.add(s.chanStatusMgr.Stop)
×
2322
                if err := s.chanStatusMgr.Start(); err != nil {
×
2323
                        startErr = err
×
2324
                        return
×
2325
                }
×
2326

2327
                cleanup = cleanup.add(s.chanEventStore.Stop)
×
2328
                if err := s.chanEventStore.Start(); err != nil {
×
2329
                        startErr = err
×
2330
                        return
×
2331
                }
×
2332

2333
                cleanup.add(func() error {
×
2334
                        s.missionController.StopStoreTickers()
×
2335
                        return nil
×
2336
                })
×
2337
                s.missionController.RunStoreTickers()
×
2338

×
2339
                // Before we start the connMgr, we'll check to see if we have
×
2340
                // any backups to recover. We do this now as we want to ensure
×
2341
                // that have all the information we need to handle channel
×
2342
                // recovery _before_ we even accept connections from any peers.
×
2343
                chanRestorer := &chanDBRestorer{
×
2344
                        db:         s.chanStateDB,
×
2345
                        secretKeys: s.cc.KeyRing,
×
2346
                        chainArb:   s.chainArb,
×
2347
                }
×
2348
                if len(s.chansToRestore.PackedSingleChanBackups) != 0 {
×
2349
                        _, err := chanbackup.UnpackAndRecoverSingles(
×
2350
                                s.chansToRestore.PackedSingleChanBackups,
×
2351
                                s.cc.KeyRing, chanRestorer, s,
×
2352
                        )
×
2353
                        if err != nil {
×
2354
                                startErr = fmt.Errorf("unable to unpack single "+
×
2355
                                        "backups: %v", err)
×
2356
                                return
×
2357
                        }
×
2358
                }
2359
                if len(s.chansToRestore.PackedMultiChanBackup) != 0 {
×
2360
                        _, err := chanbackup.UnpackAndRecoverMulti(
×
2361
                                s.chansToRestore.PackedMultiChanBackup,
×
2362
                                s.cc.KeyRing, chanRestorer, s,
×
2363
                        )
×
2364
                        if err != nil {
×
2365
                                startErr = fmt.Errorf("unable to unpack chan "+
×
2366
                                        "backup: %v", err)
×
2367
                                return
×
2368
                        }
×
2369
                }
2370

2371
                // chanSubSwapper must be started after the `channelNotifier`
2372
                // because it depends on channel events as a synchronization
2373
                // point.
2374
                cleanup = cleanup.add(s.chanSubSwapper.Stop)
×
2375
                if err := s.chanSubSwapper.Start(); err != nil {
×
2376
                        startErr = err
×
2377
                        return
×
2378
                }
×
2379

2380
                if s.torController != nil {
×
2381
                        cleanup = cleanup.add(s.torController.Stop)
×
2382
                        if err := s.createNewHiddenService(); err != nil {
×
2383
                                startErr = err
×
2384
                                return
×
2385
                        }
×
2386
                }
2387

2388
                if s.natTraversal != nil {
×
2389
                        s.wg.Add(1)
×
2390
                        go s.watchExternalIP()
×
2391
                }
×
2392

2393
                // Start connmgr last to prevent connections before init.
2394
                cleanup = cleanup.add(func() error {
×
2395
                        s.connMgr.Stop()
×
2396
                        return nil
×
2397
                })
×
2398
                s.connMgr.Start()
×
2399

×
2400
                // If peers are specified as a config option, we'll add those
×
2401
                // peers first.
×
2402
                for _, peerAddrCfg := range s.cfg.AddPeers {
×
2403
                        parsedPubkey, parsedHost, err := lncfg.ParseLNAddressPubkey(
×
2404
                                peerAddrCfg,
×
2405
                        )
×
2406
                        if err != nil {
×
2407
                                startErr = fmt.Errorf("unable to parse peer "+
×
2408
                                        "pubkey from config: %v", err)
×
2409
                                return
×
2410
                        }
×
2411
                        addr, err := parseAddr(parsedHost, s.cfg.net)
×
2412
                        if err != nil {
×
2413
                                startErr = fmt.Errorf("unable to parse peer "+
×
2414
                                        "address provided as a config option: "+
×
2415
                                        "%v", err)
×
2416
                                return
×
2417
                        }
×
2418

2419
                        peerAddr := &lnwire.NetAddress{
×
2420
                                IdentityKey: parsedPubkey,
×
2421
                                Address:     addr,
×
2422
                                ChainNet:    s.cfg.ActiveNetParams.Net,
×
2423
                        }
×
2424

×
2425
                        err = s.ConnectToPeer(
×
2426
                                peerAddr, true,
×
2427
                                s.cfg.ConnectionTimeout,
×
2428
                        )
×
2429
                        if err != nil {
×
2430
                                startErr = fmt.Errorf("unable to connect to "+
×
2431
                                        "peer address provided as a config "+
×
2432
                                        "option: %v", err)
×
2433
                                return
×
2434
                        }
×
2435
                }
2436

2437
                // Subscribe to NodeAnnouncements that advertise new addresses
2438
                // our persistent peers.
2439
                if err := s.updatePersistentPeerAddrs(); err != nil {
×
2440
                        startErr = err
×
2441
                        return
×
2442
                }
×
2443

2444
                // With all the relevant sub-systems started, we'll now attempt
2445
                // to establish persistent connections to our direct channel
2446
                // collaborators within the network. Before doing so however,
2447
                // we'll prune our set of link nodes found within the database
2448
                // to ensure we don't reconnect to any nodes we no longer have
2449
                // open channels with.
2450
                if err := s.chanStateDB.PruneLinkNodes(); err != nil {
×
2451
                        startErr = err
×
2452
                        return
×
2453
                }
×
2454
                if err := s.establishPersistentConnections(); err != nil {
×
2455
                        startErr = err
×
2456
                        return
×
2457
                }
×
2458

2459
                // setSeedList is a helper function that turns multiple DNS seed
2460
                // server tuples from the command line or config file into the
2461
                // data structure we need and does a basic formal sanity check
2462
                // in the process.
2463
                setSeedList := func(tuples []string, genesisHash chainhash.Hash) {
×
2464
                        if len(tuples) == 0 {
×
2465
                                return
×
2466
                        }
×
2467

2468
                        result := make([][2]string, len(tuples))
×
2469
                        for idx, tuple := range tuples {
×
2470
                                tuple = strings.TrimSpace(tuple)
×
2471
                                if len(tuple) == 0 {
×
2472
                                        return
×
2473
                                }
×
2474

2475
                                servers := strings.Split(tuple, ",")
×
2476
                                if len(servers) > 2 || len(servers) == 0 {
×
2477
                                        srvrLog.Warnf("Ignoring invalid DNS "+
×
2478
                                                "seed tuple: %v", servers)
×
2479
                                        return
×
2480
                                }
×
2481

2482
                                copy(result[idx][:], servers)
×
2483
                        }
2484

2485
                        chainreg.ChainDNSSeeds[genesisHash] = result
×
2486
                }
2487

2488
                // Let users overwrite the DNS seed nodes. We only allow them
2489
                // for bitcoin mainnet/testnet/signet.
2490
                if s.cfg.Bitcoin.MainNet {
×
2491
                        setSeedList(
×
2492
                                s.cfg.Bitcoin.DNSSeeds,
×
2493
                                chainreg.BitcoinMainnetGenesis,
×
2494
                        )
×
2495
                }
×
2496
                if s.cfg.Bitcoin.TestNet3 {
×
2497
                        setSeedList(
×
2498
                                s.cfg.Bitcoin.DNSSeeds,
×
2499
                                chainreg.BitcoinTestnetGenesis,
×
2500
                        )
×
2501
                }
×
2502
                if s.cfg.Bitcoin.SigNet {
×
2503
                        setSeedList(
×
2504
                                s.cfg.Bitcoin.DNSSeeds,
×
2505
                                chainreg.BitcoinSignetGenesis,
×
2506
                        )
×
2507
                }
×
2508

2509
                // If network bootstrapping hasn't been disabled, then we'll
2510
                // configure the set of active bootstrappers, and launch a
2511
                // dedicated goroutine to maintain a set of persistent
2512
                // connections.
2513
                if shouldPeerBootstrap(s.cfg) {
×
2514
                        bootstrappers, err := initNetworkBootstrappers(s)
×
2515
                        if err != nil {
×
2516
                                startErr = err
×
2517
                                return
×
2518
                        }
×
2519

2520
                        s.wg.Add(1)
×
2521
                        go s.peerBootstrapper(defaultMinPeers, bootstrappers)
×
2522
                } else {
×
2523
                        srvrLog.Infof("Auto peer bootstrapping is disabled")
×
2524
                }
×
2525

2526
                // Start the blockbeat after all other subsystems have been
2527
                // started so they are ready to receive new blocks.
2528
                cleanup = cleanup.add(func() error {
×
2529
                        s.blockbeatDispatcher.Stop()
×
2530
                        return nil
×
2531
                })
×
2532
                if err := s.blockbeatDispatcher.Start(); err != nil {
×
2533
                        startErr = err
×
2534
                        return
×
2535
                }
×
2536

2537
                // Set the active flag now that we've completed the full
2538
                // startup.
2539
                atomic.StoreInt32(&s.active, 1)
×
2540
        })
2541

2542
        if startErr != nil {
×
2543
                cleanup.run()
×
2544
        }
×
2545
        return startErr
×
2546
}
2547

2548
// Stop gracefully shutsdown the main daemon server. This function will signal
2549
// any active goroutines, or helper objects to exit, then blocks until they've
2550
// all successfully exited. Additionally, any/all listeners are closed.
2551
// NOTE: This function is safe for concurrent access.
2552
func (s *server) Stop() error {
×
2553
        s.stop.Do(func() {
×
2554
                atomic.StoreInt32(&s.stopping, 1)
×
2555

×
2556
                close(s.quit)
×
2557

×
2558
                // Shutdown connMgr first to prevent conns during shutdown.
×
2559
                s.connMgr.Stop()
×
2560

×
2561
                // Stop dispatching blocks to other systems immediately.
×
2562
                s.blockbeatDispatcher.Stop()
×
2563

×
2564
                // Shutdown the wallet, funding manager, and the rpc server.
×
2565
                if err := s.chanStatusMgr.Stop(); err != nil {
×
2566
                        srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)
×
2567
                }
×
2568
                if err := s.htlcSwitch.Stop(); err != nil {
×
2569
                        srvrLog.Warnf("failed to stop htlcSwitch: %v", err)
×
2570
                }
×
2571
                if err := s.sphinx.Stop(); err != nil {
×
2572
                        srvrLog.Warnf("failed to stop sphinx: %v", err)
×
2573
                }
×
2574
                if err := s.invoices.Stop(); err != nil {
×
2575
                        srvrLog.Warnf("failed to stop invoices: %v", err)
×
2576
                }
×
2577
                if err := s.interceptableSwitch.Stop(); err != nil {
×
2578
                        srvrLog.Warnf("failed to stop interceptable "+
×
2579
                                "switch: %v", err)
×
2580
                }
×
2581
                if err := s.invoiceHtlcModifier.Stop(); err != nil {
×
2582
                        srvrLog.Warnf("failed to stop htlc invoices "+
×
2583
                                "modifier: %v", err)
×
2584
                }
×
2585
                if err := s.chanRouter.Stop(); err != nil {
×
2586
                        srvrLog.Warnf("failed to stop chanRouter: %v", err)
×
2587
                }
×
2588
                if err := s.graphBuilder.Stop(); err != nil {
×
2589
                        srvrLog.Warnf("failed to stop graphBuilder %v", err)
×
2590
                }
×
2591
                if err := s.chainArb.Stop(); err != nil {
×
2592
                        srvrLog.Warnf("failed to stop chainArb: %v", err)
×
2593
                }
×
2594
                if err := s.fundingMgr.Stop(); err != nil {
×
2595
                        srvrLog.Warnf("failed to stop fundingMgr: %v", err)
×
2596
                }
×
2597
                if err := s.breachArbitrator.Stop(); err != nil {
×
2598
                        srvrLog.Warnf("failed to stop breachArbitrator: %v",
×
2599
                                err)
×
2600
                }
×
2601
                if err := s.utxoNursery.Stop(); err != nil {
×
2602
                        srvrLog.Warnf("failed to stop utxoNursery: %v", err)
×
2603
                }
×
2604
                if err := s.authGossiper.Stop(); err != nil {
×
2605
                        srvrLog.Warnf("failed to stop authGossiper: %v", err)
×
2606
                }
×
2607
                if err := s.sweeper.Stop(); err != nil {
×
2608
                        srvrLog.Warnf("failed to stop sweeper: %v", err)
×
2609
                }
×
2610
                if err := s.txPublisher.Stop(); err != nil {
×
2611
                        srvrLog.Warnf("failed to stop txPublisher: %v", err)
×
2612
                }
×
2613
                if err := s.channelNotifier.Stop(); err != nil {
×
2614
                        srvrLog.Warnf("failed to stop channelNotifier: %v", err)
×
2615
                }
×
2616
                if err := s.peerNotifier.Stop(); err != nil {
×
2617
                        srvrLog.Warnf("failed to stop peerNotifier: %v", err)
×
2618
                }
×
2619
                if err := s.htlcNotifier.Stop(); err != nil {
×
2620
                        srvrLog.Warnf("failed to stop htlcNotifier: %v", err)
×
2621
                }
×
2622

2623
                // Update channel.backup file. Make sure to do it before
2624
                // stopping chanSubSwapper.
2625
                singles, err := chanbackup.FetchStaticChanBackups(
×
2626
                        s.chanStateDB, s.addrSource,
×
2627
                )
×
2628
                if err != nil {
×
2629
                        srvrLog.Warnf("failed to fetch channel states: %v",
×
2630
                                err)
×
2631
                } else {
×
2632
                        err := s.chanSubSwapper.ManualUpdate(singles)
×
2633
                        if err != nil {
×
2634
                                srvrLog.Warnf("Manual update of channel "+
×
2635
                                        "backup failed: %v", err)
×
2636
                        }
×
2637
                }
2638

2639
                if err := s.chanSubSwapper.Stop(); err != nil {
×
2640
                        srvrLog.Warnf("failed to stop chanSubSwapper: %v", err)
×
2641
                }
×
2642
                if err := s.cc.ChainNotifier.Stop(); err != nil {
×
2643
                        srvrLog.Warnf("Unable to stop ChainNotifier: %v", err)
×
2644
                }
×
2645
                if err := s.cc.BestBlockTracker.Stop(); err != nil {
×
2646
                        srvrLog.Warnf("Unable to stop BestBlockTracker: %v",
×
2647
                                err)
×
2648
                }
×
2649
                if err := s.chanEventStore.Stop(); err != nil {
×
2650
                        srvrLog.Warnf("Unable to stop ChannelEventStore: %v",
×
2651
                                err)
×
2652
                }
×
2653
                s.missionController.StopStoreTickers()
×
2654

×
2655
                // Disconnect from each active peers to ensure that
×
2656
                // peerTerminationWatchers signal completion to each peer.
×
2657
                for _, peer := range s.Peers() {
×
2658
                        err := s.DisconnectPeer(peer.IdentityKey())
×
2659
                        if err != nil {
×
2660
                                srvrLog.Warnf("could not disconnect peer: %v"+
×
2661
                                        "received error: %v", peer.IdentityKey(),
×
2662
                                        err,
×
2663
                                )
×
2664
                        }
×
2665
                }
2666

2667
                // Now that all connections have been torn down, stop the tower
2668
                // client which will reliably flush all queued states to the
2669
                // tower. If this is halted for any reason, the force quit timer
2670
                // will kick in and abort to allow this method to return.
2671
                if s.towerClientMgr != nil {
×
2672
                        if err := s.towerClientMgr.Stop(); err != nil {
×
2673
                                srvrLog.Warnf("Unable to shut down tower "+
×
2674
                                        "client manager: %v", err)
×
2675
                        }
×
2676
                }
2677

2678
                if s.hostAnn != nil {
×
2679
                        if err := s.hostAnn.Stop(); err != nil {
×
2680
                                srvrLog.Warnf("unable to shut down host "+
×
2681
                                        "annoucner: %v", err)
×
2682
                        }
×
2683
                }
2684

2685
                if s.livenessMonitor != nil {
×
2686
                        if err := s.livenessMonitor.Stop(); err != nil {
×
2687
                                srvrLog.Warnf("unable to shutdown liveness "+
×
2688
                                        "monitor: %v", err)
×
2689
                        }
×
2690
                }
2691

2692
                // Wait for all lingering goroutines to quit.
2693
                srvrLog.Debug("Waiting for server to shutdown...")
×
2694
                s.wg.Wait()
×
2695

×
2696
                srvrLog.Debug("Stopping buffer pools...")
×
2697
                s.sigPool.Stop()
×
2698
                s.writePool.Stop()
×
2699
                s.readPool.Stop()
×
2700
        })
2701

2702
        return nil
×
2703
}
2704

2705
// Stopped returns true if the server has been instructed to shutdown.
2706
// NOTE: This function is safe for concurrent access.
2707
func (s *server) Stopped() bool {
×
2708
        return atomic.LoadInt32(&s.stopping) != 0
×
2709
}
×
2710

2711
// configurePortForwarding attempts to set up port forwarding for the different
2712
// ports that the server will be listening on.
2713
//
2714
// NOTE: This should only be used when using some kind of NAT traversal to
2715
// automatically set up forwarding rules.
2716
func (s *server) configurePortForwarding(ports ...uint16) ([]string, error) {
×
2717
        ip, err := s.natTraversal.ExternalIP()
×
2718
        if err != nil {
×
2719
                return nil, err
×
2720
        }
×
2721
        s.lastDetectedIP = ip
×
2722

×
2723
        externalIPs := make([]string, 0, len(ports))
×
2724
        for _, port := range ports {
×
2725
                if err := s.natTraversal.AddPortMapping(port); err != nil {
×
2726
                        srvrLog.Debugf("Unable to forward port %d: %v", port, err)
×
2727
                        continue
×
2728
                }
2729

2730
                hostIP := fmt.Sprintf("%v:%d", ip, port)
×
2731
                externalIPs = append(externalIPs, hostIP)
×
2732
        }
2733

2734
        return externalIPs, nil
×
2735
}
2736

2737
// removePortForwarding attempts to clear the forwarding rules for the different
2738
// ports the server is currently listening on.
2739
//
2740
// NOTE: This should only be used when using some kind of NAT traversal to
2741
// automatically set up forwarding rules.
2742
func (s *server) removePortForwarding() {
×
2743
        forwardedPorts := s.natTraversal.ForwardedPorts()
×
2744
        for _, port := range forwardedPorts {
×
2745
                if err := s.natTraversal.DeletePortMapping(port); err != nil {
×
2746
                        srvrLog.Errorf("Unable to remove forwarding rules for "+
×
2747
                                "port %d: %v", port, err)
×
2748
                }
×
2749
        }
2750
}
2751

2752
// watchExternalIP continuously checks for an updated external IP address every
2753
// 15 minutes. Once a new IP address has been detected, it will automatically
2754
// handle port forwarding rules and send updated node announcements to the
2755
// currently connected peers.
2756
//
2757
// NOTE: This MUST be run as a goroutine.
2758
func (s *server) watchExternalIP() {
×
2759
        defer s.wg.Done()
×
2760

×
2761
        // Before exiting, we'll make sure to remove the forwarding rules set
×
2762
        // up by the server.
×
2763
        defer s.removePortForwarding()
×
2764

×
2765
        // Keep track of the external IPs set by the user to avoid replacing
×
2766
        // them when detecting a new IP.
×
2767
        ipsSetByUser := make(map[string]struct{})
×
2768
        for _, ip := range s.cfg.ExternalIPs {
×
2769
                ipsSetByUser[ip.String()] = struct{}{}
×
2770
        }
×
2771

2772
        forwardedPorts := s.natTraversal.ForwardedPorts()
×
2773

×
2774
        ticker := time.NewTicker(15 * time.Minute)
×
2775
        defer ticker.Stop()
×
2776
out:
×
2777
        for {
×
2778
                select {
×
2779
                case <-ticker.C:
×
2780
                        // We'll start off by making sure a new IP address has
×
2781
                        // been detected.
×
2782
                        ip, err := s.natTraversal.ExternalIP()
×
2783
                        if err != nil {
×
2784
                                srvrLog.Debugf("Unable to retrieve the "+
×
2785
                                        "external IP address: %v", err)
×
2786
                                continue
×
2787
                        }
2788

2789
                        // Periodically renew the NAT port forwarding.
2790
                        for _, port := range forwardedPorts {
×
2791
                                err := s.natTraversal.AddPortMapping(port)
×
2792
                                if err != nil {
×
2793
                                        srvrLog.Warnf("Unable to automatically "+
×
2794
                                                "re-create port forwarding using %s: %v",
×
2795
                                                s.natTraversal.Name(), err)
×
2796
                                } else {
×
2797
                                        srvrLog.Debugf("Automatically re-created "+
×
2798
                                                "forwarding for port %d using %s to "+
×
2799
                                                "advertise external IP",
×
2800
                                                port, s.natTraversal.Name())
×
2801
                                }
×
2802
                        }
2803

2804
                        if ip.Equal(s.lastDetectedIP) {
×
2805
                                continue
×
2806
                        }
2807

2808
                        srvrLog.Infof("Detected new external IP address %s", ip)
×
2809

×
2810
                        // Next, we'll craft the new addresses that will be
×
2811
                        // included in the new node announcement and advertised
×
2812
                        // to the network. Each address will consist of the new
×
2813
                        // IP detected and one of the currently advertised
×
2814
                        // ports.
×
2815
                        var newAddrs []net.Addr
×
2816
                        for _, port := range forwardedPorts {
×
2817
                                hostIP := fmt.Sprintf("%v:%d", ip, port)
×
2818
                                addr, err := net.ResolveTCPAddr("tcp", hostIP)
×
2819
                                if err != nil {
×
2820
                                        srvrLog.Debugf("Unable to resolve "+
×
2821
                                                "host %v: %v", addr, err)
×
2822
                                        continue
×
2823
                                }
2824

2825
                                newAddrs = append(newAddrs, addr)
×
2826
                        }
2827

2828
                        // Skip the update if we weren't able to resolve any of
2829
                        // the new addresses.
2830
                        if len(newAddrs) == 0 {
×
2831
                                srvrLog.Debug("Skipping node announcement " +
×
2832
                                        "update due to not being able to " +
×
2833
                                        "resolve any new addresses")
×
2834
                                continue
×
2835
                        }
2836

2837
                        // Now, we'll need to update the addresses in our node's
2838
                        // announcement in order to propagate the update
2839
                        // throughout the network. We'll only include addresses
2840
                        // that have a different IP from the previous one, as
2841
                        // the previous IP is no longer valid.
2842
                        currentNodeAnn := s.getNodeAnnouncement()
×
2843

×
2844
                        for _, addr := range currentNodeAnn.Addresses {
×
2845
                                host, _, err := net.SplitHostPort(addr.String())
×
2846
                                if err != nil {
×
2847
                                        srvrLog.Debugf("Unable to determine "+
×
2848
                                                "host from address %v: %v",
×
2849
                                                addr, err)
×
2850
                                        continue
×
2851
                                }
2852

2853
                                // We'll also make sure to include external IPs
2854
                                // set manually by the user.
2855
                                _, setByUser := ipsSetByUser[addr.String()]
×
2856
                                if setByUser || host != s.lastDetectedIP.String() {
×
2857
                                        newAddrs = append(newAddrs, addr)
×
2858
                                }
×
2859
                        }
2860

2861
                        // Then, we'll generate a new timestamped node
2862
                        // announcement with the updated addresses and broadcast
2863
                        // it to our peers.
2864
                        newNodeAnn, err := s.genNodeAnnouncement(
×
2865
                                nil, netann.NodeAnnSetAddrs(newAddrs),
×
2866
                        )
×
2867
                        if err != nil {
×
2868
                                srvrLog.Debugf("Unable to generate new node "+
×
2869
                                        "announcement: %v", err)
×
2870
                                continue
×
2871
                        }
2872

2873
                        err = s.BroadcastMessage(nil, &newNodeAnn)
×
2874
                        if err != nil {
×
2875
                                srvrLog.Debugf("Unable to broadcast new node "+
×
2876
                                        "announcement to peers: %v", err)
×
2877
                                continue
×
2878
                        }
2879

2880
                        // Finally, update the last IP seen to the current one.
2881
                        s.lastDetectedIP = ip
×
2882
                case <-s.quit:
×
2883
                        break out
×
2884
                }
2885
        }
2886
}
2887

2888
// initNetworkBootstrappers initializes a set of network peer bootstrappers
2889
// based on the server, and currently active bootstrap mechanisms as defined
2890
// within the current configuration.
2891
func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, error) {
×
2892
        srvrLog.Infof("Initializing peer network bootstrappers!")
×
2893

×
2894
        var bootStrappers []discovery.NetworkPeerBootstrapper
×
2895

×
2896
        // First, we'll create an instance of the ChannelGraphBootstrapper as
×
2897
        // this can be used by default if we've already partially seeded the
×
2898
        // network.
×
2899
        chanGraph := autopilot.ChannelGraphFromDatabase(s.graphDB)
×
2900
        graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph)
×
2901
        if err != nil {
×
2902
                return nil, err
×
2903
        }
×
2904
        bootStrappers = append(bootStrappers, graphBootstrapper)
×
2905

×
2906
        // If this isn't simnet mode, then one of our additional bootstrapping
×
2907
        // sources will be the set of running DNS seeds.
×
2908
        if !s.cfg.Bitcoin.SimNet {
×
2909
                dnsSeeds, ok := chainreg.ChainDNSSeeds[*s.cfg.ActiveNetParams.GenesisHash]
×
2910

×
2911
                // If we have a set of DNS seeds for this chain, then we'll add
×
2912
                // it as an additional bootstrapping source.
×
2913
                if ok {
×
2914
                        srvrLog.Infof("Creating DNS peer bootstrapper with "+
×
2915
                                "seeds: %v", dnsSeeds)
×
2916

×
2917
                        dnsBootStrapper := discovery.NewDNSSeedBootstrapper(
×
2918
                                dnsSeeds, s.cfg.net, s.cfg.ConnectionTimeout,
×
2919
                        )
×
2920
                        bootStrappers = append(bootStrappers, dnsBootStrapper)
×
2921
                }
×
2922
        }
2923

2924
        return bootStrappers, nil
×
2925
}
2926

2927
// createBootstrapIgnorePeers creates a map of peers that the bootstrap process
2928
// needs to ignore, which is made of three parts,
2929
//   - the node itself needs to be skipped as it doesn't make sense to connect
2930
//     to itself.
2931
//   - the peers that already have connections with, as in s.peersByPub.
2932
//   - the peers that we are attempting to connect, as in s.persistentPeers.
2933
func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} {
×
2934
        s.mu.RLock()
×
2935
        defer s.mu.RUnlock()
×
2936

×
2937
        ignore := make(map[autopilot.NodeID]struct{})
×
2938

×
2939
        // We should ignore ourselves from bootstrapping.
×
2940
        selfKey := autopilot.NewNodeID(s.identityECDH.PubKey())
×
2941
        ignore[selfKey] = struct{}{}
×
2942

×
2943
        // Ignore all connected peers.
×
2944
        for _, peer := range s.peersByPub {
×
2945
                nID := autopilot.NewNodeID(peer.IdentityKey())
×
2946
                ignore[nID] = struct{}{}
×
2947
        }
×
2948

2949
        // Ignore all persistent peers as they have a dedicated reconnecting
2950
        // process.
2951
        for pubKeyStr := range s.persistentPeers {
×
2952
                var nID autopilot.NodeID
×
2953
                copy(nID[:], []byte(pubKeyStr))
×
2954
                ignore[nID] = struct{}{}
×
2955
        }
×
2956

2957
        return ignore
×
2958
}
2959

2960
// peerBootstrapper is a goroutine which is tasked with attempting to establish
2961
// and maintain a target minimum number of outbound connections. With this
2962
// invariant, we ensure that our node is connected to a diverse set of peers
2963
// and that nodes newly joining the network receive an up to date network view
2964
// as soon as possible.
2965
func (s *server) peerBootstrapper(numTargetPeers uint32,
2966
        bootstrappers []discovery.NetworkPeerBootstrapper) {
×
2967

×
2968
        defer s.wg.Done()
×
2969

×
2970
        // Before we continue, init the ignore peers map.
×
2971
        ignoreList := s.createBootstrapIgnorePeers()
×
2972

×
2973
        // We'll start off by aggressively attempting connections to peers in
×
2974
        // order to be a part of the network as soon as possible.
×
2975
        s.initialPeerBootstrap(ignoreList, numTargetPeers, bootstrappers)
×
2976

×
2977
        // Once done, we'll attempt to maintain our target minimum number of
×
2978
        // peers.
×
2979
        //
×
2980
        // We'll use a 15 second backoff, and double the time every time an
×
2981
        // epoch fails up to a ceiling.
×
2982
        backOff := time.Second * 15
×
2983

×
2984
        // We'll create a new ticker to wake us up every 15 seconds so we can
×
2985
        // see if we've reached our minimum number of peers.
×
2986
        sampleTicker := time.NewTicker(backOff)
×
2987
        defer sampleTicker.Stop()
×
2988

×
2989
        // We'll use the number of attempts and errors to determine if we need
×
2990
        // to increase the time between discovery epochs.
×
2991
        var epochErrors uint32 // To be used atomically.
×
2992
        var epochAttempts uint32
×
2993

×
2994
        for {
×
2995
                select {
×
2996
                // The ticker has just woken us up, so we'll need to check if
2997
                // we need to attempt to connect our to any more peers.
2998
                case <-sampleTicker.C:
×
2999
                        // Obtain the current number of peers, so we can gauge
×
3000
                        // if we need to sample more peers or not.
×
3001
                        s.mu.RLock()
×
3002
                        numActivePeers := uint32(len(s.peersByPub))
×
3003
                        s.mu.RUnlock()
×
3004

×
3005
                        // If we have enough peers, then we can loop back
×
3006
                        // around to the next round as we're done here.
×
3007
                        if numActivePeers >= numTargetPeers {
×
3008
                                continue
×
3009
                        }
3010

3011
                        // If all of our attempts failed during this last back
3012
                        // off period, then will increase our backoff to 5
3013
                        // minute ceiling to avoid an excessive number of
3014
                        // queries
3015
                        //
3016
                        // TODO(roasbeef): add reverse policy too?
3017

3018
                        if epochAttempts > 0 &&
×
3019
                                atomic.LoadUint32(&epochErrors) >= epochAttempts {
×
3020

×
3021
                                sampleTicker.Stop()
×
3022

×
3023
                                backOff *= 2
×
3024
                                if backOff > bootstrapBackOffCeiling {
×
3025
                                        backOff = bootstrapBackOffCeiling
×
3026
                                }
×
3027

3028
                                srvrLog.Debugf("Backing off peer bootstrapper to "+
×
3029
                                        "%v", backOff)
×
3030
                                sampleTicker = time.NewTicker(backOff)
×
3031
                                continue
×
3032
                        }
3033

3034
                        atomic.StoreUint32(&epochErrors, 0)
×
3035
                        epochAttempts = 0
×
3036

×
3037
                        // Since we know need more peers, we'll compute the
×
3038
                        // exact number we need to reach our threshold.
×
3039
                        numNeeded := numTargetPeers - numActivePeers
×
3040

×
3041
                        srvrLog.Debugf("Attempting to obtain %v more network "+
×
3042
                                "peers", numNeeded)
×
3043

×
3044
                        // With the number of peers we need calculated, we'll
×
3045
                        // query the network bootstrappers to sample a set of
×
3046
                        // random addrs for us.
×
3047
                        //
×
3048
                        // Before we continue, get a copy of the ignore peers
×
3049
                        // map.
×
3050
                        ignoreList = s.createBootstrapIgnorePeers()
×
3051

×
3052
                        peerAddrs, err := discovery.MultiSourceBootstrap(
×
3053
                                ignoreList, numNeeded*2, bootstrappers...,
×
3054
                        )
×
3055
                        if err != nil {
×
3056
                                srvrLog.Errorf("Unable to retrieve bootstrap "+
×
3057
                                        "peers: %v", err)
×
3058
                                continue
×
3059
                        }
3060

3061
                        // Finally, we'll launch a new goroutine for each
3062
                        // prospective peer candidates.
3063
                        for _, addr := range peerAddrs {
×
3064
                                epochAttempts++
×
3065

×
3066
                                go func(a *lnwire.NetAddress) {
×
3067
                                        // TODO(roasbeef): can do AS, subnet,
×
3068
                                        // country diversity, etc
×
3069
                                        errChan := make(chan error, 1)
×
3070
                                        s.connectToPeer(
×
3071
                                                a, errChan,
×
3072
                                                s.cfg.ConnectionTimeout,
×
3073
                                        )
×
3074
                                        select {
×
3075
                                        case err := <-errChan:
×
3076
                                                if err == nil {
×
3077
                                                        return
×
3078
                                                }
×
3079

3080
                                                srvrLog.Errorf("Unable to "+
×
3081
                                                        "connect to %v: %v",
×
3082
                                                        a, err)
×
3083
                                                atomic.AddUint32(&epochErrors, 1)
×
3084
                                        case <-s.quit:
×
3085
                                        }
3086
                                }(addr)
3087
                        }
3088
                case <-s.quit:
×
3089
                        return
×
3090
                }
3091
        }
3092
}
3093

3094
// bootstrapBackOffCeiling is the maximum amount of time we'll wait between
3095
// failed attempts to locate a set of bootstrap peers. We'll slowly double our
3096
// query back off each time we encounter a failure.
3097
const bootstrapBackOffCeiling = time.Minute * 5
3098

3099
// initialPeerBootstrap attempts to continuously connect to peers on startup
3100
// until the target number of peers has been reached. This ensures that nodes
3101
// receive an up to date network view as soon as possible.
3102
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
3103
        numTargetPeers uint32,
3104
        bootstrappers []discovery.NetworkPeerBootstrapper) {
×
3105

×
3106
        srvrLog.Debugf("Init bootstrap with targetPeers=%v, bootstrappers=%v, "+
×
3107
                "ignore=%v", numTargetPeers, len(bootstrappers), len(ignore))
×
3108

×
3109
        // We'll start off by waiting 2 seconds between failed attempts, then
×
3110
        // double each time we fail until we hit the bootstrapBackOffCeiling.
×
3111
        var delaySignal <-chan time.Time
×
3112
        delayTime := time.Second * 2
×
3113

×
3114
        // As want to be more aggressive, we'll use a lower back off celling
×
3115
        // then the main peer bootstrap logic.
×
3116
        backOffCeiling := bootstrapBackOffCeiling / 5
×
3117

×
3118
        for attempts := 0; ; attempts++ {
×
3119
                // Check if the server has been requested to shut down in order
×
3120
                // to prevent blocking.
×
3121
                if s.Stopped() {
×
3122
                        return
×
3123
                }
×
3124

3125
                // We can exit our aggressive initial peer bootstrapping stage
3126
                // if we've reached out target number of peers.
3127
                s.mu.RLock()
×
3128
                numActivePeers := uint32(len(s.peersByPub))
×
3129
                s.mu.RUnlock()
×
3130

×
3131
                if numActivePeers >= numTargetPeers {
×
3132
                        return
×
3133
                }
×
3134

3135
                if attempts > 0 {
×
3136
                        srvrLog.Debugf("Waiting %v before trying to locate "+
×
3137
                                "bootstrap peers (attempt #%v)", delayTime,
×
3138
                                attempts)
×
3139

×
3140
                        // We've completed at least one iterating and haven't
×
3141
                        // finished, so we'll start to insert a delay period
×
3142
                        // between each attempt.
×
3143
                        delaySignal = time.After(delayTime)
×
3144
                        select {
×
3145
                        case <-delaySignal:
×
3146
                        case <-s.quit:
×
3147
                                return
×
3148
                        }
3149

3150
                        // After our delay, we'll double the time we wait up to
3151
                        // the max back off period.
3152
                        delayTime *= 2
×
3153
                        if delayTime > backOffCeiling {
×
3154
                                delayTime = backOffCeiling
×
3155
                        }
×
3156
                }
3157

3158
                // Otherwise, we'll request for the remaining number of peers
3159
                // in order to reach our target.
3160
                peersNeeded := numTargetPeers - numActivePeers
×
3161
                bootstrapAddrs, err := discovery.MultiSourceBootstrap(
×
3162
                        ignore, peersNeeded, bootstrappers...,
×
3163
                )
×
3164
                if err != nil {
×
3165
                        srvrLog.Errorf("Unable to retrieve initial bootstrap "+
×
3166
                                "peers: %v", err)
×
3167
                        continue
×
3168
                }
3169

3170
                // Then, we'll attempt to establish a connection to the
3171
                // different peer addresses retrieved by our bootstrappers.
3172
                var wg sync.WaitGroup
×
3173
                for _, bootstrapAddr := range bootstrapAddrs {
×
3174
                        wg.Add(1)
×
3175
                        go func(addr *lnwire.NetAddress) {
×
3176
                                defer wg.Done()
×
3177

×
3178
                                errChan := make(chan error, 1)
×
3179
                                go s.connectToPeer(
×
3180
                                        addr, errChan, s.cfg.ConnectionTimeout,
×
3181
                                )
×
3182

×
3183
                                // We'll only allow this connection attempt to
×
3184
                                // take up to 3 seconds. This allows us to move
×
3185
                                // quickly by discarding peers that are slowing
×
3186
                                // us down.
×
3187
                                select {
×
3188
                                case err := <-errChan:
×
3189
                                        if err == nil {
×
3190
                                                return
×
3191
                                        }
×
3192
                                        srvrLog.Errorf("Unable to connect to "+
×
3193
                                                "%v: %v", addr, err)
×
3194
                                // TODO: tune timeout? 3 seconds might be *too*
3195
                                // aggressive but works well.
3196
                                case <-time.After(3 * time.Second):
×
3197
                                        srvrLog.Tracef("Skipping peer %v due "+
×
3198
                                                "to not establishing a "+
×
3199
                                                "connection within 3 seconds",
×
3200
                                                addr)
×
3201
                                case <-s.quit:
×
3202
                                }
3203
                        }(bootstrapAddr)
3204
                }
3205

3206
                wg.Wait()
×
3207
        }
3208
}
3209

3210
// createNewHiddenService automatically sets up a v2 or v3 onion service in
3211
// order to listen for inbound connections over Tor.
3212
func (s *server) createNewHiddenService() error {
×
3213
        // Determine the different ports the server is listening on. The onion
×
3214
        // service's virtual port will map to these ports and one will be picked
×
3215
        // at random when the onion service is being accessed.
×
3216
        listenPorts := make([]int, 0, len(s.listenAddrs))
×
3217
        for _, listenAddr := range s.listenAddrs {
×
3218
                port := listenAddr.(*net.TCPAddr).Port
×
3219
                listenPorts = append(listenPorts, port)
×
3220
        }
×
3221

3222
        encrypter, err := lnencrypt.KeyRingEncrypter(s.cc.KeyRing)
×
3223
        if err != nil {
×
3224
                return err
×
3225
        }
×
3226

3227
        // Once the port mapping has been set, we can go ahead and automatically
3228
        // create our onion service. The service's private key will be saved to
3229
        // disk in order to regain access to this service when restarting `lnd`.
3230
        onionCfg := tor.AddOnionConfig{
×
3231
                VirtualPort: defaultPeerPort,
×
3232
                TargetPorts: listenPorts,
×
3233
                Store: tor.NewOnionFile(
×
3234
                        s.cfg.Tor.PrivateKeyPath, 0600, s.cfg.Tor.EncryptKey,
×
3235
                        encrypter,
×
3236
                ),
×
3237
        }
×
3238

×
3239
        switch {
×
3240
        case s.cfg.Tor.V2:
×
3241
                onionCfg.Type = tor.V2
×
3242
        case s.cfg.Tor.V3:
×
3243
                onionCfg.Type = tor.V3
×
3244
        }
3245

3246
        addr, err := s.torController.AddOnion(onionCfg)
×
3247
        if err != nil {
×
3248
                return err
×
3249
        }
×
3250

3251
        // Now that the onion service has been created, we'll add the onion
3252
        // address it can be reached at to our list of advertised addresses.
3253
        newNodeAnn, err := s.genNodeAnnouncement(
×
3254
                nil, func(currentAnn *lnwire.NodeAnnouncement) {
×
3255
                        currentAnn.Addresses = append(currentAnn.Addresses, addr)
×
3256
                },
×
3257
        )
3258
        if err != nil {
×
3259
                return fmt.Errorf("unable to generate new node "+
×
3260
                        "announcement: %v", err)
×
3261
        }
×
3262

3263
        // Finally, we'll update the on-disk version of our announcement so it
3264
        // will eventually propagate to nodes in the network.
3265
        selfNode := &models.LightningNode{
×
3266
                HaveNodeAnnouncement: true,
×
3267
                LastUpdate:           time.Unix(int64(newNodeAnn.Timestamp), 0),
×
3268
                Addresses:            newNodeAnn.Addresses,
×
3269
                Alias:                newNodeAnn.Alias.String(),
×
3270
                Features: lnwire.NewFeatureVector(
×
3271
                        newNodeAnn.Features, lnwire.Features,
×
3272
                ),
×
3273
                Color:        newNodeAnn.RGBColor,
×
3274
                AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(),
×
3275
        }
×
3276
        copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
×
3277
        if err := s.graphDB.SetSourceNode(selfNode); err != nil {
×
3278
                return fmt.Errorf("can't set self node: %w", err)
×
3279
        }
×
3280

3281
        return nil
×
3282
}
3283

3284
// findChannel finds a channel given a public key and ChannelID. It is an
3285
// optimization that is quicker than seeking for a channel given only the
3286
// ChannelID.
3287
func (s *server) findChannel(node *btcec.PublicKey, chanID lnwire.ChannelID) (
3288
        *channeldb.OpenChannel, error) {
×
3289

×
3290
        nodeChans, err := s.chanStateDB.FetchOpenChannels(node)
×
3291
        if err != nil {
×
3292
                return nil, err
×
3293
        }
×
3294

3295
        for _, channel := range nodeChans {
×
3296
                if chanID.IsChanPoint(&channel.FundingOutpoint) {
×
3297
                        return channel, nil
×
3298
                }
×
3299
        }
3300

3301
        return nil, fmt.Errorf("unable to find channel")
×
3302
}
3303

3304
// getNodeAnnouncement fetches the current, fully signed node announcement.
3305
func (s *server) getNodeAnnouncement() lnwire.NodeAnnouncement {
×
3306
        s.mu.Lock()
×
3307
        defer s.mu.Unlock()
×
3308

×
3309
        return *s.currentNodeAnn
×
3310
}
×
3311

3312
// genNodeAnnouncement generates and returns the current fully signed node
3313
// announcement. The time stamp of the announcement will be updated in order
3314
// to ensure it propagates through the network.
3315
func (s *server) genNodeAnnouncement(features *lnwire.RawFeatureVector,
3316
        modifiers ...netann.NodeAnnModifier) (lnwire.NodeAnnouncement, error) {
×
3317

×
3318
        s.mu.Lock()
×
3319
        defer s.mu.Unlock()
×
3320

×
3321
        // First, try to update our feature manager with the updated set of
×
3322
        // features.
×
3323
        if features != nil {
×
3324
                proposedFeatures := map[feature.Set]*lnwire.RawFeatureVector{
×
3325
                        feature.SetNodeAnn: features,
×
3326
                }
×
3327
                err := s.featureMgr.UpdateFeatureSets(proposedFeatures)
×
3328
                if err != nil {
×
3329
                        return lnwire.NodeAnnouncement{}, err
×
3330
                }
×
3331

3332
                // If we could successfully update our feature manager, add
3333
                // an update modifier to include these new features to our
3334
                // set.
3335
                modifiers = append(
×
3336
                        modifiers, netann.NodeAnnSetFeatures(features),
×
3337
                )
×
3338
        }
3339

3340
        // Always update the timestamp when refreshing to ensure the update
3341
        // propagates.
3342
        modifiers = append(modifiers, netann.NodeAnnSetTimestamp)
×
3343

×
3344
        // Apply the requested changes to the node announcement.
×
3345
        for _, modifier := range modifiers {
×
3346
                modifier(s.currentNodeAnn)
×
3347
        }
×
3348

3349
        // Sign a new update after applying all of the passed modifiers.
3350
        err := netann.SignNodeAnnouncement(
×
3351
                s.nodeSigner, s.identityKeyLoc, s.currentNodeAnn,
×
3352
        )
×
3353
        if err != nil {
×
3354
                return lnwire.NodeAnnouncement{}, err
×
3355
        }
×
3356

3357
        return *s.currentNodeAnn, nil
×
3358
}
3359

3360
// updateAndBroadcastSelfNode generates a new node announcement
3361
// applying the giving modifiers and updating the time stamp
3362
// to ensure it propagates through the network. Then it broadcasts
3363
// it to the network.
3364
func (s *server) updateAndBroadcastSelfNode(features *lnwire.RawFeatureVector,
3365
        modifiers ...netann.NodeAnnModifier) error {
×
3366

×
3367
        newNodeAnn, err := s.genNodeAnnouncement(features, modifiers...)
×
3368
        if err != nil {
×
3369
                return fmt.Errorf("unable to generate new node "+
×
3370
                        "announcement: %v", err)
×
3371
        }
×
3372

3373
        // Update the on-disk version of our announcement.
3374
        // Load and modify self node istead of creating anew instance so we
3375
        // don't risk overwriting any existing values.
3376
        selfNode, err := s.graphDB.SourceNode()
×
3377
        if err != nil {
×
3378
                return fmt.Errorf("unable to get current source node: %w", err)
×
3379
        }
×
3380

3381
        selfNode.HaveNodeAnnouncement = true
×
3382
        selfNode.LastUpdate = time.Unix(int64(newNodeAnn.Timestamp), 0)
×
3383
        selfNode.Addresses = newNodeAnn.Addresses
×
3384
        selfNode.Alias = newNodeAnn.Alias.String()
×
3385
        selfNode.Features = s.featureMgr.Get(feature.SetNodeAnn)
×
3386
        selfNode.Color = newNodeAnn.RGBColor
×
3387
        selfNode.AuthSigBytes = newNodeAnn.Signature.ToSignatureBytes()
×
3388

×
3389
        copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
×
3390

×
3391
        if err := s.graphDB.SetSourceNode(selfNode); err != nil {
×
3392
                return fmt.Errorf("can't set self node: %w", err)
×
3393
        }
×
3394

3395
        // Finally, propagate it to the nodes in the network.
3396
        err = s.BroadcastMessage(nil, &newNodeAnn)
×
3397
        if err != nil {
×
3398
                rpcsLog.Debugf("Unable to broadcast new node "+
×
3399
                        "announcement to peers: %v", err)
×
3400
                return err
×
3401
        }
×
3402

3403
        return nil
×
3404
}
3405

3406
type nodeAddresses struct {
3407
        pubKey    *btcec.PublicKey
3408
        addresses []net.Addr
3409
}
3410

3411
// establishPersistentConnections attempts to establish persistent connections
3412
// to all our direct channel collaborators. In order to promote liveness of our
3413
// active channels, we instruct the connection manager to attempt to establish
3414
// and maintain persistent connections to all our direct channel counterparties.
3415
func (s *server) establishPersistentConnections() error {
×
3416
        // nodeAddrsMap stores the combination of node public keys and addresses
×
3417
        // that we'll attempt to reconnect to. PubKey strings are used as keys
×
3418
        // since other PubKey forms can't be compared.
×
3419
        nodeAddrsMap := map[string]*nodeAddresses{}
×
3420

×
3421
        // Iterate through the list of LinkNodes to find addresses we should
×
3422
        // attempt to connect to based on our set of previous connections. Set
×
3423
        // the reconnection port to the default peer port.
×
3424
        linkNodes, err := s.chanStateDB.LinkNodeDB().FetchAllLinkNodes()
×
3425
        if err != nil && err != channeldb.ErrLinkNodesNotFound {
×
3426
                return err
×
3427
        }
×
3428
        for _, node := range linkNodes {
×
3429
                pubStr := string(node.IdentityPub.SerializeCompressed())
×
3430
                nodeAddrs := &nodeAddresses{
×
3431
                        pubKey:    node.IdentityPub,
×
3432
                        addresses: node.Addresses,
×
3433
                }
×
3434
                nodeAddrsMap[pubStr] = nodeAddrs
×
3435
        }
×
3436

3437
        // After checking our previous connections for addresses to connect to,
3438
        // iterate through the nodes in our channel graph to find addresses
3439
        // that have been added via NodeAnnouncement messages.
3440
        sourceNode, err := s.graphDB.SourceNode()
×
3441
        if err != nil {
×
3442
                return err
×
3443
        }
×
3444

3445
        // TODO(roasbeef): instead iterate over link nodes and query graph for
3446
        // each of the nodes.
3447
        selfPub := s.identityECDH.PubKey().SerializeCompressed()
×
3448
        err = s.graphDB.ForEachNodeChannel(sourceNode.PubKeyBytes, func(
×
3449
                tx kvdb.RTx,
×
3450
                chanInfo *models.ChannelEdgeInfo,
×
3451
                policy, _ *models.ChannelEdgePolicy) error {
×
3452

×
3453
                // If the remote party has announced the channel to us, but we
×
3454
                // haven't yet, then we won't have a policy. However, we don't
×
3455
                // need this to connect to the peer, so we'll log it and move on.
×
3456
                if policy == nil {
×
3457
                        srvrLog.Warnf("No channel policy found for "+
×
3458
                                "ChannelPoint(%v): ", chanInfo.ChannelPoint)
×
3459
                }
×
3460

3461
                // We'll now fetch the peer opposite from us within this
3462
                // channel so we can queue up a direct connection to them.
3463
                channelPeer, err := s.graphDB.FetchOtherNode(
×
3464
                        tx, chanInfo, selfPub,
×
3465
                )
×
3466
                if err != nil {
×
3467
                        return fmt.Errorf("unable to fetch channel peer for "+
×
3468
                                "ChannelPoint(%v): %v", chanInfo.ChannelPoint,
×
3469
                                err)
×
3470
                }
×
3471

3472
                pubStr := string(channelPeer.PubKeyBytes[:])
×
3473

×
3474
                // Add all unique addresses from channel
×
3475
                // graph/NodeAnnouncements to the list of addresses we'll
×
3476
                // connect to for this peer.
×
3477
                addrSet := make(map[string]net.Addr)
×
3478
                for _, addr := range channelPeer.Addresses {
×
3479
                        switch addr.(type) {
×
3480
                        case *net.TCPAddr:
×
3481
                                addrSet[addr.String()] = addr
×
3482

3483
                        // We'll only attempt to connect to Tor addresses if Tor
3484
                        // outbound support is enabled.
3485
                        case *tor.OnionAddr:
×
3486
                                if s.cfg.Tor.Active {
×
3487
                                        addrSet[addr.String()] = addr
×
3488
                                }
×
3489
                        }
3490
                }
3491

3492
                // If this peer is also recorded as a link node, we'll add any
3493
                // additional addresses that have not already been selected.
3494
                linkNodeAddrs, ok := nodeAddrsMap[pubStr]
×
3495
                if ok {
×
3496
                        for _, lnAddress := range linkNodeAddrs.addresses {
×
3497
                                switch lnAddress.(type) {
×
3498
                                case *net.TCPAddr:
×
3499
                                        addrSet[lnAddress.String()] = lnAddress
×
3500

3501
                                // We'll only attempt to connect to Tor
3502
                                // addresses if Tor outbound support is enabled.
3503
                                case *tor.OnionAddr:
×
3504
                                        if s.cfg.Tor.Active {
×
3505
                                                addrSet[lnAddress.String()] = lnAddress
×
3506
                                        }
×
3507
                                }
3508
                        }
3509
                }
3510

3511
                // Construct a slice of the deduped addresses.
3512
                var addrs []net.Addr
×
3513
                for _, addr := range addrSet {
×
3514
                        addrs = append(addrs, addr)
×
3515
                }
×
3516

3517
                n := &nodeAddresses{
×
3518
                        addresses: addrs,
×
3519
                }
×
3520
                n.pubKey, err = channelPeer.PubKey()
×
3521
                if err != nil {
×
3522
                        return err
×
3523
                }
×
3524

3525
                nodeAddrsMap[pubStr] = n
×
3526
                return nil
×
3527
        })
3528
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
×
3529
                return err
×
3530
        }
×
3531

3532
        srvrLog.Debugf("Establishing %v persistent connections on start",
×
3533
                len(nodeAddrsMap))
×
3534

×
3535
        // Acquire and hold server lock until all persistent connection requests
×
3536
        // have been recorded and sent to the connection manager.
×
3537
        s.mu.Lock()
×
3538
        defer s.mu.Unlock()
×
3539

×
3540
        // Iterate through the combined list of addresses from prior links and
×
3541
        // node announcements and attempt to reconnect to each node.
×
3542
        var numOutboundConns int
×
3543
        for pubStr, nodeAddr := range nodeAddrsMap {
×
3544
                // Add this peer to the set of peers we should maintain a
×
3545
                // persistent connection with. We set the value to false to
×
3546
                // indicate that we should not continue to reconnect if the
×
3547
                // number of channels returns to zero, since this peer has not
×
3548
                // been requested as perm by the user.
×
3549
                s.persistentPeers[pubStr] = false
×
3550
                if _, ok := s.persistentPeersBackoff[pubStr]; !ok {
×
3551
                        s.persistentPeersBackoff[pubStr] = s.cfg.MinBackoff
×
3552
                }
×
3553

3554
                for _, address := range nodeAddr.addresses {
×
3555
                        // Create a wrapper address which couples the IP and
×
3556
                        // the pubkey so the brontide authenticated connection
×
3557
                        // can be established.
×
3558
                        lnAddr := &lnwire.NetAddress{
×
3559
                                IdentityKey: nodeAddr.pubKey,
×
3560
                                Address:     address,
×
3561
                        }
×
3562

×
3563
                        s.persistentPeerAddrs[pubStr] = append(
×
3564
                                s.persistentPeerAddrs[pubStr], lnAddr)
×
3565
                }
×
3566

3567
                // We'll connect to the first 10 peers immediately, then
3568
                // randomly stagger any remaining connections if the
3569
                // stagger initial reconnect flag is set. This ensures
3570
                // that mobile nodes or nodes with a small number of
3571
                // channels obtain connectivity quickly, but larger
3572
                // nodes are able to disperse the costs of connecting to
3573
                // all peers at once.
3574
                if numOutboundConns < numInstantInitReconnect ||
×
3575
                        !s.cfg.StaggerInitialReconnect {
×
3576

×
3577
                        go s.connectToPersistentPeer(pubStr)
×
3578
                } else {
×
3579
                        go s.delayInitialReconnect(pubStr)
×
3580
                }
×
3581

3582
                numOutboundConns++
×
3583
        }
3584

3585
        return nil
×
3586
}
3587

3588
// delayInitialReconnect will attempt a reconnection to the given peer after
3589
// sampling a value for the delay between 0s and the maxInitReconnectDelay.
3590
//
3591
// NOTE: This method MUST be run as a goroutine.
3592
func (s *server) delayInitialReconnect(pubStr string) {
×
3593
        delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second
×
3594
        select {
×
3595
        case <-time.After(delay):
×
3596
                s.connectToPersistentPeer(pubStr)
×
3597
        case <-s.quit:
×
3598
        }
3599
}
3600

3601
// prunePersistentPeerConnection removes all internal state related to
3602
// persistent connections to a peer within the server. This is used to avoid
3603
// persistent connection retries to peers we do not have any open channels with.
3604
func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
×
3605
        pubKeyStr := string(compressedPubKey[:])
×
3606

×
3607
        s.mu.Lock()
×
3608
        if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm {
×
3609
                delete(s.persistentPeers, pubKeyStr)
×
3610
                delete(s.persistentPeersBackoff, pubKeyStr)
×
3611
                delete(s.persistentPeerAddrs, pubKeyStr)
×
3612
                s.cancelConnReqs(pubKeyStr, nil)
×
3613
                s.mu.Unlock()
×
3614

×
3615
                srvrLog.Infof("Pruned peer %x from persistent connections, "+
×
3616
                        "peer has no open channels", compressedPubKey)
×
3617

×
3618
                return
×
3619
        }
×
3620
        s.mu.Unlock()
×
3621
}
3622

3623
// BroadcastMessage sends a request to the server to broadcast a set of
3624
// messages to all peers other than the one specified by the `skips` parameter.
3625
// All messages sent via BroadcastMessage will be queued for lazy delivery to
3626
// the target peers.
3627
//
3628
// NOTE: This function is safe for concurrent access.
3629
func (s *server) BroadcastMessage(skips map[route.Vertex]struct{},
3630
        msgs ...lnwire.Message) error {
×
3631

×
3632
        // Filter out peers found in the skips map. We synchronize access to
×
3633
        // peersByPub throughout this process to ensure we deliver messages to
×
3634
        // exact set of peers present at the time of invocation.
×
3635
        s.mu.RLock()
×
3636
        peers := make([]*peer.Brontide, 0, len(s.peersByPub))
×
3637
        for pubStr, sPeer := range s.peersByPub {
×
3638
                if skips != nil {
×
3639
                        if _, ok := skips[sPeer.PubKey()]; ok {
×
3640
                                srvrLog.Tracef("Skipping %x in broadcast with "+
×
3641
                                        "pubStr=%x", sPeer.PubKey(), pubStr)
×
3642
                                continue
×
3643
                        }
3644
                }
3645

3646
                peers = append(peers, sPeer)
×
3647
        }
3648
        s.mu.RUnlock()
×
3649

×
3650
        // Iterate over all known peers, dispatching a go routine to enqueue
×
3651
        // all messages to each of peers.
×
3652
        var wg sync.WaitGroup
×
3653
        for _, sPeer := range peers {
×
3654
                srvrLog.Debugf("Sending %v messages to peer %x", len(msgs),
×
3655
                        sPeer.PubKey())
×
3656

×
3657
                // Dispatch a go routine to enqueue all messages to this peer.
×
3658
                wg.Add(1)
×
3659
                s.wg.Add(1)
×
3660
                go func(p lnpeer.Peer) {
×
3661
                        defer s.wg.Done()
×
3662
                        defer wg.Done()
×
3663

×
3664
                        p.SendMessageLazy(false, msgs...)
×
3665
                }(sPeer)
×
3666
        }
3667

3668
        // Wait for all messages to have been dispatched before returning to
3669
        // caller.
3670
        wg.Wait()
×
3671

×
3672
        return nil
×
3673
}
3674

3675
// NotifyWhenOnline can be called by other subsystems to get notified when a
3676
// particular peer comes online. The peer itself is sent across the peerChan.
3677
//
3678
// NOTE: This function is safe for concurrent access.
3679
func (s *server) NotifyWhenOnline(peerKey [33]byte,
3680
        peerChan chan<- lnpeer.Peer) {
×
3681

×
3682
        s.mu.Lock()
×
3683

×
3684
        // Compute the target peer's identifier.
×
3685
        pubStr := string(peerKey[:])
×
3686

×
3687
        // Check if peer is connected.
×
3688
        peer, ok := s.peersByPub[pubStr]
×
3689
        if ok {
×
3690
                // Unlock here so that the mutex isn't held while we are
×
3691
                // waiting for the peer to become active.
×
3692
                s.mu.Unlock()
×
3693

×
3694
                // Wait until the peer signals that it is actually active
×
3695
                // rather than only in the server's maps.
×
3696
                select {
×
3697
                case <-peer.ActiveSignal():
×
3698
                case <-peer.QuitSignal():
×
3699
                        // The peer quit, so we'll add the channel to the slice
×
3700
                        // and return.
×
3701
                        s.mu.Lock()
×
3702
                        s.peerConnectedListeners[pubStr] = append(
×
3703
                                s.peerConnectedListeners[pubStr], peerChan,
×
3704
                        )
×
3705
                        s.mu.Unlock()
×
3706
                        return
×
3707
                }
3708

3709
                // Connected, can return early.
3710
                srvrLog.Debugf("Notifying that peer %x is online", peerKey)
×
3711

×
3712
                select {
×
3713
                case peerChan <- peer:
×
3714
                case <-s.quit:
×
3715
                }
3716

3717
                return
×
3718
        }
3719

3720
        // Not connected, store this listener such that it can be notified when
3721
        // the peer comes online.
3722
        s.peerConnectedListeners[pubStr] = append(
×
3723
                s.peerConnectedListeners[pubStr], peerChan,
×
3724
        )
×
3725
        s.mu.Unlock()
×
3726
}
3727

3728
// NotifyWhenOffline delivers a notification to the caller of when the peer with
3729
// the given public key has been disconnected. The notification is signaled by
3730
// closing the channel returned.
3731
func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
×
3732
        s.mu.Lock()
×
3733
        defer s.mu.Unlock()
×
3734

×
3735
        c := make(chan struct{})
×
3736

×
3737
        // If the peer is already offline, we can immediately trigger the
×
3738
        // notification.
×
3739
        peerPubKeyStr := string(peerPubKey[:])
×
3740
        if _, ok := s.peersByPub[peerPubKeyStr]; !ok {
×
3741
                srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey)
×
3742
                close(c)
×
3743
                return c
×
3744
        }
×
3745

3746
        // Otherwise, the peer is online, so we'll keep track of the channel to
3747
        // trigger the notification once the server detects the peer
3748
        // disconnects.
3749
        s.peerDisconnectedListeners[peerPubKeyStr] = append(
×
3750
                s.peerDisconnectedListeners[peerPubKeyStr], c,
×
3751
        )
×
3752

×
3753
        return c
×
3754
}
3755

3756
// FindPeer will return the peer that corresponds to the passed in public key.
3757
// This function is used by the funding manager, allowing it to update the
3758
// daemon's local representation of the remote peer.
3759
//
3760
// NOTE: This function is safe for concurrent access.
3761
func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer.Brontide, error) {
×
3762
        s.mu.RLock()
×
3763
        defer s.mu.RUnlock()
×
3764

×
3765
        pubStr := string(peerKey.SerializeCompressed())
×
3766

×
3767
        return s.findPeerByPubStr(pubStr)
×
3768
}
×
3769

3770
// FindPeerByPubStr will return the peer that corresponds to the passed peerID,
3771
// which should be a string representation of the peer's serialized, compressed
3772
// public key.
3773
//
3774
// NOTE: This function is safe for concurrent access.
3775
func (s *server) FindPeerByPubStr(pubStr string) (*peer.Brontide, error) {
×
3776
        s.mu.RLock()
×
3777
        defer s.mu.RUnlock()
×
3778

×
3779
        return s.findPeerByPubStr(pubStr)
×
3780
}
×
3781

3782
// findPeerByPubStr is an internal method that retrieves the specified peer from
3783
// the server's internal state using.
3784
func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) {
×
3785
        peer, ok := s.peersByPub[pubStr]
×
3786
        if !ok {
×
3787
                return nil, ErrPeerNotConnected
×
3788
        }
×
3789

3790
        return peer, nil
×
3791
}
3792

3793
// nextPeerBackoff computes the next backoff duration for a peer's pubkey using
3794
// exponential backoff. If no previous backoff was known, the default is
3795
// returned.
3796
func (s *server) nextPeerBackoff(pubStr string,
3797
        startTime time.Time) time.Duration {
×
3798

×
3799
        // Now, determine the appropriate backoff to use for the retry.
×
3800
        backoff, ok := s.persistentPeersBackoff[pubStr]
×
3801
        if !ok {
×
3802
                // If an existing backoff was unknown, use the default.
×
3803
                return s.cfg.MinBackoff
×
3804
        }
×
3805

3806
        // If the peer failed to start properly, we'll just use the previous
3807
        // backoff to compute the subsequent randomized exponential backoff
3808
        // duration. This will roughly double on average.
3809
        if startTime.IsZero() {
×
3810
                return computeNextBackoff(backoff, s.cfg.MaxBackoff)
×
3811
        }
×
3812

3813
        // The peer succeeded in starting. If the connection didn't last long
3814
        // enough to be considered stable, we'll continue to back off retries
3815
        // with this peer.
3816
        connDuration := time.Since(startTime)
×
3817
        if connDuration < defaultStableConnDuration {
×
3818
                return computeNextBackoff(backoff, s.cfg.MaxBackoff)
×
3819
        }
×
3820

3821
        // The peer succeed in starting and this was stable peer, so we'll
3822
        // reduce the timeout duration by the length of the connection after
3823
        // applying randomized exponential backoff. We'll only apply this in the
3824
        // case that:
3825
        //   reb(curBackoff) - connDuration > cfg.MinBackoff
3826
        relaxedBackoff := computeNextBackoff(backoff, s.cfg.MaxBackoff) - connDuration
×
3827
        if relaxedBackoff > s.cfg.MinBackoff {
×
3828
                return relaxedBackoff
×
3829
        }
×
3830

3831
        // Lastly, if reb(currBackoff) - connDuration <= cfg.MinBackoff, meaning
3832
        // the stable connection lasted much longer than our previous backoff.
3833
        // To reward such good behavior, we'll reconnect after the default
3834
        // timeout.
3835
        return s.cfg.MinBackoff
×
3836
}
3837

3838
// shouldDropLocalConnection determines if our local connection to a remote peer
3839
// should be dropped in the case of concurrent connection establishment. In
3840
// order to deterministically decide which connection should be dropped, we'll
3841
// utilize the ordering of the local and remote public key. If we didn't use
3842
// such a tie breaker, then we risk _both_ connections erroneously being
3843
// dropped.
3844
func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool {
×
3845
        localPubBytes := local.SerializeCompressed()
×
3846
        remotePubPbytes := remote.SerializeCompressed()
×
3847

×
3848
        // The connection that comes from the node with a "smaller" pubkey
×
3849
        // should be kept. Therefore, if our pubkey is "greater" than theirs, we
×
3850
        // should drop our established connection.
×
3851
        return bytes.Compare(localPubBytes, remotePubPbytes) > 0
×
3852
}
×
3853

3854
// InboundPeerConnected initializes a new peer in response to a new inbound
3855
// connection.
3856
//
3857
// NOTE: This function is safe for concurrent access.
3858
func (s *server) InboundPeerConnected(conn net.Conn) {
×
3859
        // Exit early if we have already been instructed to shutdown, this
×
3860
        // prevents any delayed callbacks from accidentally registering peers.
×
3861
        if s.Stopped() {
×
3862
                return
×
3863
        }
×
3864

3865
        nodePub := conn.(*brontide.Conn).RemotePub()
×
3866
        pubSer := nodePub.SerializeCompressed()
×
3867
        pubStr := string(pubSer)
×
3868

×
3869
        var pubBytes [33]byte
×
3870
        copy(pubBytes[:], pubSer)
×
3871

×
3872
        s.mu.Lock()
×
3873
        defer s.mu.Unlock()
×
3874

×
3875
        // If the remote node's public key is banned, drop the connection.
×
3876
        shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
×
3877
        if dcErr != nil {
×
3878
                srvrLog.Errorf("Unable to check if we should disconnect "+
×
3879
                        "peer: %v", dcErr)
×
3880
                conn.Close()
×
3881

×
3882
                return
×
3883
        }
×
3884

3885
        if shouldDc {
×
3886
                srvrLog.Debugf("Dropping connection for %v since they are "+
×
3887
                        "banned.", pubSer)
×
3888

×
3889
                conn.Close()
×
3890

×
3891
                return
×
3892
        }
×
3893

3894
        // If we already have an outbound connection to this peer, then ignore
3895
        // this new connection.
3896
        if p, ok := s.outboundPeers[pubStr]; ok {
×
3897
                srvrLog.Debugf("Already have outbound connection for %v, "+
×
3898
                        "ignoring inbound connection from local=%v, remote=%v",
×
3899
                        p, conn.LocalAddr(), conn.RemoteAddr())
×
3900

×
3901
                conn.Close()
×
3902
                return
×
3903
        }
×
3904

3905
        // If we already have a valid connection that is scheduled to take
3906
        // precedence once the prior peer has finished disconnecting, we'll
3907
        // ignore this connection.
3908
        if p, ok := s.scheduledPeerConnection[pubStr]; ok {
×
3909
                srvrLog.Debugf("Ignoring connection from %v, peer %v already "+
×
3910
                        "scheduled", conn.RemoteAddr(), p)
×
3911
                conn.Close()
×
3912
                return
×
3913
        }
×
3914

3915
        srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
×
3916

×
3917
        // Check to see if we already have a connection with this peer. If so,
×
3918
        // we may need to drop our existing connection. This prevents us from
×
3919
        // having duplicate connections to the same peer. We forgo adding a
×
3920
        // default case as we expect these to be the only error values returned
×
3921
        // from findPeerByPubStr.
×
3922
        connectedPeer, err := s.findPeerByPubStr(pubStr)
×
3923
        switch err {
×
3924
        case ErrPeerNotConnected:
×
3925
                // We were unable to locate an existing connection with the
×
3926
                // target peer, proceed to connect.
×
3927
                s.cancelConnReqs(pubStr, nil)
×
3928
                s.peerConnected(conn, nil, true)
×
3929

3930
        case nil:
×
3931
                // We already have a connection with the incoming peer. If the
×
3932
                // connection we've already established should be kept and is
×
3933
                // not of the same type of the new connection (inbound), then
×
3934
                // we'll close out the new connection s.t there's only a single
×
3935
                // connection between us.
×
3936
                localPub := s.identityECDH.PubKey()
×
3937
                if !connectedPeer.Inbound() &&
×
3938
                        !shouldDropLocalConnection(localPub, nodePub) {
×
3939

×
3940
                        srvrLog.Warnf("Received inbound connection from "+
×
3941
                                "peer %v, but already have outbound "+
×
3942
                                "connection, dropping conn", connectedPeer)
×
3943
                        conn.Close()
×
3944
                        return
×
3945
                }
×
3946

3947
                // Otherwise, if we should drop the connection, then we'll
3948
                // disconnect our already connected peer.
3949
                srvrLog.Debugf("Disconnecting stale connection to %v",
×
3950
                        connectedPeer)
×
3951

×
3952
                s.cancelConnReqs(pubStr, nil)
×
3953

×
3954
                // Remove the current peer from the server's internal state and
×
3955
                // signal that the peer termination watcher does not need to
×
3956
                // execute for this peer.
×
3957
                s.removePeer(connectedPeer)
×
3958
                s.ignorePeerTermination[connectedPeer] = struct{}{}
×
3959
                s.scheduledPeerConnection[pubStr] = func() {
×
3960
                        s.peerConnected(conn, nil, true)
×
3961
                }
×
3962
        }
3963
}
3964

3965
// OutboundPeerConnected initializes a new peer in response to a new outbound
3966
// connection.
3967
// NOTE: This function is safe for concurrent access.
3968
func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
×
3969
        // Exit early if we have already been instructed to shutdown, this
×
3970
        // prevents any delayed callbacks from accidentally registering peers.
×
3971
        if s.Stopped() {
×
3972
                return
×
3973
        }
×
3974

3975
        nodePub := conn.(*brontide.Conn).RemotePub()
×
3976
        pubSer := nodePub.SerializeCompressed()
×
3977
        pubStr := string(pubSer)
×
3978

×
3979
        var pubBytes [33]byte
×
3980
        copy(pubBytes[:], pubSer)
×
3981

×
3982
        s.mu.Lock()
×
3983
        defer s.mu.Unlock()
×
3984

×
3985
        // If the remote node's public key is banned, drop the connection.
×
3986
        shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
×
3987
        if dcErr != nil {
×
3988
                srvrLog.Errorf("Unable to check if we should disconnect "+
×
3989
                        "peer: %v", dcErr)
×
3990
                conn.Close()
×
3991

×
3992
                return
×
3993
        }
×
3994

3995
        if shouldDc {
×
3996
                srvrLog.Debugf("Dropping connection for %v since they are "+
×
3997
                        "banned.", pubSer)
×
3998

×
3999
                if connReq != nil {
×
4000
                        s.connMgr.Remove(connReq.ID())
×
4001
                }
×
4002

4003
                conn.Close()
×
4004

×
4005
                return
×
4006
        }
4007

4008
        // If we already have an inbound connection to this peer, then ignore
4009
        // this new connection.
4010
        if p, ok := s.inboundPeers[pubStr]; ok {
×
4011
                srvrLog.Debugf("Already have inbound connection for %v, "+
×
4012
                        "ignoring outbound connection from local=%v, remote=%v",
×
4013
                        p, conn.LocalAddr(), conn.RemoteAddr())
×
4014

×
4015
                if connReq != nil {
×
4016
                        s.connMgr.Remove(connReq.ID())
×
4017
                }
×
4018
                conn.Close()
×
4019
                return
×
4020
        }
4021
        if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
×
4022
                srvrLog.Debugf("Ignoring canceled outbound connection")
×
4023
                s.connMgr.Remove(connReq.ID())
×
4024
                conn.Close()
×
4025
                return
×
4026
        }
×
4027

4028
        // If we already have a valid connection that is scheduled to take
4029
        // precedence once the prior peer has finished disconnecting, we'll
4030
        // ignore this connection.
4031
        if _, ok := s.scheduledPeerConnection[pubStr]; ok {
×
4032
                srvrLog.Debugf("Ignoring connection, peer already scheduled")
×
4033

×
4034
                if connReq != nil {
×
4035
                        s.connMgr.Remove(connReq.ID())
×
4036
                }
×
4037

4038
                conn.Close()
×
4039
                return
×
4040
        }
4041

4042
        srvrLog.Infof("Established connection to: %x@%v", pubStr,
×
4043
                conn.RemoteAddr())
×
4044

×
4045
        if connReq != nil {
×
4046
                // A successful connection was returned by the connmgr.
×
4047
                // Immediately cancel all pending requests, excluding the
×
4048
                // outbound connection we just established.
×
4049
                ignore := connReq.ID()
×
4050
                s.cancelConnReqs(pubStr, &ignore)
×
4051
        } else {
×
4052
                // This was a successful connection made by some other
×
4053
                // subsystem. Remove all requests being managed by the connmgr.
×
4054
                s.cancelConnReqs(pubStr, nil)
×
4055
        }
×
4056

4057
        // If we already have a connection with this peer, decide whether or not
4058
        // we need to drop the stale connection. We forgo adding a default case
4059
        // as we expect these to be the only error values returned from
4060
        // findPeerByPubStr.
4061
        connectedPeer, err := s.findPeerByPubStr(pubStr)
×
4062
        switch err {
×
4063
        case ErrPeerNotConnected:
×
4064
                // We were unable to locate an existing connection with the
×
4065
                // target peer, proceed to connect.
×
4066
                s.peerConnected(conn, connReq, false)
×
4067

4068
        case nil:
×
4069
                // We already have a connection with the incoming peer. If the
×
4070
                // connection we've already established should be kept and is
×
4071
                // not of the same type of the new connection (outbound), then
×
4072
                // we'll close out the new connection s.t there's only a single
×
4073
                // connection between us.
×
4074
                localPub := s.identityECDH.PubKey()
×
4075
                if connectedPeer.Inbound() &&
×
4076
                        shouldDropLocalConnection(localPub, nodePub) {
×
4077

×
4078
                        srvrLog.Warnf("Established outbound connection to "+
×
4079
                                "peer %v, but already have inbound "+
×
4080
                                "connection, dropping conn", connectedPeer)
×
4081
                        if connReq != nil {
×
4082
                                s.connMgr.Remove(connReq.ID())
×
4083
                        }
×
4084
                        conn.Close()
×
4085
                        return
×
4086
                }
4087

4088
                // Otherwise, _their_ connection should be dropped. So we'll
4089
                // disconnect the peer and send the now obsolete peer to the
4090
                // server for garbage collection.
4091
                srvrLog.Debugf("Disconnecting stale connection to %v",
×
4092
                        connectedPeer)
×
4093

×
4094
                // Remove the current peer from the server's internal state and
×
4095
                // signal that the peer termination watcher does not need to
×
4096
                // execute for this peer.
×
4097
                s.removePeer(connectedPeer)
×
4098
                s.ignorePeerTermination[connectedPeer] = struct{}{}
×
4099
                s.scheduledPeerConnection[pubStr] = func() {
×
4100
                        s.peerConnected(conn, connReq, false)
×
4101
                }
×
4102
        }
4103
}
4104

4105
// UnassignedConnID is the default connection ID that a request can have before
4106
// it actually is submitted to the connmgr.
4107
// TODO(conner): move into connmgr package, or better, add connmgr method for
4108
// generating atomic IDs
4109
const UnassignedConnID uint64 = 0
4110

4111
// cancelConnReqs stops all persistent connection requests for a given pubkey.
4112
// Any attempts initiated by the peerTerminationWatcher are canceled first.
4113
// Afterwards, each connection request removed from the connmgr. The caller can
4114
// optionally specify a connection ID to ignore, which prevents us from
4115
// canceling a successful request. All persistent connreqs for the provided
4116
// pubkey are discarded after the operationjw.
4117
func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
×
4118
        // First, cancel any lingering persistent retry attempts, which will
×
4119
        // prevent retries for any with backoffs that are still maturing.
×
4120
        if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok {
×
4121
                close(cancelChan)
×
4122
                delete(s.persistentRetryCancels, pubStr)
×
4123
        }
×
4124

4125
        // Next, check to see if we have any outstanding persistent connection
4126
        // requests to this peer. If so, then we'll remove all of these
4127
        // connection requests, and also delete the entry from the map.
4128
        connReqs, ok := s.persistentConnReqs[pubStr]
×
4129
        if !ok {
×
4130
                return
×
4131
        }
×
4132

4133
        for _, connReq := range connReqs {
×
4134
                srvrLog.Tracef("Canceling %s:", connReqs)
×
4135

×
4136
                // Atomically capture the current request identifier.
×
4137
                connID := connReq.ID()
×
4138

×
4139
                // Skip any zero IDs, this indicates the request has not
×
4140
                // yet been schedule.
×
4141
                if connID == UnassignedConnID {
×
4142
                        continue
×
4143
                }
4144

4145
                // Skip a particular connection ID if instructed.
4146
                if skip != nil && connID == *skip {
×
4147
                        continue
×
4148
                }
4149

4150
                s.connMgr.Remove(connID)
×
4151
        }
4152

4153
        delete(s.persistentConnReqs, pubStr)
×
4154
}
4155

4156
// handleCustomMessage dispatches an incoming custom peers message to
4157
// subscribers.
4158
func (s *server) handleCustomMessage(peer [33]byte, msg *lnwire.Custom) error {
×
4159
        srvrLog.Debugf("Custom message received: peer=%x, type=%d",
×
4160
                peer, msg.Type)
×
4161

×
4162
        return s.customMessageServer.SendUpdate(&CustomMessage{
×
4163
                Peer: peer,
×
4164
                Msg:  msg,
×
4165
        })
×
4166
}
×
4167

4168
// SubscribeCustomMessages subscribes to a stream of incoming custom peer
4169
// messages.
4170
func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) {
×
4171
        return s.customMessageServer.Subscribe()
×
4172
}
×
4173

4174
// peerConnected is a function that handles initialization a newly connected
4175
// peer by adding it to the server's global list of all active peers, and
4176
// starting all the goroutines the peer needs to function properly. The inbound
4177
// boolean should be true if the peer initiated the connection to us.
4178
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
4179
        inbound bool) {
×
4180

×
4181
        brontideConn := conn.(*brontide.Conn)
×
4182
        addr := conn.RemoteAddr()
×
4183
        pubKey := brontideConn.RemotePub()
×
4184

×
4185
        srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v",
×
4186
                pubKey.SerializeCompressed(), addr, inbound)
×
4187

×
4188
        peerAddr := &lnwire.NetAddress{
×
4189
                IdentityKey: pubKey,
×
4190
                Address:     addr,
×
4191
                ChainNet:    s.cfg.ActiveNetParams.Net,
×
4192
        }
×
4193

×
4194
        // With the brontide connection established, we'll now craft the feature
×
4195
        // vectors to advertise to the remote node.
×
4196
        initFeatures := s.featureMgr.Get(feature.SetInit)
×
4197
        legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal)
×
4198

×
4199
        // Lookup past error caches for the peer in the server. If no buffer is
×
4200
        // found, create a fresh buffer.
×
4201
        pkStr := string(peerAddr.IdentityKey.SerializeCompressed())
×
4202
        errBuffer, ok := s.peerErrors[pkStr]
×
4203
        if !ok {
×
4204
                var err error
×
4205
                errBuffer, err = queue.NewCircularBuffer(peer.ErrorBufferSize)
×
4206
                if err != nil {
×
4207
                        srvrLog.Errorf("unable to create peer %v", err)
×
4208
                        return
×
4209
                }
×
4210
        }
4211

4212
        // If we directly set the peer.Config TowerClient member to the
4213
        // s.towerClientMgr then in the case that the s.towerClientMgr is nil,
4214
        // the peer.Config's TowerClient member will not evaluate to nil even
4215
        // though the underlying value is nil. To avoid this gotcha which can
4216
        // cause a panic, we need to explicitly pass nil to the peer.Config's
4217
        // TowerClient if needed.
4218
        var towerClient wtclient.ClientManager
×
4219
        if s.towerClientMgr != nil {
×
4220
                towerClient = s.towerClientMgr
×
4221
        }
×
4222

4223
        thresholdSats := btcutil.Amount(s.cfg.MaxFeeExposure)
×
4224
        thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
×
4225

×
4226
        // Now that we've established a connection, create a peer, and it to the
×
4227
        // set of currently active peers. Configure the peer with the incoming
×
4228
        // and outgoing broadcast deltas to prevent htlcs from being accepted or
×
4229
        // offered that would trigger channel closure. In case of outgoing
×
4230
        // htlcs, an extra block is added to prevent the channel from being
×
4231
        // closed when the htlc is outstanding and a new block comes in.
×
4232
        pCfg := peer.Config{
×
4233
                Conn:                    brontideConn,
×
4234
                ConnReq:                 connReq,
×
4235
                Addr:                    peerAddr,
×
4236
                Inbound:                 inbound,
×
4237
                Features:                initFeatures,
×
4238
                LegacyFeatures:          legacyFeatures,
×
4239
                OutgoingCltvRejectDelta: lncfg.DefaultOutgoingCltvRejectDelta,
×
4240
                ChanActiveTimeout:       s.cfg.ChanEnableTimeout,
×
4241
                ErrorBuffer:             errBuffer,
×
4242
                WritePool:               s.writePool,
×
4243
                ReadPool:                s.readPool,
×
4244
                Switch:                  s.htlcSwitch,
×
4245
                InterceptSwitch:         s.interceptableSwitch,
×
4246
                ChannelDB:               s.chanStateDB,
×
4247
                ChannelGraph:            s.graphDB,
×
4248
                ChainArb:                s.chainArb,
×
4249
                AuthGossiper:            s.authGossiper,
×
4250
                ChanStatusMgr:           s.chanStatusMgr,
×
4251
                ChainIO:                 s.cc.ChainIO,
×
4252
                FeeEstimator:            s.cc.FeeEstimator,
×
4253
                Signer:                  s.cc.Wallet.Cfg.Signer,
×
4254
                SigPool:                 s.sigPool,
×
4255
                Wallet:                  s.cc.Wallet,
×
4256
                ChainNotifier:           s.cc.ChainNotifier,
×
4257
                BestBlockView:           s.cc.BestBlockTracker,
×
4258
                RoutingPolicy:           s.cc.RoutingPolicy,
×
4259
                Sphinx:                  s.sphinx,
×
4260
                WitnessBeacon:           s.witnessBeacon,
×
4261
                Invoices:                s.invoices,
×
4262
                ChannelNotifier:         s.channelNotifier,
×
4263
                HtlcNotifier:            s.htlcNotifier,
×
4264
                TowerClient:             towerClient,
×
4265
                DisconnectPeer:          s.DisconnectPeer,
×
4266
                GenNodeAnnouncement: func(...netann.NodeAnnModifier) (
×
4267
                        lnwire.NodeAnnouncement, error) {
×
4268

×
4269
                        return s.genNodeAnnouncement(nil)
×
4270
                },
×
4271

4272
                PongBuf: s.pongBuf,
4273

4274
                PrunePersistentPeerConnection: s.prunePersistentPeerConnection,
4275

4276
                FetchLastChanUpdate: s.fetchLastChanUpdate(),
4277

4278
                FundingManager: s.fundingMgr,
4279

4280
                Hodl:                    s.cfg.Hodl,
4281
                UnsafeReplay:            s.cfg.UnsafeReplay,
4282
                MaxOutgoingCltvExpiry:   s.cfg.MaxOutgoingCltvExpiry,
4283
                MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation,
4284
                CoopCloseTargetConfs:    s.cfg.CoopCloseTargetConfs,
4285
                MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
4286
                        s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
4287
                ChannelCommitInterval:  s.cfg.ChannelCommitInterval,
4288
                PendingCommitInterval:  s.cfg.PendingCommitInterval,
4289
                ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize,
4290
                HandleCustomMessage:    s.handleCustomMessage,
4291
                GetAliases:             s.aliasMgr.GetAliases,
4292
                RequestAlias:           s.aliasMgr.RequestAlias,
4293
                AddLocalAlias:          s.aliasMgr.AddLocalAlias,
4294
                DisallowRouteBlinding:  s.cfg.ProtocolOptions.NoRouteBlinding(),
4295
                DisallowQuiescence:     s.cfg.ProtocolOptions.NoQuiescence(),
4296
                MaxFeeExposure:         thresholdMSats,
4297
                Quit:                   s.quit,
4298
                AuxLeafStore:           s.implCfg.AuxLeafStore,
4299
                AuxSigner:              s.implCfg.AuxSigner,
4300
                MsgRouter:              s.implCfg.MsgRouter,
4301
                AuxChanCloser:          s.implCfg.AuxChanCloser,
4302
                AuxResolver:            s.implCfg.AuxContractResolver,
4303
                AuxTrafficShaper:       s.implCfg.TrafficShaper,
4304
                ShouldFwdExpEndorsement: func() bool {
×
4305
                        if s.cfg.ProtocolOptions.NoExperimentalEndorsement() {
×
4306
                                return false
×
4307
                        }
×
4308

4309
                        return clock.NewDefaultClock().Now().Before(
×
4310
                                EndorsementExperimentEnd,
×
4311
                        )
×
4312
                },
4313
        }
4314

4315
        copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed())
×
4316
        copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed())
×
4317

×
4318
        p := peer.NewBrontide(pCfg)
×
4319

×
4320
        // TODO(roasbeef): update IP address for link-node
×
4321
        //  * also mark last-seen, do it one single transaction?
×
4322

×
4323
        s.addPeer(p)
×
4324

×
4325
        // Once we have successfully added the peer to the server, we can
×
4326
        // delete the previous error buffer from the server's map of error
×
4327
        // buffers.
×
4328
        delete(s.peerErrors, pkStr)
×
4329

×
4330
        // Dispatch a goroutine to asynchronously start the peer. This process
×
4331
        // includes sending and receiving Init messages, which would be a DOS
×
4332
        // vector if we held the server's mutex throughout the procedure.
×
4333
        s.wg.Add(1)
×
4334
        go s.peerInitializer(p)
×
4335
}
4336

4337
// addPeer adds the passed peer to the server's global state of all active
4338
// peers.
4339
func (s *server) addPeer(p *peer.Brontide) {
×
4340
        if p == nil {
×
4341
                return
×
4342
        }
×
4343

4344
        pubBytes := p.IdentityKey().SerializeCompressed()
×
4345

×
4346
        // Ignore new peers if we're shutting down.
×
4347
        if s.Stopped() {
×
4348
                srvrLog.Infof("Server stopped, skipped adding peer=%x",
×
4349
                        pubBytes)
×
4350
                p.Disconnect(ErrServerShuttingDown)
×
4351

×
4352
                return
×
4353
        }
×
4354

4355
        // Track the new peer in our indexes so we can quickly look it up either
4356
        // according to its public key, or its peer ID.
4357
        // TODO(roasbeef): pipe all requests through to the
4358
        // queryHandler/peerManager
4359

4360
        // NOTE: This pubStr is a raw bytes to string conversion and will NOT
4361
        // be human-readable.
4362
        pubStr := string(pubBytes)
×
4363

×
4364
        s.peersByPub[pubStr] = p
×
4365

×
4366
        if p.Inbound() {
×
4367
                s.inboundPeers[pubStr] = p
×
4368
        } else {
×
4369
                s.outboundPeers[pubStr] = p
×
4370
        }
×
4371

4372
        // Inform the peer notifier of a peer online event so that it can be reported
4373
        // to clients listening for peer events.
4374
        var pubKey [33]byte
×
4375
        copy(pubKey[:], pubBytes)
×
4376

×
4377
        s.peerNotifier.NotifyPeerOnline(pubKey)
×
4378
}
4379

4380
// peerInitializer asynchronously starts a newly connected peer after it has
4381
// been added to the server's peer map. This method sets up a
4382
// peerTerminationWatcher for the given peer, and ensures that it executes even
4383
// if the peer failed to start. In the event of a successful connection, this
4384
// method reads the negotiated, local feature-bits and spawns the appropriate
4385
// graph synchronization method. Any registered clients of NotifyWhenOnline will
4386
// be signaled of the new peer once the method returns.
4387
//
4388
// NOTE: This MUST be launched as a goroutine.
4389
func (s *server) peerInitializer(p *peer.Brontide) {
×
4390
        defer s.wg.Done()
×
4391

×
4392
        pubBytes := p.IdentityKey().SerializeCompressed()
×
4393

×
4394
        // Avoid initializing peers while the server is exiting.
×
4395
        if s.Stopped() {
×
4396
                srvrLog.Infof("Server stopped, skipped initializing peer=%x",
×
4397
                        pubBytes)
×
4398
                return
×
4399
        }
×
4400

4401
        // Create a channel that will be used to signal a successful start of
4402
        // the link. This prevents the peer termination watcher from beginning
4403
        // its duty too early.
4404
        ready := make(chan struct{})
×
4405

×
4406
        // Before starting the peer, launch a goroutine to watch for the
×
4407
        // unexpected termination of this peer, which will ensure all resources
×
4408
        // are properly cleaned up, and re-establish persistent connections when
×
4409
        // necessary. The peer termination watcher will be short circuited if
×
4410
        // the peer is ever added to the ignorePeerTermination map, indicating
×
4411
        // that the server has already handled the removal of this peer.
×
4412
        s.wg.Add(1)
×
4413
        go s.peerTerminationWatcher(p, ready)
×
4414

×
4415
        // Start the peer! If an error occurs, we Disconnect the peer, which
×
4416
        // will unblock the peerTerminationWatcher.
×
4417
        if err := p.Start(); err != nil {
×
4418
                srvrLog.Warnf("Starting peer=%x got error: %v", pubBytes, err)
×
4419

×
4420
                p.Disconnect(fmt.Errorf("unable to start peer: %w", err))
×
4421
                return
×
4422
        }
×
4423

4424
        // Otherwise, signal to the peerTerminationWatcher that the peer startup
4425
        // was successful, and to begin watching the peer's wait group.
4426
        close(ready)
×
4427

×
4428
        s.mu.Lock()
×
4429
        defer s.mu.Unlock()
×
4430

×
4431
        // Check if there are listeners waiting for this peer to come online.
×
4432
        srvrLog.Debugf("Notifying that peer %v is online", p)
×
4433

×
4434
        // TODO(guggero): Do a proper conversion to a string everywhere, or use
×
4435
        // route.Vertex as the key type of peerConnectedListeners.
×
4436
        pubStr := string(pubBytes)
×
4437
        for _, peerChan := range s.peerConnectedListeners[pubStr] {
×
4438
                select {
×
4439
                case peerChan <- p:
×
4440
                case <-s.quit:
×
4441
                        return
×
4442
                }
4443
        }
4444
        delete(s.peerConnectedListeners, pubStr)
×
4445
}
4446

4447
// peerTerminationWatcher waits until a peer has been disconnected unexpectedly,
4448
// and then cleans up all resources allocated to the peer, notifies relevant
4449
// sub-systems of its demise, and finally handles re-connecting to the peer if
4450
// it's persistent. If the server intentionally disconnects a peer, it should
4451
// have a corresponding entry in the ignorePeerTermination map which will cause
4452
// the cleanup routine to exit early. The passed `ready` chan is used to
4453
// synchronize when WaitForDisconnect should begin watching on the peer's
4454
// waitgroup. The ready chan should only be signaled if the peer starts
4455
// successfully, otherwise the peer should be disconnected instead.
4456
//
4457
// NOTE: This MUST be launched as a goroutine.
4458
func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
×
4459
        defer s.wg.Done()
×
4460

×
4461
        p.WaitForDisconnect(ready)
×
4462

×
4463
        srvrLog.Debugf("Peer %v has been disconnected", p)
×
4464

×
4465
        // If the server is exiting then we can bail out early ourselves as all
×
4466
        // the other sub-systems will already be shutting down.
×
4467
        if s.Stopped() {
×
4468
                srvrLog.Debugf("Server quitting, exit early for peer %v", p)
×
4469
                return
×
4470
        }
×
4471

4472
        // Next, we'll cancel all pending funding reservations with this node.
4473
        // If we tried to initiate any funding flows that haven't yet finished,
4474
        // then we need to unlock those committed outputs so they're still
4475
        // available for use.
4476
        s.fundingMgr.CancelPeerReservations(p.PubKey())
×
4477

×
4478
        pubKey := p.IdentityKey()
×
4479

×
4480
        // We'll also inform the gossiper that this peer is no longer active,
×
4481
        // so we don't need to maintain sync state for it any longer.
×
4482
        s.authGossiper.PruneSyncState(p.PubKey())
×
4483

×
4484
        // Tell the switch to remove all links associated with this peer.
×
4485
        // Passing nil as the target link indicates that all links associated
×
4486
        // with this interface should be closed.
×
4487
        //
×
4488
        // TODO(roasbeef): instead add a PurgeInterfaceLinks function?
×
4489
        links, err := s.htlcSwitch.GetLinksByInterface(p.PubKey())
×
4490
        if err != nil && err != htlcswitch.ErrNoLinksFound {
×
4491
                srvrLog.Errorf("Unable to get channel links for %v: %v", p, err)
×
4492
        }
×
4493

4494
        for _, link := range links {
×
4495
                s.htlcSwitch.RemoveLink(link.ChanID())
×
4496
        }
×
4497

4498
        s.mu.Lock()
×
4499
        defer s.mu.Unlock()
×
4500

×
4501
        // If there were any notification requests for when this peer
×
4502
        // disconnected, we can trigger them now.
×
4503
        srvrLog.Debugf("Notifying that peer %v is offline", p)
×
4504
        pubStr := string(pubKey.SerializeCompressed())
×
4505
        for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
×
4506
                close(offlineChan)
×
4507
        }
×
4508
        delete(s.peerDisconnectedListeners, pubStr)
×
4509

×
4510
        // If the server has already removed this peer, we can short circuit the
×
4511
        // peer termination watcher and skip cleanup.
×
4512
        if _, ok := s.ignorePeerTermination[p]; ok {
×
4513
                delete(s.ignorePeerTermination, p)
×
4514

×
4515
                pubKey := p.PubKey()
×
4516
                pubStr := string(pubKey[:])
×
4517

×
4518
                // If a connection callback is present, we'll go ahead and
×
4519
                // execute it now that previous peer has fully disconnected. If
×
4520
                // the callback is not present, this likely implies the peer was
×
4521
                // purposefully disconnected via RPC, and that no reconnect
×
4522
                // should be attempted.
×
4523
                connCallback, ok := s.scheduledPeerConnection[pubStr]
×
4524
                if ok {
×
4525
                        delete(s.scheduledPeerConnection, pubStr)
×
4526
                        connCallback()
×
4527
                }
×
4528
                return
×
4529
        }
4530

4531
        // First, cleanup any remaining state the server has regarding the peer
4532
        // in question.
4533
        s.removePeer(p)
×
4534

×
4535
        // Next, check to see if this is a persistent peer or not.
×
4536
        if _, ok := s.persistentPeers[pubStr]; !ok {
×
4537
                return
×
4538
        }
×
4539

4540
        // Get the last address that we used to connect to the peer.
4541
        addrs := []net.Addr{
×
4542
                p.NetAddress().Address,
×
4543
        }
×
4544

×
4545
        // We'll ensure that we locate all the peers advertised addresses for
×
4546
        // reconnection purposes.
×
4547
        advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(pubKey)
×
4548
        switch {
×
4549
        // We found advertised addresses, so use them.
4550
        case err == nil:
×
4551
                addrs = advertisedAddrs
×
4552

4553
        // The peer doesn't have an advertised address.
4554
        case err == errNoAdvertisedAddr:
×
4555
                // If it is an outbound peer then we fall back to the existing
×
4556
                // peer address.
×
4557
                if !p.Inbound() {
×
4558
                        break
×
4559
                }
4560

4561
                // Fall back to the existing peer address if
4562
                // we're not accepting connections over Tor.
4563
                if s.torController == nil {
×
4564
                        break
×
4565
                }
4566

4567
                // If we are, the peer's address won't be known
4568
                // to us (we'll see a private address, which is
4569
                // the address used by our onion service to dial
4570
                // to lnd), so we don't have enough information
4571
                // to attempt a reconnect.
4572
                srvrLog.Debugf("Ignoring reconnection attempt "+
×
4573
                        "to inbound peer %v without "+
×
4574
                        "advertised address", p)
×
4575
                return
×
4576

4577
        // We came across an error retrieving an advertised
4578
        // address, log it, and fall back to the existing peer
4579
        // address.
4580
        default:
×
4581
                srvrLog.Errorf("Unable to retrieve advertised "+
×
4582
                        "address for node %x: %v", p.PubKey(),
×
4583
                        err)
×
4584
        }
4585

4586
        // Make an easy lookup map so that we can check if an address
4587
        // is already in the address list that we have stored for this peer.
4588
        existingAddrs := make(map[string]bool)
×
4589
        for _, addr := range s.persistentPeerAddrs[pubStr] {
×
4590
                existingAddrs[addr.String()] = true
×
4591
        }
×
4592

4593
        // Add any missing addresses for this peer to persistentPeerAddr.
4594
        for _, addr := range addrs {
×
4595
                if existingAddrs[addr.String()] {
×
4596
                        continue
×
4597
                }
4598

4599
                s.persistentPeerAddrs[pubStr] = append(
×
4600
                        s.persistentPeerAddrs[pubStr],
×
4601
                        &lnwire.NetAddress{
×
4602
                                IdentityKey: p.IdentityKey(),
×
4603
                                Address:     addr,
×
4604
                                ChainNet:    p.NetAddress().ChainNet,
×
4605
                        },
×
4606
                )
×
4607
        }
4608

4609
        // Record the computed backoff in the backoff map.
4610
        backoff := s.nextPeerBackoff(pubStr, p.StartTime())
×
4611
        s.persistentPeersBackoff[pubStr] = backoff
×
4612

×
4613
        // Initialize a retry canceller for this peer if one does not
×
4614
        // exist.
×
4615
        cancelChan, ok := s.persistentRetryCancels[pubStr]
×
4616
        if !ok {
×
4617
                cancelChan = make(chan struct{})
×
4618
                s.persistentRetryCancels[pubStr] = cancelChan
×
4619
        }
×
4620

4621
        // We choose not to wait group this go routine since the Connect
4622
        // call can stall for arbitrarily long if we shutdown while an
4623
        // outbound connection attempt is being made.
4624
        go func() {
×
4625
                srvrLog.Debugf("Scheduling connection re-establishment to "+
×
4626
                        "persistent peer %x in %s",
×
4627
                        p.IdentityKey().SerializeCompressed(), backoff)
×
4628

×
4629
                select {
×
4630
                case <-time.After(backoff):
×
4631
                case <-cancelChan:
×
4632
                        return
×
4633
                case <-s.quit:
×
4634
                        return
×
4635
                }
4636

4637
                srvrLog.Debugf("Attempting to re-establish persistent "+
×
4638
                        "connection to peer %x",
×
4639
                        p.IdentityKey().SerializeCompressed())
×
4640

×
4641
                s.connectToPersistentPeer(pubStr)
×
4642
        }()
4643
}
4644

4645
// connectToPersistentPeer uses all the stored addresses for a peer to attempt
4646
// to connect to the peer. It creates connection requests if there are
4647
// currently none for a given address and it removes old connection requests
4648
// if the associated address is no longer in the latest address list for the
4649
// peer.
4650
func (s *server) connectToPersistentPeer(pubKeyStr string) {
×
4651
        s.mu.Lock()
×
4652
        defer s.mu.Unlock()
×
4653

×
4654
        // Create an easy lookup map of the addresses we have stored for the
×
4655
        // peer. We will remove entries from this map if we have existing
×
4656
        // connection requests for the associated address and then any leftover
×
4657
        // entries will indicate which addresses we should create new
×
4658
        // connection requests for.
×
4659
        addrMap := make(map[string]*lnwire.NetAddress)
×
4660
        for _, addr := range s.persistentPeerAddrs[pubKeyStr] {
×
4661
                addrMap[addr.String()] = addr
×
4662
        }
×
4663

4664
        // Go through each of the existing connection requests and
4665
        // check if they correspond to the latest set of addresses. If
4666
        // there is a connection requests that does not use one of the latest
4667
        // advertised addresses then remove that connection request.
4668
        var updatedConnReqs []*connmgr.ConnReq
×
4669
        for _, connReq := range s.persistentConnReqs[pubKeyStr] {
×
4670
                lnAddr := connReq.Addr.(*lnwire.NetAddress).Address.String()
×
4671

×
4672
                switch _, ok := addrMap[lnAddr]; ok {
×
4673
                // If the existing connection request is using one of the
4674
                // latest advertised addresses for the peer then we add it to
4675
                // updatedConnReqs and remove the associated address from
4676
                // addrMap so that we don't recreate this connReq later on.
4677
                case true:
×
4678
                        updatedConnReqs = append(
×
4679
                                updatedConnReqs, connReq,
×
4680
                        )
×
4681
                        delete(addrMap, lnAddr)
×
4682

4683
                // If the existing connection request is using an address that
4684
                // is not one of the latest advertised addresses for the peer
4685
                // then we remove the connecting request from the connection
4686
                // manager.
4687
                case false:
×
4688
                        srvrLog.Info(
×
4689
                                "Removing conn req:", connReq.Addr.String(),
×
4690
                        )
×
4691
                        s.connMgr.Remove(connReq.ID())
×
4692
                }
4693
        }
4694

4695
        s.persistentConnReqs[pubKeyStr] = updatedConnReqs
×
4696

×
4697
        cancelChan, ok := s.persistentRetryCancels[pubKeyStr]
×
4698
        if !ok {
×
4699
                cancelChan = make(chan struct{})
×
4700
                s.persistentRetryCancels[pubKeyStr] = cancelChan
×
4701
        }
×
4702

4703
        // Any addresses left in addrMap are new ones that we have not made
4704
        // connection requests for. So create new connection requests for those.
4705
        // If there is more than one address in the address map, stagger the
4706
        // creation of the connection requests for those.
4707
        go func() {
×
4708
                ticker := time.NewTicker(multiAddrConnectionStagger)
×
4709
                defer ticker.Stop()
×
4710

×
4711
                for _, addr := range addrMap {
×
4712
                        // Send the persistent connection request to the
×
4713
                        // connection manager, saving the request itself so we
×
4714
                        // can cancel/restart the process as needed.
×
4715
                        connReq := &connmgr.ConnReq{
×
4716
                                Addr:      addr,
×
4717
                                Permanent: true,
×
4718
                        }
×
4719

×
4720
                        s.mu.Lock()
×
4721
                        s.persistentConnReqs[pubKeyStr] = append(
×
4722
                                s.persistentConnReqs[pubKeyStr], connReq,
×
4723
                        )
×
4724
                        s.mu.Unlock()
×
4725

×
4726
                        srvrLog.Debugf("Attempting persistent connection to "+
×
4727
                                "channel peer %v", addr)
×
4728

×
4729
                        go s.connMgr.Connect(connReq)
×
4730

×
4731
                        select {
×
4732
                        case <-s.quit:
×
4733
                                return
×
4734
                        case <-cancelChan:
×
4735
                                return
×
4736
                        case <-ticker.C:
×
4737
                        }
4738
                }
4739
        }()
4740
}
4741

4742
// removePeer removes the passed peer from the server's state of all active
4743
// peers.
4744
func (s *server) removePeer(p *peer.Brontide) {
×
4745
        if p == nil {
×
4746
                return
×
4747
        }
×
4748

4749
        srvrLog.Debugf("removing peer %v", p)
×
4750

×
4751
        // As the peer is now finished, ensure that the TCP connection is
×
4752
        // closed and all of its related goroutines have exited.
×
4753
        p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p))
×
4754

×
4755
        // If this peer had an active persistent connection request, remove it.
×
4756
        if p.ConnReq() != nil {
×
4757
                s.connMgr.Remove(p.ConnReq().ID())
×
4758
        }
×
4759

4760
        // Ignore deleting peers if we're shutting down.
4761
        if s.Stopped() {
×
4762
                return
×
4763
        }
×
4764

4765
        pKey := p.PubKey()
×
4766
        pubSer := pKey[:]
×
4767
        pubStr := string(pubSer)
×
4768

×
4769
        delete(s.peersByPub, pubStr)
×
4770

×
4771
        if p.Inbound() {
×
4772
                delete(s.inboundPeers, pubStr)
×
4773
        } else {
×
4774
                delete(s.outboundPeers, pubStr)
×
4775
        }
×
4776

4777
        // Copy the peer's error buffer across to the server if it has any items
4778
        // in it so that we can restore peer errors across connections.
4779
        if p.ErrorBuffer().Total() > 0 {
×
4780
                s.peerErrors[pubStr] = p.ErrorBuffer()
×
4781
        }
×
4782

4783
        // Inform the peer notifier of a peer offline event so that it can be
4784
        // reported to clients listening for peer events.
4785
        var pubKey [33]byte
×
4786
        copy(pubKey[:], pubSer)
×
4787

×
4788
        s.peerNotifier.NotifyPeerOffline(pubKey)
×
4789
}
4790

4791
// ConnectToPeer requests that the server connect to a Lightning Network peer
4792
// at the specified address. This function will *block* until either a
4793
// connection is established, or the initial handshake process fails.
4794
//
4795
// NOTE: This function is safe for concurrent access.
4796
func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
4797
        perm bool, timeout time.Duration) error {
×
4798

×
4799
        targetPub := string(addr.IdentityKey.SerializeCompressed())
×
4800

×
4801
        // Acquire mutex, but use explicit unlocking instead of defer for
×
4802
        // better granularity.  In certain conditions, this method requires
×
4803
        // making an outbound connection to a remote peer, which requires the
×
4804
        // lock to be released, and subsequently reacquired.
×
4805
        s.mu.Lock()
×
4806

×
4807
        // Ensure we're not already connected to this peer.
×
4808
        peer, err := s.findPeerByPubStr(targetPub)
×
4809
        if err == nil {
×
4810
                s.mu.Unlock()
×
4811
                return &errPeerAlreadyConnected{peer: peer}
×
4812
        }
×
4813

4814
        // Peer was not found, continue to pursue connection with peer.
4815

4816
        // If there's already a pending connection request for this pubkey,
4817
        // then we ignore this request to ensure we don't create a redundant
4818
        // connection.
4819
        if reqs, ok := s.persistentConnReqs[targetPub]; ok {
×
4820
                srvrLog.Warnf("Already have %d persistent connection "+
×
4821
                        "requests for %v, connecting anyway.", len(reqs), addr)
×
4822
        }
×
4823

4824
        // If there's not already a pending or active connection to this node,
4825
        // then instruct the connection manager to attempt to establish a
4826
        // persistent connection to the peer.
4827
        srvrLog.Debugf("Connecting to %v", addr)
×
4828
        if perm {
×
4829
                connReq := &connmgr.ConnReq{
×
4830
                        Addr:      addr,
×
4831
                        Permanent: true,
×
4832
                }
×
4833

×
4834
                // Since the user requested a permanent connection, we'll set
×
4835
                // the entry to true which will tell the server to continue
×
4836
                // reconnecting even if the number of channels with this peer is
×
4837
                // zero.
×
4838
                s.persistentPeers[targetPub] = true
×
4839
                if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
×
4840
                        s.persistentPeersBackoff[targetPub] = s.cfg.MinBackoff
×
4841
                }
×
4842
                s.persistentConnReqs[targetPub] = append(
×
4843
                        s.persistentConnReqs[targetPub], connReq,
×
4844
                )
×
4845
                s.mu.Unlock()
×
4846

×
4847
                go s.connMgr.Connect(connReq)
×
4848

×
4849
                return nil
×
4850
        }
4851
        s.mu.Unlock()
×
4852

×
4853
        // If we're not making a persistent connection, then we'll attempt to
×
4854
        // connect to the target peer. If the we can't make the connection, or
×
4855
        // the crypto negotiation breaks down, then return an error to the
×
4856
        // caller.
×
4857
        errChan := make(chan error, 1)
×
4858
        s.connectToPeer(addr, errChan, timeout)
×
4859

×
4860
        select {
×
4861
        case err := <-errChan:
×
4862
                return err
×
4863
        case <-s.quit:
×
4864
                return ErrServerShuttingDown
×
4865
        }
4866
}
4867

4868
// connectToPeer establishes a connection to a remote peer. errChan is used to
4869
// notify the caller if the connection attempt has failed. Otherwise, it will be
4870
// closed.
4871
func (s *server) connectToPeer(addr *lnwire.NetAddress,
4872
        errChan chan<- error, timeout time.Duration) {
×
4873

×
4874
        conn, err := brontide.Dial(
×
4875
                s.identityECDH, addr, timeout, s.cfg.net.Dial,
×
4876
        )
×
4877
        if err != nil {
×
4878
                srvrLog.Errorf("Unable to connect to %v: %v", addr, err)
×
4879
                select {
×
4880
                case errChan <- err:
×
4881
                case <-s.quit:
×
4882
                }
4883
                return
×
4884
        }
4885

4886
        close(errChan)
×
4887

×
4888
        srvrLog.Tracef("Brontide dialer made local=%v, remote=%v",
×
4889
                conn.LocalAddr(), conn.RemoteAddr())
×
4890

×
4891
        s.OutboundPeerConnected(nil, conn)
×
4892
}
4893

4894
// DisconnectPeer sends the request to server to close the connection with peer
4895
// identified by public key.
4896
//
4897
// NOTE: This function is safe for concurrent access.
4898
func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
×
4899
        pubBytes := pubKey.SerializeCompressed()
×
4900
        pubStr := string(pubBytes)
×
4901

×
4902
        s.mu.Lock()
×
4903
        defer s.mu.Unlock()
×
4904

×
4905
        // Check that were actually connected to this peer. If not, then we'll
×
4906
        // exit in an error as we can't disconnect from a peer that we're not
×
4907
        // currently connected to.
×
4908
        peer, err := s.findPeerByPubStr(pubStr)
×
4909
        if err == ErrPeerNotConnected {
×
4910
                return fmt.Errorf("peer %x is not connected", pubBytes)
×
4911
        }
×
4912

4913
        srvrLog.Infof("Disconnecting from %v", peer)
×
4914

×
4915
        s.cancelConnReqs(pubStr, nil)
×
4916

×
4917
        // If this peer was formerly a persistent connection, then we'll remove
×
4918
        // them from this map so we don't attempt to re-connect after we
×
4919
        // disconnect.
×
4920
        delete(s.persistentPeers, pubStr)
×
4921
        delete(s.persistentPeersBackoff, pubStr)
×
4922

×
4923
        // Remove the peer by calling Disconnect. Previously this was done with
×
4924
        // removePeer, which bypassed the peerTerminationWatcher.
×
4925
        peer.Disconnect(fmt.Errorf("server: DisconnectPeer called"))
×
4926

×
4927
        return nil
×
4928
}
4929

4930
// OpenChannel sends a request to the server to open a channel to the specified
4931
// peer identified by nodeKey with the passed channel funding parameters.
4932
//
4933
// NOTE: This function is safe for concurrent access.
4934
func (s *server) OpenChannel(
4935
        req *funding.InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) {
×
4936

×
4937
        // The updateChan will have a buffer of 2, since we expect a ChanPending
×
4938
        // + a ChanOpen update, and we want to make sure the funding process is
×
4939
        // not blocked if the caller is not reading the updates.
×
4940
        req.Updates = make(chan *lnrpc.OpenStatusUpdate, 2)
×
4941
        req.Err = make(chan error, 1)
×
4942

×
4943
        // First attempt to locate the target peer to open a channel with, if
×
4944
        // we're unable to locate the peer then this request will fail.
×
4945
        pubKeyBytes := req.TargetPubkey.SerializeCompressed()
×
4946
        s.mu.RLock()
×
4947
        peer, ok := s.peersByPub[string(pubKeyBytes)]
×
4948
        if !ok {
×
4949
                s.mu.RUnlock()
×
4950

×
4951
                req.Err <- fmt.Errorf("peer %x is not online", pubKeyBytes)
×
4952
                return req.Updates, req.Err
×
4953
        }
×
4954
        req.Peer = peer
×
4955
        s.mu.RUnlock()
×
4956

×
4957
        // We'll wait until the peer is active before beginning the channel
×
4958
        // opening process.
×
4959
        select {
×
4960
        case <-peer.ActiveSignal():
×
4961
        case <-peer.QuitSignal():
×
4962
                req.Err <- fmt.Errorf("peer %x disconnected", pubKeyBytes)
×
4963
                return req.Updates, req.Err
×
4964
        case <-s.quit:
×
4965
                req.Err <- ErrServerShuttingDown
×
4966
                return req.Updates, req.Err
×
4967
        }
4968

4969
        // If the fee rate wasn't specified at this point we fail the funding
4970
        // because of the missing fee rate information. The caller of the
4971
        // `OpenChannel` method needs to make sure that default values for the
4972
        // fee rate are set beforehand.
4973
        if req.FundingFeePerKw == 0 {
×
4974
                req.Err <- fmt.Errorf("no FundingFeePerKw specified for " +
×
4975
                        "the channel opening transaction")
×
4976

×
4977
                return req.Updates, req.Err
×
4978
        }
×
4979

4980
        // Spawn a goroutine to send the funding workflow request to the funding
4981
        // manager. This allows the server to continue handling queries instead
4982
        // of blocking on this request which is exported as a synchronous
4983
        // request to the outside world.
4984
        go s.fundingMgr.InitFundingWorkflow(req)
×
4985

×
4986
        return req.Updates, req.Err
×
4987
}
4988

4989
// Peers returns a slice of all active peers.
4990
//
4991
// NOTE: This function is safe for concurrent access.
4992
func (s *server) Peers() []*peer.Brontide {
×
4993
        s.mu.RLock()
×
4994
        defer s.mu.RUnlock()
×
4995

×
4996
        peers := make([]*peer.Brontide, 0, len(s.peersByPub))
×
4997
        for _, peer := range s.peersByPub {
×
4998
                peers = append(peers, peer)
×
4999
        }
×
5000

5001
        return peers
×
5002
}
5003

5004
// computeNextBackoff uses a truncated exponential backoff to compute the next
5005
// backoff using the value of the exiting backoff. The returned duration is
5006
// randomized in either direction by 1/20 to prevent tight loops from
5007
// stabilizing.
5008
func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration {
×
5009
        // Double the current backoff, truncating if it exceeds our maximum.
×
5010
        nextBackoff := 2 * currBackoff
×
5011
        if nextBackoff > maxBackoff {
×
5012
                nextBackoff = maxBackoff
×
5013
        }
×
5014

5015
        // Using 1/10 of our duration as a margin, compute a random offset to
5016
        // avoid the nodes entering connection cycles.
5017
        margin := nextBackoff / 10
×
5018

×
5019
        var wiggle big.Int
×
5020
        wiggle.SetUint64(uint64(margin))
×
5021
        if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
×
5022
                // Randomizing is not mission critical, so we'll just return the
×
5023
                // current backoff.
×
5024
                return nextBackoff
×
5025
        }
×
5026

5027
        // Otherwise add in our wiggle, but subtract out half of the margin so
5028
        // that the backoff can tweaked by 1/20 in either direction.
5029
        return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
×
5030
}
5031

5032
// errNoAdvertisedAddr is an error returned when we attempt to retrieve the
5033
// advertised address of a node, but they don't have one.
5034
var errNoAdvertisedAddr = errors.New("no advertised address found")
5035

5036
// fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a node.
5037
func (s *server) fetchNodeAdvertisedAddrs(pub *btcec.PublicKey) ([]net.Addr, error) {
×
5038
        vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed())
×
5039
        if err != nil {
×
5040
                return nil, err
×
5041
        }
×
5042

5043
        node, err := s.graphDB.FetchLightningNode(vertex)
×
5044
        if err != nil {
×
5045
                return nil, err
×
5046
        }
×
5047

5048
        if len(node.Addresses) == 0 {
×
5049
                return nil, errNoAdvertisedAddr
×
5050
        }
×
5051

5052
        return node.Addresses, nil
×
5053
}
5054

5055
// fetchLastChanUpdate returns a function which is able to retrieve our latest
5056
// channel update for a target channel.
5057
func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
5058
        *lnwire.ChannelUpdate1, error) {
×
5059

×
5060
        ourPubKey := s.identityECDH.PubKey().SerializeCompressed()
×
5061
        return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate1, error) {
×
5062
                info, edge1, edge2, err := s.graphBuilder.GetChannelByID(cid)
×
5063
                if err != nil {
×
5064
                        return nil, err
×
5065
                }
×
5066

5067
                return netann.ExtractChannelUpdate(
×
5068
                        ourPubKey[:], info, edge1, edge2,
×
5069
                )
×
5070
        }
5071
}
5072

5073
// applyChannelUpdate applies the channel update to the different sub-systems of
5074
// the server. The useAlias boolean denotes whether or not to send an alias in
5075
// place of the real SCID.
5076
func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate1,
5077
        op *wire.OutPoint, useAlias bool) error {
×
5078

×
5079
        var (
×
5080
                peerAlias    *lnwire.ShortChannelID
×
5081
                defaultAlias lnwire.ShortChannelID
×
5082
        )
×
5083

×
5084
        chanID := lnwire.NewChanIDFromOutPoint(*op)
×
5085

×
5086
        // Fetch the peer's alias from the lnwire.ChannelID so it can be used
×
5087
        // in the ChannelUpdate if it hasn't been announced yet.
×
5088
        if useAlias {
×
5089
                foundAlias, _ := s.aliasMgr.GetPeerAlias(chanID)
×
5090
                if foundAlias != defaultAlias {
×
5091
                        peerAlias = &foundAlias
×
5092
                }
×
5093
        }
5094

5095
        errChan := s.authGossiper.ProcessLocalAnnouncement(
×
5096
                update, discovery.RemoteAlias(peerAlias),
×
5097
        )
×
5098
        select {
×
5099
        case err := <-errChan:
×
5100
                return err
×
5101
        case <-s.quit:
×
5102
                return ErrServerShuttingDown
×
5103
        }
5104
}
5105

5106
// SendCustomMessage sends a custom message to the peer with the specified
5107
// pubkey.
5108
func (s *server) SendCustomMessage(peerPub [33]byte, msgType lnwire.MessageType,
5109
        data []byte) error {
×
5110

×
5111
        peer, err := s.FindPeerByPubStr(string(peerPub[:]))
×
5112
        if err != nil {
×
5113
                return err
×
5114
        }
×
5115

5116
        // We'll wait until the peer is active.
5117
        select {
×
5118
        case <-peer.ActiveSignal():
×
5119
        case <-peer.QuitSignal():
×
5120
                return fmt.Errorf("peer %x disconnected", peerPub)
×
5121
        case <-s.quit:
×
5122
                return ErrServerShuttingDown
×
5123
        }
5124

5125
        msg, err := lnwire.NewCustom(msgType, data)
×
5126
        if err != nil {
×
5127
                return err
×
5128
        }
×
5129

5130
        // Send the message as low-priority. For now we assume that all
5131
        // application-defined message are low priority.
5132
        return peer.SendMessageLazy(true, msg)
×
5133
}
5134

5135
// newSweepPkScriptGen creates closure that generates a new public key script
5136
// which should be used to sweep any funds into the on-chain wallet.
5137
// Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash
5138
// (p2wkh) output.
5139
func newSweepPkScriptGen(
5140
        wallet lnwallet.WalletController,
5141
        netParams *chaincfg.Params) func() fn.Result[lnwallet.AddrWithKey] {
×
5142

×
5143
        return func() fn.Result[lnwallet.AddrWithKey] {
×
5144
                sweepAddr, err := wallet.NewAddress(
×
5145
                        lnwallet.TaprootPubkey, false,
×
5146
                        lnwallet.DefaultAccountName,
×
5147
                )
×
5148
                if err != nil {
×
5149
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5150
                }
×
5151

5152
                addr, err := txscript.PayToAddrScript(sweepAddr)
×
5153
                if err != nil {
×
5154
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5155
                }
×
5156

5157
                internalKeyDesc, err := lnwallet.InternalKeyForAddr(
×
5158
                        wallet, netParams, addr,
×
5159
                )
×
5160
                if err != nil {
×
5161
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5162
                }
×
5163

5164
                return fn.Ok(lnwallet.AddrWithKey{
×
5165
                        DeliveryAddress: addr,
×
5166
                        InternalKey:     internalKeyDesc,
×
5167
                })
×
5168
        }
5169
}
5170

5171
// shouldPeerBootstrap returns true if we should attempt to perform peer
5172
// bootstrapping to actively seek our peers using the set of active network
5173
// bootstrappers.
5174
func shouldPeerBootstrap(cfg *Config) bool {
6✔
5175
        isSimnet := cfg.Bitcoin.SimNet
6✔
5176
        isSignet := cfg.Bitcoin.SigNet
6✔
5177
        isRegtest := cfg.Bitcoin.RegTest
6✔
5178
        isDevNetwork := isSimnet || isSignet || isRegtest
6✔
5179

6✔
5180
        // TODO(yy): remove the check on simnet/regtest such that the itest is
6✔
5181
        // covering the bootstrapping process.
6✔
5182
        return !cfg.NoNetBootstrap && !isDevNetwork
6✔
5183
}
6✔
5184

5185
// fetchClosedChannelSCIDs returns a set of SCIDs that have their force closing
5186
// finished.
5187
func (s *server) fetchClosedChannelSCIDs() map[lnwire.ShortChannelID]struct{} {
×
5188
        // Get a list of closed channels.
×
5189
        channels, err := s.chanStateDB.FetchClosedChannels(false)
×
5190
        if err != nil {
×
5191
                srvrLog.Errorf("Failed to fetch closed channels: %v", err)
×
5192
                return nil
×
5193
        }
×
5194

5195
        // Save the SCIDs in a map.
5196
        closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels))
×
5197
        for _, c := range channels {
×
5198
                // If the channel is not pending, its FC has been finalized.
×
5199
                if !c.IsPending {
×
5200
                        closedSCIDs[c.ShortChanID] = struct{}{}
×
5201
                }
×
5202
        }
5203

5204
        // Double check whether the reported closed channel has indeed finished
5205
        // closing.
5206
        //
5207
        // NOTE: There are misalignments regarding when a channel's FC is
5208
        // marked as finalized. We double check the pending channels to make
5209
        // sure the returned SCIDs are indeed terminated.
5210
        //
5211
        // TODO(yy): fix the misalignments in `FetchClosedChannels`.
5212
        pendings, err := s.chanStateDB.FetchPendingChannels()
×
5213
        if err != nil {
×
5214
                srvrLog.Errorf("Failed to fetch pending channels: %v", err)
×
5215
                return nil
×
5216
        }
×
5217

5218
        for _, c := range pendings {
×
5219
                if _, ok := closedSCIDs[c.ShortChannelID]; !ok {
×
5220
                        continue
×
5221
                }
5222

5223
                // If the channel is still reported as pending, remove it from
5224
                // the map.
5225
                delete(closedSCIDs, c.ShortChannelID)
×
5226

×
5227
                srvrLog.Warnf("Channel=%v is prematurely marked as finalized",
×
5228
                        c.ShortChannelID)
×
5229
        }
5230

5231
        return closedSCIDs
×
5232
}
5233

5234
// getStartingBeat returns the current beat. This is used during the startup to
5235
// initialize blockbeat consumers.
5236
func (s *server) getStartingBeat() (*chainio.Beat, error) {
×
5237
        // beat is the current blockbeat.
×
5238
        var beat *chainio.Beat
×
5239

×
5240
        // We should get a notification with the current best block immediately
×
5241
        // by passing a nil block.
×
5242
        blockEpochs, err := s.cc.ChainNotifier.RegisterBlockEpochNtfn(nil)
×
5243
        if err != nil {
×
5244
                return beat, fmt.Errorf("register block epoch ntfn: %w", err)
×
5245
        }
×
5246
        defer blockEpochs.Cancel()
×
5247

×
5248
        // We registered for the block epochs with a nil request. The notifier
×
5249
        // should send us the current best block immediately. So we need to
×
5250
        // wait for it here because we need to know the current best height.
×
5251
        select {
×
5252
        case bestBlock := <-blockEpochs.Epochs:
×
5253
                srvrLog.Infof("Received initial block %v at height %d",
×
5254
                        bestBlock.Hash, bestBlock.Height)
×
5255

×
5256
                // Update the current blockbeat.
×
5257
                beat = chainio.NewBeat(*bestBlock)
×
5258

5259
        case <-s.quit:
×
5260
                srvrLog.Debug("LND shutting down")
×
5261
        }
5262

5263
        return beat, nil
×
5264
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc