• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.29
/server.go
1
package lnd
2

3
import (
4
        "bytes"
5
        "context"
6
        "crypto/rand"
7
        "encoding/hex"
8
        "fmt"
9
        "math/big"
10
        prand "math/rand"
11
        "net"
12
        "strconv"
13
        "strings"
14
        "sync"
15
        "sync/atomic"
16
        "time"
17

18
        "github.com/btcsuite/btcd/btcec/v2"
19
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
20
        "github.com/btcsuite/btcd/btcutil"
21
        "github.com/btcsuite/btcd/chaincfg"
22
        "github.com/btcsuite/btcd/chaincfg/chainhash"
23
        "github.com/btcsuite/btcd/connmgr"
24
        "github.com/btcsuite/btcd/txscript"
25
        "github.com/btcsuite/btcd/wire"
26
        "github.com/go-errors/errors"
27
        sphinx "github.com/lightningnetwork/lightning-onion"
28
        "github.com/lightningnetwork/lnd/aliasmgr"
29
        "github.com/lightningnetwork/lnd/autopilot"
30
        "github.com/lightningnetwork/lnd/brontide"
31
        "github.com/lightningnetwork/lnd/chainreg"
32
        "github.com/lightningnetwork/lnd/chanacceptor"
33
        "github.com/lightningnetwork/lnd/chanbackup"
34
        "github.com/lightningnetwork/lnd/chanfitness"
35
        "github.com/lightningnetwork/lnd/channeldb"
36
        "github.com/lightningnetwork/lnd/channelnotifier"
37
        "github.com/lightningnetwork/lnd/clock"
38
        "github.com/lightningnetwork/lnd/cluster"
39
        "github.com/lightningnetwork/lnd/contractcourt"
40
        "github.com/lightningnetwork/lnd/discovery"
41
        "github.com/lightningnetwork/lnd/feature"
42
        "github.com/lightningnetwork/lnd/fn/v2"
43
        "github.com/lightningnetwork/lnd/funding"
44
        "github.com/lightningnetwork/lnd/graph"
45
        graphdb "github.com/lightningnetwork/lnd/graph/db"
46
        "github.com/lightningnetwork/lnd/graph/db/models"
47
        "github.com/lightningnetwork/lnd/graph/graphsession"
48
        "github.com/lightningnetwork/lnd/healthcheck"
49
        "github.com/lightningnetwork/lnd/htlcswitch"
50
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
51
        "github.com/lightningnetwork/lnd/input"
52
        "github.com/lightningnetwork/lnd/invoices"
53
        "github.com/lightningnetwork/lnd/keychain"
54
        "github.com/lightningnetwork/lnd/kvdb"
55
        "github.com/lightningnetwork/lnd/lncfg"
56
        "github.com/lightningnetwork/lnd/lnencrypt"
57
        "github.com/lightningnetwork/lnd/lnpeer"
58
        "github.com/lightningnetwork/lnd/lnrpc"
59
        "github.com/lightningnetwork/lnd/lnrpc/routerrpc"
60
        "github.com/lightningnetwork/lnd/lnwallet"
61
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
62
        "github.com/lightningnetwork/lnd/lnwallet/chanfunding"
63
        "github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
64
        "github.com/lightningnetwork/lnd/lnwire"
65
        "github.com/lightningnetwork/lnd/nat"
66
        "github.com/lightningnetwork/lnd/netann"
67
        "github.com/lightningnetwork/lnd/peer"
68
        "github.com/lightningnetwork/lnd/peernotifier"
69
        "github.com/lightningnetwork/lnd/pool"
70
        "github.com/lightningnetwork/lnd/queue"
71
        "github.com/lightningnetwork/lnd/routing"
72
        "github.com/lightningnetwork/lnd/routing/localchans"
73
        "github.com/lightningnetwork/lnd/routing/route"
74
        "github.com/lightningnetwork/lnd/subscribe"
75
        "github.com/lightningnetwork/lnd/sweep"
76
        "github.com/lightningnetwork/lnd/ticker"
77
        "github.com/lightningnetwork/lnd/tor"
78
        "github.com/lightningnetwork/lnd/walletunlocker"
79
        "github.com/lightningnetwork/lnd/watchtower/blob"
80
        "github.com/lightningnetwork/lnd/watchtower/wtclient"
81
        "github.com/lightningnetwork/lnd/watchtower/wtpolicy"
82
        "github.com/lightningnetwork/lnd/watchtower/wtserver"
83
)
84

85
const (
86
        // defaultMinPeers is the minimum number of peers nodes should always be
87
        // connected to.
88
        defaultMinPeers = 3
89

90
        // defaultStableConnDuration is a floor under which all reconnection
91
        // attempts will apply exponential randomized backoff. Connections
92
        // durations exceeding this value will be eligible to have their
93
        // backoffs reduced.
94
        defaultStableConnDuration = 10 * time.Minute
95

96
        // numInstantInitReconnect specifies how many persistent peers we should
97
        // always attempt outbound connections to immediately. After this value
98
        // is surpassed, the remaining peers will be randomly delayed using
99
        // maxInitReconnectDelay.
100
        numInstantInitReconnect = 10
101

102
        // maxInitReconnectDelay specifies the maximum delay in seconds we will
103
        // apply in attempting to reconnect to persistent peers on startup. The
104
        // value used or a particular peer will be chosen between 0s and this
105
        // value.
106
        maxInitReconnectDelay = 30
107

108
        // multiAddrConnectionStagger is the number of seconds to wait between
109
        // attempting to a peer with each of its advertised addresses.
110
        multiAddrConnectionStagger = 10 * time.Second
111
)
112

113
var (
114
        // ErrPeerNotConnected signals that the server has no connection to the
115
        // given peer.
116
        ErrPeerNotConnected = errors.New("peer is not connected")
117

118
        // ErrServerNotActive indicates that the server has started but hasn't
119
        // fully finished the startup process.
120
        ErrServerNotActive = errors.New("server is still in the process of " +
121
                "starting")
122

123
        // ErrServerShuttingDown indicates that the server is in the process of
124
        // gracefully exiting.
125
        ErrServerShuttingDown = errors.New("server is shutting down")
126

127
        // MaxFundingAmount is a soft-limit of the maximum channel size
128
        // currently accepted within the Lightning Protocol. This is
129
        // defined in BOLT-0002, and serves as an initial precautionary limit
130
        // while implementations are battle tested in the real world.
131
        //
132
        // At the moment, this value depends on which chain is active. It is set
133
        // to the value under the Bitcoin chain as default.
134
        //
135
        // TODO(roasbeef): add command line param to modify.
136
        MaxFundingAmount = funding.MaxBtcFundingAmount
137

138
        // EndorsementExperimentEnd is the time after which nodes should stop
139
        // propagating experimental endorsement signals.
140
        //
141
        // Per blip04: January 1, 2026 12:00:00 AM UTC in unix seconds.
142
        EndorsementExperimentEnd = time.Unix(1767225600, 0)
143
)
144

145
// errPeerAlreadyConnected is an error returned by the server when we're
146
// commanded to connect to a peer, but they're already connected.
147
type errPeerAlreadyConnected struct {
148
        peer *peer.Brontide
149
}
150

151
// Error returns the human readable version of this error type.
152
//
153
// NOTE: Part of the error interface.
154
func (e *errPeerAlreadyConnected) Error() string {
×
155
        return fmt.Sprintf("already connected to peer: %v", e.peer)
×
156
}
×
157

158
// server is the main server of the Lightning Network Daemon. The server houses
159
// global state pertaining to the wallet, database, and the rpcserver.
160
// Additionally, the server is also used as a central messaging bus to interact
161
// with any of its companion objects.
162
type server struct {
163
        active   int32 // atomic
164
        stopping int32 // atomic
165

166
        start sync.Once
167
        stop  sync.Once
168

169
        cfg *Config
170

171
        implCfg *ImplementationCfg
172

173
        // identityECDH is an ECDH capable wrapper for the private key used
174
        // to authenticate any incoming connections.
175
        identityECDH keychain.SingleKeyECDH
176

177
        // identityKeyLoc is the key locator for the above wrapped identity key.
178
        identityKeyLoc keychain.KeyLocator
179

180
        // nodeSigner is an implementation of the MessageSigner implementation
181
        // that's backed by the identity private key of the running lnd node.
182
        nodeSigner *netann.NodeSigner
183

184
        chanStatusMgr *netann.ChanStatusManager
185

186
        // listenAddrs is the list of addresses the server is currently
187
        // listening on.
188
        listenAddrs []net.Addr
189

190
        // torController is a client that will communicate with a locally
191
        // running Tor server. This client will handle initiating and
192
        // authenticating the connection to the Tor server, automatically
193
        // creating and setting up onion services, etc.
194
        torController *tor.Controller
195

196
        // natTraversal is the specific NAT traversal technique used to
197
        // automatically set up port forwarding rules in order to advertise to
198
        // the network that the node is accepting inbound connections.
199
        natTraversal nat.Traversal
200

201
        // lastDetectedIP is the last IP detected by the NAT traversal technique
202
        // above. This IP will be watched periodically in a goroutine in order
203
        // to handle dynamic IP changes.
204
        lastDetectedIP net.IP
205

206
        mu sync.RWMutex
207

208
        // peersByPub is a map of the active peers.
209
        //
210
        // NOTE: The key used here is the raw bytes of the peer's public key to
211
        // string conversion, which means it cannot be printed using `%s` as it
212
        // will just print the binary.
213
        //
214
        // TODO(yy): Use the hex string instead.
215
        peersByPub map[string]*peer.Brontide
216

217
        inboundPeers  map[string]*peer.Brontide
218
        outboundPeers map[string]*peer.Brontide
219

220
        peerConnectedListeners    map[string][]chan<- lnpeer.Peer
221
        peerDisconnectedListeners map[string][]chan<- struct{}
222

223
        // TODO(yy): the Brontide.Start doesn't know this value, which means it
224
        // will continue to send messages even if there are no active channels
225
        // and the value below is false. Once it's pruned, all its connections
226
        // will be closed, thus the Brontide.Start will return an error.
227
        persistentPeers        map[string]bool
228
        persistentPeersBackoff map[string]time.Duration
229
        persistentPeerAddrs    map[string][]*lnwire.NetAddress
230
        persistentConnReqs     map[string][]*connmgr.ConnReq
231
        persistentRetryCancels map[string]chan struct{}
232

233
        // peerErrors keeps a set of peer error buffers for peers that have
234
        // disconnected from us. This allows us to track historic peer errors
235
        // over connections. The string of the peer's compressed pubkey is used
236
        // as a key for this map.
237
        peerErrors map[string]*queue.CircularBuffer
238

239
        // ignorePeerTermination tracks peers for which the server has initiated
240
        // a disconnect. Adding a peer to this map causes the peer termination
241
        // watcher to short circuit in the event that peers are purposefully
242
        // disconnected.
243
        ignorePeerTermination map[*peer.Brontide]struct{}
244

245
        // scheduledPeerConnection maps a pubkey string to a callback that
246
        // should be executed in the peerTerminationWatcher the prior peer with
247
        // the same pubkey exits.  This allows the server to wait until the
248
        // prior peer has cleaned up successfully, before adding the new peer
249
        // intended to replace it.
250
        scheduledPeerConnection map[string]func()
251

252
        // pongBuf is a shared pong reply buffer we'll use across all active
253
        // peer goroutines. We know the max size of a pong message
254
        // (lnwire.MaxPongBytes), so we can allocate this ahead of time, and
255
        // avoid allocations each time we need to send a pong message.
256
        pongBuf []byte
257

258
        cc *chainreg.ChainControl
259

260
        fundingMgr *funding.Manager
261

262
        graphDB *graphdb.ChannelGraph
263

264
        chanStateDB *channeldb.ChannelStateDB
265

266
        addrSource channeldb.AddrSource
267

268
        // miscDB is the DB that contains all "other" databases within the main
269
        // channel DB that haven't been separated out yet.
270
        miscDB *channeldb.DB
271

272
        invoicesDB invoices.InvoiceDB
273

274
        aliasMgr *aliasmgr.Manager
275

276
        htlcSwitch *htlcswitch.Switch
277

278
        interceptableSwitch *htlcswitch.InterceptableSwitch
279

280
        invoices *invoices.InvoiceRegistry
281

282
        invoiceHtlcModifier *invoices.HtlcModificationInterceptor
283

284
        channelNotifier *channelnotifier.ChannelNotifier
285

286
        peerNotifier *peernotifier.PeerNotifier
287

288
        htlcNotifier *htlcswitch.HtlcNotifier
289

290
        witnessBeacon contractcourt.WitnessBeacon
291

292
        breachArbitrator *contractcourt.BreachArbitrator
293

294
        missionController *routing.MissionController
295
        defaultMC         *routing.MissionControl
296

297
        graphBuilder *graph.Builder
298

299
        chanRouter *routing.ChannelRouter
300

301
        controlTower routing.ControlTower
302

303
        authGossiper *discovery.AuthenticatedGossiper
304

305
        localChanMgr *localchans.Manager
306

307
        utxoNursery *contractcourt.UtxoNursery
308

309
        sweeper *sweep.UtxoSweeper
310

311
        chainArb *contractcourt.ChainArbitrator
312

313
        sphinx *hop.OnionProcessor
314

315
        towerClientMgr *wtclient.Manager
316

317
        connMgr *connmgr.ConnManager
318

319
        sigPool *lnwallet.SigPool
320

321
        writePool *pool.Write
322

323
        readPool *pool.Read
324

325
        tlsManager *TLSManager
326

327
        // featureMgr dispatches feature vectors for various contexts within the
328
        // daemon.
329
        featureMgr *feature.Manager
330

331
        // currentNodeAnn is the node announcement that has been broadcast to
332
        // the network upon startup, if the attributes of the node (us) has
333
        // changed since last start.
334
        currentNodeAnn *lnwire.NodeAnnouncement
335

336
        // chansToRestore is the set of channels that upon starting, the server
337
        // should attempt to restore/recover.
338
        chansToRestore walletunlocker.ChannelsToRecover
339

340
        // chanSubSwapper is a sub-system that will ensure our on-disk channel
341
        // backups are consistent at all times. It interacts with the
342
        // channelNotifier to be notified of newly opened and closed channels.
343
        chanSubSwapper *chanbackup.SubSwapper
344

345
        // chanEventStore tracks the behaviour of channels and their remote peers to
346
        // provide insights into their health and performance.
347
        chanEventStore *chanfitness.ChannelEventStore
348

349
        hostAnn *netann.HostAnnouncer
350

351
        // livenessMonitor monitors that lnd has access to critical resources.
352
        livenessMonitor *healthcheck.Monitor
353

354
        customMessageServer *subscribe.Server
355

356
        // txPublisher is a publisher with fee-bumping capability.
357
        txPublisher *sweep.TxPublisher
358

359
        quit chan struct{}
360

361
        wg sync.WaitGroup
362
}
363

364
// updatePersistentPeerAddrs subscribes to topology changes and stores
365
// advertised addresses for any NodeAnnouncements from our persisted peers.
366
func (s *server) updatePersistentPeerAddrs() error {
×
367
        graphSub, err := s.graphBuilder.SubscribeTopology()
×
368
        if err != nil {
×
369
                return err
×
370
        }
×
371

372
        s.wg.Add(1)
×
373
        go func() {
×
374
                defer func() {
×
375
                        graphSub.Cancel()
×
376
                        s.wg.Done()
×
377
                }()
×
378

379
                for {
×
380
                        select {
×
381
                        case <-s.quit:
×
382
                                return
×
383

384
                        case topChange, ok := <-graphSub.TopologyChanges:
×
385
                                // If the router is shutting down, then we will
×
386
                                // as well.
×
387
                                if !ok {
×
388
                                        return
×
389
                                }
×
390

391
                                for _, update := range topChange.NodeUpdates {
×
392
                                        pubKeyStr := string(
×
393
                                                update.IdentityKey.
×
394
                                                        SerializeCompressed(),
×
395
                                        )
×
396

×
397
                                        // We only care about updates from
×
398
                                        // our persistentPeers.
×
399
                                        s.mu.RLock()
×
400
                                        _, ok := s.persistentPeers[pubKeyStr]
×
401
                                        s.mu.RUnlock()
×
402
                                        if !ok {
×
403
                                                continue
×
404
                                        }
405

406
                                        addrs := make([]*lnwire.NetAddress, 0,
×
407
                                                len(update.Addresses))
×
408

×
409
                                        for _, addr := range update.Addresses {
×
410
                                                addrs = append(addrs,
×
411
                                                        &lnwire.NetAddress{
×
412
                                                                IdentityKey: update.IdentityKey,
×
413
                                                                Address:     addr,
×
414
                                                                ChainNet:    s.cfg.ActiveNetParams.Net,
×
415
                                                        },
×
416
                                                )
×
417
                                        }
×
418

419
                                        s.mu.Lock()
×
420

×
421
                                        // Update the stored addresses for this
×
422
                                        // to peer to reflect the new set.
×
423
                                        s.persistentPeerAddrs[pubKeyStr] = addrs
×
424

×
425
                                        // If there are no outstanding
×
426
                                        // connection requests for this peer
×
427
                                        // then our work is done since we are
×
428
                                        // not currently trying to connect to
×
429
                                        // them.
×
430
                                        if len(s.persistentConnReqs[pubKeyStr]) == 0 {
×
431
                                                s.mu.Unlock()
×
432
                                                continue
×
433
                                        }
434

435
                                        s.mu.Unlock()
×
436

×
437
                                        s.connectToPersistentPeer(pubKeyStr)
×
438
                                }
439
                        }
440
                }
441
        }()
442

443
        return nil
×
444
}
445

446
// CustomMessage is a custom message that is received from a peer.
447
type CustomMessage struct {
448
        // Peer is the peer pubkey
449
        Peer [33]byte
450

451
        // Msg is the custom wire message.
452
        Msg *lnwire.Custom
453
}
454

455
// parseAddr parses an address from its string format to a net.Addr.
456
func parseAddr(address string, netCfg tor.Net) (net.Addr, error) {
×
457
        var (
×
458
                host string
×
459
                port int
×
460
        )
×
461

×
462
        // Split the address into its host and port components.
×
463
        h, p, err := net.SplitHostPort(address)
×
464
        if err != nil {
×
465
                // If a port wasn't specified, we'll assume the address only
×
466
                // contains the host so we'll use the default port.
×
467
                host = address
×
468
                port = defaultPeerPort
×
469
        } else {
×
470
                // Otherwise, we'll note both the host and ports.
×
471
                host = h
×
472
                portNum, err := strconv.Atoi(p)
×
473
                if err != nil {
×
474
                        return nil, err
×
475
                }
×
476
                port = portNum
×
477
        }
478

479
        if tor.IsOnionHost(host) {
×
480
                return &tor.OnionAddr{OnionService: host, Port: port}, nil
×
481
        }
×
482

483
        // If the host is part of a TCP address, we'll use the network
484
        // specific ResolveTCPAddr function in order to resolve these
485
        // addresses over Tor in order to prevent leaking your real IP
486
        // address.
487
        hostPort := net.JoinHostPort(host, strconv.Itoa(port))
×
488
        return netCfg.ResolveTCPAddr("tcp", hostPort)
×
489
}
490

491
// noiseDial is a factory function which creates a connmgr compliant dialing
492
// function by returning a closure which includes the server's identity key.
493
func noiseDial(idKey keychain.SingleKeyECDH,
494
        netCfg tor.Net, timeout time.Duration) func(net.Addr) (net.Conn, error) {
×
495

×
496
        return func(a net.Addr) (net.Conn, error) {
×
497
                lnAddr := a.(*lnwire.NetAddress)
×
498
                return brontide.Dial(idKey, lnAddr, timeout, netCfg.Dial)
×
499
        }
×
500
}
501

502
// newServer creates a new instance of the server which is to listen using the
503
// passed listener address.
504
func newServer(cfg *Config, listenAddrs []net.Addr,
505
        dbs *DatabaseInstances, cc *chainreg.ChainControl,
506
        nodeKeyDesc *keychain.KeyDescriptor,
507
        chansToRestore walletunlocker.ChannelsToRecover,
508
        chanPredicate chanacceptor.ChannelAcceptor,
509
        torController *tor.Controller, tlsManager *TLSManager,
510
        leaderElector cluster.LeaderElector,
511
        implCfg *ImplementationCfg) (*server, error) {
×
512

×
513
        var (
×
514
                err         error
×
515
                nodeKeyECDH = keychain.NewPubKeyECDH(*nodeKeyDesc, cc.KeyRing)
×
516

×
517
                // We just derived the full descriptor, so we know the public
×
518
                // key is set on it.
×
519
                nodeKeySigner = keychain.NewPubKeyMessageSigner(
×
520
                        nodeKeyDesc.PubKey, nodeKeyDesc.KeyLocator, cc.KeyRing,
×
521
                )
×
522
        )
×
523

×
524
        listeners := make([]net.Listener, len(listenAddrs))
×
525
        for i, listenAddr := range listenAddrs {
×
526
                // Note: though brontide.NewListener uses ResolveTCPAddr, it
×
527
                // doesn't need to call the general lndResolveTCP function
×
528
                // since we are resolving a local address.
×
529
                listeners[i], err = brontide.NewListener(
×
530
                        nodeKeyECDH, listenAddr.String(),
×
531
                )
×
532
                if err != nil {
×
533
                        return nil, err
×
534
                }
×
535
        }
536

537
        var serializedPubKey [33]byte
×
538
        copy(serializedPubKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
539

×
540
        netParams := cfg.ActiveNetParams.Params
×
541

×
542
        // Initialize the sphinx router.
×
543
        replayLog := htlcswitch.NewDecayedLog(
×
544
                dbs.DecayedLogDB, cc.ChainNotifier,
×
545
        )
×
546
        sphinxRouter := sphinx.NewRouter(nodeKeyECDH, replayLog)
×
547

×
548
        writeBufferPool := pool.NewWriteBuffer(
×
549
                pool.DefaultWriteBufferGCInterval,
×
550
                pool.DefaultWriteBufferExpiryInterval,
×
551
        )
×
552

×
553
        writePool := pool.NewWrite(
×
554
                writeBufferPool, cfg.Workers.Write, pool.DefaultWorkerTimeout,
×
555
        )
×
556

×
557
        readBufferPool := pool.NewReadBuffer(
×
558
                pool.DefaultReadBufferGCInterval,
×
559
                pool.DefaultReadBufferExpiryInterval,
×
560
        )
×
561

×
562
        readPool := pool.NewRead(
×
563
                readBufferPool, cfg.Workers.Read, pool.DefaultWorkerTimeout,
×
564
        )
×
565

×
566
        // If the taproot overlay flag is set, but we don't have an aux funding
×
567
        // controller, then we'll exit as this is incompatible.
×
568
        if cfg.ProtocolOptions.TaprootOverlayChans &&
×
569
                implCfg.AuxFundingController.IsNone() {
×
570

×
571
                return nil, fmt.Errorf("taproot overlay flag set, but not " +
×
572
                        "aux controllers")
×
573
        }
×
574

575
        //nolint:ll
576
        featureMgr, err := feature.NewManager(feature.Config{
×
577
                NoTLVOnion:                cfg.ProtocolOptions.LegacyOnion(),
×
578
                NoStaticRemoteKey:         cfg.ProtocolOptions.NoStaticRemoteKey(),
×
579
                NoAnchors:                 cfg.ProtocolOptions.NoAnchorCommitments(),
×
580
                NoWumbo:                   !cfg.ProtocolOptions.Wumbo(),
×
581
                NoScriptEnforcementLease:  cfg.ProtocolOptions.NoScriptEnforcementLease(),
×
582
                NoKeysend:                 !cfg.AcceptKeySend,
×
583
                NoOptionScidAlias:         !cfg.ProtocolOptions.ScidAlias(),
×
584
                NoZeroConf:                !cfg.ProtocolOptions.ZeroConf(),
×
585
                NoAnySegwit:               cfg.ProtocolOptions.NoAnySegwit(),
×
586
                CustomFeatures:            cfg.ProtocolOptions.CustomFeatures(),
×
587
                NoTaprootChans:            !cfg.ProtocolOptions.TaprootChans,
×
588
                NoTaprootOverlay:          !cfg.ProtocolOptions.TaprootOverlayChans,
×
589
                NoRouteBlinding:           cfg.ProtocolOptions.NoRouteBlinding(),
×
590
                NoExperimentalEndorsement: cfg.ProtocolOptions.NoExperimentalEndorsement(),
×
591
                NoQuiescence:              cfg.ProtocolOptions.NoQuiescence(),
×
592
        })
×
593
        if err != nil {
×
594
                return nil, err
×
595
        }
×
596

597
        invoiceHtlcModifier := invoices.NewHtlcModificationInterceptor()
×
598
        registryConfig := invoices.RegistryConfig{
×
599
                FinalCltvRejectDelta:        lncfg.DefaultFinalCltvRejectDelta,
×
600
                HtlcHoldDuration:            invoices.DefaultHtlcHoldDuration,
×
601
                Clock:                       clock.NewDefaultClock(),
×
602
                AcceptKeySend:               cfg.AcceptKeySend,
×
603
                AcceptAMP:                   cfg.AcceptAMP,
×
604
                GcCanceledInvoicesOnStartup: cfg.GcCanceledInvoicesOnStartup,
×
605
                GcCanceledInvoicesOnTheFly:  cfg.GcCanceledInvoicesOnTheFly,
×
606
                KeysendHoldTime:             cfg.KeysendHoldTime,
×
607
                HtlcInterceptor:             invoiceHtlcModifier,
×
608
        }
×
609

×
610
        addrSource := channeldb.NewMultiAddrSource(dbs.ChanStateDB, dbs.GraphDB)
×
611

×
612
        s := &server{
×
613
                cfg:            cfg,
×
614
                implCfg:        implCfg,
×
615
                graphDB:        dbs.GraphDB,
×
616
                chanStateDB:    dbs.ChanStateDB.ChannelStateDB(),
×
617
                addrSource:     addrSource,
×
618
                miscDB:         dbs.ChanStateDB,
×
619
                invoicesDB:     dbs.InvoiceDB,
×
620
                cc:             cc,
×
621
                sigPool:        lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer),
×
622
                writePool:      writePool,
×
623
                readPool:       readPool,
×
624
                chansToRestore: chansToRestore,
×
625

×
626
                channelNotifier: channelnotifier.New(
×
627
                        dbs.ChanStateDB.ChannelStateDB(),
×
628
                ),
×
629

×
630
                identityECDH:   nodeKeyECDH,
×
631
                identityKeyLoc: nodeKeyDesc.KeyLocator,
×
632
                nodeSigner:     netann.NewNodeSigner(nodeKeySigner),
×
633

×
634
                listenAddrs: listenAddrs,
×
635

×
636
                // TODO(roasbeef): derive proper onion key based on rotation
×
637
                // schedule
×
638
                sphinx: hop.NewOnionProcessor(sphinxRouter),
×
639

×
640
                torController: torController,
×
641

×
642
                persistentPeers:         make(map[string]bool),
×
643
                persistentPeersBackoff:  make(map[string]time.Duration),
×
644
                persistentConnReqs:      make(map[string][]*connmgr.ConnReq),
×
645
                persistentPeerAddrs:     make(map[string][]*lnwire.NetAddress),
×
646
                persistentRetryCancels:  make(map[string]chan struct{}),
×
647
                peerErrors:              make(map[string]*queue.CircularBuffer),
×
648
                ignorePeerTermination:   make(map[*peer.Brontide]struct{}),
×
649
                scheduledPeerConnection: make(map[string]func()),
×
650
                pongBuf:                 make([]byte, lnwire.MaxPongBytes),
×
651

×
652
                peersByPub:                make(map[string]*peer.Brontide),
×
653
                inboundPeers:              make(map[string]*peer.Brontide),
×
654
                outboundPeers:             make(map[string]*peer.Brontide),
×
655
                peerConnectedListeners:    make(map[string][]chan<- lnpeer.Peer),
×
656
                peerDisconnectedListeners: make(map[string][]chan<- struct{}),
×
657

×
658
                invoiceHtlcModifier: invoiceHtlcModifier,
×
659

×
660
                customMessageServer: subscribe.NewServer(),
×
661

×
662
                tlsManager: tlsManager,
×
663

×
664
                featureMgr: featureMgr,
×
665
                quit:       make(chan struct{}),
×
666
        }
×
667

×
668
        currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock()
×
669
        if err != nil {
×
670
                return nil, err
×
671
        }
×
672

673
        expiryWatcher := invoices.NewInvoiceExpiryWatcher(
×
674
                clock.NewDefaultClock(), cfg.Invoices.HoldExpiryDelta,
×
675
                uint32(currentHeight), currentHash, cc.ChainNotifier,
×
676
        )
×
677
        s.invoices = invoices.NewRegistry(
×
678
                dbs.InvoiceDB, expiryWatcher, &registryConfig,
×
679
        )
×
680

×
681
        s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
×
682

×
683
        thresholdSats := btcutil.Amount(cfg.MaxFeeExposure)
×
684
        thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
×
685

×
686
        linkUpdater := func(shortID lnwire.ShortChannelID) error {
×
687
                link, err := s.htlcSwitch.GetLinkByShortID(shortID)
×
688
                if err != nil {
×
689
                        return err
×
690
                }
×
691

692
                s.htlcSwitch.UpdateLinkAliases(link)
×
693

×
694
                return nil
×
695
        }
696

697
        s.aliasMgr, err = aliasmgr.NewManager(dbs.ChanStateDB, linkUpdater)
×
698
        if err != nil {
×
699
                return nil, err
×
700
        }
×
701

702
        s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{
×
703
                DB:                   dbs.ChanStateDB,
×
704
                FetchAllOpenChannels: s.chanStateDB.FetchAllOpenChannels,
×
705
                FetchAllChannels:     s.chanStateDB.FetchAllChannels,
×
706
                FetchClosedChannels:  s.chanStateDB.FetchClosedChannels,
×
707
                LocalChannelClose: func(pubKey []byte,
×
708
                        request *htlcswitch.ChanClose) {
×
709

×
710
                        peer, err := s.FindPeerByPubStr(string(pubKey))
×
711
                        if err != nil {
×
712
                                srvrLog.Errorf("unable to close channel, peer"+
×
713
                                        " with %v id can't be found: %v",
×
714
                                        pubKey, err,
×
715
                                )
×
716
                                return
×
717
                        }
×
718

719
                        peer.HandleLocalCloseChanReqs(request)
×
720
                },
721
                FwdingLog:              dbs.ChanStateDB.ForwardingLog(),
722
                SwitchPackager:         channeldb.NewSwitchPackager(),
723
                ExtractErrorEncrypter:  s.sphinx.ExtractErrorEncrypter,
724
                FetchLastChannelUpdate: s.fetchLastChanUpdate(),
725
                Notifier:               s.cc.ChainNotifier,
726
                HtlcNotifier:           s.htlcNotifier,
727
                FwdEventTicker:         ticker.New(htlcswitch.DefaultFwdEventInterval),
728
                LogEventTicker:         ticker.New(htlcswitch.DefaultLogInterval),
729
                AckEventTicker:         ticker.New(htlcswitch.DefaultAckInterval),
730
                AllowCircularRoute:     cfg.AllowCircularRoute,
731
                RejectHTLC:             cfg.RejectHTLC,
732
                Clock:                  clock.NewDefaultClock(),
733
                MailboxDeliveryTimeout: cfg.Htlcswitch.MailboxDeliveryTimeout,
734
                MaxFeeExposure:         thresholdMSats,
735
                SignAliasUpdate:        s.signAliasUpdate,
736
                IsAlias:                aliasmgr.IsAlias,
737
        }, uint32(currentHeight))
738
        if err != nil {
×
739
                return nil, err
×
740
        }
×
741
        s.interceptableSwitch, err = htlcswitch.NewInterceptableSwitch(
×
742
                &htlcswitch.InterceptableSwitchConfig{
×
743
                        Switch:             s.htlcSwitch,
×
744
                        CltvRejectDelta:    lncfg.DefaultFinalCltvRejectDelta,
×
745
                        CltvInterceptDelta: lncfg.DefaultCltvInterceptDelta,
×
746
                        RequireInterceptor: s.cfg.RequireInterceptor,
×
747
                        Notifier:           s.cc.ChainNotifier,
×
748
                },
×
749
        )
×
750
        if err != nil {
×
751
                return nil, err
×
752
        }
×
753

754
        s.witnessBeacon = newPreimageBeacon(
×
755
                dbs.ChanStateDB.NewWitnessCache(),
×
756
                s.interceptableSwitch.ForwardPacket,
×
757
        )
×
758

×
759
        chanStatusMgrCfg := &netann.ChanStatusConfig{
×
760
                ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
×
761
                ChanEnableTimeout:        cfg.ChanEnableTimeout,
×
762
                ChanDisableTimeout:       cfg.ChanDisableTimeout,
×
763
                OurPubKey:                nodeKeyDesc.PubKey,
×
764
                OurKeyLoc:                nodeKeyDesc.KeyLocator,
×
765
                MessageSigner:            s.nodeSigner,
×
766
                IsChannelActive:          s.htlcSwitch.HasActiveLink,
×
767
                ApplyChannelUpdate:       s.applyChannelUpdate,
×
768
                DB:                       s.chanStateDB,
×
769
                Graph:                    dbs.GraphDB,
×
770
        }
×
771

×
772
        chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
×
773
        if err != nil {
×
774
                return nil, err
×
775
        }
×
776
        s.chanStatusMgr = chanStatusMgr
×
777

×
778
        // If enabled, use either UPnP or NAT-PMP to automatically configure
×
779
        // port forwarding for users behind a NAT.
×
780
        if cfg.NAT {
×
781
                srvrLog.Info("Scanning local network for a UPnP enabled device")
×
782

×
783
                discoveryTimeout := time.Duration(10 * time.Second)
×
784

×
785
                ctx, cancel := context.WithTimeout(
×
786
                        context.Background(), discoveryTimeout,
×
787
                )
×
788
                defer cancel()
×
789
                upnp, err := nat.DiscoverUPnP(ctx)
×
790
                if err == nil {
×
791
                        s.natTraversal = upnp
×
792
                } else {
×
793
                        // If we were not able to discover a UPnP enabled device
×
794
                        // on the local network, we'll fall back to attempting
×
795
                        // to discover a NAT-PMP enabled device.
×
796
                        srvrLog.Errorf("Unable to discover a UPnP enabled "+
×
797
                                "device on the local network: %v", err)
×
798

×
799
                        srvrLog.Info("Scanning local network for a NAT-PMP " +
×
800
                                "enabled device")
×
801

×
802
                        pmp, err := nat.DiscoverPMP(discoveryTimeout)
×
803
                        if err != nil {
×
804
                                err := fmt.Errorf("unable to discover a "+
×
805
                                        "NAT-PMP enabled device on the local "+
×
806
                                        "network: %v", err)
×
807
                                srvrLog.Error(err)
×
808
                                return nil, err
×
809
                        }
×
810

811
                        s.natTraversal = pmp
×
812
                }
813
        }
814

815
        // If we were requested to automatically configure port forwarding,
816
        // we'll use the ports that the server will be listening on.
817
        externalIPStrings := make([]string, len(cfg.ExternalIPs))
×
818
        for idx, ip := range cfg.ExternalIPs {
×
819
                externalIPStrings[idx] = ip.String()
×
820
        }
×
821
        if s.natTraversal != nil {
×
822
                listenPorts := make([]uint16, 0, len(listenAddrs))
×
823
                for _, listenAddr := range listenAddrs {
×
824
                        // At this point, the listen addresses should have
×
825
                        // already been normalized, so it's safe to ignore the
×
826
                        // errors.
×
827
                        _, portStr, _ := net.SplitHostPort(listenAddr.String())
×
828
                        port, _ := strconv.Atoi(portStr)
×
829

×
830
                        listenPorts = append(listenPorts, uint16(port))
×
831
                }
×
832

833
                ips, err := s.configurePortForwarding(listenPorts...)
×
834
                if err != nil {
×
835
                        srvrLog.Errorf("Unable to automatically set up port "+
×
836
                                "forwarding using %s: %v",
×
837
                                s.natTraversal.Name(), err)
×
838
                } else {
×
839
                        srvrLog.Infof("Automatically set up port forwarding "+
×
840
                                "using %s to advertise external IP",
×
841
                                s.natTraversal.Name())
×
842
                        externalIPStrings = append(externalIPStrings, ips...)
×
843
                }
×
844
        }
845

846
        // If external IP addresses have been specified, add those to the list
847
        // of this server's addresses.
848
        externalIPs, err := lncfg.NormalizeAddresses(
×
849
                externalIPStrings, strconv.Itoa(defaultPeerPort),
×
850
                cfg.net.ResolveTCPAddr,
×
851
        )
×
852
        if err != nil {
×
853
                return nil, err
×
854
        }
×
855

856
        selfAddrs := make([]net.Addr, 0, len(externalIPs))
×
857
        selfAddrs = append(selfAddrs, externalIPs...)
×
858

×
859
        // We'll now reconstruct a node announcement based on our current
×
860
        // configuration so we can send it out as a sort of heart beat within
×
861
        // the network.
×
862
        //
×
863
        // We'll start by parsing the node color from configuration.
×
864
        color, err := lncfg.ParseHexColor(cfg.Color)
×
865
        if err != nil {
×
866
                srvrLog.Errorf("unable to parse color: %v\n", err)
×
867
                return nil, err
×
868
        }
×
869

870
        // If no alias is provided, default to first 10 characters of public
871
        // key.
872
        alias := cfg.Alias
×
873
        if alias == "" {
×
874
                alias = hex.EncodeToString(serializedPubKey[:10])
×
875
        }
×
876
        nodeAlias, err := lnwire.NewNodeAlias(alias)
×
877
        if err != nil {
×
878
                return nil, err
×
879
        }
×
880
        selfNode := &models.LightningNode{
×
881
                HaveNodeAnnouncement: true,
×
882
                LastUpdate:           time.Now(),
×
883
                Addresses:            selfAddrs,
×
884
                Alias:                nodeAlias.String(),
×
885
                Features:             s.featureMgr.Get(feature.SetNodeAnn),
×
886
                Color:                color,
×
887
        }
×
888
        copy(selfNode.PubKeyBytes[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
889

×
890
        // Based on the disk representation of the node announcement generated
×
891
        // above, we'll generate a node announcement that can go out on the
×
892
        // network so we can properly sign it.
×
893
        nodeAnn, err := selfNode.NodeAnnouncement(false)
×
894
        if err != nil {
×
895
                return nil, fmt.Errorf("unable to gen self node ann: %w", err)
×
896
        }
×
897

898
        // With the announcement generated, we'll sign it to properly
899
        // authenticate the message on the network.
900
        authSig, err := netann.SignAnnouncement(
×
901
                s.nodeSigner, nodeKeyDesc.KeyLocator, nodeAnn,
×
902
        )
×
903
        if err != nil {
×
904
                return nil, fmt.Errorf("unable to generate signature for "+
×
905
                        "self node announcement: %v", err)
×
906
        }
×
907
        selfNode.AuthSigBytes = authSig.Serialize()
×
908
        nodeAnn.Signature, err = lnwire.NewSigFromECDSARawSignature(
×
909
                selfNode.AuthSigBytes,
×
910
        )
×
911
        if err != nil {
×
912
                return nil, err
×
913
        }
×
914

915
        // Finally, we'll update the representation on disk, and update our
916
        // cached in-memory version as well.
917
        if err := dbs.GraphDB.SetSourceNode(selfNode); err != nil {
×
918
                return nil, fmt.Errorf("can't set self node: %w", err)
×
919
        }
×
920
        s.currentNodeAnn = nodeAnn
×
921

×
922
        // The router will get access to the payment ID sequencer, such that it
×
923
        // can generate unique payment IDs.
×
924
        sequencer, err := htlcswitch.NewPersistentSequencer(dbs.ChanStateDB)
×
925
        if err != nil {
×
926
                return nil, err
×
927
        }
×
928

929
        // Instantiate mission control with config from the sub server.
930
        //
931
        // TODO(joostjager): When we are further in the process of moving to sub
932
        // servers, the mission control instance itself can be moved there too.
933
        routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
×
934

×
935
        // We only initialize a probability estimator if there's no custom one.
×
936
        var estimator routing.Estimator
×
937
        if cfg.Estimator != nil {
×
938
                estimator = cfg.Estimator
×
939
        } else {
×
940
                switch routingConfig.ProbabilityEstimatorType {
×
941
                case routing.AprioriEstimatorName:
×
942
                        aCfg := routingConfig.AprioriConfig
×
943
                        aprioriConfig := routing.AprioriConfig{
×
944
                                AprioriHopProbability: aCfg.HopProbability,
×
945
                                PenaltyHalfLife:       aCfg.PenaltyHalfLife,
×
946
                                AprioriWeight:         aCfg.Weight,
×
947
                                CapacityFraction:      aCfg.CapacityFraction,
×
948
                        }
×
949

×
950
                        estimator, err = routing.NewAprioriEstimator(
×
951
                                aprioriConfig,
×
952
                        )
×
953
                        if err != nil {
×
954
                                return nil, err
×
955
                        }
×
956

957
                case routing.BimodalEstimatorName:
×
958
                        bCfg := routingConfig.BimodalConfig
×
959
                        bimodalConfig := routing.BimodalConfig{
×
960
                                BimodalNodeWeight: bCfg.NodeWeight,
×
961
                                BimodalScaleMsat: lnwire.MilliSatoshi(
×
962
                                        bCfg.Scale,
×
963
                                ),
×
964
                                BimodalDecayTime: bCfg.DecayTime,
×
965
                        }
×
966

×
967
                        estimator, err = routing.NewBimodalEstimator(
×
968
                                bimodalConfig,
×
969
                        )
×
970
                        if err != nil {
×
971
                                return nil, err
×
972
                        }
×
973

974
                default:
×
975
                        return nil, fmt.Errorf("unknown estimator type %v",
×
976
                                routingConfig.ProbabilityEstimatorType)
×
977
                }
978
        }
979

980
        mcCfg := &routing.MissionControlConfig{
×
981
                OnConfigUpdate:          fn.Some(s.UpdateRoutingConfig),
×
982
                Estimator:               estimator,
×
983
                MaxMcHistory:            routingConfig.MaxMcHistory,
×
984
                McFlushInterval:         routingConfig.McFlushInterval,
×
985
                MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
×
986
        }
×
987

×
988
        s.missionController, err = routing.NewMissionController(
×
989
                dbs.ChanStateDB, selfNode.PubKeyBytes, mcCfg,
×
990
        )
×
991
        if err != nil {
×
992
                return nil, fmt.Errorf("can't create mission control "+
×
993
                        "manager: %w", err)
×
994
        }
×
995
        s.defaultMC, err = s.missionController.GetNamespacedStore(
×
996
                routing.DefaultMissionControlNamespace,
×
997
        )
×
998
        if err != nil {
×
999
                return nil, fmt.Errorf("can't create mission control in the "+
×
1000
                        "default namespace: %w", err)
×
1001
        }
×
1002

1003
        srvrLog.Debugf("Instantiating payment session source with config: "+
×
1004
                "AttemptCost=%v + %v%%, MinRouteProbability=%v",
×
1005
                int64(routingConfig.AttemptCost),
×
1006
                float64(routingConfig.AttemptCostPPM)/10000,
×
1007
                routingConfig.MinRouteProbability)
×
1008

×
1009
        pathFindingConfig := routing.PathFindingConfig{
×
1010
                AttemptCost: lnwire.NewMSatFromSatoshis(
×
1011
                        routingConfig.AttemptCost,
×
1012
                ),
×
1013
                AttemptCostPPM: routingConfig.AttemptCostPPM,
×
1014
                MinProbability: routingConfig.MinRouteProbability,
×
1015
        }
×
1016

×
1017
        sourceNode, err := dbs.GraphDB.SourceNode()
×
1018
        if err != nil {
×
1019
                return nil, fmt.Errorf("error getting source node: %w", err)
×
1020
        }
×
1021
        paymentSessionSource := &routing.SessionSource{
×
1022
                GraphSessionFactory: graphsession.NewGraphSessionFactory(
×
1023
                        dbs.GraphDB,
×
1024
                ),
×
1025
                SourceNode:        sourceNode,
×
1026
                MissionControl:    s.defaultMC,
×
1027
                GetLink:           s.htlcSwitch.GetLinkByShortID,
×
1028
                PathFindingConfig: pathFindingConfig,
×
1029
        }
×
1030

×
1031
        paymentControl := channeldb.NewPaymentControl(dbs.ChanStateDB)
×
1032

×
1033
        s.controlTower = routing.NewControlTower(paymentControl)
×
1034

×
1035
        strictPruning := cfg.Bitcoin.Node == "neutrino" ||
×
1036
                cfg.Routing.StrictZombiePruning
×
1037

×
1038
        s.graphBuilder, err = graph.NewBuilder(&graph.Config{
×
1039
                SelfNode:            selfNode.PubKeyBytes,
×
1040
                Graph:               dbs.GraphDB,
×
1041
                Chain:               cc.ChainIO,
×
1042
                ChainView:           cc.ChainView,
×
1043
                Notifier:            cc.ChainNotifier,
×
1044
                ChannelPruneExpiry:  graph.DefaultChannelPruneExpiry,
×
1045
                GraphPruneInterval:  time.Hour,
×
1046
                FirstTimePruneDelay: graph.DefaultFirstTimePruneDelay,
×
1047
                AssumeChannelValid:  cfg.Routing.AssumeChannelValid,
×
1048
                StrictZombiePruning: strictPruning,
×
1049
                IsAlias:             aliasmgr.IsAlias,
×
1050
        })
×
1051
        if err != nil {
×
1052
                return nil, fmt.Errorf("can't create graph builder: %w", err)
×
1053
        }
×
1054

1055
        s.chanRouter, err = routing.New(routing.Config{
×
1056
                SelfNode:           selfNode.PubKeyBytes,
×
1057
                RoutingGraph:       graphsession.NewRoutingGraph(dbs.GraphDB),
×
1058
                Chain:              cc.ChainIO,
×
1059
                Payer:              s.htlcSwitch,
×
1060
                Control:            s.controlTower,
×
1061
                MissionControl:     s.defaultMC,
×
1062
                SessionSource:      paymentSessionSource,
×
1063
                GetLink:            s.htlcSwitch.GetLinkByShortID,
×
1064
                NextPaymentID:      sequencer.NextID,
×
1065
                PathFindingConfig:  pathFindingConfig,
×
1066
                Clock:              clock.NewDefaultClock(),
×
1067
                ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate,
×
1068
                ClosedSCIDs:        s.fetchClosedChannelSCIDs(),
×
1069
                TrafficShaper:      implCfg.TrafficShaper,
×
1070
        })
×
1071
        if err != nil {
×
1072
                return nil, fmt.Errorf("can't create router: %w", err)
×
1073
        }
×
1074

1075
        chanSeries := discovery.NewChanSeries(s.graphDB)
×
1076
        gossipMessageStore, err := discovery.NewMessageStore(dbs.ChanStateDB)
×
1077
        if err != nil {
×
1078
                return nil, err
×
1079
        }
×
1080
        waitingProofStore, err := channeldb.NewWaitingProofStore(dbs.ChanStateDB)
×
1081
        if err != nil {
×
1082
                return nil, err
×
1083
        }
×
1084

1085
        scidCloserMan := discovery.NewScidCloserMan(s.graphDB, s.chanStateDB)
×
1086

×
1087
        s.authGossiper = discovery.New(discovery.Config{
×
1088
                Graph:                 s.graphBuilder,
×
1089
                ChainIO:               s.cc.ChainIO,
×
1090
                Notifier:              s.cc.ChainNotifier,
×
1091
                ChainHash:             *s.cfg.ActiveNetParams.GenesisHash,
×
1092
                Broadcast:             s.BroadcastMessage,
×
1093
                ChanSeries:            chanSeries,
×
1094
                NotifyWhenOnline:      s.NotifyWhenOnline,
×
1095
                NotifyWhenOffline:     s.NotifyWhenOffline,
×
1096
                FetchSelfAnnouncement: s.getNodeAnnouncement,
×
1097
                UpdateSelfAnnouncement: func() (lnwire.NodeAnnouncement,
×
1098
                        error) {
×
1099

×
1100
                        return s.genNodeAnnouncement(nil)
×
1101
                },
×
1102
                ProofMatureDelta:        0,
1103
                TrickleDelay:            time.Millisecond * time.Duration(cfg.TrickleDelay),
1104
                RetransmitTicker:        ticker.New(time.Minute * 30),
1105
                RebroadcastInterval:     time.Hour * 24,
1106
                WaitingProofStore:       waitingProofStore,
1107
                MessageStore:            gossipMessageStore,
1108
                AnnSigner:               s.nodeSigner,
1109
                RotateTicker:            ticker.New(discovery.DefaultSyncerRotationInterval),
1110
                HistoricalSyncTicker:    ticker.New(cfg.HistoricalSyncInterval),
1111
                NumActiveSyncers:        cfg.NumGraphSyncPeers,
1112
                NoTimestampQueries:      cfg.ProtocolOptions.NoTimestampQueryOption, //nolint:ll
1113
                MinimumBatchSize:        10,
1114
                SubBatchDelay:           cfg.Gossip.SubBatchDelay,
1115
                IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters,
1116
                PinnedSyncers:           cfg.Gossip.PinnedSyncers,
1117
                MaxChannelUpdateBurst:   cfg.Gossip.MaxChannelUpdateBurst,
1118
                ChannelUpdateInterval:   cfg.Gossip.ChannelUpdateInterval,
1119
                IsAlias:                 aliasmgr.IsAlias,
1120
                SignAliasUpdate:         s.signAliasUpdate,
1121
                FindBaseByAlias:         s.aliasMgr.FindBaseSCID,
1122
                GetAlias:                s.aliasMgr.GetPeerAlias,
1123
                FindChannel:             s.findChannel,
1124
                IsStillZombieChannel:    s.graphBuilder.IsZombieChannel,
1125
                ScidCloser:              scidCloserMan,
1126
        }, nodeKeyDesc)
1127

1128
        selfVertex := route.Vertex(nodeKeyDesc.PubKey.SerializeCompressed())
×
1129
        //nolint:ll
×
1130
        s.localChanMgr = &localchans.Manager{
×
1131
                SelfPub:              nodeKeyDesc.PubKey,
×
1132
                DefaultRoutingPolicy: cc.RoutingPolicy,
×
1133
                ForAllOutgoingChannels: func(cb func(*models.ChannelEdgeInfo,
×
1134
                        *models.ChannelEdgePolicy) error) error {
×
1135

×
1136
                        return s.graphDB.ForEachNodeChannel(selfVertex,
×
1137
                                func(_ kvdb.RTx, c *models.ChannelEdgeInfo,
×
1138
                                        e *models.ChannelEdgePolicy,
×
1139
                                        _ *models.ChannelEdgePolicy) error {
×
1140

×
1141
                                        // NOTE: The invoked callback here may
×
1142
                                        // receive a nil channel policy.
×
1143
                                        return cb(c, e)
×
1144
                                },
×
1145
                        )
1146
                },
1147
                PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
1148
                UpdateForwardingPolicies:  s.htlcSwitch.UpdateForwardingPolicies,
1149
                FetchChannel:              s.chanStateDB.FetchChannel,
1150
                AddEdge: func(edge *models.ChannelEdgeInfo) error {
×
1151
                        return s.graphBuilder.AddEdge(edge)
×
1152
                },
×
1153
        }
1154

1155
        utxnStore, err := contractcourt.NewNurseryStore(
×
1156
                s.cfg.ActiveNetParams.GenesisHash, dbs.ChanStateDB,
×
1157
        )
×
1158
        if err != nil {
×
1159
                srvrLog.Errorf("unable to create nursery store: %v", err)
×
1160
                return nil, err
×
1161
        }
×
1162

1163
        sweeperStore, err := sweep.NewSweeperStore(
×
1164
                dbs.ChanStateDB, s.cfg.ActiveNetParams.GenesisHash,
×
1165
        )
×
1166
        if err != nil {
×
1167
                srvrLog.Errorf("unable to create sweeper store: %v", err)
×
1168
                return nil, err
×
1169
        }
×
1170

1171
        aggregator := sweep.NewBudgetAggregator(
×
1172
                cc.FeeEstimator, sweep.DefaultMaxInputsPerTx,
×
1173
                s.implCfg.AuxSweeper,
×
1174
        )
×
1175

×
1176
        s.txPublisher = sweep.NewTxPublisher(sweep.TxPublisherConfig{
×
1177
                Signer:     cc.Wallet.Cfg.Signer,
×
1178
                Wallet:     cc.Wallet,
×
1179
                Estimator:  cc.FeeEstimator,
×
1180
                Notifier:   cc.ChainNotifier,
×
1181
                AuxSweeper: s.implCfg.AuxSweeper,
×
1182
        })
×
1183

×
1184
        s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
×
1185
                FeeEstimator: cc.FeeEstimator,
×
1186
                GenSweepScript: newSweepPkScriptGen(
×
1187
                        cc.Wallet, s.cfg.ActiveNetParams.Params,
×
1188
                ),
×
1189
                Signer:               cc.Wallet.Cfg.Signer,
×
1190
                Wallet:               newSweeperWallet(cc.Wallet),
×
1191
                Mempool:              cc.MempoolNotifier,
×
1192
                Notifier:             cc.ChainNotifier,
×
1193
                Store:                sweeperStore,
×
1194
                MaxInputsPerTx:       sweep.DefaultMaxInputsPerTx,
×
1195
                MaxFeeRate:           cfg.Sweeper.MaxFeeRate,
×
1196
                Aggregator:           aggregator,
×
1197
                Publisher:            s.txPublisher,
×
1198
                NoDeadlineConfTarget: cfg.Sweeper.NoDeadlineConfTarget,
×
1199
        })
×
1200

×
1201
        s.utxoNursery = contractcourt.NewUtxoNursery(&contractcourt.NurseryConfig{
×
1202
                ChainIO:             cc.ChainIO,
×
1203
                ConfDepth:           1,
×
1204
                FetchClosedChannels: s.chanStateDB.FetchClosedChannels,
×
1205
                FetchClosedChannel:  s.chanStateDB.FetchClosedChannel,
×
1206
                Notifier:            cc.ChainNotifier,
×
1207
                PublishTransaction:  cc.Wallet.PublishTransaction,
×
1208
                Store:               utxnStore,
×
1209
                SweepInput:          s.sweeper.SweepInput,
×
1210
                Budget:              s.cfg.Sweeper.Budget,
×
1211
        })
×
1212

×
1213
        // Construct a closure that wraps the htlcswitch's CloseLink method.
×
1214
        closeLink := func(chanPoint *wire.OutPoint,
×
1215
                closureType contractcourt.ChannelCloseType) {
×
1216
                // TODO(conner): Properly respect the update and error channels
×
1217
                // returned by CloseLink.
×
1218

×
1219
                // Instruct the switch to close the channel.  Provide no close out
×
1220
                // delivery script or target fee per kw because user input is not
×
1221
                // available when the remote peer closes the channel.
×
1222
                s.htlcSwitch.CloseLink(chanPoint, closureType, 0, 0, nil)
×
1223
        }
×
1224

1225
        // We will use the following channel to reliably hand off contract
1226
        // breach events from the ChannelArbitrator to the BreachArbitrator,
1227
        contractBreaches := make(chan *contractcourt.ContractBreachEvent, 1)
×
1228

×
1229
        s.breachArbitrator = contractcourt.NewBreachArbitrator(
×
1230
                &contractcourt.BreachConfig{
×
1231
                        CloseLink: closeLink,
×
1232
                        DB:        s.chanStateDB,
×
1233
                        Estimator: s.cc.FeeEstimator,
×
1234
                        GenSweepScript: newSweepPkScriptGen(
×
1235
                                cc.Wallet, s.cfg.ActiveNetParams.Params,
×
1236
                        ),
×
1237
                        Notifier:           cc.ChainNotifier,
×
1238
                        PublishTransaction: cc.Wallet.PublishTransaction,
×
1239
                        ContractBreaches:   contractBreaches,
×
1240
                        Signer:             cc.Wallet.Cfg.Signer,
×
1241
                        Store: contractcourt.NewRetributionStore(
×
1242
                                dbs.ChanStateDB,
×
1243
                        ),
×
1244
                        AuxSweeper: s.implCfg.AuxSweeper,
×
1245
                },
×
1246
        )
×
1247

×
1248
        //nolint:ll
×
1249
        s.chainArb = contractcourt.NewChainArbitrator(contractcourt.ChainArbitratorConfig{
×
1250
                ChainHash:              *s.cfg.ActiveNetParams.GenesisHash,
×
1251
                IncomingBroadcastDelta: lncfg.DefaultIncomingBroadcastDelta,
×
1252
                OutgoingBroadcastDelta: lncfg.DefaultOutgoingBroadcastDelta,
×
1253
                NewSweepAddr: func() ([]byte, error) {
×
1254
                        addr, err := newSweepPkScriptGen(
×
1255
                                cc.Wallet, netParams,
×
1256
                        )().Unpack()
×
1257
                        if err != nil {
×
1258
                                return nil, err
×
1259
                        }
×
1260

1261
                        return addr.DeliveryAddress, nil
×
1262
                },
1263
                PublishTx: cc.Wallet.PublishTransaction,
1264
                DeliverResolutionMsg: func(msgs ...contractcourt.ResolutionMsg) error {
×
1265
                        for _, msg := range msgs {
×
1266
                                err := s.htlcSwitch.ProcessContractResolution(msg)
×
1267
                                if err != nil {
×
1268
                                        return err
×
1269
                                }
×
1270
                        }
1271
                        return nil
×
1272
                },
1273
                IncubateOutputs: func(chanPoint wire.OutPoint,
1274
                        outHtlcRes fn.Option[lnwallet.OutgoingHtlcResolution],
1275
                        inHtlcRes fn.Option[lnwallet.IncomingHtlcResolution],
1276
                        broadcastHeight uint32,
1277
                        deadlineHeight fn.Option[int32]) error {
×
1278

×
1279
                        return s.utxoNursery.IncubateOutputs(
×
1280
                                chanPoint, outHtlcRes, inHtlcRes,
×
1281
                                broadcastHeight, deadlineHeight,
×
1282
                        )
×
1283
                },
×
1284
                PreimageDB:   s.witnessBeacon,
1285
                Notifier:     cc.ChainNotifier,
1286
                Mempool:      cc.MempoolNotifier,
1287
                Signer:       cc.Wallet.Cfg.Signer,
1288
                FeeEstimator: cc.FeeEstimator,
1289
                ChainIO:      cc.ChainIO,
1290
                MarkLinkInactive: func(chanPoint wire.OutPoint) error {
×
1291
                        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
×
1292
                        s.htlcSwitch.RemoveLink(chanID)
×
1293
                        return nil
×
1294
                },
×
1295
                IsOurAddress: cc.Wallet.IsOurAddress,
1296
                ContractBreach: func(chanPoint wire.OutPoint,
1297
                        breachRet *lnwallet.BreachRetribution) error {
×
1298

×
1299
                        // processACK will handle the BreachArbitrator ACKing
×
1300
                        // the event.
×
1301
                        finalErr := make(chan error, 1)
×
1302
                        processACK := func(brarErr error) {
×
1303
                                if brarErr != nil {
×
1304
                                        finalErr <- brarErr
×
1305
                                        return
×
1306
                                }
×
1307

1308
                                // If the BreachArbitrator successfully handled
1309
                                // the event, we can signal that the handoff
1310
                                // was successful.
1311
                                finalErr <- nil
×
1312
                        }
1313

1314
                        event := &contractcourt.ContractBreachEvent{
×
1315
                                ChanPoint:         chanPoint,
×
1316
                                ProcessACK:        processACK,
×
1317
                                BreachRetribution: breachRet,
×
1318
                        }
×
1319

×
1320
                        // Send the contract breach event to the
×
1321
                        // BreachArbitrator.
×
1322
                        select {
×
1323
                        case contractBreaches <- event:
×
1324
                        case <-s.quit:
×
1325
                                return ErrServerShuttingDown
×
1326
                        }
1327

1328
                        // We'll wait for a final error to be available from
1329
                        // the BreachArbitrator.
1330
                        select {
×
1331
                        case err := <-finalErr:
×
1332
                                return err
×
1333
                        case <-s.quit:
×
1334
                                return ErrServerShuttingDown
×
1335
                        }
1336
                },
1337
                DisableChannel: func(chanPoint wire.OutPoint) error {
×
1338
                        return s.chanStatusMgr.RequestDisable(chanPoint, false)
×
1339
                },
×
1340
                Sweeper:                       s.sweeper,
1341
                Registry:                      s.invoices,
1342
                NotifyClosedChannel:           s.channelNotifier.NotifyClosedChannelEvent,
1343
                NotifyFullyResolvedChannel:    s.channelNotifier.NotifyFullyResolvedChannelEvent,
1344
                OnionProcessor:                s.sphinx,
1345
                PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
1346
                IsForwardedHTLC:               s.htlcSwitch.IsForwardedHTLC,
1347
                Clock:                         clock.NewDefaultClock(),
1348
                SubscribeBreachComplete:       s.breachArbitrator.SubscribeBreachComplete,
1349
                PutFinalHtlcOutcome:           s.chanStateDB.PutOnchainFinalHtlcOutcome,
1350
                HtlcNotifier:                  s.htlcNotifier,
1351
                Budget:                        *s.cfg.Sweeper.Budget,
1352

1353
                // TODO(yy): remove this hack once PaymentCircuit is interfaced.
1354
                QueryIncomingCircuit: func(
1355
                        circuit models.CircuitKey) *models.CircuitKey {
×
1356

×
1357
                        // Get the circuit map.
×
1358
                        circuits := s.htlcSwitch.CircuitLookup()
×
1359

×
1360
                        // Lookup the outgoing circuit.
×
1361
                        pc := circuits.LookupOpenCircuit(circuit)
×
1362
                        if pc == nil {
×
1363
                                return nil
×
1364
                        }
×
1365

1366
                        return &pc.Incoming
×
1367
                },
1368
                AuxLeafStore: implCfg.AuxLeafStore,
1369
                AuxSigner:    implCfg.AuxSigner,
1370
                AuxResolver:  implCfg.AuxContractResolver,
1371
        }, dbs.ChanStateDB)
1372

1373
        // Select the configuration and funding parameters for Bitcoin.
1374
        chainCfg := cfg.Bitcoin
×
1375
        minRemoteDelay := funding.MinBtcRemoteDelay
×
1376
        maxRemoteDelay := funding.MaxBtcRemoteDelay
×
1377

×
1378
        var chanIDSeed [32]byte
×
1379
        if _, err := rand.Read(chanIDSeed[:]); err != nil {
×
1380
                return nil, err
×
1381
        }
×
1382

1383
        // Wrap the DeleteChannelEdges method so that the funding manager can
1384
        // use it without depending on several layers of indirection.
1385
        deleteAliasEdge := func(scid lnwire.ShortChannelID) (
×
1386
                *models.ChannelEdgePolicy, error) {
×
1387

×
1388
                info, e1, e2, err := s.graphDB.FetchChannelEdgesByID(
×
1389
                        scid.ToUint64(),
×
1390
                )
×
1391
                if errors.Is(err, graphdb.ErrEdgeNotFound) {
×
1392
                        // This is unlikely but there is a slim chance of this
×
1393
                        // being hit if lnd was killed via SIGKILL and the
×
1394
                        // funding manager was stepping through the delete
×
1395
                        // alias edge logic.
×
1396
                        return nil, nil
×
1397
                } else if err != nil {
×
1398
                        return nil, err
×
1399
                }
×
1400

1401
                // Grab our key to find our policy.
1402
                var ourKey [33]byte
×
1403
                copy(ourKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
×
1404

×
1405
                var ourPolicy *models.ChannelEdgePolicy
×
1406
                if info != nil && info.NodeKey1Bytes == ourKey {
×
1407
                        ourPolicy = e1
×
1408
                } else {
×
1409
                        ourPolicy = e2
×
1410
                }
×
1411

1412
                if ourPolicy == nil {
×
1413
                        // Something is wrong, so return an error.
×
1414
                        return nil, fmt.Errorf("we don't have an edge")
×
1415
                }
×
1416

1417
                err = s.graphDB.DeleteChannelEdges(
×
1418
                        false, false, scid.ToUint64(),
×
1419
                )
×
1420
                return ourPolicy, err
×
1421
        }
1422

1423
        // For the reservationTimeout and the zombieSweeperInterval different
1424
        // values are set in case we are in a dev environment so enhance test
1425
        // capacilities.
1426
        reservationTimeout := chanfunding.DefaultReservationTimeout
×
1427
        zombieSweeperInterval := lncfg.DefaultZombieSweeperInterval
×
1428

×
1429
        // Get the development config for funding manager. If we are not in
×
1430
        // development mode, this would be nil.
×
1431
        var devCfg *funding.DevConfig
×
1432
        if lncfg.IsDevBuild() {
×
1433
                devCfg = &funding.DevConfig{
×
1434
                        ProcessChannelReadyWait: cfg.Dev.ChannelReadyWait(),
×
1435
                }
×
1436

×
1437
                reservationTimeout = cfg.Dev.GetReservationTimeout()
×
1438
                zombieSweeperInterval = cfg.Dev.GetZombieSweeperInterval()
×
1439

×
1440
                srvrLog.Debugf("Using the dev config for the fundingMgr: %v, "+
×
1441
                        "reservationTimeout=%v, zombieSweeperInterval=%v",
×
1442
                        devCfg, reservationTimeout, zombieSweeperInterval)
×
1443
        }
×
1444

1445
        //nolint:ll
1446
        s.fundingMgr, err = funding.NewFundingManager(funding.Config{
×
1447
                Dev:                devCfg,
×
1448
                NoWumboChans:       !cfg.ProtocolOptions.Wumbo(),
×
1449
                IDKey:              nodeKeyDesc.PubKey,
×
1450
                IDKeyLoc:           nodeKeyDesc.KeyLocator,
×
1451
                Wallet:             cc.Wallet,
×
1452
                PublishTransaction: cc.Wallet.PublishTransaction,
×
1453
                UpdateLabel: func(hash chainhash.Hash, label string) error {
×
1454
                        return cc.Wallet.LabelTransaction(hash, label, true)
×
1455
                },
×
1456
                Notifier:     cc.ChainNotifier,
1457
                ChannelDB:    s.chanStateDB,
1458
                FeeEstimator: cc.FeeEstimator,
1459
                SignMessage:  cc.MsgSigner.SignMessage,
1460
                CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement,
1461
                        error) {
×
1462

×
1463
                        return s.genNodeAnnouncement(nil)
×
1464
                },
×
1465
                SendAnnouncement:     s.authGossiper.ProcessLocalAnnouncement,
1466
                NotifyWhenOnline:     s.NotifyWhenOnline,
1467
                TempChanIDSeed:       chanIDSeed,
1468
                FindChannel:          s.findChannel,
1469
                DefaultRoutingPolicy: cc.RoutingPolicy,
1470
                DefaultMinHtlcIn:     cc.MinHtlcIn,
1471
                NumRequiredConfs: func(chanAmt btcutil.Amount,
1472
                        pushAmt lnwire.MilliSatoshi) uint16 {
×
1473
                        // For large channels we increase the number
×
1474
                        // of confirmations we require for the
×
1475
                        // channel to be considered open. As it is
×
1476
                        // always the responder that gets to choose
×
1477
                        // value, the pushAmt is value being pushed
×
1478
                        // to us. This means we have more to lose
×
1479
                        // in the case this gets re-orged out, and
×
1480
                        // we will require more confirmations before
×
1481
                        // we consider it open.
×
1482

×
1483
                        // In case the user has explicitly specified
×
1484
                        // a default value for the number of
×
1485
                        // confirmations, we use it.
×
1486
                        defaultConf := uint16(chainCfg.DefaultNumChanConfs)
×
1487
                        if defaultConf != 0 {
×
1488
                                return defaultConf
×
1489
                        }
×
1490

1491
                        minConf := uint64(3)
×
1492
                        maxConf := uint64(6)
×
1493

×
1494
                        // If this is a wumbo channel, then we'll require the
×
1495
                        // max amount of confirmations.
×
1496
                        if chanAmt > MaxFundingAmount {
×
1497
                                return uint16(maxConf)
×
1498
                        }
×
1499

1500
                        // If not we return a value scaled linearly
1501
                        // between 3 and 6, depending on channel size.
1502
                        // TODO(halseth): Use 1 as minimum?
1503
                        maxChannelSize := uint64(
×
1504
                                lnwire.NewMSatFromSatoshis(MaxFundingAmount))
×
1505
                        stake := lnwire.NewMSatFromSatoshis(chanAmt) + pushAmt
×
1506
                        conf := maxConf * uint64(stake) / maxChannelSize
×
1507
                        if conf < minConf {
×
1508
                                conf = minConf
×
1509
                        }
×
1510
                        if conf > maxConf {
×
1511
                                conf = maxConf
×
1512
                        }
×
1513
                        return uint16(conf)
×
1514
                },
1515
                RequiredRemoteDelay: func(chanAmt btcutil.Amount) uint16 {
×
1516
                        // We scale the remote CSV delay (the time the
×
1517
                        // remote have to claim funds in case of a unilateral
×
1518
                        // close) linearly from minRemoteDelay blocks
×
1519
                        // for small channels, to maxRemoteDelay blocks
×
1520
                        // for channels of size MaxFundingAmount.
×
1521

×
1522
                        // In case the user has explicitly specified
×
1523
                        // a default value for the remote delay, we
×
1524
                        // use it.
×
1525
                        defaultDelay := uint16(chainCfg.DefaultRemoteDelay)
×
1526
                        if defaultDelay > 0 {
×
1527
                                return defaultDelay
×
1528
                        }
×
1529

1530
                        // If this is a wumbo channel, then we'll require the
1531
                        // max value.
1532
                        if chanAmt > MaxFundingAmount {
×
1533
                                return maxRemoteDelay
×
1534
                        }
×
1535

1536
                        // If not we scale according to channel size.
1537
                        delay := uint16(btcutil.Amount(maxRemoteDelay) *
×
1538
                                chanAmt / MaxFundingAmount)
×
1539
                        if delay < minRemoteDelay {
×
1540
                                delay = minRemoteDelay
×
1541
                        }
×
1542
                        if delay > maxRemoteDelay {
×
1543
                                delay = maxRemoteDelay
×
1544
                        }
×
1545
                        return delay
×
1546
                },
1547
                WatchNewChannel: func(channel *channeldb.OpenChannel,
1548
                        peerKey *btcec.PublicKey) error {
×
1549

×
1550
                        // First, we'll mark this new peer as a persistent peer
×
1551
                        // for re-connection purposes. If the peer is not yet
×
1552
                        // tracked or the user hasn't requested it to be perm,
×
1553
                        // we'll set false to prevent the server from continuing
×
1554
                        // to connect to this peer even if the number of
×
1555
                        // channels with this peer is zero.
×
1556
                        s.mu.Lock()
×
1557
                        pubStr := string(peerKey.SerializeCompressed())
×
1558
                        if _, ok := s.persistentPeers[pubStr]; !ok {
×
1559
                                s.persistentPeers[pubStr] = false
×
1560
                        }
×
1561
                        s.mu.Unlock()
×
1562

×
1563
                        // With that taken care of, we'll send this channel to
×
1564
                        // the chain arb so it can react to on-chain events.
×
1565
                        return s.chainArb.WatchNewChannel(channel)
×
1566
                },
1567
                ReportShortChanID: func(chanPoint wire.OutPoint) error {
×
1568
                        cid := lnwire.NewChanIDFromOutPoint(chanPoint)
×
1569
                        return s.htlcSwitch.UpdateShortChanID(cid)
×
1570
                },
×
1571
                RequiredRemoteChanReserve: func(chanAmt,
1572
                        dustLimit btcutil.Amount) btcutil.Amount {
×
1573

×
1574
                        // By default, we'll require the remote peer to maintain
×
1575
                        // at least 1% of the total channel capacity at all
×
1576
                        // times. If this value ends up dipping below the dust
×
1577
                        // limit, then we'll use the dust limit itself as the
×
1578
                        // reserve as required by BOLT #2.
×
1579
                        reserve := chanAmt / 100
×
1580
                        if reserve < dustLimit {
×
1581
                                reserve = dustLimit
×
1582
                        }
×
1583

1584
                        return reserve
×
1585
                },
1586
                RequiredRemoteMaxValue: func(chanAmt btcutil.Amount) lnwire.MilliSatoshi {
×
1587
                        // By default, we'll allow the remote peer to fully
×
1588
                        // utilize the full bandwidth of the channel, minus our
×
1589
                        // required reserve.
×
1590
                        reserve := lnwire.NewMSatFromSatoshis(chanAmt / 100)
×
1591
                        return lnwire.NewMSatFromSatoshis(chanAmt) - reserve
×
1592
                },
×
1593
                RequiredRemoteMaxHTLCs: func(chanAmt btcutil.Amount) uint16 {
×
1594
                        if cfg.DefaultRemoteMaxHtlcs > 0 {
×
1595
                                return cfg.DefaultRemoteMaxHtlcs
×
1596
                        }
×
1597

1598
                        // By default, we'll permit them to utilize the full
1599
                        // channel bandwidth.
1600
                        return uint16(input.MaxHTLCNumber / 2)
×
1601
                },
1602
                ZombieSweeperInterval:         zombieSweeperInterval,
1603
                ReservationTimeout:            reservationTimeout,
1604
                MinChanSize:                   btcutil.Amount(cfg.MinChanSize),
1605
                MaxChanSize:                   btcutil.Amount(cfg.MaxChanSize),
1606
                MaxPendingChannels:            cfg.MaxPendingChannels,
1607
                RejectPush:                    cfg.RejectPush,
1608
                MaxLocalCSVDelay:              chainCfg.MaxLocalDelay,
1609
                NotifyOpenChannelEvent:        s.channelNotifier.NotifyOpenChannelEvent,
1610
                OpenChannelPredicate:          chanPredicate,
1611
                NotifyPendingOpenChannelEvent: s.channelNotifier.NotifyPendingOpenChannelEvent,
1612
                EnableUpfrontShutdown:         cfg.EnableUpfrontShutdown,
1613
                MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
1614
                        s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
1615
                DeleteAliasEdge:      deleteAliasEdge,
1616
                AliasManager:         s.aliasMgr,
1617
                IsSweeperOutpoint:    s.sweeper.IsSweeperOutpoint,
1618
                AuxFundingController: implCfg.AuxFundingController,
1619
                AuxSigner:            implCfg.AuxSigner,
1620
                AuxResolver:          implCfg.AuxContractResolver,
1621
        })
1622
        if err != nil {
×
1623
                return nil, err
×
1624
        }
×
1625

1626
        // Next, we'll assemble the sub-system that will maintain an on-disk
1627
        // static backup of the latest channel state.
1628
        chanNotifier := &channelNotifier{
×
1629
                chanNotifier: s.channelNotifier,
×
1630
                addrs:        s.addrSource,
×
1631
        }
×
1632
        backupFile := chanbackup.NewMultiFile(cfg.BackupFilePath)
×
1633
        startingChans, err := chanbackup.FetchStaticChanBackups(
×
1634
                s.chanStateDB, s.addrSource,
×
1635
        )
×
1636
        if err != nil {
×
1637
                return nil, err
×
1638
        }
×
1639
        s.chanSubSwapper, err = chanbackup.NewSubSwapper(
×
1640
                startingChans, chanNotifier, s.cc.KeyRing, backupFile,
×
1641
        )
×
1642
        if err != nil {
×
1643
                return nil, err
×
1644
        }
×
1645

1646
        // Assemble a peer notifier which will provide clients with subscriptions
1647
        // to peer online and offline events.
1648
        s.peerNotifier = peernotifier.New()
×
1649

×
1650
        // Create a channel event store which monitors all open channels.
×
1651
        s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
×
1652
                SubscribeChannelEvents: func() (subscribe.Subscription, error) {
×
1653
                        return s.channelNotifier.SubscribeChannelEvents()
×
1654
                },
×
1655
                SubscribePeerEvents: func() (subscribe.Subscription, error) {
×
1656
                        return s.peerNotifier.SubscribePeerEvents()
×
1657
                },
×
1658
                GetOpenChannels: s.chanStateDB.FetchAllOpenChannels,
1659
                Clock:           clock.NewDefaultClock(),
1660
                ReadFlapCount:   s.miscDB.ReadFlapCount,
1661
                WriteFlapCount:  s.miscDB.WriteFlapCounts,
1662
                FlapCountTicker: ticker.New(chanfitness.FlapCountFlushRate),
1663
        })
1664

1665
        if cfg.WtClient.Active {
×
1666
                policy := wtpolicy.DefaultPolicy()
×
1667
                policy.MaxUpdates = cfg.WtClient.MaxUpdates
×
1668

×
1669
                // We expose the sweep fee rate in sat/vbyte, but the tower
×
1670
                // protocol operations on sat/kw.
×
1671
                sweepRateSatPerVByte := chainfee.SatPerKVByte(
×
1672
                        1000 * cfg.WtClient.SweepFeeRate,
×
1673
                )
×
1674

×
1675
                policy.SweepFeeRate = sweepRateSatPerVByte.FeePerKWeight()
×
1676

×
1677
                if err := policy.Validate(); err != nil {
×
1678
                        return nil, err
×
1679
                }
×
1680

1681
                // authDial is the wrapper around the btrontide.Dial for the
1682
                // watchtower.
1683
                authDial := func(localKey keychain.SingleKeyECDH,
×
1684
                        netAddr *lnwire.NetAddress,
×
1685
                        dialer tor.DialFunc) (wtserver.Peer, error) {
×
1686

×
1687
                        return brontide.Dial(
×
1688
                                localKey, netAddr, cfg.ConnectionTimeout, dialer,
×
1689
                        )
×
1690
                }
×
1691

1692
                // buildBreachRetribution is a call-back that can be used to
1693
                // query the BreachRetribution info and channel type given a
1694
                // channel ID and commitment height.
1695
                buildBreachRetribution := func(chanID lnwire.ChannelID,
×
1696
                        commitHeight uint64) (*lnwallet.BreachRetribution,
×
1697
                        channeldb.ChannelType, error) {
×
1698

×
1699
                        channel, err := s.chanStateDB.FetchChannelByID(
×
1700
                                nil, chanID,
×
1701
                        )
×
1702
                        if err != nil {
×
1703
                                return nil, 0, err
×
1704
                        }
×
1705

1706
                        br, err := lnwallet.NewBreachRetribution(
×
1707
                                channel, commitHeight, 0, nil,
×
1708
                                implCfg.AuxLeafStore,
×
1709
                                implCfg.AuxContractResolver,
×
1710
                        )
×
1711
                        if err != nil {
×
1712
                                return nil, 0, err
×
1713
                        }
×
1714

1715
                        return br, channel.ChanType, nil
×
1716
                }
1717

1718
                fetchClosedChannel := s.chanStateDB.FetchClosedChannelForID
×
1719

×
1720
                // Copy the policy for legacy channels and set the blob flag
×
1721
                // signalling support for anchor channels.
×
1722
                anchorPolicy := policy
×
1723
                anchorPolicy.BlobType |= blob.Type(blob.FlagAnchorChannel)
×
1724

×
1725
                // Copy the policy for legacy channels and set the blob flag
×
1726
                // signalling support for taproot channels.
×
1727
                taprootPolicy := policy
×
1728
                taprootPolicy.TxPolicy.BlobType |= blob.Type(
×
1729
                        blob.FlagTaprootChannel,
×
1730
                )
×
1731

×
1732
                s.towerClientMgr, err = wtclient.NewManager(&wtclient.Config{
×
1733
                        FetchClosedChannel:     fetchClosedChannel,
×
1734
                        BuildBreachRetribution: buildBreachRetribution,
×
1735
                        SessionCloseRange:      cfg.WtClient.SessionCloseRange,
×
1736
                        ChainNotifier:          s.cc.ChainNotifier,
×
1737
                        SubscribeChannelEvents: func() (subscribe.Subscription,
×
1738
                                error) {
×
1739

×
1740
                                return s.channelNotifier.
×
1741
                                        SubscribeChannelEvents()
×
1742
                        },
×
1743
                        Signer: cc.Wallet.Cfg.Signer,
1744
                        NewAddress: func() ([]byte, error) {
×
1745
                                addr, err := newSweepPkScriptGen(
×
1746
                                        cc.Wallet, netParams,
×
1747
                                )().Unpack()
×
1748
                                if err != nil {
×
1749
                                        return nil, err
×
1750
                                }
×
1751

1752
                                return addr.DeliveryAddress, nil
×
1753
                        },
1754
                        SecretKeyRing:      s.cc.KeyRing,
1755
                        Dial:               cfg.net.Dial,
1756
                        AuthDial:           authDial,
1757
                        DB:                 dbs.TowerClientDB,
1758
                        ChainHash:          *s.cfg.ActiveNetParams.GenesisHash,
1759
                        MinBackoff:         10 * time.Second,
1760
                        MaxBackoff:         5 * time.Minute,
1761
                        MaxTasksInMemQueue: cfg.WtClient.MaxTasksInMemQueue,
1762
                }, policy, anchorPolicy, taprootPolicy)
1763
                if err != nil {
×
1764
                        return nil, err
×
1765
                }
×
1766
        }
1767

1768
        if len(cfg.ExternalHosts) != 0 {
×
1769
                advertisedIPs := make(map[string]struct{})
×
1770
                for _, addr := range s.currentNodeAnn.Addresses {
×
1771
                        advertisedIPs[addr.String()] = struct{}{}
×
1772
                }
×
1773

1774
                s.hostAnn = netann.NewHostAnnouncer(netann.HostAnnouncerConfig{
×
1775
                        Hosts:         cfg.ExternalHosts,
×
1776
                        RefreshTicker: ticker.New(defaultHostSampleInterval),
×
1777
                        LookupHost: func(host string) (net.Addr, error) {
×
1778
                                return lncfg.ParseAddressString(
×
1779
                                        host, strconv.Itoa(defaultPeerPort),
×
1780
                                        cfg.net.ResolveTCPAddr,
×
1781
                                )
×
1782
                        },
×
1783
                        AdvertisedIPs: advertisedIPs,
1784
                        AnnounceNewIPs: netann.IPAnnouncer(
1785
                                func(modifier ...netann.NodeAnnModifier) (
1786
                                        lnwire.NodeAnnouncement, error) {
×
1787

×
1788
                                        return s.genNodeAnnouncement(
×
1789
                                                nil, modifier...,
×
1790
                                        )
×
1791
                                }),
×
1792
                })
1793
        }
1794

1795
        // Create liveness monitor.
1796
        s.createLivenessMonitor(cfg, cc, leaderElector)
×
1797

×
1798
        // Create the connection manager which will be responsible for
×
1799
        // maintaining persistent outbound connections and also accepting new
×
1800
        // incoming connections
×
1801
        cmgr, err := connmgr.New(&connmgr.Config{
×
1802
                Listeners:      listeners,
×
1803
                OnAccept:       s.InboundPeerConnected,
×
1804
                RetryDuration:  time.Second * 5,
×
1805
                TargetOutbound: 100,
×
1806
                Dial: noiseDial(
×
1807
                        nodeKeyECDH, s.cfg.net, s.cfg.ConnectionTimeout,
×
1808
                ),
×
1809
                OnConnection: s.OutboundPeerConnected,
×
1810
        })
×
1811
        if err != nil {
×
1812
                return nil, err
×
1813
        }
×
1814
        s.connMgr = cmgr
×
1815

×
1816
        return s, nil
×
1817
}
1818

1819
// UpdateRoutingConfig is a callback function to update the routing config
1820
// values in the main cfg.
1821
func (s *server) UpdateRoutingConfig(cfg *routing.MissionControlConfig) {
×
1822
        routerCfg := s.cfg.SubRPCServers.RouterRPC
×
1823

×
1824
        switch c := cfg.Estimator.Config().(type) {
×
1825
        case routing.AprioriConfig:
×
1826
                routerCfg.ProbabilityEstimatorType =
×
1827
                        routing.AprioriEstimatorName
×
1828

×
1829
                targetCfg := routerCfg.AprioriConfig
×
1830
                targetCfg.PenaltyHalfLife = c.PenaltyHalfLife
×
1831
                targetCfg.Weight = c.AprioriWeight
×
1832
                targetCfg.CapacityFraction = c.CapacityFraction
×
1833
                targetCfg.HopProbability = c.AprioriHopProbability
×
1834

1835
        case routing.BimodalConfig:
×
1836
                routerCfg.ProbabilityEstimatorType =
×
1837
                        routing.BimodalEstimatorName
×
1838

×
1839
                targetCfg := routerCfg.BimodalConfig
×
1840
                targetCfg.Scale = int64(c.BimodalScaleMsat)
×
1841
                targetCfg.NodeWeight = c.BimodalNodeWeight
×
1842
                targetCfg.DecayTime = c.BimodalDecayTime
×
1843
        }
1844

1845
        routerCfg.MaxMcHistory = cfg.MaxMcHistory
×
1846
}
1847

1848
// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
1849
// used for option_scid_alias channels where the ChannelUpdate to be sent back
1850
// may differ from what is on disk.
1851
func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
1852
        error) {
×
1853

×
1854
        data, err := u.DataToSign()
×
1855
        if err != nil {
×
1856
                return nil, err
×
1857
        }
×
1858

1859
        return s.cc.MsgSigner.SignMessage(s.identityKeyLoc, data, true)
×
1860
}
1861

1862
// createLivenessMonitor creates a set of health checks using our configured
1863
// values and uses these checks to create a liveness monitor. Available
1864
// health checks,
1865
//   - chainHealthCheck (will be disabled for --nochainbackend mode)
1866
//   - diskCheck
1867
//   - tlsHealthCheck
1868
//   - torController, only created when tor is enabled.
1869
//
1870
// If a health check has been disabled by setting attempts to 0, our monitor
1871
// will not run it.
1872
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl,
1873
        leaderElector cluster.LeaderElector) {
×
1874

×
1875
        chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts
×
1876
        if cfg.Bitcoin.Node == "nochainbackend" {
×
1877
                srvrLog.Info("Disabling chain backend checks for " +
×
1878
                        "nochainbackend mode")
×
1879

×
1880
                chainBackendAttempts = 0
×
1881
        }
×
1882

1883
        chainHealthCheck := healthcheck.NewObservation(
×
1884
                "chain backend",
×
1885
                cc.HealthCheck,
×
1886
                cfg.HealthChecks.ChainCheck.Interval,
×
1887
                cfg.HealthChecks.ChainCheck.Timeout,
×
1888
                cfg.HealthChecks.ChainCheck.Backoff,
×
1889
                chainBackendAttempts,
×
1890
        )
×
1891

×
1892
        diskCheck := healthcheck.NewObservation(
×
1893
                "disk space",
×
1894
                func() error {
×
1895
                        free, err := healthcheck.AvailableDiskSpaceRatio(
×
1896
                                cfg.LndDir,
×
1897
                        )
×
1898
                        if err != nil {
×
1899
                                return err
×
1900
                        }
×
1901

1902
                        // If we have more free space than we require,
1903
                        // we return a nil error.
1904
                        if free > cfg.HealthChecks.DiskCheck.RequiredRemaining {
×
1905
                                return nil
×
1906
                        }
×
1907

1908
                        return fmt.Errorf("require: %v free space, got: %v",
×
1909
                                cfg.HealthChecks.DiskCheck.RequiredRemaining,
×
1910
                                free)
×
1911
                },
1912
                cfg.HealthChecks.DiskCheck.Interval,
1913
                cfg.HealthChecks.DiskCheck.Timeout,
1914
                cfg.HealthChecks.DiskCheck.Backoff,
1915
                cfg.HealthChecks.DiskCheck.Attempts,
1916
        )
1917

1918
        tlsHealthCheck := healthcheck.NewObservation(
×
1919
                "tls",
×
1920
                func() error {
×
1921
                        expired, expTime, err := s.tlsManager.IsCertExpired(
×
1922
                                s.cc.KeyRing,
×
1923
                        )
×
1924
                        if err != nil {
×
1925
                                return err
×
1926
                        }
×
1927
                        if expired {
×
1928
                                return fmt.Errorf("TLS certificate is "+
×
1929
                                        "expired as of %v", expTime)
×
1930
                        }
×
1931

1932
                        // If the certificate is not outdated, no error needs
1933
                        // to be returned
1934
                        return nil
×
1935
                },
1936
                cfg.HealthChecks.TLSCheck.Interval,
1937
                cfg.HealthChecks.TLSCheck.Timeout,
1938
                cfg.HealthChecks.TLSCheck.Backoff,
1939
                cfg.HealthChecks.TLSCheck.Attempts,
1940
        )
1941

1942
        checks := []*healthcheck.Observation{
×
1943
                chainHealthCheck, diskCheck, tlsHealthCheck,
×
1944
        }
×
1945

×
1946
        // If Tor is enabled, add the healthcheck for tor connection.
×
1947
        if s.torController != nil {
×
1948
                torConnectionCheck := healthcheck.NewObservation(
×
1949
                        "tor connection",
×
1950
                        func() error {
×
1951
                                return healthcheck.CheckTorServiceStatus(
×
1952
                                        s.torController,
×
1953
                                        s.createNewHiddenService,
×
1954
                                )
×
1955
                        },
×
1956
                        cfg.HealthChecks.TorConnection.Interval,
1957
                        cfg.HealthChecks.TorConnection.Timeout,
1958
                        cfg.HealthChecks.TorConnection.Backoff,
1959
                        cfg.HealthChecks.TorConnection.Attempts,
1960
                )
1961
                checks = append(checks, torConnectionCheck)
×
1962
        }
1963

1964
        // If remote signing is enabled, add the healthcheck for the remote
1965
        // signing RPC interface.
1966
        if s.cfg.RemoteSigner != nil && s.cfg.RemoteSigner.Enable {
×
1967
                // Because we have two cascading timeouts here, we need to add
×
1968
                // some slack to the "outer" one of them in case the "inner"
×
1969
                // returns exactly on time.
×
1970
                overhead := time.Millisecond * 10
×
1971

×
1972
                remoteSignerConnectionCheck := healthcheck.NewObservation(
×
1973
                        "remote signer connection",
×
1974
                        rpcwallet.HealthCheck(
×
1975
                                s.cfg.RemoteSigner,
×
1976

×
1977
                                // For the health check we might to be even
×
1978
                                // stricter than the initial/normal connect, so
×
1979
                                // we use the health check timeout here.
×
1980
                                cfg.HealthChecks.RemoteSigner.Timeout,
×
1981
                        ),
×
1982
                        cfg.HealthChecks.RemoteSigner.Interval,
×
1983
                        cfg.HealthChecks.RemoteSigner.Timeout+overhead,
×
1984
                        cfg.HealthChecks.RemoteSigner.Backoff,
×
1985
                        cfg.HealthChecks.RemoteSigner.Attempts,
×
1986
                )
×
1987
                checks = append(checks, remoteSignerConnectionCheck)
×
1988
        }
×
1989

1990
        // If we have a leader elector, we add a health check to ensure we are
1991
        // still the leader. During normal operation, we should always be the
1992
        // leader, but there are circumstances where this may change, such as
1993
        // when we lose network connectivity for long enough expiring out lease.
1994
        if leaderElector != nil {
×
1995
                leaderCheck := healthcheck.NewObservation(
×
1996
                        "leader status",
×
1997
                        func() error {
×
1998
                                // Check if we are still the leader. Note that
×
1999
                                // we don't need to use a timeout context here
×
2000
                                // as the healthcheck observer will handle the
×
2001
                                // timeout case for us.
×
2002
                                timeoutCtx, cancel := context.WithTimeout(
×
2003
                                        context.Background(),
×
2004
                                        cfg.HealthChecks.LeaderCheck.Timeout,
×
2005
                                )
×
2006
                                defer cancel()
×
2007

×
2008
                                leader, err := leaderElector.IsLeader(
×
2009
                                        timeoutCtx,
×
2010
                                )
×
2011
                                if err != nil {
×
2012
                                        return fmt.Errorf("unable to check if "+
×
2013
                                                "still leader: %v", err)
×
2014
                                }
×
2015

2016
                                if !leader {
×
2017
                                        srvrLog.Debug("Not the current leader")
×
2018
                                        return fmt.Errorf("not the current " +
×
2019
                                                "leader")
×
2020
                                }
×
2021

2022
                                return nil
×
2023
                        },
2024
                        cfg.HealthChecks.LeaderCheck.Interval,
2025
                        cfg.HealthChecks.LeaderCheck.Timeout,
2026
                        cfg.HealthChecks.LeaderCheck.Backoff,
2027
                        cfg.HealthChecks.LeaderCheck.Attempts,
2028
                )
2029

2030
                checks = append(checks, leaderCheck)
×
2031
        }
2032

2033
        // If we have not disabled all of our health checks, we create a
2034
        // liveness monitor with our configured checks.
2035
        s.livenessMonitor = healthcheck.NewMonitor(
×
2036
                &healthcheck.Config{
×
2037
                        Checks:   checks,
×
2038
                        Shutdown: srvrLog.Criticalf,
×
2039
                },
×
2040
        )
×
2041
}
2042

2043
// Started returns true if the server has been started, and false otherwise.
2044
// NOTE: This function is safe for concurrent access.
2045
func (s *server) Started() bool {
×
2046
        return atomic.LoadInt32(&s.active) != 0
×
2047
}
×
2048

2049
// cleaner is used to aggregate "cleanup" functions during an operation that
2050
// starts several subsystems. In case one of the subsystem fails to start
2051
// and a proper resource cleanup is required, the "run" method achieves this
2052
// by running all these added "cleanup" functions.
2053
type cleaner []func() error
2054

2055
// add is used to add a cleanup function to be called when
2056
// the run function is executed.
2057
func (c cleaner) add(cleanup func() error) cleaner {
×
2058
        return append(c, cleanup)
×
2059
}
×
2060

2061
// run is used to run all the previousely added cleanup functions.
2062
func (c cleaner) run() {
×
2063
        for i := len(c) - 1; i >= 0; i-- {
×
2064
                if err := c[i](); err != nil {
×
2065
                        srvrLog.Infof("Cleanup failed: %v", err)
×
2066
                }
×
2067
        }
2068
}
2069

2070
// Start starts the main daemon server, all requested listeners, and any helper
2071
// goroutines.
2072
// NOTE: This function is safe for concurrent access.
2073
//
2074
//nolint:funlen
2075
func (s *server) Start() error {
×
2076
        var startErr error
×
2077

×
2078
        // If one sub system fails to start, the following code ensures that the
×
2079
        // previous started ones are stopped. It also ensures a proper wallet
×
2080
        // shutdown which is important for releasing its resources (boltdb, etc...)
×
2081
        cleanup := cleaner{}
×
2082

×
2083
        s.start.Do(func() {
×
2084
                cleanup = cleanup.add(s.customMessageServer.Stop)
×
2085
                if err := s.customMessageServer.Start(); err != nil {
×
2086
                        startErr = err
×
2087
                        return
×
2088
                }
×
2089

2090
                if s.hostAnn != nil {
×
2091
                        cleanup = cleanup.add(s.hostAnn.Stop)
×
2092
                        if err := s.hostAnn.Start(); err != nil {
×
2093
                                startErr = err
×
2094
                                return
×
2095
                        }
×
2096
                }
2097

2098
                if s.livenessMonitor != nil {
×
2099
                        cleanup = cleanup.add(s.livenessMonitor.Stop)
×
2100
                        if err := s.livenessMonitor.Start(); err != nil {
×
2101
                                startErr = err
×
2102
                                return
×
2103
                        }
×
2104
                }
2105

2106
                // Start the notification server. This is used so channel
2107
                // management goroutines can be notified when a funding
2108
                // transaction reaches a sufficient number of confirmations, or
2109
                // when the input for the funding transaction is spent in an
2110
                // attempt at an uncooperative close by the counterparty.
2111
                cleanup = cleanup.add(s.sigPool.Stop)
×
2112
                if err := s.sigPool.Start(); err != nil {
×
2113
                        startErr = err
×
2114
                        return
×
2115
                }
×
2116

2117
                cleanup = cleanup.add(s.writePool.Stop)
×
2118
                if err := s.writePool.Start(); err != nil {
×
2119
                        startErr = err
×
2120
                        return
×
2121
                }
×
2122

2123
                cleanup = cleanup.add(s.readPool.Stop)
×
2124
                if err := s.readPool.Start(); err != nil {
×
2125
                        startErr = err
×
2126
                        return
×
2127
                }
×
2128

2129
                cleanup = cleanup.add(s.cc.ChainNotifier.Stop)
×
2130
                if err := s.cc.ChainNotifier.Start(); err != nil {
×
2131
                        startErr = err
×
2132
                        return
×
2133
                }
×
2134

2135
                cleanup = cleanup.add(s.cc.BestBlockTracker.Stop)
×
2136
                if err := s.cc.BestBlockTracker.Start(); err != nil {
×
2137
                        startErr = err
×
2138
                        return
×
2139
                }
×
2140

2141
                cleanup = cleanup.add(s.channelNotifier.Stop)
×
2142
                if err := s.channelNotifier.Start(); err != nil {
×
2143
                        startErr = err
×
2144
                        return
×
2145
                }
×
2146

2147
                cleanup = cleanup.add(func() error {
×
2148
                        return s.peerNotifier.Stop()
×
2149
                })
×
2150
                if err := s.peerNotifier.Start(); err != nil {
×
2151
                        startErr = err
×
2152
                        return
×
2153
                }
×
2154

2155
                cleanup = cleanup.add(s.htlcNotifier.Stop)
×
2156
                if err := s.htlcNotifier.Start(); err != nil {
×
2157
                        startErr = err
×
2158
                        return
×
2159
                }
×
2160

2161
                if s.towerClientMgr != nil {
×
2162
                        cleanup = cleanup.add(s.towerClientMgr.Stop)
×
2163
                        if err := s.towerClientMgr.Start(); err != nil {
×
2164
                                startErr = err
×
2165
                                return
×
2166
                        }
×
2167
                }
2168

2169
                cleanup = cleanup.add(s.txPublisher.Stop)
×
2170
                if err := s.txPublisher.Start(); err != nil {
×
2171
                        startErr = err
×
2172
                        return
×
2173
                }
×
2174

2175
                cleanup = cleanup.add(s.sweeper.Stop)
×
2176
                if err := s.sweeper.Start(); err != nil {
×
2177
                        startErr = err
×
2178
                        return
×
2179
                }
×
2180

2181
                cleanup = cleanup.add(s.utxoNursery.Stop)
×
2182
                if err := s.utxoNursery.Start(); err != nil {
×
2183
                        startErr = err
×
2184
                        return
×
2185
                }
×
2186

2187
                cleanup = cleanup.add(s.breachArbitrator.Stop)
×
2188
                if err := s.breachArbitrator.Start(); err != nil {
×
2189
                        startErr = err
×
2190
                        return
×
2191
                }
×
2192

2193
                cleanup = cleanup.add(s.fundingMgr.Stop)
×
2194
                if err := s.fundingMgr.Start(); err != nil {
×
2195
                        startErr = err
×
2196
                        return
×
2197
                }
×
2198

2199
                // htlcSwitch must be started before chainArb since the latter
2200
                // relies on htlcSwitch to deliver resolution message upon
2201
                // start.
2202
                cleanup = cleanup.add(s.htlcSwitch.Stop)
×
2203
                if err := s.htlcSwitch.Start(); err != nil {
×
2204
                        startErr = err
×
2205
                        return
×
2206
                }
×
2207

2208
                cleanup = cleanup.add(s.interceptableSwitch.Stop)
×
2209
                if err := s.interceptableSwitch.Start(); err != nil {
×
2210
                        startErr = err
×
2211
                        return
×
2212
                }
×
2213

2214
                cleanup = cleanup.add(s.invoiceHtlcModifier.Stop)
×
2215
                if err := s.invoiceHtlcModifier.Start(); err != nil {
×
2216
                        startErr = err
×
2217
                        return
×
2218
                }
×
2219

2220
                cleanup = cleanup.add(s.chainArb.Stop)
×
2221
                if err := s.chainArb.Start(); err != nil {
×
2222
                        startErr = err
×
2223
                        return
×
2224
                }
×
2225

2226
                cleanup = cleanup.add(s.graphBuilder.Stop)
×
2227
                if err := s.graphBuilder.Start(); err != nil {
×
2228
                        startErr = err
×
2229
                        return
×
2230
                }
×
2231

2232
                cleanup = cleanup.add(s.chanRouter.Stop)
×
2233
                if err := s.chanRouter.Start(); err != nil {
×
2234
                        startErr = err
×
2235
                        return
×
2236
                }
×
2237
                // The authGossiper depends on the chanRouter and therefore
2238
                // should be started after it.
2239
                cleanup = cleanup.add(s.authGossiper.Stop)
×
2240
                if err := s.authGossiper.Start(); err != nil {
×
2241
                        startErr = err
×
2242
                        return
×
2243
                }
×
2244

2245
                cleanup = cleanup.add(s.invoices.Stop)
×
2246
                if err := s.invoices.Start(); err != nil {
×
2247
                        startErr = err
×
2248
                        return
×
2249
                }
×
2250

2251
                cleanup = cleanup.add(s.sphinx.Stop)
×
2252
                if err := s.sphinx.Start(); err != nil {
×
2253
                        startErr = err
×
2254
                        return
×
2255
                }
×
2256

2257
                cleanup = cleanup.add(s.chanStatusMgr.Stop)
×
2258
                if err := s.chanStatusMgr.Start(); err != nil {
×
2259
                        startErr = err
×
2260
                        return
×
2261
                }
×
2262

2263
                cleanup = cleanup.add(s.chanEventStore.Stop)
×
2264
                if err := s.chanEventStore.Start(); err != nil {
×
2265
                        startErr = err
×
2266
                        return
×
2267
                }
×
2268

2269
                cleanup.add(func() error {
×
2270
                        s.missionController.StopStoreTickers()
×
2271
                        return nil
×
2272
                })
×
2273
                s.missionController.RunStoreTickers()
×
2274

×
2275
                // Before we start the connMgr, we'll check to see if we have
×
2276
                // any backups to recover. We do this now as we want to ensure
×
2277
                // that have all the information we need to handle channel
×
2278
                // recovery _before_ we even accept connections from any peers.
×
2279
                chanRestorer := &chanDBRestorer{
×
2280
                        db:         s.chanStateDB,
×
2281
                        secretKeys: s.cc.KeyRing,
×
2282
                        chainArb:   s.chainArb,
×
2283
                }
×
2284
                if len(s.chansToRestore.PackedSingleChanBackups) != 0 {
×
2285
                        _, err := chanbackup.UnpackAndRecoverSingles(
×
2286
                                s.chansToRestore.PackedSingleChanBackups,
×
2287
                                s.cc.KeyRing, chanRestorer, s,
×
2288
                        )
×
2289
                        if err != nil {
×
2290
                                startErr = fmt.Errorf("unable to unpack single "+
×
2291
                                        "backups: %v", err)
×
2292
                                return
×
2293
                        }
×
2294
                }
2295
                if len(s.chansToRestore.PackedMultiChanBackup) != 0 {
×
2296
                        _, err := chanbackup.UnpackAndRecoverMulti(
×
2297
                                s.chansToRestore.PackedMultiChanBackup,
×
2298
                                s.cc.KeyRing, chanRestorer, s,
×
2299
                        )
×
2300
                        if err != nil {
×
2301
                                startErr = fmt.Errorf("unable to unpack chan "+
×
2302
                                        "backup: %v", err)
×
2303
                                return
×
2304
                        }
×
2305
                }
2306

2307
                // chanSubSwapper must be started after the `channelNotifier`
2308
                // because it depends on channel events as a synchronization
2309
                // point.
2310
                cleanup = cleanup.add(s.chanSubSwapper.Stop)
×
2311
                if err := s.chanSubSwapper.Start(); err != nil {
×
2312
                        startErr = err
×
2313
                        return
×
2314
                }
×
2315

2316
                if s.torController != nil {
×
2317
                        cleanup = cleanup.add(s.torController.Stop)
×
2318
                        if err := s.createNewHiddenService(); err != nil {
×
2319
                                startErr = err
×
2320
                                return
×
2321
                        }
×
2322
                }
2323

2324
                if s.natTraversal != nil {
×
2325
                        s.wg.Add(1)
×
2326
                        go s.watchExternalIP()
×
2327
                }
×
2328

2329
                // Start connmgr last to prevent connections before init.
2330
                cleanup = cleanup.add(func() error {
×
2331
                        s.connMgr.Stop()
×
2332
                        return nil
×
2333
                })
×
2334
                s.connMgr.Start()
×
2335

×
2336
                // If peers are specified as a config option, we'll add those
×
2337
                // peers first.
×
2338
                for _, peerAddrCfg := range s.cfg.AddPeers {
×
2339
                        parsedPubkey, parsedHost, err := lncfg.ParseLNAddressPubkey(
×
2340
                                peerAddrCfg,
×
2341
                        )
×
2342
                        if err != nil {
×
2343
                                startErr = fmt.Errorf("unable to parse peer "+
×
2344
                                        "pubkey from config: %v", err)
×
2345
                                return
×
2346
                        }
×
2347
                        addr, err := parseAddr(parsedHost, s.cfg.net)
×
2348
                        if err != nil {
×
2349
                                startErr = fmt.Errorf("unable to parse peer "+
×
2350
                                        "address provided as a config option: "+
×
2351
                                        "%v", err)
×
2352
                                return
×
2353
                        }
×
2354

2355
                        peerAddr := &lnwire.NetAddress{
×
2356
                                IdentityKey: parsedPubkey,
×
2357
                                Address:     addr,
×
2358
                                ChainNet:    s.cfg.ActiveNetParams.Net,
×
2359
                        }
×
2360

×
2361
                        err = s.ConnectToPeer(
×
2362
                                peerAddr, true,
×
2363
                                s.cfg.ConnectionTimeout,
×
2364
                        )
×
2365
                        if err != nil {
×
2366
                                startErr = fmt.Errorf("unable to connect to "+
×
2367
                                        "peer address provided as a config "+
×
2368
                                        "option: %v", err)
×
2369
                                return
×
2370
                        }
×
2371
                }
2372

2373
                // Subscribe to NodeAnnouncements that advertise new addresses
2374
                // our persistent peers.
2375
                if err := s.updatePersistentPeerAddrs(); err != nil {
×
2376
                        startErr = err
×
2377
                        return
×
2378
                }
×
2379

2380
                // With all the relevant sub-systems started, we'll now attempt
2381
                // to establish persistent connections to our direct channel
2382
                // collaborators within the network. Before doing so however,
2383
                // we'll prune our set of link nodes found within the database
2384
                // to ensure we don't reconnect to any nodes we no longer have
2385
                // open channels with.
2386
                if err := s.chanStateDB.PruneLinkNodes(); err != nil {
×
2387
                        startErr = err
×
2388
                        return
×
2389
                }
×
2390
                if err := s.establishPersistentConnections(); err != nil {
×
2391
                        startErr = err
×
2392
                        return
×
2393
                }
×
2394

2395
                // setSeedList is a helper function that turns multiple DNS seed
2396
                // server tuples from the command line or config file into the
2397
                // data structure we need and does a basic formal sanity check
2398
                // in the process.
2399
                setSeedList := func(tuples []string, genesisHash chainhash.Hash) {
×
2400
                        if len(tuples) == 0 {
×
2401
                                return
×
2402
                        }
×
2403

2404
                        result := make([][2]string, len(tuples))
×
2405
                        for idx, tuple := range tuples {
×
2406
                                tuple = strings.TrimSpace(tuple)
×
2407
                                if len(tuple) == 0 {
×
2408
                                        return
×
2409
                                }
×
2410

2411
                                servers := strings.Split(tuple, ",")
×
2412
                                if len(servers) > 2 || len(servers) == 0 {
×
2413
                                        srvrLog.Warnf("Ignoring invalid DNS "+
×
2414
                                                "seed tuple: %v", servers)
×
2415
                                        return
×
2416
                                }
×
2417

2418
                                copy(result[idx][:], servers)
×
2419
                        }
2420

2421
                        chainreg.ChainDNSSeeds[genesisHash] = result
×
2422
                }
2423

2424
                // Let users overwrite the DNS seed nodes. We only allow them
2425
                // for bitcoin mainnet/testnet/signet.
2426
                if s.cfg.Bitcoin.MainNet {
×
2427
                        setSeedList(
×
2428
                                s.cfg.Bitcoin.DNSSeeds,
×
2429
                                chainreg.BitcoinMainnetGenesis,
×
2430
                        )
×
2431
                }
×
2432
                if s.cfg.Bitcoin.TestNet3 {
×
2433
                        setSeedList(
×
2434
                                s.cfg.Bitcoin.DNSSeeds,
×
2435
                                chainreg.BitcoinTestnetGenesis,
×
2436
                        )
×
2437
                }
×
2438
                if s.cfg.Bitcoin.SigNet {
×
2439
                        setSeedList(
×
2440
                                s.cfg.Bitcoin.DNSSeeds,
×
2441
                                chainreg.BitcoinSignetGenesis,
×
2442
                        )
×
2443
                }
×
2444

2445
                // If network bootstrapping hasn't been disabled, then we'll
2446
                // configure the set of active bootstrappers, and launch a
2447
                // dedicated goroutine to maintain a set of persistent
2448
                // connections.
2449
                if shouldPeerBootstrap(s.cfg) {
×
2450
                        bootstrappers, err := initNetworkBootstrappers(s)
×
2451
                        if err != nil {
×
2452
                                startErr = err
×
2453
                                return
×
2454
                        }
×
2455

2456
                        s.wg.Add(1)
×
2457
                        go s.peerBootstrapper(defaultMinPeers, bootstrappers)
×
2458
                } else {
×
2459
                        srvrLog.Infof("Auto peer bootstrapping is disabled")
×
2460
                }
×
2461

2462
                // Set the active flag now that we've completed the full
2463
                // startup.
2464
                atomic.StoreInt32(&s.active, 1)
×
2465
        })
2466

2467
        if startErr != nil {
×
2468
                cleanup.run()
×
2469
        }
×
2470
        return startErr
×
2471
}
2472

2473
// Stop gracefully shutsdown the main daemon server. This function will signal
2474
// any active goroutines, or helper objects to exit, then blocks until they've
2475
// all successfully exited. Additionally, any/all listeners are closed.
2476
// NOTE: This function is safe for concurrent access.
2477
func (s *server) Stop() error {
×
2478
        s.stop.Do(func() {
×
2479
                atomic.StoreInt32(&s.stopping, 1)
×
2480

×
2481
                close(s.quit)
×
2482

×
2483
                // Shutdown connMgr first to prevent conns during shutdown.
×
2484
                s.connMgr.Stop()
×
2485

×
2486
                // Shutdown the wallet, funding manager, and the rpc server.
×
2487
                if err := s.chanStatusMgr.Stop(); err != nil {
×
2488
                        srvrLog.Warnf("failed to stop chanStatusMgr: %v", err)
×
2489
                }
×
2490
                if err := s.htlcSwitch.Stop(); err != nil {
×
2491
                        srvrLog.Warnf("failed to stop htlcSwitch: %v", err)
×
2492
                }
×
2493
                if err := s.sphinx.Stop(); err != nil {
×
2494
                        srvrLog.Warnf("failed to stop sphinx: %v", err)
×
2495
                }
×
2496
                if err := s.invoices.Stop(); err != nil {
×
2497
                        srvrLog.Warnf("failed to stop invoices: %v", err)
×
2498
                }
×
2499
                if err := s.interceptableSwitch.Stop(); err != nil {
×
2500
                        srvrLog.Warnf("failed to stop interceptable "+
×
2501
                                "switch: %v", err)
×
2502
                }
×
2503
                if err := s.invoiceHtlcModifier.Stop(); err != nil {
×
2504
                        srvrLog.Warnf("failed to stop htlc invoices "+
×
2505
                                "modifier: %v", err)
×
2506
                }
×
2507
                if err := s.chanRouter.Stop(); err != nil {
×
2508
                        srvrLog.Warnf("failed to stop chanRouter: %v", err)
×
2509
                }
×
2510
                if err := s.graphBuilder.Stop(); err != nil {
×
2511
                        srvrLog.Warnf("failed to stop graphBuilder %v", err)
×
2512
                }
×
2513
                if err := s.chainArb.Stop(); err != nil {
×
2514
                        srvrLog.Warnf("failed to stop chainArb: %v", err)
×
2515
                }
×
2516
                if err := s.fundingMgr.Stop(); err != nil {
×
2517
                        srvrLog.Warnf("failed to stop fundingMgr: %v", err)
×
2518
                }
×
2519
                if err := s.breachArbitrator.Stop(); err != nil {
×
2520
                        srvrLog.Warnf("failed to stop breachArbitrator: %v",
×
2521
                                err)
×
2522
                }
×
2523
                if err := s.utxoNursery.Stop(); err != nil {
×
2524
                        srvrLog.Warnf("failed to stop utxoNursery: %v", err)
×
2525
                }
×
2526
                if err := s.authGossiper.Stop(); err != nil {
×
2527
                        srvrLog.Warnf("failed to stop authGossiper: %v", err)
×
2528
                }
×
2529
                if err := s.sweeper.Stop(); err != nil {
×
2530
                        srvrLog.Warnf("failed to stop sweeper: %v", err)
×
2531
                }
×
2532
                if err := s.txPublisher.Stop(); err != nil {
×
2533
                        srvrLog.Warnf("failed to stop txPublisher: %v", err)
×
2534
                }
×
2535
                if err := s.channelNotifier.Stop(); err != nil {
×
2536
                        srvrLog.Warnf("failed to stop channelNotifier: %v", err)
×
2537
                }
×
2538
                if err := s.peerNotifier.Stop(); err != nil {
×
2539
                        srvrLog.Warnf("failed to stop peerNotifier: %v", err)
×
2540
                }
×
2541
                if err := s.htlcNotifier.Stop(); err != nil {
×
2542
                        srvrLog.Warnf("failed to stop htlcNotifier: %v", err)
×
2543
                }
×
2544

2545
                // Update channel.backup file. Make sure to do it before
2546
                // stopping chanSubSwapper.
2547
                singles, err := chanbackup.FetchStaticChanBackups(
×
2548
                        s.chanStateDB, s.addrSource,
×
2549
                )
×
2550
                if err != nil {
×
2551
                        srvrLog.Warnf("failed to fetch channel states: %v",
×
2552
                                err)
×
2553
                } else {
×
2554
                        err := s.chanSubSwapper.ManualUpdate(singles)
×
2555
                        if err != nil {
×
2556
                                srvrLog.Warnf("Manual update of channel "+
×
2557
                                        "backup failed: %v", err)
×
2558
                        }
×
2559
                }
2560

2561
                if err := s.chanSubSwapper.Stop(); err != nil {
×
2562
                        srvrLog.Warnf("failed to stop chanSubSwapper: %v", err)
×
2563
                }
×
2564
                if err := s.cc.ChainNotifier.Stop(); err != nil {
×
2565
                        srvrLog.Warnf("Unable to stop ChainNotifier: %v", err)
×
2566
                }
×
2567
                if err := s.cc.BestBlockTracker.Stop(); err != nil {
×
2568
                        srvrLog.Warnf("Unable to stop BestBlockTracker: %v",
×
2569
                                err)
×
2570
                }
×
2571
                if err := s.chanEventStore.Stop(); err != nil {
×
2572
                        srvrLog.Warnf("Unable to stop ChannelEventStore: %v",
×
2573
                                err)
×
2574
                }
×
2575
                s.missionController.StopStoreTickers()
×
2576

×
2577
                // Disconnect from each active peers to ensure that
×
2578
                // peerTerminationWatchers signal completion to each peer.
×
2579
                for _, peer := range s.Peers() {
×
2580
                        err := s.DisconnectPeer(peer.IdentityKey())
×
2581
                        if err != nil {
×
2582
                                srvrLog.Warnf("could not disconnect peer: %v"+
×
2583
                                        "received error: %v", peer.IdentityKey(),
×
2584
                                        err,
×
2585
                                )
×
2586
                        }
×
2587
                }
2588

2589
                // Now that all connections have been torn down, stop the tower
2590
                // client which will reliably flush all queued states to the
2591
                // tower. If this is halted for any reason, the force quit timer
2592
                // will kick in and abort to allow this method to return.
2593
                if s.towerClientMgr != nil {
×
2594
                        if err := s.towerClientMgr.Stop(); err != nil {
×
2595
                                srvrLog.Warnf("Unable to shut down tower "+
×
2596
                                        "client manager: %v", err)
×
2597
                        }
×
2598
                }
2599

2600
                if s.hostAnn != nil {
×
2601
                        if err := s.hostAnn.Stop(); err != nil {
×
2602
                                srvrLog.Warnf("unable to shut down host "+
×
2603
                                        "annoucner: %v", err)
×
2604
                        }
×
2605
                }
2606

2607
                if s.livenessMonitor != nil {
×
2608
                        if err := s.livenessMonitor.Stop(); err != nil {
×
2609
                                srvrLog.Warnf("unable to shutdown liveness "+
×
2610
                                        "monitor: %v", err)
×
2611
                        }
×
2612
                }
2613

2614
                // Wait for all lingering goroutines to quit.
2615
                srvrLog.Debug("Waiting for server to shutdown...")
×
2616
                s.wg.Wait()
×
2617

×
2618
                srvrLog.Debug("Stopping buffer pools...")
×
2619
                s.sigPool.Stop()
×
2620
                s.writePool.Stop()
×
2621
                s.readPool.Stop()
×
2622
        })
2623

2624
        return nil
×
2625
}
2626

2627
// Stopped returns true if the server has been instructed to shutdown.
2628
// NOTE: This function is safe for concurrent access.
2629
func (s *server) Stopped() bool {
×
2630
        return atomic.LoadInt32(&s.stopping) != 0
×
2631
}
×
2632

2633
// configurePortForwarding attempts to set up port forwarding for the different
2634
// ports that the server will be listening on.
2635
//
2636
// NOTE: This should only be used when using some kind of NAT traversal to
2637
// automatically set up forwarding rules.
2638
func (s *server) configurePortForwarding(ports ...uint16) ([]string, error) {
×
2639
        ip, err := s.natTraversal.ExternalIP()
×
2640
        if err != nil {
×
2641
                return nil, err
×
2642
        }
×
2643
        s.lastDetectedIP = ip
×
2644

×
2645
        externalIPs := make([]string, 0, len(ports))
×
2646
        for _, port := range ports {
×
2647
                if err := s.natTraversal.AddPortMapping(port); err != nil {
×
2648
                        srvrLog.Debugf("Unable to forward port %d: %v", port, err)
×
2649
                        continue
×
2650
                }
2651

2652
                hostIP := fmt.Sprintf("%v:%d", ip, port)
×
2653
                externalIPs = append(externalIPs, hostIP)
×
2654
        }
2655

2656
        return externalIPs, nil
×
2657
}
2658

2659
// removePortForwarding attempts to clear the forwarding rules for the different
2660
// ports the server is currently listening on.
2661
//
2662
// NOTE: This should only be used when using some kind of NAT traversal to
2663
// automatically set up forwarding rules.
2664
func (s *server) removePortForwarding() {
×
2665
        forwardedPorts := s.natTraversal.ForwardedPorts()
×
2666
        for _, port := range forwardedPorts {
×
2667
                if err := s.natTraversal.DeletePortMapping(port); err != nil {
×
2668
                        srvrLog.Errorf("Unable to remove forwarding rules for "+
×
2669
                                "port %d: %v", port, err)
×
2670
                }
×
2671
        }
2672
}
2673

2674
// watchExternalIP continuously checks for an updated external IP address every
2675
// 15 minutes. Once a new IP address has been detected, it will automatically
2676
// handle port forwarding rules and send updated node announcements to the
2677
// currently connected peers.
2678
//
2679
// NOTE: This MUST be run as a goroutine.
2680
func (s *server) watchExternalIP() {
×
2681
        defer s.wg.Done()
×
2682

×
2683
        // Before exiting, we'll make sure to remove the forwarding rules set
×
2684
        // up by the server.
×
2685
        defer s.removePortForwarding()
×
2686

×
2687
        // Keep track of the external IPs set by the user to avoid replacing
×
2688
        // them when detecting a new IP.
×
2689
        ipsSetByUser := make(map[string]struct{})
×
2690
        for _, ip := range s.cfg.ExternalIPs {
×
2691
                ipsSetByUser[ip.String()] = struct{}{}
×
2692
        }
×
2693

2694
        forwardedPorts := s.natTraversal.ForwardedPorts()
×
2695

×
2696
        ticker := time.NewTicker(15 * time.Minute)
×
2697
        defer ticker.Stop()
×
2698
out:
×
2699
        for {
×
2700
                select {
×
2701
                case <-ticker.C:
×
2702
                        // We'll start off by making sure a new IP address has
×
2703
                        // been detected.
×
2704
                        ip, err := s.natTraversal.ExternalIP()
×
2705
                        if err != nil {
×
2706
                                srvrLog.Debugf("Unable to retrieve the "+
×
2707
                                        "external IP address: %v", err)
×
2708
                                continue
×
2709
                        }
2710

2711
                        // Periodically renew the NAT port forwarding.
2712
                        for _, port := range forwardedPorts {
×
2713
                                err := s.natTraversal.AddPortMapping(port)
×
2714
                                if err != nil {
×
2715
                                        srvrLog.Warnf("Unable to automatically "+
×
2716
                                                "re-create port forwarding using %s: %v",
×
2717
                                                s.natTraversal.Name(), err)
×
2718
                                } else {
×
2719
                                        srvrLog.Debugf("Automatically re-created "+
×
2720
                                                "forwarding for port %d using %s to "+
×
2721
                                                "advertise external IP",
×
2722
                                                port, s.natTraversal.Name())
×
2723
                                }
×
2724
                        }
2725

2726
                        if ip.Equal(s.lastDetectedIP) {
×
2727
                                continue
×
2728
                        }
2729

2730
                        srvrLog.Infof("Detected new external IP address %s", ip)
×
2731

×
2732
                        // Next, we'll craft the new addresses that will be
×
2733
                        // included in the new node announcement and advertised
×
2734
                        // to the network. Each address will consist of the new
×
2735
                        // IP detected and one of the currently advertised
×
2736
                        // ports.
×
2737
                        var newAddrs []net.Addr
×
2738
                        for _, port := range forwardedPorts {
×
2739
                                hostIP := fmt.Sprintf("%v:%d", ip, port)
×
2740
                                addr, err := net.ResolveTCPAddr("tcp", hostIP)
×
2741
                                if err != nil {
×
2742
                                        srvrLog.Debugf("Unable to resolve "+
×
2743
                                                "host %v: %v", addr, err)
×
2744
                                        continue
×
2745
                                }
2746

2747
                                newAddrs = append(newAddrs, addr)
×
2748
                        }
2749

2750
                        // Skip the update if we weren't able to resolve any of
2751
                        // the new addresses.
2752
                        if len(newAddrs) == 0 {
×
2753
                                srvrLog.Debug("Skipping node announcement " +
×
2754
                                        "update due to not being able to " +
×
2755
                                        "resolve any new addresses")
×
2756
                                continue
×
2757
                        }
2758

2759
                        // Now, we'll need to update the addresses in our node's
2760
                        // announcement in order to propagate the update
2761
                        // throughout the network. We'll only include addresses
2762
                        // that have a different IP from the previous one, as
2763
                        // the previous IP is no longer valid.
2764
                        currentNodeAnn := s.getNodeAnnouncement()
×
2765

×
2766
                        for _, addr := range currentNodeAnn.Addresses {
×
2767
                                host, _, err := net.SplitHostPort(addr.String())
×
2768
                                if err != nil {
×
2769
                                        srvrLog.Debugf("Unable to determine "+
×
2770
                                                "host from address %v: %v",
×
2771
                                                addr, err)
×
2772
                                        continue
×
2773
                                }
2774

2775
                                // We'll also make sure to include external IPs
2776
                                // set manually by the user.
2777
                                _, setByUser := ipsSetByUser[addr.String()]
×
2778
                                if setByUser || host != s.lastDetectedIP.String() {
×
2779
                                        newAddrs = append(newAddrs, addr)
×
2780
                                }
×
2781
                        }
2782

2783
                        // Then, we'll generate a new timestamped node
2784
                        // announcement with the updated addresses and broadcast
2785
                        // it to our peers.
2786
                        newNodeAnn, err := s.genNodeAnnouncement(
×
2787
                                nil, netann.NodeAnnSetAddrs(newAddrs),
×
2788
                        )
×
2789
                        if err != nil {
×
2790
                                srvrLog.Debugf("Unable to generate new node "+
×
2791
                                        "announcement: %v", err)
×
2792
                                continue
×
2793
                        }
2794

2795
                        err = s.BroadcastMessage(nil, &newNodeAnn)
×
2796
                        if err != nil {
×
2797
                                srvrLog.Debugf("Unable to broadcast new node "+
×
2798
                                        "announcement to peers: %v", err)
×
2799
                                continue
×
2800
                        }
2801

2802
                        // Finally, update the last IP seen to the current one.
2803
                        s.lastDetectedIP = ip
×
2804
                case <-s.quit:
×
2805
                        break out
×
2806
                }
2807
        }
2808
}
2809

2810
// initNetworkBootstrappers initializes a set of network peer bootstrappers
2811
// based on the server, and currently active bootstrap mechanisms as defined
2812
// within the current configuration.
2813
func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, error) {
×
2814
        srvrLog.Infof("Initializing peer network bootstrappers!")
×
2815

×
2816
        var bootStrappers []discovery.NetworkPeerBootstrapper
×
2817

×
2818
        // First, we'll create an instance of the ChannelGraphBootstrapper as
×
2819
        // this can be used by default if we've already partially seeded the
×
2820
        // network.
×
2821
        chanGraph := autopilot.ChannelGraphFromDatabase(s.graphDB)
×
2822
        graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph)
×
2823
        if err != nil {
×
2824
                return nil, err
×
2825
        }
×
2826
        bootStrappers = append(bootStrappers, graphBootstrapper)
×
2827

×
2828
        // If this isn't simnet mode, then one of our additional bootstrapping
×
2829
        // sources will be the set of running DNS seeds.
×
2830
        if !s.cfg.Bitcoin.SimNet {
×
2831
                dnsSeeds, ok := chainreg.ChainDNSSeeds[*s.cfg.ActiveNetParams.GenesisHash]
×
2832

×
2833
                // If we have a set of DNS seeds for this chain, then we'll add
×
2834
                // it as an additional bootstrapping source.
×
2835
                if ok {
×
2836
                        srvrLog.Infof("Creating DNS peer bootstrapper with "+
×
2837
                                "seeds: %v", dnsSeeds)
×
2838

×
2839
                        dnsBootStrapper := discovery.NewDNSSeedBootstrapper(
×
2840
                                dnsSeeds, s.cfg.net, s.cfg.ConnectionTimeout,
×
2841
                        )
×
2842
                        bootStrappers = append(bootStrappers, dnsBootStrapper)
×
2843
                }
×
2844
        }
2845

2846
        return bootStrappers, nil
×
2847
}
2848

2849
// createBootstrapIgnorePeers creates a map of peers that the bootstrap process
2850
// needs to ignore, which is made of three parts,
2851
//   - the node itself needs to be skipped as it doesn't make sense to connect
2852
//     to itself.
2853
//   - the peers that already have connections with, as in s.peersByPub.
2854
//   - the peers that we are attempting to connect, as in s.persistentPeers.
2855
func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} {
×
2856
        s.mu.RLock()
×
2857
        defer s.mu.RUnlock()
×
2858

×
2859
        ignore := make(map[autopilot.NodeID]struct{})
×
2860

×
2861
        // We should ignore ourselves from bootstrapping.
×
2862
        selfKey := autopilot.NewNodeID(s.identityECDH.PubKey())
×
2863
        ignore[selfKey] = struct{}{}
×
2864

×
2865
        // Ignore all connected peers.
×
2866
        for _, peer := range s.peersByPub {
×
2867
                nID := autopilot.NewNodeID(peer.IdentityKey())
×
2868
                ignore[nID] = struct{}{}
×
2869
        }
×
2870

2871
        // Ignore all persistent peers as they have a dedicated reconnecting
2872
        // process.
2873
        for pubKeyStr := range s.persistentPeers {
×
2874
                var nID autopilot.NodeID
×
2875
                copy(nID[:], []byte(pubKeyStr))
×
2876
                ignore[nID] = struct{}{}
×
2877
        }
×
2878

2879
        return ignore
×
2880
}
2881

2882
// peerBootstrapper is a goroutine which is tasked with attempting to establish
2883
// and maintain a target minimum number of outbound connections. With this
2884
// invariant, we ensure that our node is connected to a diverse set of peers
2885
// and that nodes newly joining the network receive an up to date network view
2886
// as soon as possible.
2887
func (s *server) peerBootstrapper(numTargetPeers uint32,
2888
        bootstrappers []discovery.NetworkPeerBootstrapper) {
×
2889

×
2890
        defer s.wg.Done()
×
2891

×
2892
        // Before we continue, init the ignore peers map.
×
2893
        ignoreList := s.createBootstrapIgnorePeers()
×
2894

×
2895
        // We'll start off by aggressively attempting connections to peers in
×
2896
        // order to be a part of the network as soon as possible.
×
2897
        s.initialPeerBootstrap(ignoreList, numTargetPeers, bootstrappers)
×
2898

×
2899
        // Once done, we'll attempt to maintain our target minimum number of
×
2900
        // peers.
×
2901
        //
×
2902
        // We'll use a 15 second backoff, and double the time every time an
×
2903
        // epoch fails up to a ceiling.
×
2904
        backOff := time.Second * 15
×
2905

×
2906
        // We'll create a new ticker to wake us up every 15 seconds so we can
×
2907
        // see if we've reached our minimum number of peers.
×
2908
        sampleTicker := time.NewTicker(backOff)
×
2909
        defer sampleTicker.Stop()
×
2910

×
2911
        // We'll use the number of attempts and errors to determine if we need
×
2912
        // to increase the time between discovery epochs.
×
2913
        var epochErrors uint32 // To be used atomically.
×
2914
        var epochAttempts uint32
×
2915

×
2916
        for {
×
2917
                select {
×
2918
                // The ticker has just woken us up, so we'll need to check if
2919
                // we need to attempt to connect our to any more peers.
2920
                case <-sampleTicker.C:
×
2921
                        // Obtain the current number of peers, so we can gauge
×
2922
                        // if we need to sample more peers or not.
×
2923
                        s.mu.RLock()
×
2924
                        numActivePeers := uint32(len(s.peersByPub))
×
2925
                        s.mu.RUnlock()
×
2926

×
2927
                        // If we have enough peers, then we can loop back
×
2928
                        // around to the next round as we're done here.
×
2929
                        if numActivePeers >= numTargetPeers {
×
2930
                                continue
×
2931
                        }
2932

2933
                        // If all of our attempts failed during this last back
2934
                        // off period, then will increase our backoff to 5
2935
                        // minute ceiling to avoid an excessive number of
2936
                        // queries
2937
                        //
2938
                        // TODO(roasbeef): add reverse policy too?
2939

2940
                        if epochAttempts > 0 &&
×
2941
                                atomic.LoadUint32(&epochErrors) >= epochAttempts {
×
2942

×
2943
                                sampleTicker.Stop()
×
2944

×
2945
                                backOff *= 2
×
2946
                                if backOff > bootstrapBackOffCeiling {
×
2947
                                        backOff = bootstrapBackOffCeiling
×
2948
                                }
×
2949

2950
                                srvrLog.Debugf("Backing off peer bootstrapper to "+
×
2951
                                        "%v", backOff)
×
2952
                                sampleTicker = time.NewTicker(backOff)
×
2953
                                continue
×
2954
                        }
2955

2956
                        atomic.StoreUint32(&epochErrors, 0)
×
2957
                        epochAttempts = 0
×
2958

×
2959
                        // Since we know need more peers, we'll compute the
×
2960
                        // exact number we need to reach our threshold.
×
2961
                        numNeeded := numTargetPeers - numActivePeers
×
2962

×
2963
                        srvrLog.Debugf("Attempting to obtain %v more network "+
×
2964
                                "peers", numNeeded)
×
2965

×
2966
                        // With the number of peers we need calculated, we'll
×
2967
                        // query the network bootstrappers to sample a set of
×
2968
                        // random addrs for us.
×
2969
                        //
×
2970
                        // Before we continue, get a copy of the ignore peers
×
2971
                        // map.
×
2972
                        ignoreList = s.createBootstrapIgnorePeers()
×
2973

×
2974
                        peerAddrs, err := discovery.MultiSourceBootstrap(
×
2975
                                ignoreList, numNeeded*2, bootstrappers...,
×
2976
                        )
×
2977
                        if err != nil {
×
2978
                                srvrLog.Errorf("Unable to retrieve bootstrap "+
×
2979
                                        "peers: %v", err)
×
2980
                                continue
×
2981
                        }
2982

2983
                        // Finally, we'll launch a new goroutine for each
2984
                        // prospective peer candidates.
2985
                        for _, addr := range peerAddrs {
×
2986
                                epochAttempts++
×
2987

×
2988
                                go func(a *lnwire.NetAddress) {
×
2989
                                        // TODO(roasbeef): can do AS, subnet,
×
2990
                                        // country diversity, etc
×
2991
                                        errChan := make(chan error, 1)
×
2992
                                        s.connectToPeer(
×
2993
                                                a, errChan,
×
2994
                                                s.cfg.ConnectionTimeout,
×
2995
                                        )
×
2996
                                        select {
×
2997
                                        case err := <-errChan:
×
2998
                                                if err == nil {
×
2999
                                                        return
×
3000
                                                }
×
3001

3002
                                                srvrLog.Errorf("Unable to "+
×
3003
                                                        "connect to %v: %v",
×
3004
                                                        a, err)
×
3005
                                                atomic.AddUint32(&epochErrors, 1)
×
3006
                                        case <-s.quit:
×
3007
                                        }
3008
                                }(addr)
3009
                        }
3010
                case <-s.quit:
×
3011
                        return
×
3012
                }
3013
        }
3014
}
3015

3016
// bootstrapBackOffCeiling is the maximum amount of time we'll wait between
3017
// failed attempts to locate a set of bootstrap peers. We'll slowly double our
3018
// query back off each time we encounter a failure.
3019
const bootstrapBackOffCeiling = time.Minute * 5
3020

3021
// initialPeerBootstrap attempts to continuously connect to peers on startup
3022
// until the target number of peers has been reached. This ensures that nodes
3023
// receive an up to date network view as soon as possible.
3024
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
3025
        numTargetPeers uint32,
3026
        bootstrappers []discovery.NetworkPeerBootstrapper) {
×
3027

×
3028
        srvrLog.Debugf("Init bootstrap with targetPeers=%v, bootstrappers=%v, "+
×
3029
                "ignore=%v", numTargetPeers, len(bootstrappers), len(ignore))
×
3030

×
3031
        // We'll start off by waiting 2 seconds between failed attempts, then
×
3032
        // double each time we fail until we hit the bootstrapBackOffCeiling.
×
3033
        var delaySignal <-chan time.Time
×
3034
        delayTime := time.Second * 2
×
3035

×
3036
        // As want to be more aggressive, we'll use a lower back off celling
×
3037
        // then the main peer bootstrap logic.
×
3038
        backOffCeiling := bootstrapBackOffCeiling / 5
×
3039

×
3040
        for attempts := 0; ; attempts++ {
×
3041
                // Check if the server has been requested to shut down in order
×
3042
                // to prevent blocking.
×
3043
                if s.Stopped() {
×
3044
                        return
×
3045
                }
×
3046

3047
                // We can exit our aggressive initial peer bootstrapping stage
3048
                // if we've reached out target number of peers.
3049
                s.mu.RLock()
×
3050
                numActivePeers := uint32(len(s.peersByPub))
×
3051
                s.mu.RUnlock()
×
3052

×
3053
                if numActivePeers >= numTargetPeers {
×
3054
                        return
×
3055
                }
×
3056

3057
                if attempts > 0 {
×
3058
                        srvrLog.Debugf("Waiting %v before trying to locate "+
×
3059
                                "bootstrap peers (attempt #%v)", delayTime,
×
3060
                                attempts)
×
3061

×
3062
                        // We've completed at least one iterating and haven't
×
3063
                        // finished, so we'll start to insert a delay period
×
3064
                        // between each attempt.
×
3065
                        delaySignal = time.After(delayTime)
×
3066
                        select {
×
3067
                        case <-delaySignal:
×
3068
                        case <-s.quit:
×
3069
                                return
×
3070
                        }
3071

3072
                        // After our delay, we'll double the time we wait up to
3073
                        // the max back off period.
3074
                        delayTime *= 2
×
3075
                        if delayTime > backOffCeiling {
×
3076
                                delayTime = backOffCeiling
×
3077
                        }
×
3078
                }
3079

3080
                // Otherwise, we'll request for the remaining number of peers
3081
                // in order to reach our target.
3082
                peersNeeded := numTargetPeers - numActivePeers
×
3083
                bootstrapAddrs, err := discovery.MultiSourceBootstrap(
×
3084
                        ignore, peersNeeded, bootstrappers...,
×
3085
                )
×
3086
                if err != nil {
×
3087
                        srvrLog.Errorf("Unable to retrieve initial bootstrap "+
×
3088
                                "peers: %v", err)
×
3089
                        continue
×
3090
                }
3091

3092
                // Then, we'll attempt to establish a connection to the
3093
                // different peer addresses retrieved by our bootstrappers.
3094
                var wg sync.WaitGroup
×
3095
                for _, bootstrapAddr := range bootstrapAddrs {
×
3096
                        wg.Add(1)
×
3097
                        go func(addr *lnwire.NetAddress) {
×
3098
                                defer wg.Done()
×
3099

×
3100
                                errChan := make(chan error, 1)
×
3101
                                go s.connectToPeer(
×
3102
                                        addr, errChan, s.cfg.ConnectionTimeout,
×
3103
                                )
×
3104

×
3105
                                // We'll only allow this connection attempt to
×
3106
                                // take up to 3 seconds. This allows us to move
×
3107
                                // quickly by discarding peers that are slowing
×
3108
                                // us down.
×
3109
                                select {
×
3110
                                case err := <-errChan:
×
3111
                                        if err == nil {
×
3112
                                                return
×
3113
                                        }
×
3114
                                        srvrLog.Errorf("Unable to connect to "+
×
3115
                                                "%v: %v", addr, err)
×
3116
                                // TODO: tune timeout? 3 seconds might be *too*
3117
                                // aggressive but works well.
3118
                                case <-time.After(3 * time.Second):
×
3119
                                        srvrLog.Tracef("Skipping peer %v due "+
×
3120
                                                "to not establishing a "+
×
3121
                                                "connection within 3 seconds",
×
3122
                                                addr)
×
3123
                                case <-s.quit:
×
3124
                                }
3125
                        }(bootstrapAddr)
3126
                }
3127

3128
                wg.Wait()
×
3129
        }
3130
}
3131

3132
// createNewHiddenService automatically sets up a v2 or v3 onion service in
3133
// order to listen for inbound connections over Tor.
3134
func (s *server) createNewHiddenService() error {
×
3135
        // Determine the different ports the server is listening on. The onion
×
3136
        // service's virtual port will map to these ports and one will be picked
×
3137
        // at random when the onion service is being accessed.
×
3138
        listenPorts := make([]int, 0, len(s.listenAddrs))
×
3139
        for _, listenAddr := range s.listenAddrs {
×
3140
                port := listenAddr.(*net.TCPAddr).Port
×
3141
                listenPorts = append(listenPorts, port)
×
3142
        }
×
3143

3144
        encrypter, err := lnencrypt.KeyRingEncrypter(s.cc.KeyRing)
×
3145
        if err != nil {
×
3146
                return err
×
3147
        }
×
3148

3149
        // Once the port mapping has been set, we can go ahead and automatically
3150
        // create our onion service. The service's private key will be saved to
3151
        // disk in order to regain access to this service when restarting `lnd`.
3152
        onionCfg := tor.AddOnionConfig{
×
3153
                VirtualPort: defaultPeerPort,
×
3154
                TargetPorts: listenPorts,
×
3155
                Store: tor.NewOnionFile(
×
3156
                        s.cfg.Tor.PrivateKeyPath, 0600, s.cfg.Tor.EncryptKey,
×
3157
                        encrypter,
×
3158
                ),
×
3159
        }
×
3160

×
3161
        switch {
×
3162
        case s.cfg.Tor.V2:
×
3163
                onionCfg.Type = tor.V2
×
3164
        case s.cfg.Tor.V3:
×
3165
                onionCfg.Type = tor.V3
×
3166
        }
3167

3168
        addr, err := s.torController.AddOnion(onionCfg)
×
3169
        if err != nil {
×
3170
                return err
×
3171
        }
×
3172

3173
        // Now that the onion service has been created, we'll add the onion
3174
        // address it can be reached at to our list of advertised addresses.
3175
        newNodeAnn, err := s.genNodeAnnouncement(
×
3176
                nil, func(currentAnn *lnwire.NodeAnnouncement) {
×
3177
                        currentAnn.Addresses = append(currentAnn.Addresses, addr)
×
3178
                },
×
3179
        )
3180
        if err != nil {
×
3181
                return fmt.Errorf("unable to generate new node "+
×
3182
                        "announcement: %v", err)
×
3183
        }
×
3184

3185
        // Finally, we'll update the on-disk version of our announcement so it
3186
        // will eventually propagate to nodes in the network.
3187
        selfNode := &models.LightningNode{
×
3188
                HaveNodeAnnouncement: true,
×
3189
                LastUpdate:           time.Unix(int64(newNodeAnn.Timestamp), 0),
×
3190
                Addresses:            newNodeAnn.Addresses,
×
3191
                Alias:                newNodeAnn.Alias.String(),
×
3192
                Features: lnwire.NewFeatureVector(
×
3193
                        newNodeAnn.Features, lnwire.Features,
×
3194
                ),
×
3195
                Color:        newNodeAnn.RGBColor,
×
3196
                AuthSigBytes: newNodeAnn.Signature.ToSignatureBytes(),
×
3197
        }
×
3198
        copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
×
3199
        if err := s.graphDB.SetSourceNode(selfNode); err != nil {
×
3200
                return fmt.Errorf("can't set self node: %w", err)
×
3201
        }
×
3202

3203
        return nil
×
3204
}
3205

3206
// findChannel finds a channel given a public key and ChannelID. It is an
3207
// optimization that is quicker than seeking for a channel given only the
3208
// ChannelID.
3209
func (s *server) findChannel(node *btcec.PublicKey, chanID lnwire.ChannelID) (
3210
        *channeldb.OpenChannel, error) {
×
3211

×
3212
        nodeChans, err := s.chanStateDB.FetchOpenChannels(node)
×
3213
        if err != nil {
×
3214
                return nil, err
×
3215
        }
×
3216

3217
        for _, channel := range nodeChans {
×
3218
                if chanID.IsChanPoint(&channel.FundingOutpoint) {
×
3219
                        return channel, nil
×
3220
                }
×
3221
        }
3222

3223
        return nil, fmt.Errorf("unable to find channel")
×
3224
}
3225

3226
// getNodeAnnouncement fetches the current, fully signed node announcement.
3227
func (s *server) getNodeAnnouncement() lnwire.NodeAnnouncement {
×
3228
        s.mu.Lock()
×
3229
        defer s.mu.Unlock()
×
3230

×
3231
        return *s.currentNodeAnn
×
3232
}
×
3233

3234
// genNodeAnnouncement generates and returns the current fully signed node
3235
// announcement. The time stamp of the announcement will be updated in order
3236
// to ensure it propagates through the network.
3237
func (s *server) genNodeAnnouncement(features *lnwire.RawFeatureVector,
3238
        modifiers ...netann.NodeAnnModifier) (lnwire.NodeAnnouncement, error) {
×
3239

×
3240
        s.mu.Lock()
×
3241
        defer s.mu.Unlock()
×
3242

×
3243
        // First, try to update our feature manager with the updated set of
×
3244
        // features.
×
3245
        if features != nil {
×
3246
                proposedFeatures := map[feature.Set]*lnwire.RawFeatureVector{
×
3247
                        feature.SetNodeAnn: features,
×
3248
                }
×
3249
                err := s.featureMgr.UpdateFeatureSets(proposedFeatures)
×
3250
                if err != nil {
×
3251
                        return lnwire.NodeAnnouncement{}, err
×
3252
                }
×
3253

3254
                // If we could successfully update our feature manager, add
3255
                // an update modifier to include these new features to our
3256
                // set.
3257
                modifiers = append(
×
3258
                        modifiers, netann.NodeAnnSetFeatures(features),
×
3259
                )
×
3260
        }
3261

3262
        // Always update the timestamp when refreshing to ensure the update
3263
        // propagates.
3264
        modifiers = append(modifiers, netann.NodeAnnSetTimestamp)
×
3265

×
3266
        // Apply the requested changes to the node announcement.
×
3267
        for _, modifier := range modifiers {
×
3268
                modifier(s.currentNodeAnn)
×
3269
        }
×
3270

3271
        // Sign a new update after applying all of the passed modifiers.
3272
        err := netann.SignNodeAnnouncement(
×
3273
                s.nodeSigner, s.identityKeyLoc, s.currentNodeAnn,
×
3274
        )
×
3275
        if err != nil {
×
3276
                return lnwire.NodeAnnouncement{}, err
×
3277
        }
×
3278

3279
        return *s.currentNodeAnn, nil
×
3280
}
3281

3282
// updateAndBroadcastSelfNode generates a new node announcement
3283
// applying the giving modifiers and updating the time stamp
3284
// to ensure it propagates through the network. Then it broadcasts
3285
// it to the network.
3286
func (s *server) updateAndBroadcastSelfNode(features *lnwire.RawFeatureVector,
3287
        modifiers ...netann.NodeAnnModifier) error {
×
3288

×
3289
        newNodeAnn, err := s.genNodeAnnouncement(features, modifiers...)
×
3290
        if err != nil {
×
3291
                return fmt.Errorf("unable to generate new node "+
×
3292
                        "announcement: %v", err)
×
3293
        }
×
3294

3295
        // Update the on-disk version of our announcement.
3296
        // Load and modify self node istead of creating anew instance so we
3297
        // don't risk overwriting any existing values.
3298
        selfNode, err := s.graphDB.SourceNode()
×
3299
        if err != nil {
×
3300
                return fmt.Errorf("unable to get current source node: %w", err)
×
3301
        }
×
3302

3303
        selfNode.HaveNodeAnnouncement = true
×
3304
        selfNode.LastUpdate = time.Unix(int64(newNodeAnn.Timestamp), 0)
×
3305
        selfNode.Addresses = newNodeAnn.Addresses
×
3306
        selfNode.Alias = newNodeAnn.Alias.String()
×
3307
        selfNode.Features = s.featureMgr.Get(feature.SetNodeAnn)
×
3308
        selfNode.Color = newNodeAnn.RGBColor
×
3309
        selfNode.AuthSigBytes = newNodeAnn.Signature.ToSignatureBytes()
×
3310

×
3311
        copy(selfNode.PubKeyBytes[:], s.identityECDH.PubKey().SerializeCompressed())
×
3312

×
3313
        if err := s.graphDB.SetSourceNode(selfNode); err != nil {
×
3314
                return fmt.Errorf("can't set self node: %w", err)
×
3315
        }
×
3316

3317
        // Finally, propagate it to the nodes in the network.
3318
        err = s.BroadcastMessage(nil, &newNodeAnn)
×
3319
        if err != nil {
×
3320
                rpcsLog.Debugf("Unable to broadcast new node "+
×
3321
                        "announcement to peers: %v", err)
×
3322
                return err
×
3323
        }
×
3324

3325
        return nil
×
3326
}
3327

3328
type nodeAddresses struct {
3329
        pubKey    *btcec.PublicKey
3330
        addresses []net.Addr
3331
}
3332

3333
// establishPersistentConnections attempts to establish persistent connections
3334
// to all our direct channel collaborators. In order to promote liveness of our
3335
// active channels, we instruct the connection manager to attempt to establish
3336
// and maintain persistent connections to all our direct channel counterparties.
3337
func (s *server) establishPersistentConnections() error {
×
3338
        // nodeAddrsMap stores the combination of node public keys and addresses
×
3339
        // that we'll attempt to reconnect to. PubKey strings are used as keys
×
3340
        // since other PubKey forms can't be compared.
×
3341
        nodeAddrsMap := map[string]*nodeAddresses{}
×
3342

×
3343
        // Iterate through the list of LinkNodes to find addresses we should
×
3344
        // attempt to connect to based on our set of previous connections. Set
×
3345
        // the reconnection port to the default peer port.
×
3346
        linkNodes, err := s.chanStateDB.LinkNodeDB().FetchAllLinkNodes()
×
3347
        if err != nil && err != channeldb.ErrLinkNodesNotFound {
×
3348
                return err
×
3349
        }
×
3350
        for _, node := range linkNodes {
×
3351
                pubStr := string(node.IdentityPub.SerializeCompressed())
×
3352
                nodeAddrs := &nodeAddresses{
×
3353
                        pubKey:    node.IdentityPub,
×
3354
                        addresses: node.Addresses,
×
3355
                }
×
3356
                nodeAddrsMap[pubStr] = nodeAddrs
×
3357
        }
×
3358

3359
        // After checking our previous connections for addresses to connect to,
3360
        // iterate through the nodes in our channel graph to find addresses
3361
        // that have been added via NodeAnnouncement messages.
3362
        sourceNode, err := s.graphDB.SourceNode()
×
3363
        if err != nil {
×
3364
                return err
×
3365
        }
×
3366

3367
        // TODO(roasbeef): instead iterate over link nodes and query graph for
3368
        // each of the nodes.
3369
        selfPub := s.identityECDH.PubKey().SerializeCompressed()
×
3370
        err = s.graphDB.ForEachNodeChannel(sourceNode.PubKeyBytes, func(
×
3371
                tx kvdb.RTx,
×
3372
                chanInfo *models.ChannelEdgeInfo,
×
3373
                policy, _ *models.ChannelEdgePolicy) error {
×
3374

×
3375
                // If the remote party has announced the channel to us, but we
×
3376
                // haven't yet, then we won't have a policy. However, we don't
×
3377
                // need this to connect to the peer, so we'll log it and move on.
×
3378
                if policy == nil {
×
3379
                        srvrLog.Warnf("No channel policy found for "+
×
3380
                                "ChannelPoint(%v): ", chanInfo.ChannelPoint)
×
3381
                }
×
3382

3383
                // We'll now fetch the peer opposite from us within this
3384
                // channel so we can queue up a direct connection to them.
3385
                channelPeer, err := s.graphDB.FetchOtherNode(
×
3386
                        tx, chanInfo, selfPub,
×
3387
                )
×
3388
                if err != nil {
×
3389
                        return fmt.Errorf("unable to fetch channel peer for "+
×
3390
                                "ChannelPoint(%v): %v", chanInfo.ChannelPoint,
×
3391
                                err)
×
3392
                }
×
3393

3394
                pubStr := string(channelPeer.PubKeyBytes[:])
×
3395

×
3396
                // Add all unique addresses from channel
×
3397
                // graph/NodeAnnouncements to the list of addresses we'll
×
3398
                // connect to for this peer.
×
3399
                addrSet := make(map[string]net.Addr)
×
3400
                for _, addr := range channelPeer.Addresses {
×
3401
                        switch addr.(type) {
×
3402
                        case *net.TCPAddr:
×
3403
                                addrSet[addr.String()] = addr
×
3404

3405
                        // We'll only attempt to connect to Tor addresses if Tor
3406
                        // outbound support is enabled.
3407
                        case *tor.OnionAddr:
×
3408
                                if s.cfg.Tor.Active {
×
3409
                                        addrSet[addr.String()] = addr
×
3410
                                }
×
3411
                        }
3412
                }
3413

3414
                // If this peer is also recorded as a link node, we'll add any
3415
                // additional addresses that have not already been selected.
3416
                linkNodeAddrs, ok := nodeAddrsMap[pubStr]
×
3417
                if ok {
×
3418
                        for _, lnAddress := range linkNodeAddrs.addresses {
×
3419
                                switch lnAddress.(type) {
×
3420
                                case *net.TCPAddr:
×
3421
                                        addrSet[lnAddress.String()] = lnAddress
×
3422

3423
                                // We'll only attempt to connect to Tor
3424
                                // addresses if Tor outbound support is enabled.
3425
                                case *tor.OnionAddr:
×
3426
                                        if s.cfg.Tor.Active {
×
3427
                                                addrSet[lnAddress.String()] = lnAddress
×
3428
                                        }
×
3429
                                }
3430
                        }
3431
                }
3432

3433
                // Construct a slice of the deduped addresses.
3434
                var addrs []net.Addr
×
3435
                for _, addr := range addrSet {
×
3436
                        addrs = append(addrs, addr)
×
3437
                }
×
3438

3439
                n := &nodeAddresses{
×
3440
                        addresses: addrs,
×
3441
                }
×
3442
                n.pubKey, err = channelPeer.PubKey()
×
3443
                if err != nil {
×
3444
                        return err
×
3445
                }
×
3446

3447
                nodeAddrsMap[pubStr] = n
×
3448
                return nil
×
3449
        })
3450
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
×
3451
                return err
×
3452
        }
×
3453

3454
        srvrLog.Debugf("Establishing %v persistent connections on start",
×
3455
                len(nodeAddrsMap))
×
3456

×
3457
        // Acquire and hold server lock until all persistent connection requests
×
3458
        // have been recorded and sent to the connection manager.
×
3459
        s.mu.Lock()
×
3460
        defer s.mu.Unlock()
×
3461

×
3462
        // Iterate through the combined list of addresses from prior links and
×
3463
        // node announcements and attempt to reconnect to each node.
×
3464
        var numOutboundConns int
×
3465
        for pubStr, nodeAddr := range nodeAddrsMap {
×
3466
                // Add this peer to the set of peers we should maintain a
×
3467
                // persistent connection with. We set the value to false to
×
3468
                // indicate that we should not continue to reconnect if the
×
3469
                // number of channels returns to zero, since this peer has not
×
3470
                // been requested as perm by the user.
×
3471
                s.persistentPeers[pubStr] = false
×
3472
                if _, ok := s.persistentPeersBackoff[pubStr]; !ok {
×
3473
                        s.persistentPeersBackoff[pubStr] = s.cfg.MinBackoff
×
3474
                }
×
3475

3476
                for _, address := range nodeAddr.addresses {
×
3477
                        // Create a wrapper address which couples the IP and
×
3478
                        // the pubkey so the brontide authenticated connection
×
3479
                        // can be established.
×
3480
                        lnAddr := &lnwire.NetAddress{
×
3481
                                IdentityKey: nodeAddr.pubKey,
×
3482
                                Address:     address,
×
3483
                        }
×
3484

×
3485
                        s.persistentPeerAddrs[pubStr] = append(
×
3486
                                s.persistentPeerAddrs[pubStr], lnAddr)
×
3487
                }
×
3488

3489
                // We'll connect to the first 10 peers immediately, then
3490
                // randomly stagger any remaining connections if the
3491
                // stagger initial reconnect flag is set. This ensures
3492
                // that mobile nodes or nodes with a small number of
3493
                // channels obtain connectivity quickly, but larger
3494
                // nodes are able to disperse the costs of connecting to
3495
                // all peers at once.
3496
                if numOutboundConns < numInstantInitReconnect ||
×
3497
                        !s.cfg.StaggerInitialReconnect {
×
3498

×
3499
                        go s.connectToPersistentPeer(pubStr)
×
3500
                } else {
×
3501
                        go s.delayInitialReconnect(pubStr)
×
3502
                }
×
3503

3504
                numOutboundConns++
×
3505
        }
3506

3507
        return nil
×
3508
}
3509

3510
// delayInitialReconnect will attempt a reconnection to the given peer after
3511
// sampling a value for the delay between 0s and the maxInitReconnectDelay.
3512
//
3513
// NOTE: This method MUST be run as a goroutine.
3514
func (s *server) delayInitialReconnect(pubStr string) {
×
3515
        delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second
×
3516
        select {
×
3517
        case <-time.After(delay):
×
3518
                s.connectToPersistentPeer(pubStr)
×
3519
        case <-s.quit:
×
3520
        }
3521
}
3522

3523
// prunePersistentPeerConnection removes all internal state related to
3524
// persistent connections to a peer within the server. This is used to avoid
3525
// persistent connection retries to peers we do not have any open channels with.
3526
func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
×
3527
        pubKeyStr := string(compressedPubKey[:])
×
3528

×
3529
        s.mu.Lock()
×
3530
        if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm {
×
3531
                delete(s.persistentPeers, pubKeyStr)
×
3532
                delete(s.persistentPeersBackoff, pubKeyStr)
×
3533
                delete(s.persistentPeerAddrs, pubKeyStr)
×
3534
                s.cancelConnReqs(pubKeyStr, nil)
×
3535
                s.mu.Unlock()
×
3536

×
3537
                srvrLog.Infof("Pruned peer %x from persistent connections, "+
×
3538
                        "peer has no open channels", compressedPubKey)
×
3539

×
3540
                return
×
3541
        }
×
3542
        s.mu.Unlock()
×
3543
}
3544

3545
// BroadcastMessage sends a request to the server to broadcast a set of
3546
// messages to all peers other than the one specified by the `skips` parameter.
3547
// All messages sent via BroadcastMessage will be queued for lazy delivery to
3548
// the target peers.
3549
//
3550
// NOTE: This function is safe for concurrent access.
3551
func (s *server) BroadcastMessage(skips map[route.Vertex]struct{},
3552
        msgs ...lnwire.Message) error {
×
3553

×
3554
        // Filter out peers found in the skips map. We synchronize access to
×
3555
        // peersByPub throughout this process to ensure we deliver messages to
×
3556
        // exact set of peers present at the time of invocation.
×
3557
        s.mu.RLock()
×
3558
        peers := make([]*peer.Brontide, 0, len(s.peersByPub))
×
3559
        for pubStr, sPeer := range s.peersByPub {
×
3560
                if skips != nil {
×
3561
                        if _, ok := skips[sPeer.PubKey()]; ok {
×
3562
                                srvrLog.Tracef("Skipping %x in broadcast with "+
×
3563
                                        "pubStr=%x", sPeer.PubKey(), pubStr)
×
3564
                                continue
×
3565
                        }
3566
                }
3567

3568
                peers = append(peers, sPeer)
×
3569
        }
3570
        s.mu.RUnlock()
×
3571

×
3572
        // Iterate over all known peers, dispatching a go routine to enqueue
×
3573
        // all messages to each of peers.
×
3574
        var wg sync.WaitGroup
×
3575
        for _, sPeer := range peers {
×
3576
                srvrLog.Debugf("Sending %v messages to peer %x", len(msgs),
×
3577
                        sPeer.PubKey())
×
3578

×
3579
                // Dispatch a go routine to enqueue all messages to this peer.
×
3580
                wg.Add(1)
×
3581
                s.wg.Add(1)
×
3582
                go func(p lnpeer.Peer) {
×
3583
                        defer s.wg.Done()
×
3584
                        defer wg.Done()
×
3585

×
3586
                        p.SendMessageLazy(false, msgs...)
×
3587
                }(sPeer)
×
3588
        }
3589

3590
        // Wait for all messages to have been dispatched before returning to
3591
        // caller.
3592
        wg.Wait()
×
3593

×
3594
        return nil
×
3595
}
3596

3597
// NotifyWhenOnline can be called by other subsystems to get notified when a
3598
// particular peer comes online. The peer itself is sent across the peerChan.
3599
//
3600
// NOTE: This function is safe for concurrent access.
3601
func (s *server) NotifyWhenOnline(peerKey [33]byte,
3602
        peerChan chan<- lnpeer.Peer) {
×
3603

×
3604
        s.mu.Lock()
×
3605

×
3606
        // Compute the target peer's identifier.
×
3607
        pubStr := string(peerKey[:])
×
3608

×
3609
        // Check if peer is connected.
×
3610
        peer, ok := s.peersByPub[pubStr]
×
3611
        if ok {
×
3612
                // Unlock here so that the mutex isn't held while we are
×
3613
                // waiting for the peer to become active.
×
3614
                s.mu.Unlock()
×
3615

×
3616
                // Wait until the peer signals that it is actually active
×
3617
                // rather than only in the server's maps.
×
3618
                select {
×
3619
                case <-peer.ActiveSignal():
×
3620
                case <-peer.QuitSignal():
×
3621
                        // The peer quit, so we'll add the channel to the slice
×
3622
                        // and return.
×
3623
                        s.mu.Lock()
×
3624
                        s.peerConnectedListeners[pubStr] = append(
×
3625
                                s.peerConnectedListeners[pubStr], peerChan,
×
3626
                        )
×
3627
                        s.mu.Unlock()
×
3628
                        return
×
3629
                }
3630

3631
                // Connected, can return early.
3632
                srvrLog.Debugf("Notifying that peer %x is online", peerKey)
×
3633

×
3634
                select {
×
3635
                case peerChan <- peer:
×
3636
                case <-s.quit:
×
3637
                }
3638

3639
                return
×
3640
        }
3641

3642
        // Not connected, store this listener such that it can be notified when
3643
        // the peer comes online.
3644
        s.peerConnectedListeners[pubStr] = append(
×
3645
                s.peerConnectedListeners[pubStr], peerChan,
×
3646
        )
×
3647
        s.mu.Unlock()
×
3648
}
3649

3650
// NotifyWhenOffline delivers a notification to the caller of when the peer with
3651
// the given public key has been disconnected. The notification is signaled by
3652
// closing the channel returned.
3653
func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
×
3654
        s.mu.Lock()
×
3655
        defer s.mu.Unlock()
×
3656

×
3657
        c := make(chan struct{})
×
3658

×
3659
        // If the peer is already offline, we can immediately trigger the
×
3660
        // notification.
×
3661
        peerPubKeyStr := string(peerPubKey[:])
×
3662
        if _, ok := s.peersByPub[peerPubKeyStr]; !ok {
×
3663
                srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey)
×
3664
                close(c)
×
3665
                return c
×
3666
        }
×
3667

3668
        // Otherwise, the peer is online, so we'll keep track of the channel to
3669
        // trigger the notification once the server detects the peer
3670
        // disconnects.
3671
        s.peerDisconnectedListeners[peerPubKeyStr] = append(
×
3672
                s.peerDisconnectedListeners[peerPubKeyStr], c,
×
3673
        )
×
3674

×
3675
        return c
×
3676
}
3677

3678
// FindPeer will return the peer that corresponds to the passed in public key.
3679
// This function is used by the funding manager, allowing it to update the
3680
// daemon's local representation of the remote peer.
3681
//
3682
// NOTE: This function is safe for concurrent access.
3683
func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer.Brontide, error) {
×
3684
        s.mu.RLock()
×
3685
        defer s.mu.RUnlock()
×
3686

×
3687
        pubStr := string(peerKey.SerializeCompressed())
×
3688

×
3689
        return s.findPeerByPubStr(pubStr)
×
3690
}
×
3691

3692
// FindPeerByPubStr will return the peer that corresponds to the passed peerID,
3693
// which should be a string representation of the peer's serialized, compressed
3694
// public key.
3695
//
3696
// NOTE: This function is safe for concurrent access.
3697
func (s *server) FindPeerByPubStr(pubStr string) (*peer.Brontide, error) {
×
3698
        s.mu.RLock()
×
3699
        defer s.mu.RUnlock()
×
3700

×
3701
        return s.findPeerByPubStr(pubStr)
×
3702
}
×
3703

3704
// findPeerByPubStr is an internal method that retrieves the specified peer from
3705
// the server's internal state using.
3706
func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) {
×
3707
        peer, ok := s.peersByPub[pubStr]
×
3708
        if !ok {
×
3709
                return nil, ErrPeerNotConnected
×
3710
        }
×
3711

3712
        return peer, nil
×
3713
}
3714

3715
// nextPeerBackoff computes the next backoff duration for a peer's pubkey using
3716
// exponential backoff. If no previous backoff was known, the default is
3717
// returned.
3718
func (s *server) nextPeerBackoff(pubStr string,
3719
        startTime time.Time) time.Duration {
×
3720

×
3721
        // Now, determine the appropriate backoff to use for the retry.
×
3722
        backoff, ok := s.persistentPeersBackoff[pubStr]
×
3723
        if !ok {
×
3724
                // If an existing backoff was unknown, use the default.
×
3725
                return s.cfg.MinBackoff
×
3726
        }
×
3727

3728
        // If the peer failed to start properly, we'll just use the previous
3729
        // backoff to compute the subsequent randomized exponential backoff
3730
        // duration. This will roughly double on average.
3731
        if startTime.IsZero() {
×
3732
                return computeNextBackoff(backoff, s.cfg.MaxBackoff)
×
3733
        }
×
3734

3735
        // The peer succeeded in starting. If the connection didn't last long
3736
        // enough to be considered stable, we'll continue to back off retries
3737
        // with this peer.
3738
        connDuration := time.Since(startTime)
×
3739
        if connDuration < defaultStableConnDuration {
×
3740
                return computeNextBackoff(backoff, s.cfg.MaxBackoff)
×
3741
        }
×
3742

3743
        // The peer succeed in starting and this was stable peer, so we'll
3744
        // reduce the timeout duration by the length of the connection after
3745
        // applying randomized exponential backoff. We'll only apply this in the
3746
        // case that:
3747
        //   reb(curBackoff) - connDuration > cfg.MinBackoff
3748
        relaxedBackoff := computeNextBackoff(backoff, s.cfg.MaxBackoff) - connDuration
×
3749
        if relaxedBackoff > s.cfg.MinBackoff {
×
3750
                return relaxedBackoff
×
3751
        }
×
3752

3753
        // Lastly, if reb(currBackoff) - connDuration <= cfg.MinBackoff, meaning
3754
        // the stable connection lasted much longer than our previous backoff.
3755
        // To reward such good behavior, we'll reconnect after the default
3756
        // timeout.
3757
        return s.cfg.MinBackoff
×
3758
}
3759

3760
// shouldDropLocalConnection determines if our local connection to a remote peer
3761
// should be dropped in the case of concurrent connection establishment. In
3762
// order to deterministically decide which connection should be dropped, we'll
3763
// utilize the ordering of the local and remote public key. If we didn't use
3764
// such a tie breaker, then we risk _both_ connections erroneously being
3765
// dropped.
3766
func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool {
×
3767
        localPubBytes := local.SerializeCompressed()
×
3768
        remotePubPbytes := remote.SerializeCompressed()
×
3769

×
3770
        // The connection that comes from the node with a "smaller" pubkey
×
3771
        // should be kept. Therefore, if our pubkey is "greater" than theirs, we
×
3772
        // should drop our established connection.
×
3773
        return bytes.Compare(localPubBytes, remotePubPbytes) > 0
×
3774
}
×
3775

3776
// InboundPeerConnected initializes a new peer in response to a new inbound
3777
// connection.
3778
//
3779
// NOTE: This function is safe for concurrent access.
3780
func (s *server) InboundPeerConnected(conn net.Conn) {
×
3781
        // Exit early if we have already been instructed to shutdown, this
×
3782
        // prevents any delayed callbacks from accidentally registering peers.
×
3783
        if s.Stopped() {
×
3784
                return
×
3785
        }
×
3786

3787
        nodePub := conn.(*brontide.Conn).RemotePub()
×
3788
        pubSer := nodePub.SerializeCompressed()
×
3789
        pubStr := string(pubSer)
×
3790

×
3791
        var pubBytes [33]byte
×
3792
        copy(pubBytes[:], pubSer)
×
3793

×
3794
        s.mu.Lock()
×
3795
        defer s.mu.Unlock()
×
3796

×
3797
        // If the remote node's public key is banned, drop the connection.
×
3798
        shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
×
3799
        if dcErr != nil {
×
3800
                srvrLog.Errorf("Unable to check if we should disconnect "+
×
3801
                        "peer: %v", dcErr)
×
3802
                conn.Close()
×
3803

×
3804
                return
×
3805
        }
×
3806

3807
        if shouldDc {
×
3808
                srvrLog.Debugf("Dropping connection for %v since they are "+
×
3809
                        "banned.", pubSer)
×
3810

×
3811
                conn.Close()
×
3812

×
3813
                return
×
3814
        }
×
3815

3816
        // If we already have an outbound connection to this peer, then ignore
3817
        // this new connection.
3818
        if p, ok := s.outboundPeers[pubStr]; ok {
×
3819
                srvrLog.Debugf("Already have outbound connection for %v, "+
×
3820
                        "ignoring inbound connection from local=%v, remote=%v",
×
3821
                        p, conn.LocalAddr(), conn.RemoteAddr())
×
3822

×
3823
                conn.Close()
×
3824
                return
×
3825
        }
×
3826

3827
        // If we already have a valid connection that is scheduled to take
3828
        // precedence once the prior peer has finished disconnecting, we'll
3829
        // ignore this connection.
3830
        if p, ok := s.scheduledPeerConnection[pubStr]; ok {
×
3831
                srvrLog.Debugf("Ignoring connection from %v, peer %v already "+
×
3832
                        "scheduled", conn.RemoteAddr(), p)
×
3833
                conn.Close()
×
3834
                return
×
3835
        }
×
3836

3837
        srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
×
3838

×
3839
        // Check to see if we already have a connection with this peer. If so,
×
3840
        // we may need to drop our existing connection. This prevents us from
×
3841
        // having duplicate connections to the same peer. We forgo adding a
×
3842
        // default case as we expect these to be the only error values returned
×
3843
        // from findPeerByPubStr.
×
3844
        connectedPeer, err := s.findPeerByPubStr(pubStr)
×
3845
        switch err {
×
3846
        case ErrPeerNotConnected:
×
3847
                // We were unable to locate an existing connection with the
×
3848
                // target peer, proceed to connect.
×
3849
                s.cancelConnReqs(pubStr, nil)
×
3850
                s.peerConnected(conn, nil, true)
×
3851

3852
        case nil:
×
3853
                // We already have a connection with the incoming peer. If the
×
3854
                // connection we've already established should be kept and is
×
3855
                // not of the same type of the new connection (inbound), then
×
3856
                // we'll close out the new connection s.t there's only a single
×
3857
                // connection between us.
×
3858
                localPub := s.identityECDH.PubKey()
×
3859
                if !connectedPeer.Inbound() &&
×
3860
                        !shouldDropLocalConnection(localPub, nodePub) {
×
3861

×
3862
                        srvrLog.Warnf("Received inbound connection from "+
×
3863
                                "peer %v, but already have outbound "+
×
3864
                                "connection, dropping conn", connectedPeer)
×
3865
                        conn.Close()
×
3866
                        return
×
3867
                }
×
3868

3869
                // Otherwise, if we should drop the connection, then we'll
3870
                // disconnect our already connected peer.
3871
                srvrLog.Debugf("Disconnecting stale connection to %v",
×
3872
                        connectedPeer)
×
3873

×
3874
                s.cancelConnReqs(pubStr, nil)
×
3875

×
3876
                // Remove the current peer from the server's internal state and
×
3877
                // signal that the peer termination watcher does not need to
×
3878
                // execute for this peer.
×
3879
                s.removePeer(connectedPeer)
×
3880
                s.ignorePeerTermination[connectedPeer] = struct{}{}
×
3881
                s.scheduledPeerConnection[pubStr] = func() {
×
3882
                        s.peerConnected(conn, nil, true)
×
3883
                }
×
3884
        }
3885
}
3886

3887
// OutboundPeerConnected initializes a new peer in response to a new outbound
3888
// connection.
3889
// NOTE: This function is safe for concurrent access.
3890
func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
×
3891
        // Exit early if we have already been instructed to shutdown, this
×
3892
        // prevents any delayed callbacks from accidentally registering peers.
×
3893
        if s.Stopped() {
×
3894
                return
×
3895
        }
×
3896

3897
        nodePub := conn.(*brontide.Conn).RemotePub()
×
3898
        pubSer := nodePub.SerializeCompressed()
×
3899
        pubStr := string(pubSer)
×
3900

×
3901
        var pubBytes [33]byte
×
3902
        copy(pubBytes[:], pubSer)
×
3903

×
3904
        s.mu.Lock()
×
3905
        defer s.mu.Unlock()
×
3906

×
3907
        // If the remote node's public key is banned, drop the connection.
×
3908
        shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
×
3909
        if dcErr != nil {
×
3910
                srvrLog.Errorf("Unable to check if we should disconnect "+
×
3911
                        "peer: %v", dcErr)
×
3912
                conn.Close()
×
3913

×
3914
                return
×
3915
        }
×
3916

3917
        if shouldDc {
×
3918
                srvrLog.Debugf("Dropping connection for %v since they are "+
×
3919
                        "banned.", pubSer)
×
3920

×
3921
                if connReq != nil {
×
3922
                        s.connMgr.Remove(connReq.ID())
×
3923
                }
×
3924

3925
                conn.Close()
×
3926

×
3927
                return
×
3928
        }
3929

3930
        // If we already have an inbound connection to this peer, then ignore
3931
        // this new connection.
3932
        if p, ok := s.inboundPeers[pubStr]; ok {
×
3933
                srvrLog.Debugf("Already have inbound connection for %v, "+
×
3934
                        "ignoring outbound connection from local=%v, remote=%v",
×
3935
                        p, conn.LocalAddr(), conn.RemoteAddr())
×
3936

×
3937
                if connReq != nil {
×
3938
                        s.connMgr.Remove(connReq.ID())
×
3939
                }
×
3940
                conn.Close()
×
3941
                return
×
3942
        }
3943
        if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
×
3944
                srvrLog.Debugf("Ignoring canceled outbound connection")
×
3945
                s.connMgr.Remove(connReq.ID())
×
3946
                conn.Close()
×
3947
                return
×
3948
        }
×
3949

3950
        // If we already have a valid connection that is scheduled to take
3951
        // precedence once the prior peer has finished disconnecting, we'll
3952
        // ignore this connection.
3953
        if _, ok := s.scheduledPeerConnection[pubStr]; ok {
×
3954
                srvrLog.Debugf("Ignoring connection, peer already scheduled")
×
3955

×
3956
                if connReq != nil {
×
3957
                        s.connMgr.Remove(connReq.ID())
×
3958
                }
×
3959

3960
                conn.Close()
×
3961
                return
×
3962
        }
3963

3964
        srvrLog.Infof("Established connection to: %x@%v", pubStr,
×
3965
                conn.RemoteAddr())
×
3966

×
3967
        if connReq != nil {
×
3968
                // A successful connection was returned by the connmgr.
×
3969
                // Immediately cancel all pending requests, excluding the
×
3970
                // outbound connection we just established.
×
3971
                ignore := connReq.ID()
×
3972
                s.cancelConnReqs(pubStr, &ignore)
×
3973
        } else {
×
3974
                // This was a successful connection made by some other
×
3975
                // subsystem. Remove all requests being managed by the connmgr.
×
3976
                s.cancelConnReqs(pubStr, nil)
×
3977
        }
×
3978

3979
        // If we already have a connection with this peer, decide whether or not
3980
        // we need to drop the stale connection. We forgo adding a default case
3981
        // as we expect these to be the only error values returned from
3982
        // findPeerByPubStr.
3983
        connectedPeer, err := s.findPeerByPubStr(pubStr)
×
3984
        switch err {
×
3985
        case ErrPeerNotConnected:
×
3986
                // We were unable to locate an existing connection with the
×
3987
                // target peer, proceed to connect.
×
3988
                s.peerConnected(conn, connReq, false)
×
3989

3990
        case nil:
×
3991
                // We already have a connection with the incoming peer. If the
×
3992
                // connection we've already established should be kept and is
×
3993
                // not of the same type of the new connection (outbound), then
×
3994
                // we'll close out the new connection s.t there's only a single
×
3995
                // connection between us.
×
3996
                localPub := s.identityECDH.PubKey()
×
3997
                if connectedPeer.Inbound() &&
×
3998
                        shouldDropLocalConnection(localPub, nodePub) {
×
3999

×
4000
                        srvrLog.Warnf("Established outbound connection to "+
×
4001
                                "peer %v, but already have inbound "+
×
4002
                                "connection, dropping conn", connectedPeer)
×
4003
                        if connReq != nil {
×
4004
                                s.connMgr.Remove(connReq.ID())
×
4005
                        }
×
4006
                        conn.Close()
×
4007
                        return
×
4008
                }
4009

4010
                // Otherwise, _their_ connection should be dropped. So we'll
4011
                // disconnect the peer and send the now obsolete peer to the
4012
                // server for garbage collection.
4013
                srvrLog.Debugf("Disconnecting stale connection to %v",
×
4014
                        connectedPeer)
×
4015

×
4016
                // Remove the current peer from the server's internal state and
×
4017
                // signal that the peer termination watcher does not need to
×
4018
                // execute for this peer.
×
4019
                s.removePeer(connectedPeer)
×
4020
                s.ignorePeerTermination[connectedPeer] = struct{}{}
×
4021
                s.scheduledPeerConnection[pubStr] = func() {
×
4022
                        s.peerConnected(conn, connReq, false)
×
4023
                }
×
4024
        }
4025
}
4026

4027
// UnassignedConnID is the default connection ID that a request can have before
4028
// it actually is submitted to the connmgr.
4029
// TODO(conner): move into connmgr package, or better, add connmgr method for
4030
// generating atomic IDs
4031
const UnassignedConnID uint64 = 0
4032

4033
// cancelConnReqs stops all persistent connection requests for a given pubkey.
4034
// Any attempts initiated by the peerTerminationWatcher are canceled first.
4035
// Afterwards, each connection request removed from the connmgr. The caller can
4036
// optionally specify a connection ID to ignore, which prevents us from
4037
// canceling a successful request. All persistent connreqs for the provided
4038
// pubkey are discarded after the operationjw.
4039
func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
×
4040
        // First, cancel any lingering persistent retry attempts, which will
×
4041
        // prevent retries for any with backoffs that are still maturing.
×
4042
        if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok {
×
4043
                close(cancelChan)
×
4044
                delete(s.persistentRetryCancels, pubStr)
×
4045
        }
×
4046

4047
        // Next, check to see if we have any outstanding persistent connection
4048
        // requests to this peer. If so, then we'll remove all of these
4049
        // connection requests, and also delete the entry from the map.
4050
        connReqs, ok := s.persistentConnReqs[pubStr]
×
4051
        if !ok {
×
4052
                return
×
4053
        }
×
4054

4055
        for _, connReq := range connReqs {
×
4056
                srvrLog.Tracef("Canceling %s:", connReqs)
×
4057

×
4058
                // Atomically capture the current request identifier.
×
4059
                connID := connReq.ID()
×
4060

×
4061
                // Skip any zero IDs, this indicates the request has not
×
4062
                // yet been schedule.
×
4063
                if connID == UnassignedConnID {
×
4064
                        continue
×
4065
                }
4066

4067
                // Skip a particular connection ID if instructed.
4068
                if skip != nil && connID == *skip {
×
4069
                        continue
×
4070
                }
4071

4072
                s.connMgr.Remove(connID)
×
4073
        }
4074

4075
        delete(s.persistentConnReqs, pubStr)
×
4076
}
4077

4078
// handleCustomMessage dispatches an incoming custom peers message to
4079
// subscribers.
4080
func (s *server) handleCustomMessage(peer [33]byte, msg *lnwire.Custom) error {
×
4081
        srvrLog.Debugf("Custom message received: peer=%x, type=%d",
×
4082
                peer, msg.Type)
×
4083

×
4084
        return s.customMessageServer.SendUpdate(&CustomMessage{
×
4085
                Peer: peer,
×
4086
                Msg:  msg,
×
4087
        })
×
4088
}
×
4089

4090
// SubscribeCustomMessages subscribes to a stream of incoming custom peer
4091
// messages.
4092
func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) {
×
4093
        return s.customMessageServer.Subscribe()
×
4094
}
×
4095

4096
// peerConnected is a function that handles initialization a newly connected
4097
// peer by adding it to the server's global list of all active peers, and
4098
// starting all the goroutines the peer needs to function properly. The inbound
4099
// boolean should be true if the peer initiated the connection to us.
4100
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
4101
        inbound bool) {
×
4102

×
4103
        brontideConn := conn.(*brontide.Conn)
×
4104
        addr := conn.RemoteAddr()
×
4105
        pubKey := brontideConn.RemotePub()
×
4106

×
4107
        srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v",
×
4108
                pubKey.SerializeCompressed(), addr, inbound)
×
4109

×
4110
        peerAddr := &lnwire.NetAddress{
×
4111
                IdentityKey: pubKey,
×
4112
                Address:     addr,
×
4113
                ChainNet:    s.cfg.ActiveNetParams.Net,
×
4114
        }
×
4115

×
4116
        // With the brontide connection established, we'll now craft the feature
×
4117
        // vectors to advertise to the remote node.
×
4118
        initFeatures := s.featureMgr.Get(feature.SetInit)
×
4119
        legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal)
×
4120

×
4121
        // Lookup past error caches for the peer in the server. If no buffer is
×
4122
        // found, create a fresh buffer.
×
4123
        pkStr := string(peerAddr.IdentityKey.SerializeCompressed())
×
4124
        errBuffer, ok := s.peerErrors[pkStr]
×
4125
        if !ok {
×
4126
                var err error
×
4127
                errBuffer, err = queue.NewCircularBuffer(peer.ErrorBufferSize)
×
4128
                if err != nil {
×
4129
                        srvrLog.Errorf("unable to create peer %v", err)
×
4130
                        return
×
4131
                }
×
4132
        }
4133

4134
        // If we directly set the peer.Config TowerClient member to the
4135
        // s.towerClientMgr then in the case that the s.towerClientMgr is nil,
4136
        // the peer.Config's TowerClient member will not evaluate to nil even
4137
        // though the underlying value is nil. To avoid this gotcha which can
4138
        // cause a panic, we need to explicitly pass nil to the peer.Config's
4139
        // TowerClient if needed.
4140
        var towerClient wtclient.ClientManager
×
4141
        if s.towerClientMgr != nil {
×
4142
                towerClient = s.towerClientMgr
×
4143
        }
×
4144

4145
        thresholdSats := btcutil.Amount(s.cfg.MaxFeeExposure)
×
4146
        thresholdMSats := lnwire.NewMSatFromSatoshis(thresholdSats)
×
4147

×
4148
        // Now that we've established a connection, create a peer, and it to the
×
4149
        // set of currently active peers. Configure the peer with the incoming
×
4150
        // and outgoing broadcast deltas to prevent htlcs from being accepted or
×
4151
        // offered that would trigger channel closure. In case of outgoing
×
4152
        // htlcs, an extra block is added to prevent the channel from being
×
4153
        // closed when the htlc is outstanding and a new block comes in.
×
4154
        pCfg := peer.Config{
×
4155
                Conn:                    brontideConn,
×
4156
                ConnReq:                 connReq,
×
4157
                Addr:                    peerAddr,
×
4158
                Inbound:                 inbound,
×
4159
                Features:                initFeatures,
×
4160
                LegacyFeatures:          legacyFeatures,
×
4161
                OutgoingCltvRejectDelta: lncfg.DefaultOutgoingCltvRejectDelta,
×
4162
                ChanActiveTimeout:       s.cfg.ChanEnableTimeout,
×
4163
                ErrorBuffer:             errBuffer,
×
4164
                WritePool:               s.writePool,
×
4165
                ReadPool:                s.readPool,
×
4166
                Switch:                  s.htlcSwitch,
×
4167
                InterceptSwitch:         s.interceptableSwitch,
×
4168
                ChannelDB:               s.chanStateDB,
×
4169
                ChannelGraph:            s.graphDB,
×
4170
                ChainArb:                s.chainArb,
×
4171
                AuthGossiper:            s.authGossiper,
×
4172
                ChanStatusMgr:           s.chanStatusMgr,
×
4173
                ChainIO:                 s.cc.ChainIO,
×
4174
                FeeEstimator:            s.cc.FeeEstimator,
×
4175
                Signer:                  s.cc.Wallet.Cfg.Signer,
×
4176
                SigPool:                 s.sigPool,
×
4177
                Wallet:                  s.cc.Wallet,
×
4178
                ChainNotifier:           s.cc.ChainNotifier,
×
4179
                BestBlockView:           s.cc.BestBlockTracker,
×
4180
                RoutingPolicy:           s.cc.RoutingPolicy,
×
4181
                Sphinx:                  s.sphinx,
×
4182
                WitnessBeacon:           s.witnessBeacon,
×
4183
                Invoices:                s.invoices,
×
4184
                ChannelNotifier:         s.channelNotifier,
×
4185
                HtlcNotifier:            s.htlcNotifier,
×
4186
                TowerClient:             towerClient,
×
4187
                DisconnectPeer:          s.DisconnectPeer,
×
4188
                GenNodeAnnouncement: func(...netann.NodeAnnModifier) (
×
4189
                        lnwire.NodeAnnouncement, error) {
×
4190

×
4191
                        return s.genNodeAnnouncement(nil)
×
4192
                },
×
4193

4194
                PongBuf: s.pongBuf,
4195

4196
                PrunePersistentPeerConnection: s.prunePersistentPeerConnection,
4197

4198
                FetchLastChanUpdate: s.fetchLastChanUpdate(),
4199

4200
                FundingManager: s.fundingMgr,
4201

4202
                Hodl:                    s.cfg.Hodl,
4203
                UnsafeReplay:            s.cfg.UnsafeReplay,
4204
                MaxOutgoingCltvExpiry:   s.cfg.MaxOutgoingCltvExpiry,
4205
                MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation,
4206
                CoopCloseTargetConfs:    s.cfg.CoopCloseTargetConfs,
4207
                MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
4208
                        s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
4209
                ChannelCommitInterval:  s.cfg.ChannelCommitInterval,
4210
                PendingCommitInterval:  s.cfg.PendingCommitInterval,
4211
                ChannelCommitBatchSize: s.cfg.ChannelCommitBatchSize,
4212
                HandleCustomMessage:    s.handleCustomMessage,
4213
                GetAliases:             s.aliasMgr.GetAliases,
4214
                RequestAlias:           s.aliasMgr.RequestAlias,
4215
                AddLocalAlias:          s.aliasMgr.AddLocalAlias,
4216
                DisallowRouteBlinding:  s.cfg.ProtocolOptions.NoRouteBlinding(),
4217
                DisallowQuiescence:     s.cfg.ProtocolOptions.NoQuiescence(),
4218
                MaxFeeExposure:         thresholdMSats,
4219
                Quit:                   s.quit,
4220
                AuxLeafStore:           s.implCfg.AuxLeafStore,
4221
                AuxSigner:              s.implCfg.AuxSigner,
4222
                MsgRouter:              s.implCfg.MsgRouter,
4223
                AuxChanCloser:          s.implCfg.AuxChanCloser,
4224
                AuxResolver:            s.implCfg.AuxContractResolver,
4225
                AuxTrafficShaper:       s.implCfg.TrafficShaper,
4226
                ShouldFwdExpEndorsement: func() bool {
×
4227
                        if s.cfg.ProtocolOptions.NoExperimentalEndorsement() {
×
4228
                                return false
×
4229
                        }
×
4230

4231
                        return clock.NewDefaultClock().Now().Before(
×
4232
                                EndorsementExperimentEnd,
×
4233
                        )
×
4234
                },
4235
        }
4236

4237
        copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed())
×
4238
        copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed())
×
4239

×
4240
        p := peer.NewBrontide(pCfg)
×
4241

×
4242
        // TODO(roasbeef): update IP address for link-node
×
4243
        //  * also mark last-seen, do it one single transaction?
×
4244

×
4245
        s.addPeer(p)
×
4246

×
4247
        // Once we have successfully added the peer to the server, we can
×
4248
        // delete the previous error buffer from the server's map of error
×
4249
        // buffers.
×
4250
        delete(s.peerErrors, pkStr)
×
4251

×
4252
        // Dispatch a goroutine to asynchronously start the peer. This process
×
4253
        // includes sending and receiving Init messages, which would be a DOS
×
4254
        // vector if we held the server's mutex throughout the procedure.
×
4255
        s.wg.Add(1)
×
4256
        go s.peerInitializer(p)
×
4257
}
4258

4259
// addPeer adds the passed peer to the server's global state of all active
4260
// peers.
4261
func (s *server) addPeer(p *peer.Brontide) {
×
4262
        if p == nil {
×
4263
                return
×
4264
        }
×
4265

4266
        pubBytes := p.IdentityKey().SerializeCompressed()
×
4267

×
4268
        // Ignore new peers if we're shutting down.
×
4269
        if s.Stopped() {
×
4270
                srvrLog.Infof("Server stopped, skipped adding peer=%x",
×
4271
                        pubBytes)
×
4272
                p.Disconnect(ErrServerShuttingDown)
×
4273

×
4274
                return
×
4275
        }
×
4276

4277
        // Track the new peer in our indexes so we can quickly look it up either
4278
        // according to its public key, or its peer ID.
4279
        // TODO(roasbeef): pipe all requests through to the
4280
        // queryHandler/peerManager
4281

4282
        // NOTE: This pubStr is a raw bytes to string conversion and will NOT
4283
        // be human-readable.
4284
        pubStr := string(pubBytes)
×
4285

×
4286
        s.peersByPub[pubStr] = p
×
4287

×
4288
        if p.Inbound() {
×
4289
                s.inboundPeers[pubStr] = p
×
4290
        } else {
×
4291
                s.outboundPeers[pubStr] = p
×
4292
        }
×
4293

4294
        // Inform the peer notifier of a peer online event so that it can be reported
4295
        // to clients listening for peer events.
4296
        var pubKey [33]byte
×
4297
        copy(pubKey[:], pubBytes)
×
4298

×
4299
        s.peerNotifier.NotifyPeerOnline(pubKey)
×
4300
}
4301

4302
// peerInitializer asynchronously starts a newly connected peer after it has
4303
// been added to the server's peer map. This method sets up a
4304
// peerTerminationWatcher for the given peer, and ensures that it executes even
4305
// if the peer failed to start. In the event of a successful connection, this
4306
// method reads the negotiated, local feature-bits and spawns the appropriate
4307
// graph synchronization method. Any registered clients of NotifyWhenOnline will
4308
// be signaled of the new peer once the method returns.
4309
//
4310
// NOTE: This MUST be launched as a goroutine.
4311
func (s *server) peerInitializer(p *peer.Brontide) {
×
4312
        defer s.wg.Done()
×
4313

×
4314
        pubBytes := p.IdentityKey().SerializeCompressed()
×
4315

×
4316
        // Avoid initializing peers while the server is exiting.
×
4317
        if s.Stopped() {
×
4318
                srvrLog.Infof("Server stopped, skipped initializing peer=%x",
×
4319
                        pubBytes)
×
4320
                return
×
4321
        }
×
4322

4323
        // Create a channel that will be used to signal a successful start of
4324
        // the link. This prevents the peer termination watcher from beginning
4325
        // its duty too early.
4326
        ready := make(chan struct{})
×
4327

×
4328
        // Before starting the peer, launch a goroutine to watch for the
×
4329
        // unexpected termination of this peer, which will ensure all resources
×
4330
        // are properly cleaned up, and re-establish persistent connections when
×
4331
        // necessary. The peer termination watcher will be short circuited if
×
4332
        // the peer is ever added to the ignorePeerTermination map, indicating
×
4333
        // that the server has already handled the removal of this peer.
×
4334
        s.wg.Add(1)
×
4335
        go s.peerTerminationWatcher(p, ready)
×
4336

×
4337
        // Start the peer! If an error occurs, we Disconnect the peer, which
×
4338
        // will unblock the peerTerminationWatcher.
×
4339
        if err := p.Start(); err != nil {
×
4340
                srvrLog.Warnf("Starting peer=%x got error: %v", pubBytes, err)
×
4341

×
4342
                p.Disconnect(fmt.Errorf("unable to start peer: %w", err))
×
4343
                return
×
4344
        }
×
4345

4346
        // Otherwise, signal to the peerTerminationWatcher that the peer startup
4347
        // was successful, and to begin watching the peer's wait group.
4348
        close(ready)
×
4349

×
4350
        s.mu.Lock()
×
4351
        defer s.mu.Unlock()
×
4352

×
4353
        // Check if there are listeners waiting for this peer to come online.
×
4354
        srvrLog.Debugf("Notifying that peer %v is online", p)
×
4355

×
4356
        // TODO(guggero): Do a proper conversion to a string everywhere, or use
×
4357
        // route.Vertex as the key type of peerConnectedListeners.
×
4358
        pubStr := string(pubBytes)
×
4359
        for _, peerChan := range s.peerConnectedListeners[pubStr] {
×
4360
                select {
×
4361
                case peerChan <- p:
×
4362
                case <-s.quit:
×
4363
                        return
×
4364
                }
4365
        }
4366
        delete(s.peerConnectedListeners, pubStr)
×
4367
}
4368

4369
// peerTerminationWatcher waits until a peer has been disconnected unexpectedly,
4370
// and then cleans up all resources allocated to the peer, notifies relevant
4371
// sub-systems of its demise, and finally handles re-connecting to the peer if
4372
// it's persistent. If the server intentionally disconnects a peer, it should
4373
// have a corresponding entry in the ignorePeerTermination map which will cause
4374
// the cleanup routine to exit early. The passed `ready` chan is used to
4375
// synchronize when WaitForDisconnect should begin watching on the peer's
4376
// waitgroup. The ready chan should only be signaled if the peer starts
4377
// successfully, otherwise the peer should be disconnected instead.
4378
//
4379
// NOTE: This MUST be launched as a goroutine.
4380
func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
×
4381
        defer s.wg.Done()
×
4382

×
4383
        p.WaitForDisconnect(ready)
×
4384

×
4385
        srvrLog.Debugf("Peer %v has been disconnected", p)
×
4386

×
4387
        // If the server is exiting then we can bail out early ourselves as all
×
4388
        // the other sub-systems will already be shutting down.
×
4389
        if s.Stopped() {
×
4390
                srvrLog.Debugf("Server quitting, exit early for peer %v", p)
×
4391
                return
×
4392
        }
×
4393

4394
        // Next, we'll cancel all pending funding reservations with this node.
4395
        // If we tried to initiate any funding flows that haven't yet finished,
4396
        // then we need to unlock those committed outputs so they're still
4397
        // available for use.
4398
        s.fundingMgr.CancelPeerReservations(p.PubKey())
×
4399

×
4400
        pubKey := p.IdentityKey()
×
4401

×
4402
        // We'll also inform the gossiper that this peer is no longer active,
×
4403
        // so we don't need to maintain sync state for it any longer.
×
4404
        s.authGossiper.PruneSyncState(p.PubKey())
×
4405

×
4406
        // Tell the switch to remove all links associated with this peer.
×
4407
        // Passing nil as the target link indicates that all links associated
×
4408
        // with this interface should be closed.
×
4409
        //
×
4410
        // TODO(roasbeef): instead add a PurgeInterfaceLinks function?
×
4411
        links, err := s.htlcSwitch.GetLinksByInterface(p.PubKey())
×
4412
        if err != nil && err != htlcswitch.ErrNoLinksFound {
×
4413
                srvrLog.Errorf("Unable to get channel links for %v: %v", p, err)
×
4414
        }
×
4415

4416
        for _, link := range links {
×
4417
                s.htlcSwitch.RemoveLink(link.ChanID())
×
4418
        }
×
4419

4420
        s.mu.Lock()
×
4421
        defer s.mu.Unlock()
×
4422

×
4423
        // If there were any notification requests for when this peer
×
4424
        // disconnected, we can trigger them now.
×
4425
        srvrLog.Debugf("Notifying that peer %v is offline", p)
×
4426
        pubStr := string(pubKey.SerializeCompressed())
×
4427
        for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
×
4428
                close(offlineChan)
×
4429
        }
×
4430
        delete(s.peerDisconnectedListeners, pubStr)
×
4431

×
4432
        // If the server has already removed this peer, we can short circuit the
×
4433
        // peer termination watcher and skip cleanup.
×
4434
        if _, ok := s.ignorePeerTermination[p]; ok {
×
4435
                delete(s.ignorePeerTermination, p)
×
4436

×
4437
                pubKey := p.PubKey()
×
4438
                pubStr := string(pubKey[:])
×
4439

×
4440
                // If a connection callback is present, we'll go ahead and
×
4441
                // execute it now that previous peer has fully disconnected. If
×
4442
                // the callback is not present, this likely implies the peer was
×
4443
                // purposefully disconnected via RPC, and that no reconnect
×
4444
                // should be attempted.
×
4445
                connCallback, ok := s.scheduledPeerConnection[pubStr]
×
4446
                if ok {
×
4447
                        delete(s.scheduledPeerConnection, pubStr)
×
4448
                        connCallback()
×
4449
                }
×
4450
                return
×
4451
        }
4452

4453
        // First, cleanup any remaining state the server has regarding the peer
4454
        // in question.
4455
        s.removePeer(p)
×
4456

×
4457
        // Next, check to see if this is a persistent peer or not.
×
4458
        if _, ok := s.persistentPeers[pubStr]; !ok {
×
4459
                return
×
4460
        }
×
4461

4462
        // Get the last address that we used to connect to the peer.
4463
        addrs := []net.Addr{
×
4464
                p.NetAddress().Address,
×
4465
        }
×
4466

×
4467
        // We'll ensure that we locate all the peers advertised addresses for
×
4468
        // reconnection purposes.
×
4469
        advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(pubKey)
×
4470
        switch {
×
4471
        // We found advertised addresses, so use them.
4472
        case err == nil:
×
4473
                addrs = advertisedAddrs
×
4474

4475
        // The peer doesn't have an advertised address.
4476
        case err == errNoAdvertisedAddr:
×
4477
                // If it is an outbound peer then we fall back to the existing
×
4478
                // peer address.
×
4479
                if !p.Inbound() {
×
4480
                        break
×
4481
                }
4482

4483
                // Fall back to the existing peer address if
4484
                // we're not accepting connections over Tor.
4485
                if s.torController == nil {
×
4486
                        break
×
4487
                }
4488

4489
                // If we are, the peer's address won't be known
4490
                // to us (we'll see a private address, which is
4491
                // the address used by our onion service to dial
4492
                // to lnd), so we don't have enough information
4493
                // to attempt a reconnect.
4494
                srvrLog.Debugf("Ignoring reconnection attempt "+
×
4495
                        "to inbound peer %v without "+
×
4496
                        "advertised address", p)
×
4497
                return
×
4498

4499
        // We came across an error retrieving an advertised
4500
        // address, log it, and fall back to the existing peer
4501
        // address.
4502
        default:
×
4503
                srvrLog.Errorf("Unable to retrieve advertised "+
×
4504
                        "address for node %x: %v", p.PubKey(),
×
4505
                        err)
×
4506
        }
4507

4508
        // Make an easy lookup map so that we can check if an address
4509
        // is already in the address list that we have stored for this peer.
4510
        existingAddrs := make(map[string]bool)
×
4511
        for _, addr := range s.persistentPeerAddrs[pubStr] {
×
4512
                existingAddrs[addr.String()] = true
×
4513
        }
×
4514

4515
        // Add any missing addresses for this peer to persistentPeerAddr.
4516
        for _, addr := range addrs {
×
4517
                if existingAddrs[addr.String()] {
×
4518
                        continue
×
4519
                }
4520

4521
                s.persistentPeerAddrs[pubStr] = append(
×
4522
                        s.persistentPeerAddrs[pubStr],
×
4523
                        &lnwire.NetAddress{
×
4524
                                IdentityKey: p.IdentityKey(),
×
4525
                                Address:     addr,
×
4526
                                ChainNet:    p.NetAddress().ChainNet,
×
4527
                        },
×
4528
                )
×
4529
        }
4530

4531
        // Record the computed backoff in the backoff map.
4532
        backoff := s.nextPeerBackoff(pubStr, p.StartTime())
×
4533
        s.persistentPeersBackoff[pubStr] = backoff
×
4534

×
4535
        // Initialize a retry canceller for this peer if one does not
×
4536
        // exist.
×
4537
        cancelChan, ok := s.persistentRetryCancels[pubStr]
×
4538
        if !ok {
×
4539
                cancelChan = make(chan struct{})
×
4540
                s.persistentRetryCancels[pubStr] = cancelChan
×
4541
        }
×
4542

4543
        // We choose not to wait group this go routine since the Connect
4544
        // call can stall for arbitrarily long if we shutdown while an
4545
        // outbound connection attempt is being made.
4546
        go func() {
×
4547
                srvrLog.Debugf("Scheduling connection re-establishment to "+
×
4548
                        "persistent peer %x in %s",
×
4549
                        p.IdentityKey().SerializeCompressed(), backoff)
×
4550

×
4551
                select {
×
4552
                case <-time.After(backoff):
×
4553
                case <-cancelChan:
×
4554
                        return
×
4555
                case <-s.quit:
×
4556
                        return
×
4557
                }
4558

4559
                srvrLog.Debugf("Attempting to re-establish persistent "+
×
4560
                        "connection to peer %x",
×
4561
                        p.IdentityKey().SerializeCompressed())
×
4562

×
4563
                s.connectToPersistentPeer(pubStr)
×
4564
        }()
4565
}
4566

4567
// connectToPersistentPeer uses all the stored addresses for a peer to attempt
4568
// to connect to the peer. It creates connection requests if there are
4569
// currently none for a given address and it removes old connection requests
4570
// if the associated address is no longer in the latest address list for the
4571
// peer.
4572
func (s *server) connectToPersistentPeer(pubKeyStr string) {
×
4573
        s.mu.Lock()
×
4574
        defer s.mu.Unlock()
×
4575

×
4576
        // Create an easy lookup map of the addresses we have stored for the
×
4577
        // peer. We will remove entries from this map if we have existing
×
4578
        // connection requests for the associated address and then any leftover
×
4579
        // entries will indicate which addresses we should create new
×
4580
        // connection requests for.
×
4581
        addrMap := make(map[string]*lnwire.NetAddress)
×
4582
        for _, addr := range s.persistentPeerAddrs[pubKeyStr] {
×
4583
                addrMap[addr.String()] = addr
×
4584
        }
×
4585

4586
        // Go through each of the existing connection requests and
4587
        // check if they correspond to the latest set of addresses. If
4588
        // there is a connection requests that does not use one of the latest
4589
        // advertised addresses then remove that connection request.
4590
        var updatedConnReqs []*connmgr.ConnReq
×
4591
        for _, connReq := range s.persistentConnReqs[pubKeyStr] {
×
4592
                lnAddr := connReq.Addr.(*lnwire.NetAddress).Address.String()
×
4593

×
4594
                switch _, ok := addrMap[lnAddr]; ok {
×
4595
                // If the existing connection request is using one of the
4596
                // latest advertised addresses for the peer then we add it to
4597
                // updatedConnReqs and remove the associated address from
4598
                // addrMap so that we don't recreate this connReq later on.
4599
                case true:
×
4600
                        updatedConnReqs = append(
×
4601
                                updatedConnReqs, connReq,
×
4602
                        )
×
4603
                        delete(addrMap, lnAddr)
×
4604

4605
                // If the existing connection request is using an address that
4606
                // is not one of the latest advertised addresses for the peer
4607
                // then we remove the connecting request from the connection
4608
                // manager.
4609
                case false:
×
4610
                        srvrLog.Info(
×
4611
                                "Removing conn req:", connReq.Addr.String(),
×
4612
                        )
×
4613
                        s.connMgr.Remove(connReq.ID())
×
4614
                }
4615
        }
4616

4617
        s.persistentConnReqs[pubKeyStr] = updatedConnReqs
×
4618

×
4619
        cancelChan, ok := s.persistentRetryCancels[pubKeyStr]
×
4620
        if !ok {
×
4621
                cancelChan = make(chan struct{})
×
4622
                s.persistentRetryCancels[pubKeyStr] = cancelChan
×
4623
        }
×
4624

4625
        // Any addresses left in addrMap are new ones that we have not made
4626
        // connection requests for. So create new connection requests for those.
4627
        // If there is more than one address in the address map, stagger the
4628
        // creation of the connection requests for those.
4629
        go func() {
×
4630
                ticker := time.NewTicker(multiAddrConnectionStagger)
×
4631
                defer ticker.Stop()
×
4632

×
4633
                for _, addr := range addrMap {
×
4634
                        // Send the persistent connection request to the
×
4635
                        // connection manager, saving the request itself so we
×
4636
                        // can cancel/restart the process as needed.
×
4637
                        connReq := &connmgr.ConnReq{
×
4638
                                Addr:      addr,
×
4639
                                Permanent: true,
×
4640
                        }
×
4641

×
4642
                        s.mu.Lock()
×
4643
                        s.persistentConnReqs[pubKeyStr] = append(
×
4644
                                s.persistentConnReqs[pubKeyStr], connReq,
×
4645
                        )
×
4646
                        s.mu.Unlock()
×
4647

×
4648
                        srvrLog.Debugf("Attempting persistent connection to "+
×
4649
                                "channel peer %v", addr)
×
4650

×
4651
                        go s.connMgr.Connect(connReq)
×
4652

×
4653
                        select {
×
4654
                        case <-s.quit:
×
4655
                                return
×
4656
                        case <-cancelChan:
×
4657
                                return
×
4658
                        case <-ticker.C:
×
4659
                        }
4660
                }
4661
        }()
4662
}
4663

4664
// removePeer removes the passed peer from the server's state of all active
4665
// peers.
4666
func (s *server) removePeer(p *peer.Brontide) {
×
4667
        if p == nil {
×
4668
                return
×
4669
        }
×
4670

4671
        srvrLog.Debugf("removing peer %v", p)
×
4672

×
4673
        // As the peer is now finished, ensure that the TCP connection is
×
4674
        // closed and all of its related goroutines have exited.
×
4675
        p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p))
×
4676

×
4677
        // If this peer had an active persistent connection request, remove it.
×
4678
        if p.ConnReq() != nil {
×
4679
                s.connMgr.Remove(p.ConnReq().ID())
×
4680
        }
×
4681

4682
        // Ignore deleting peers if we're shutting down.
4683
        if s.Stopped() {
×
4684
                return
×
4685
        }
×
4686

4687
        pKey := p.PubKey()
×
4688
        pubSer := pKey[:]
×
4689
        pubStr := string(pubSer)
×
4690

×
4691
        delete(s.peersByPub, pubStr)
×
4692

×
4693
        if p.Inbound() {
×
4694
                delete(s.inboundPeers, pubStr)
×
4695
        } else {
×
4696
                delete(s.outboundPeers, pubStr)
×
4697
        }
×
4698

4699
        // Copy the peer's error buffer across to the server if it has any items
4700
        // in it so that we can restore peer errors across connections.
4701
        if p.ErrorBuffer().Total() > 0 {
×
4702
                s.peerErrors[pubStr] = p.ErrorBuffer()
×
4703
        }
×
4704

4705
        // Inform the peer notifier of a peer offline event so that it can be
4706
        // reported to clients listening for peer events.
4707
        var pubKey [33]byte
×
4708
        copy(pubKey[:], pubSer)
×
4709

×
4710
        s.peerNotifier.NotifyPeerOffline(pubKey)
×
4711
}
4712

4713
// ConnectToPeer requests that the server connect to a Lightning Network peer
4714
// at the specified address. This function will *block* until either a
4715
// connection is established, or the initial handshake process fails.
4716
//
4717
// NOTE: This function is safe for concurrent access.
4718
func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
4719
        perm bool, timeout time.Duration) error {
×
4720

×
4721
        targetPub := string(addr.IdentityKey.SerializeCompressed())
×
4722

×
4723
        // Acquire mutex, but use explicit unlocking instead of defer for
×
4724
        // better granularity.  In certain conditions, this method requires
×
4725
        // making an outbound connection to a remote peer, which requires the
×
4726
        // lock to be released, and subsequently reacquired.
×
4727
        s.mu.Lock()
×
4728

×
4729
        // Ensure we're not already connected to this peer.
×
4730
        peer, err := s.findPeerByPubStr(targetPub)
×
4731
        if err == nil {
×
4732
                s.mu.Unlock()
×
4733
                return &errPeerAlreadyConnected{peer: peer}
×
4734
        }
×
4735

4736
        // Peer was not found, continue to pursue connection with peer.
4737

4738
        // If there's already a pending connection request for this pubkey,
4739
        // then we ignore this request to ensure we don't create a redundant
4740
        // connection.
4741
        if reqs, ok := s.persistentConnReqs[targetPub]; ok {
×
4742
                srvrLog.Warnf("Already have %d persistent connection "+
×
4743
                        "requests for %v, connecting anyway.", len(reqs), addr)
×
4744
        }
×
4745

4746
        // If there's not already a pending or active connection to this node,
4747
        // then instruct the connection manager to attempt to establish a
4748
        // persistent connection to the peer.
4749
        srvrLog.Debugf("Connecting to %v", addr)
×
4750
        if perm {
×
4751
                connReq := &connmgr.ConnReq{
×
4752
                        Addr:      addr,
×
4753
                        Permanent: true,
×
4754
                }
×
4755

×
4756
                // Since the user requested a permanent connection, we'll set
×
4757
                // the entry to true which will tell the server to continue
×
4758
                // reconnecting even if the number of channels with this peer is
×
4759
                // zero.
×
4760
                s.persistentPeers[targetPub] = true
×
4761
                if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
×
4762
                        s.persistentPeersBackoff[targetPub] = s.cfg.MinBackoff
×
4763
                }
×
4764
                s.persistentConnReqs[targetPub] = append(
×
4765
                        s.persistentConnReqs[targetPub], connReq,
×
4766
                )
×
4767
                s.mu.Unlock()
×
4768

×
4769
                go s.connMgr.Connect(connReq)
×
4770

×
4771
                return nil
×
4772
        }
4773
        s.mu.Unlock()
×
4774

×
4775
        // If we're not making a persistent connection, then we'll attempt to
×
4776
        // connect to the target peer. If the we can't make the connection, or
×
4777
        // the crypto negotiation breaks down, then return an error to the
×
4778
        // caller.
×
4779
        errChan := make(chan error, 1)
×
4780
        s.connectToPeer(addr, errChan, timeout)
×
4781

×
4782
        select {
×
4783
        case err := <-errChan:
×
4784
                return err
×
4785
        case <-s.quit:
×
4786
                return ErrServerShuttingDown
×
4787
        }
4788
}
4789

4790
// connectToPeer establishes a connection to a remote peer. errChan is used to
4791
// notify the caller if the connection attempt has failed. Otherwise, it will be
4792
// closed.
4793
func (s *server) connectToPeer(addr *lnwire.NetAddress,
4794
        errChan chan<- error, timeout time.Duration) {
×
4795

×
4796
        conn, err := brontide.Dial(
×
4797
                s.identityECDH, addr, timeout, s.cfg.net.Dial,
×
4798
        )
×
4799
        if err != nil {
×
4800
                srvrLog.Errorf("Unable to connect to %v: %v", addr, err)
×
4801
                select {
×
4802
                case errChan <- err:
×
4803
                case <-s.quit:
×
4804
                }
4805
                return
×
4806
        }
4807

4808
        close(errChan)
×
4809

×
4810
        srvrLog.Tracef("Brontide dialer made local=%v, remote=%v",
×
4811
                conn.LocalAddr(), conn.RemoteAddr())
×
4812

×
4813
        s.OutboundPeerConnected(nil, conn)
×
4814
}
4815

4816
// DisconnectPeer sends the request to server to close the connection with peer
4817
// identified by public key.
4818
//
4819
// NOTE: This function is safe for concurrent access.
4820
func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
×
4821
        pubBytes := pubKey.SerializeCompressed()
×
4822
        pubStr := string(pubBytes)
×
4823

×
4824
        s.mu.Lock()
×
4825
        defer s.mu.Unlock()
×
4826

×
4827
        // Check that were actually connected to this peer. If not, then we'll
×
4828
        // exit in an error as we can't disconnect from a peer that we're not
×
4829
        // currently connected to.
×
4830
        peer, err := s.findPeerByPubStr(pubStr)
×
4831
        if err == ErrPeerNotConnected {
×
4832
                return fmt.Errorf("peer %x is not connected", pubBytes)
×
4833
        }
×
4834

4835
        srvrLog.Infof("Disconnecting from %v", peer)
×
4836

×
4837
        s.cancelConnReqs(pubStr, nil)
×
4838

×
4839
        // If this peer was formerly a persistent connection, then we'll remove
×
4840
        // them from this map so we don't attempt to re-connect after we
×
4841
        // disconnect.
×
4842
        delete(s.persistentPeers, pubStr)
×
4843
        delete(s.persistentPeersBackoff, pubStr)
×
4844

×
4845
        // Remove the peer by calling Disconnect. Previously this was done with
×
4846
        // removePeer, which bypassed the peerTerminationWatcher.
×
4847
        peer.Disconnect(fmt.Errorf("server: DisconnectPeer called"))
×
4848

×
4849
        return nil
×
4850
}
4851

4852
// OpenChannel sends a request to the server to open a channel to the specified
4853
// peer identified by nodeKey with the passed channel funding parameters.
4854
//
4855
// NOTE: This function is safe for concurrent access.
4856
func (s *server) OpenChannel(
4857
        req *funding.InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) {
×
4858

×
4859
        // The updateChan will have a buffer of 2, since we expect a ChanPending
×
4860
        // + a ChanOpen update, and we want to make sure the funding process is
×
4861
        // not blocked if the caller is not reading the updates.
×
4862
        req.Updates = make(chan *lnrpc.OpenStatusUpdate, 2)
×
4863
        req.Err = make(chan error, 1)
×
4864

×
4865
        // First attempt to locate the target peer to open a channel with, if
×
4866
        // we're unable to locate the peer then this request will fail.
×
4867
        pubKeyBytes := req.TargetPubkey.SerializeCompressed()
×
4868
        s.mu.RLock()
×
4869
        peer, ok := s.peersByPub[string(pubKeyBytes)]
×
4870
        if !ok {
×
4871
                s.mu.RUnlock()
×
4872

×
4873
                req.Err <- fmt.Errorf("peer %x is not online", pubKeyBytes)
×
4874
                return req.Updates, req.Err
×
4875
        }
×
4876
        req.Peer = peer
×
4877
        s.mu.RUnlock()
×
4878

×
4879
        // We'll wait until the peer is active before beginning the channel
×
4880
        // opening process.
×
4881
        select {
×
4882
        case <-peer.ActiveSignal():
×
4883
        case <-peer.QuitSignal():
×
4884
                req.Err <- fmt.Errorf("peer %x disconnected", pubKeyBytes)
×
4885
                return req.Updates, req.Err
×
4886
        case <-s.quit:
×
4887
                req.Err <- ErrServerShuttingDown
×
4888
                return req.Updates, req.Err
×
4889
        }
4890

4891
        // If the fee rate wasn't specified at this point we fail the funding
4892
        // because of the missing fee rate information. The caller of the
4893
        // `OpenChannel` method needs to make sure that default values for the
4894
        // fee rate are set beforehand.
4895
        if req.FundingFeePerKw == 0 {
×
4896
                req.Err <- fmt.Errorf("no FundingFeePerKw specified for " +
×
4897
                        "the channel opening transaction")
×
4898

×
4899
                return req.Updates, req.Err
×
4900
        }
×
4901

4902
        // Spawn a goroutine to send the funding workflow request to the funding
4903
        // manager. This allows the server to continue handling queries instead
4904
        // of blocking on this request which is exported as a synchronous
4905
        // request to the outside world.
4906
        go s.fundingMgr.InitFundingWorkflow(req)
×
4907

×
4908
        return req.Updates, req.Err
×
4909
}
4910

4911
// Peers returns a slice of all active peers.
4912
//
4913
// NOTE: This function is safe for concurrent access.
4914
func (s *server) Peers() []*peer.Brontide {
×
4915
        s.mu.RLock()
×
4916
        defer s.mu.RUnlock()
×
4917

×
4918
        peers := make([]*peer.Brontide, 0, len(s.peersByPub))
×
4919
        for _, peer := range s.peersByPub {
×
4920
                peers = append(peers, peer)
×
4921
        }
×
4922

4923
        return peers
×
4924
}
4925

4926
// computeNextBackoff uses a truncated exponential backoff to compute the next
4927
// backoff using the value of the exiting backoff. The returned duration is
4928
// randomized in either direction by 1/20 to prevent tight loops from
4929
// stabilizing.
4930
func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration {
×
4931
        // Double the current backoff, truncating if it exceeds our maximum.
×
4932
        nextBackoff := 2 * currBackoff
×
4933
        if nextBackoff > maxBackoff {
×
4934
                nextBackoff = maxBackoff
×
4935
        }
×
4936

4937
        // Using 1/10 of our duration as a margin, compute a random offset to
4938
        // avoid the nodes entering connection cycles.
4939
        margin := nextBackoff / 10
×
4940

×
4941
        var wiggle big.Int
×
4942
        wiggle.SetUint64(uint64(margin))
×
4943
        if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
×
4944
                // Randomizing is not mission critical, so we'll just return the
×
4945
                // current backoff.
×
4946
                return nextBackoff
×
4947
        }
×
4948

4949
        // Otherwise add in our wiggle, but subtract out half of the margin so
4950
        // that the backoff can tweaked by 1/20 in either direction.
4951
        return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
×
4952
}
4953

4954
// errNoAdvertisedAddr is an error returned when we attempt to retrieve the
4955
// advertised address of a node, but they don't have one.
4956
var errNoAdvertisedAddr = errors.New("no advertised address found")
4957

4958
// fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a node.
4959
func (s *server) fetchNodeAdvertisedAddrs(pub *btcec.PublicKey) ([]net.Addr, error) {
×
4960
        vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed())
×
4961
        if err != nil {
×
4962
                return nil, err
×
4963
        }
×
4964

4965
        node, err := s.graphDB.FetchLightningNode(vertex)
×
4966
        if err != nil {
×
4967
                return nil, err
×
4968
        }
×
4969

4970
        if len(node.Addresses) == 0 {
×
4971
                return nil, errNoAdvertisedAddr
×
4972
        }
×
4973

4974
        return node.Addresses, nil
×
4975
}
4976

4977
// fetchLastChanUpdate returns a function which is able to retrieve our latest
4978
// channel update for a target channel.
4979
func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
4980
        *lnwire.ChannelUpdate1, error) {
×
4981

×
4982
        ourPubKey := s.identityECDH.PubKey().SerializeCompressed()
×
4983
        return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate1, error) {
×
4984
                info, edge1, edge2, err := s.graphBuilder.GetChannelByID(cid)
×
4985
                if err != nil {
×
4986
                        return nil, err
×
4987
                }
×
4988

4989
                return netann.ExtractChannelUpdate(
×
4990
                        ourPubKey[:], info, edge1, edge2,
×
4991
                )
×
4992
        }
4993
}
4994

4995
// applyChannelUpdate applies the channel update to the different sub-systems of
4996
// the server. The useAlias boolean denotes whether or not to send an alias in
4997
// place of the real SCID.
4998
func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate1,
4999
        op *wire.OutPoint, useAlias bool) error {
×
5000

×
5001
        var (
×
5002
                peerAlias    *lnwire.ShortChannelID
×
5003
                defaultAlias lnwire.ShortChannelID
×
5004
        )
×
5005

×
5006
        chanID := lnwire.NewChanIDFromOutPoint(*op)
×
5007

×
5008
        // Fetch the peer's alias from the lnwire.ChannelID so it can be used
×
5009
        // in the ChannelUpdate if it hasn't been announced yet.
×
5010
        if useAlias {
×
5011
                foundAlias, _ := s.aliasMgr.GetPeerAlias(chanID)
×
5012
                if foundAlias != defaultAlias {
×
5013
                        peerAlias = &foundAlias
×
5014
                }
×
5015
        }
5016

5017
        errChan := s.authGossiper.ProcessLocalAnnouncement(
×
5018
                update, discovery.RemoteAlias(peerAlias),
×
5019
        )
×
5020
        select {
×
5021
        case err := <-errChan:
×
5022
                return err
×
5023
        case <-s.quit:
×
5024
                return ErrServerShuttingDown
×
5025
        }
5026
}
5027

5028
// SendCustomMessage sends a custom message to the peer with the specified
5029
// pubkey.
5030
func (s *server) SendCustomMessage(peerPub [33]byte, msgType lnwire.MessageType,
5031
        data []byte) error {
×
5032

×
5033
        peer, err := s.FindPeerByPubStr(string(peerPub[:]))
×
5034
        if err != nil {
×
5035
                return err
×
5036
        }
×
5037

5038
        // We'll wait until the peer is active.
5039
        select {
×
5040
        case <-peer.ActiveSignal():
×
5041
        case <-peer.QuitSignal():
×
5042
                return fmt.Errorf("peer %x disconnected", peerPub)
×
5043
        case <-s.quit:
×
5044
                return ErrServerShuttingDown
×
5045
        }
5046

5047
        msg, err := lnwire.NewCustom(msgType, data)
×
5048
        if err != nil {
×
5049
                return err
×
5050
        }
×
5051

5052
        // Send the message as low-priority. For now we assume that all
5053
        // application-defined message are low priority.
5054
        return peer.SendMessageLazy(true, msg)
×
5055
}
5056

5057
// newSweepPkScriptGen creates closure that generates a new public key script
5058
// which should be used to sweep any funds into the on-chain wallet.
5059
// Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash
5060
// (p2wkh) output.
5061
func newSweepPkScriptGen(
5062
        wallet lnwallet.WalletController,
5063
        netParams *chaincfg.Params) func() fn.Result[lnwallet.AddrWithKey] {
×
5064

×
5065
        return func() fn.Result[lnwallet.AddrWithKey] {
×
5066
                sweepAddr, err := wallet.NewAddress(
×
5067
                        lnwallet.TaprootPubkey, false,
×
5068
                        lnwallet.DefaultAccountName,
×
5069
                )
×
5070
                if err != nil {
×
5071
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5072
                }
×
5073

5074
                addr, err := txscript.PayToAddrScript(sweepAddr)
×
5075
                if err != nil {
×
5076
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5077
                }
×
5078

5079
                internalKeyDesc, err := lnwallet.InternalKeyForAddr(
×
5080
                        wallet, netParams, addr,
×
5081
                )
×
5082
                if err != nil {
×
5083
                        return fn.Err[lnwallet.AddrWithKey](err)
×
5084
                }
×
5085

5086
                return fn.Ok(lnwallet.AddrWithKey{
×
5087
                        DeliveryAddress: addr,
×
5088
                        InternalKey:     internalKeyDesc,
×
5089
                })
×
5090
        }
5091
}
5092

5093
// shouldPeerBootstrap returns true if we should attempt to perform peer
5094
// bootstrapping to actively seek our peers using the set of active network
5095
// bootstrappers.
5096
func shouldPeerBootstrap(cfg *Config) bool {
6✔
5097
        isSimnet := cfg.Bitcoin.SimNet
6✔
5098
        isSignet := cfg.Bitcoin.SigNet
6✔
5099
        isRegtest := cfg.Bitcoin.RegTest
6✔
5100
        isDevNetwork := isSimnet || isSignet || isRegtest
6✔
5101

6✔
5102
        // TODO(yy): remove the check on simnet/regtest such that the itest is
6✔
5103
        // covering the bootstrapping process.
6✔
5104
        return !cfg.NoNetBootstrap && !isDevNetwork
6✔
5105
}
6✔
5106

5107
// fetchClosedChannelSCIDs returns a set of SCIDs that have their force closing
5108
// finished.
5109
func (s *server) fetchClosedChannelSCIDs() map[lnwire.ShortChannelID]struct{} {
×
5110
        // Get a list of closed channels.
×
5111
        channels, err := s.chanStateDB.FetchClosedChannels(false)
×
5112
        if err != nil {
×
5113
                srvrLog.Errorf("Failed to fetch closed channels: %v", err)
×
5114
                return nil
×
5115
        }
×
5116

5117
        // Save the SCIDs in a map.
5118
        closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels))
×
5119
        for _, c := range channels {
×
5120
                // If the channel is not pending, its FC has been finalized.
×
5121
                if !c.IsPending {
×
5122
                        closedSCIDs[c.ShortChanID] = struct{}{}
×
5123
                }
×
5124
        }
5125

5126
        // Double check whether the reported closed channel has indeed finished
5127
        // closing.
5128
        //
5129
        // NOTE: There are misalignments regarding when a channel's FC is
5130
        // marked as finalized. We double check the pending channels to make
5131
        // sure the returned SCIDs are indeed terminated.
5132
        //
5133
        // TODO(yy): fix the misalignments in `FetchClosedChannels`.
5134
        pendings, err := s.chanStateDB.FetchPendingChannels()
×
5135
        if err != nil {
×
5136
                srvrLog.Errorf("Failed to fetch pending channels: %v", err)
×
5137
                return nil
×
5138
        }
×
5139

5140
        for _, c := range pendings {
×
5141
                if _, ok := closedSCIDs[c.ShortChannelID]; !ok {
×
5142
                        continue
×
5143
                }
5144

5145
                // If the channel is still reported as pending, remove it from
5146
                // the map.
5147
                delete(closedSCIDs, c.ShortChannelID)
×
5148

×
5149
                srvrLog.Warnf("Channel=%v is prematurely marked as finalized",
×
5150
                        c.ShortChannelID)
×
5151
        }
5152

5153
        return closedSCIDs
×
5154
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc