• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12428593038

20 Dec 2024 09:02AM UTC coverage: 58.33% (-0.2%) from 58.576%
12428593038

Pull #9382

github

guggero
.golangci.yml: speed up linter by updating start commit

With this we allow the linter to only look at recent changes, since
everything between that old commit and this most recent one has been
linted correctly anyway.
Pull Request #9382: lint: deprecate old linters, use new ref commit

133769 of 229330 relevant lines covered (58.33%)

19284.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.49
/peer/brontide.go
1
package peer
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "errors"
7
        "fmt"
8
        "math/rand"
9
        "net"
10
        "strings"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcec/v2"
16
        "github.com/btcsuite/btcd/chaincfg/chainhash"
17
        "github.com/btcsuite/btcd/connmgr"
18
        "github.com/btcsuite/btcd/txscript"
19
        "github.com/btcsuite/btcd/wire"
20
        "github.com/btcsuite/btclog/v2"
21
        "github.com/davecgh/go-spew/spew"
22
        "github.com/lightningnetwork/lnd/buffer"
23
        "github.com/lightningnetwork/lnd/chainntnfs"
24
        "github.com/lightningnetwork/lnd/channeldb"
25
        "github.com/lightningnetwork/lnd/channelnotifier"
26
        "github.com/lightningnetwork/lnd/contractcourt"
27
        "github.com/lightningnetwork/lnd/discovery"
28
        "github.com/lightningnetwork/lnd/feature"
29
        "github.com/lightningnetwork/lnd/fn/v2"
30
        "github.com/lightningnetwork/lnd/funding"
31
        graphdb "github.com/lightningnetwork/lnd/graph/db"
32
        "github.com/lightningnetwork/lnd/graph/db/models"
33
        "github.com/lightningnetwork/lnd/htlcswitch"
34
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
35
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
36
        "github.com/lightningnetwork/lnd/input"
37
        "github.com/lightningnetwork/lnd/invoices"
38
        "github.com/lightningnetwork/lnd/keychain"
39
        "github.com/lightningnetwork/lnd/lnpeer"
40
        "github.com/lightningnetwork/lnd/lntypes"
41
        "github.com/lightningnetwork/lnd/lnutils"
42
        "github.com/lightningnetwork/lnd/lnwallet"
43
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
44
        "github.com/lightningnetwork/lnd/lnwallet/chancloser"
45
        "github.com/lightningnetwork/lnd/lnwire"
46
        "github.com/lightningnetwork/lnd/msgmux"
47
        "github.com/lightningnetwork/lnd/netann"
48
        "github.com/lightningnetwork/lnd/pool"
49
        "github.com/lightningnetwork/lnd/queue"
50
        "github.com/lightningnetwork/lnd/subscribe"
51
        "github.com/lightningnetwork/lnd/ticker"
52
        "github.com/lightningnetwork/lnd/tlv"
53
        "github.com/lightningnetwork/lnd/watchtower/wtclient"
54
)
55

56
const (
57
        // pingInterval is the interval at which ping messages are sent.
58
        pingInterval = 1 * time.Minute
59

60
        // pingTimeout is the amount of time we will wait for a pong response
61
        // before considering the peer to be unresponsive.
62
        //
63
        // This MUST be a smaller value than the pingInterval.
64
        pingTimeout = 30 * time.Second
65

66
        // idleTimeout is the duration of inactivity before we time out a peer.
67
        idleTimeout = 5 * time.Minute
68

69
        // writeMessageTimeout is the timeout used when writing a message to the
70
        // peer.
71
        writeMessageTimeout = 5 * time.Second
72

73
        // readMessageTimeout is the timeout used when reading a message from a
74
        // peer.
75
        readMessageTimeout = 5 * time.Second
76

77
        // handshakeTimeout is the timeout used when waiting for the peer's init
78
        // message.
79
        handshakeTimeout = 15 * time.Second
80

81
        // ErrorBufferSize is the number of historic peer errors that we store.
82
        ErrorBufferSize = 10
83

84
        // pongSizeCeiling is the upper bound on a uniformly distributed random
85
        // variable that we use for requesting pong responses. We don't use the
86
        // MaxPongBytes (upper bound accepted by the protocol) because it is
87
        // needlessly wasteful of precious Tor bandwidth for little to no gain.
88
        pongSizeCeiling = 4096
89

90
        // torTimeoutMultiplier is the scaling factor we use on network timeouts
91
        // for Tor peers.
92
        torTimeoutMultiplier = 3
93
)
94

95
var (
96
        // ErrChannelNotFound is an error returned when a channel is queried and
97
        // either the Brontide doesn't know of it, or the channel in question
98
        // is pending.
99
        ErrChannelNotFound = fmt.Errorf("channel not found")
100
)
101

102
// outgoingMsg packages an lnwire.Message to be sent out on the wire, along with
103
// a buffered channel which will be sent upon once the write is complete. This
104
// buffered channel acts as a semaphore to be used for synchronization purposes.
105
type outgoingMsg struct {
106
        priority bool
107
        msg      lnwire.Message
108
        errChan  chan error // MUST be buffered.
109
}
110

111
// newChannelMsg packages a channeldb.OpenChannel with a channel that allows
112
// the receiver of the request to report when the channel creation process has
113
// completed.
114
type newChannelMsg struct {
115
        // channel is used when the pending channel becomes active.
116
        channel *lnpeer.NewChannel
117

118
        // channelID is used when there's a new pending channel.
119
        channelID lnwire.ChannelID
120

121
        err chan error
122
}
123

124
type customMsg struct {
125
        peer [33]byte
126
        msg  lnwire.Custom
127
}
128

129
// closeMsg is a wrapper struct around any wire messages that deal with the
130
// cooperative channel closure negotiation process. This struct includes the
131
// raw channel ID targeted along with the original message.
132
type closeMsg struct {
133
        cid lnwire.ChannelID
134
        msg lnwire.Message
135
}
136

137
// PendingUpdate describes the pending state of a closing channel.
138
type PendingUpdate struct {
139
        Txid        []byte
140
        OutputIndex uint32
141
}
142

143
// ChannelCloseUpdate contains the outcome of the close channel operation.
144
type ChannelCloseUpdate struct {
145
        ClosingTxid []byte
146
        Success     bool
147

148
        // LocalCloseOutput is an optional, additional output on the closing
149
        // transaction that the local party should be paid to. This will only be
150
        // populated if the local balance isn't dust.
151
        LocalCloseOutput fn.Option[chancloser.CloseOutput]
152

153
        // RemoteCloseOutput is an optional, additional output on the closing
154
        // transaction that the remote party should be paid to. This will only
155
        // be populated if the remote balance isn't dust.
156
        RemoteCloseOutput fn.Option[chancloser.CloseOutput]
157

158
        // AuxOutputs is an optional set of additional outputs that might be
159
        // included in the closing transaction. These are used for custom
160
        // channel types.
161
        AuxOutputs fn.Option[chancloser.AuxCloseOutputs]
162
}
163

164
// TimestampedError is a timestamped error that is used to store the most recent
165
// errors we have experienced with our peers.
166
type TimestampedError struct {
167
        Error     error
168
        Timestamp time.Time
169
}
170

171
// Config defines configuration fields that are necessary for a peer object
172
// to function.
173
type Config struct {
174
        // Conn is the underlying network connection for this peer.
175
        Conn MessageConn
176

177
        // ConnReq stores information related to the persistent connection request
178
        // for this peer.
179
        ConnReq *connmgr.ConnReq
180

181
        // PubKeyBytes is the serialized, compressed public key of this peer.
182
        PubKeyBytes [33]byte
183

184
        // Addr is the network address of the peer.
185
        Addr *lnwire.NetAddress
186

187
        // Inbound indicates whether or not the peer is an inbound peer.
188
        Inbound bool
189

190
        // Features is the set of features that we advertise to the remote party.
191
        Features *lnwire.FeatureVector
192

193
        // LegacyFeatures is the set of features that we advertise to the remote
194
        // peer for backwards compatibility. Nodes that have not implemented
195
        // flat features will still be able to read our feature bits from the
196
        // legacy global field, but we will also advertise everything in the
197
        // default features field.
198
        LegacyFeatures *lnwire.FeatureVector
199

200
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
201
        // an htlc where we don't offer it anymore.
202
        OutgoingCltvRejectDelta uint32
203

204
        // ChanActiveTimeout specifies the duration the peer will wait to request
205
        // a channel reenable, beginning from the time the peer was started.
206
        ChanActiveTimeout time.Duration
207

208
        // ErrorBuffer stores a set of errors related to a peer. It contains error
209
        // messages that our peer has recently sent us over the wire and records of
210
        // unknown messages that were sent to us so that we can have a full track
211
        // record of the communication errors we have had with our peer. If we
212
        // choose to disconnect from a peer, it also stores the reason we had for
213
        // disconnecting.
214
        ErrorBuffer *queue.CircularBuffer
215

216
        // WritePool is the task pool that manages reuse of write buffers. Write
217
        // tasks are submitted to the pool in order to conserve the total number of
218
        // write buffers allocated at any one time, and decouple write buffer
219
        // allocation from the peer life cycle.
220
        WritePool *pool.Write
221

222
        // ReadPool is the task pool that manages reuse of read buffers.
223
        ReadPool *pool.Read
224

225
        // Switch is a pointer to the htlcswitch. It is used to setup, get, and
226
        // tear-down ChannelLinks.
227
        Switch messageSwitch
228

229
        // InterceptSwitch is a pointer to the InterceptableSwitch, a wrapper around
230
        // the regular Switch. We only export it here to pass ForwardPackets to the
231
        // ChannelLinkConfig.
232
        InterceptSwitch *htlcswitch.InterceptableSwitch
233

234
        // ChannelDB is used to fetch opened channels, and closed channels.
235
        ChannelDB *channeldb.ChannelStateDB
236

237
        // ChannelGraph is a pointer to the channel graph which is used to
238
        // query information about the set of known active channels.
239
        ChannelGraph *graphdb.ChannelGraph
240

241
        // ChainArb is used to subscribe to channel events, update contract signals,
242
        // and force close channels.
243
        ChainArb *contractcourt.ChainArbitrator
244

245
        // AuthGossiper is needed so that the Brontide impl can register with the
246
        // gossiper and process remote channel announcements.
247
        AuthGossiper *discovery.AuthenticatedGossiper
248

249
        // ChanStatusMgr is used to set or un-set the disabled bit in channel
250
        // updates.
251
        ChanStatusMgr *netann.ChanStatusManager
252

253
        // ChainIO is used to retrieve the best block.
254
        ChainIO lnwallet.BlockChainIO
255

256
        // FeeEstimator is used to compute our target ideal fee-per-kw when
257
        // initializing the coop close process.
258
        FeeEstimator chainfee.Estimator
259

260
        // Signer is used when creating *lnwallet.LightningChannel instances.
261
        Signer input.Signer
262

263
        // SigPool is used when creating *lnwallet.LightningChannel instances.
264
        SigPool *lnwallet.SigPool
265

266
        // Wallet is used to publish transactions and generates delivery
267
        // scripts during the coop close process.
268
        Wallet *lnwallet.LightningWallet
269

270
        // ChainNotifier is used to receive confirmations of a coop close
271
        // transaction.
272
        ChainNotifier chainntnfs.ChainNotifier
273

274
        // BestBlockView is used to efficiently query for up-to-date
275
        // blockchain state information
276
        BestBlockView chainntnfs.BestBlockView
277

278
        // RoutingPolicy is used to set the forwarding policy for links created by
279
        // the Brontide.
280
        RoutingPolicy models.ForwardingPolicy
281

282
        // Sphinx is used when setting up ChannelLinks so they can decode sphinx
283
        // onion blobs.
284
        Sphinx *hop.OnionProcessor
285

286
        // WitnessBeacon is used when setting up ChannelLinks so they can add any
287
        // preimages that they learn.
288
        WitnessBeacon contractcourt.WitnessBeacon
289

290
        // Invoices is passed to the ChannelLink on creation and handles all
291
        // invoice-related logic.
292
        Invoices *invoices.InvoiceRegistry
293

294
        // ChannelNotifier is used by the link to notify other sub-systems about
295
        // channel-related events and by the Brontide to subscribe to
296
        // ActiveLinkEvents.
297
        ChannelNotifier *channelnotifier.ChannelNotifier
298

299
        // HtlcNotifier is used when creating a ChannelLink.
300
        HtlcNotifier *htlcswitch.HtlcNotifier
301

302
        // TowerClient is used to backup revoked states.
303
        TowerClient wtclient.ClientManager
304

305
        // DisconnectPeer is used to disconnect this peer if the cooperative close
306
        // process fails.
307
        DisconnectPeer func(*btcec.PublicKey) error
308

309
        // GenNodeAnnouncement is used to send our node announcement to the remote
310
        // on startup.
311
        GenNodeAnnouncement func(...netann.NodeAnnModifier) (
312
                lnwire.NodeAnnouncement, error)
313

314
        // PrunePersistentPeerConnection is used to remove all internal state
315
        // related to this peer in the server.
316
        PrunePersistentPeerConnection func([33]byte)
317

318
        // FetchLastChanUpdate fetches our latest channel update for a target
319
        // channel.
320
        FetchLastChanUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate1,
321
                error)
322

323
        // FundingManager is an implementation of the funding.Controller interface.
324
        FundingManager funding.Controller
325

326
        // Hodl is used when creating ChannelLinks to specify HodlFlags as
327
        // breakpoints in dev builds.
328
        Hodl *hodl.Config
329

330
        // UnsafeReplay is used when creating ChannelLinks to specify whether or
331
        // not to replay adds on its commitment tx.
332
        UnsafeReplay bool
333

334
        // MaxOutgoingCltvExpiry is used when creating ChannelLinks and is the max
335
        // number of blocks that funds could be locked up for when forwarding
336
        // payments.
337
        MaxOutgoingCltvExpiry uint32
338

339
        // MaxChannelFeeAllocation is used when creating ChannelLinks and is the
340
        // maximum percentage of total funds that can be allocated to a channel's
341
        // commitment fee. This only applies for the initiator of the channel.
342
        MaxChannelFeeAllocation float64
343

344
        // MaxAnchorsCommitFeeRate is the maximum fee rate we'll use as an
345
        // initiator for anchor channel commitments.
346
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
347

348
        // CoopCloseTargetConfs is the confirmation target that will be used
349
        // to estimate the fee rate to use during a cooperative channel
350
        // closure initiated by the remote peer.
351
        CoopCloseTargetConfs uint32
352

353
        // ServerPubKey is the serialized, compressed public key of our lnd node.
354
        // It is used to determine which policy (channel edge) to pass to the
355
        // ChannelLink.
356
        ServerPubKey [33]byte
357

358
        // ChannelCommitInterval is the maximum time that is allowed to pass between
359
        // receiving a channel state update and signing the next commitment.
360
        // Setting this to a longer duration allows for more efficient channel
361
        // operations at the cost of latency.
362
        ChannelCommitInterval time.Duration
363

364
        // PendingCommitInterval is the maximum time that is allowed to pass
365
        // while waiting for the remote party to revoke a locally initiated
366
        // commitment state. Setting this to a longer duration if a slow
367
        // response is expected from the remote party or large number of
368
        // payments are attempted at the same time.
369
        PendingCommitInterval time.Duration
370

371
        // ChannelCommitBatchSize is the maximum number of channel state updates
372
        // that is accumulated before signing a new commitment.
373
        ChannelCommitBatchSize uint32
374

375
        // HandleCustomMessage is called whenever a custom message is received
376
        // from the peer.
377
        HandleCustomMessage func(peer [33]byte, msg *lnwire.Custom) error
378

379
        // GetAliases is passed to created links so the Switch and link can be
380
        // aware of the channel's aliases.
381
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
382

383
        // RequestAlias allows the Brontide struct to request an alias to send
384
        // to the peer.
385
        RequestAlias func() (lnwire.ShortChannelID, error)
386

387
        // AddLocalAlias persists an alias to an underlying alias store.
388
        AddLocalAlias func(alias, base lnwire.ShortChannelID,
389
                gossip, liveUpdate bool) error
390

391
        // AuxLeafStore is an optional store that can be used to store auxiliary
392
        // leaves for certain custom channel types.
393
        AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
394

395
        // AuxSigner is an optional signer that can be used to sign auxiliary
396
        // leaves for certain custom channel types.
397
        AuxSigner fn.Option[lnwallet.AuxSigner]
398

399
        // AuxResolver is an optional interface that can be used to modify the
400
        // way contracts are resolved.
401
        AuxResolver fn.Option[lnwallet.AuxContractResolver]
402

403
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
404
        // used to manage the bandwidth of peer links.
405
        AuxTrafficShaper fn.Option[htlcswitch.AuxTrafficShaper]
406

407
        // PongBuf is a slice we'll reuse instead of allocating memory on the
408
        // heap. Since only reads will occur and no writes, there is no need
409
        // for any synchronization primitives. As a result, it's safe to share
410
        // this across multiple Peer struct instances.
411
        PongBuf []byte
412

413
        // Adds the option to disable forwarding payments in blinded routes
414
        // by failing back any blinding-related payloads as if they were
415
        // invalid.
416
        DisallowRouteBlinding bool
417

418
        // DisallowQuiescence is a flag that indicates whether the Brontide
419
        // should have the quiescence feature disabled.
420
        DisallowQuiescence bool
421

422
        // MaxFeeExposure limits the number of outstanding fees in a channel.
423
        // This value will be passed to created links.
424
        MaxFeeExposure lnwire.MilliSatoshi
425

426
        // MsgRouter is an optional instance of the main message router that
427
        // the peer will use. If None, then a new default version will be used
428
        // in place.
429
        MsgRouter fn.Option[msgmux.Router]
430

431
        // AuxChanCloser is an optional instance of an abstraction that can be
432
        // used to modify the way the co-op close transaction is constructed.
433
        AuxChanCloser fn.Option[chancloser.AuxChanCloser]
434

435
        // ShouldFwdExpEndorsement is a closure that indicates whether
436
        // experimental endorsement signals should be set.
437
        ShouldFwdExpEndorsement func() bool
438

439
        // Quit is the server's quit channel. If this is closed, we halt operation.
440
        Quit chan struct{}
441
}
442

443
// Brontide is an active peer on the Lightning Network. This struct is responsible
444
// for managing any channel state related to this peer. To do so, it has
445
// several helper goroutines to handle events such as HTLC timeouts, new
446
// funding workflow, and detecting an uncooperative closure of any active
447
// channels.
448
// TODO(roasbeef): proper reconnection logic.
449
type Brontide struct {
450
        // MUST be used atomically.
451
        started    int32
452
        disconnect int32
453

454
        // MUST be used atomically.
455
        bytesReceived uint64
456
        bytesSent     uint64
457

458
        // isTorConnection is a flag that indicates whether or not we believe
459
        // the remote peer is a tor connection. It is not always possible to
460
        // know this with certainty but we have heuristics we use that should
461
        // catch most cases.
462
        //
463
        // NOTE: We judge the tor-ness of a connection by if the remote peer has
464
        // ".onion" in the address OR if it's connected over localhost.
465
        // This will miss cases where our peer is connected to our clearnet
466
        // address over the tor network (via exit nodes). It will also misjudge
467
        // actual localhost connections as tor. We need to include this because
468
        // inbound connections to our tor address will appear to come from the
469
        // local socks5 proxy. This heuristic is only used to expand the timeout
470
        // window for peers so it is OK to misjudge this. If you use this field
471
        // for any other purpose you should seriously consider whether or not
472
        // this heuristic is good enough for your use case.
473
        isTorConnection bool
474

475
        pingManager *PingManager
476

477
        // lastPingPayload stores an unsafe pointer wrapped as an atomic
478
        // variable which points to the last payload the remote party sent us
479
        // as their ping.
480
        //
481
        // MUST be used atomically.
482
        lastPingPayload atomic.Value
483

484
        cfg Config
485

486
        // activeSignal when closed signals that the peer is now active and
487
        // ready to process messages.
488
        activeSignal chan struct{}
489

490
        // startTime is the time this peer connection was successfully established.
491
        // It will be zero for peers that did not successfully call Start().
492
        startTime time.Time
493

494
        // sendQueue is the channel which is used to queue outgoing messages to be
495
        // written onto the wire. Note that this channel is unbuffered.
496
        sendQueue chan outgoingMsg
497

498
        // outgoingQueue is a buffered channel which allows second/third party
499
        // objects to queue messages to be sent out on the wire.
500
        outgoingQueue chan outgoingMsg
501

502
        // activeChannels is a map which stores the state machines of all
503
        // active channels. Channels are indexed into the map by the txid of
504
        // the funding transaction which opened the channel.
505
        //
506
        // NOTE: On startup, pending channels are stored as nil in this map.
507
        // Confirmed channels have channel data populated in the map. This means
508
        // that accesses to this map should nil-check the LightningChannel to
509
        // see if this is a pending channel or not. The tradeoff here is either
510
        // having two maps everywhere (one for pending, one for confirmed chans)
511
        // or having an extra nil-check per access.
512
        activeChannels *lnutils.SyncMap[
513
                lnwire.ChannelID, *lnwallet.LightningChannel]
514

515
        // addedChannels tracks any new channels opened during this peer's
516
        // lifecycle. We use this to filter out these new channels when the time
517
        // comes to request a reenable for active channels, since they will have
518
        // waited a shorter duration.
519
        addedChannels *lnutils.SyncMap[lnwire.ChannelID, struct{}]
520

521
        // newActiveChannel is used by the fundingManager to send fully opened
522
        // channels to the source peer which handled the funding workflow.
523
        newActiveChannel chan *newChannelMsg
524

525
        // newPendingChannel is used by the fundingManager to send pending open
526
        // channels to the source peer which handled the funding workflow.
527
        newPendingChannel chan *newChannelMsg
528

529
        // removePendingChannel is used by the fundingManager to cancel pending
530
        // open channels to the source peer when the funding flow is failed.
531
        removePendingChannel chan *newChannelMsg
532

533
        // activeMsgStreams is a map from channel id to the channel streams that
534
        // proxy messages to individual, active links.
535
        activeMsgStreams map[lnwire.ChannelID]*msgStream
536

537
        // activeChanCloses is a map that keeps track of all the active
538
        // cooperative channel closures. Any channel closing messages are directed
539
        // to one of these active state machines. Once the channel has been closed,
540
        // the state machine will be deleted from the map.
541
        activeChanCloses map[lnwire.ChannelID]*chancloser.ChanCloser
542

543
        // localCloseChanReqs is a channel in which any local requests to close
544
        // a particular channel are sent over.
545
        localCloseChanReqs chan *htlcswitch.ChanClose
546

547
        // linkFailures receives all reported channel failures from the switch,
548
        // and instructs the channelManager to clean remaining channel state.
549
        linkFailures chan linkFailureReport
550

551
        // chanCloseMsgs is a channel that any message related to channel
552
        // closures are sent over. This includes lnwire.Shutdown message as
553
        // well as lnwire.ClosingSigned messages.
554
        chanCloseMsgs chan *closeMsg
555

556
        // remoteFeatures is the feature vector received from the peer during
557
        // the connection handshake.
558
        remoteFeatures *lnwire.FeatureVector
559

560
        // resentChanSyncMsg is a set that keeps track of which channels we
561
        // have re-sent channel reestablishment messages for. This is done to
562
        // avoid getting into loop where both peers will respond to the other
563
        // peer's chansync message with its own over and over again.
564
        resentChanSyncMsg map[lnwire.ChannelID]struct{}
565

566
        // channelEventClient is the channel event subscription client that's
567
        // used to assist retry enabling the channels. This client is only
568
        // created when the reenableTimeout is no greater than 1 minute. Once
569
        // created, it is canceled once the reenabling has been finished.
570
        //
571
        // NOTE: we choose to create the client conditionally to avoid
572
        // potentially holding lots of un-consumed events.
573
        channelEventClient *subscribe.Client
574

575
        // msgRouter is an instance of the msgmux.Router which is used to send
576
        // off new wire messages for handing.
577
        msgRouter fn.Option[msgmux.Router]
578

579
        // globalMsgRouter is a flag that indicates whether we have a global
580
        // msg router. If so, then we don't worry about stopping the msg router
581
        // when a peer disconnects.
582
        globalMsgRouter bool
583

584
        startReady chan struct{}
585
        quit       chan struct{}
586
        wg         sync.WaitGroup
587

588
        // log is a peer-specific logging instance.
589
        log btclog.Logger
590
}
591

592
// A compile-time check to ensure that Brontide satisfies the lnpeer.Peer interface.
593
var _ lnpeer.Peer = (*Brontide)(nil)
594

595
// NewBrontide creates a new Brontide from a peer.Config struct.
596
func NewBrontide(cfg Config) *Brontide {
26✔
597
        logPrefix := fmt.Sprintf("Peer(%x):", cfg.PubKeyBytes)
26✔
598

26✔
599
        // We have a global message router if one was passed in via the config.
26✔
600
        // In this case, we don't need to attempt to tear it down when the peer
26✔
601
        // is stopped.
26✔
602
        globalMsgRouter := cfg.MsgRouter.IsSome()
26✔
603

26✔
604
        // We'll either use the msg router instance passed in, or create a new
26✔
605
        // blank instance.
26✔
606
        msgRouter := cfg.MsgRouter.Alt(fn.Some[msgmux.Router](
26✔
607
                msgmux.NewMultiMsgRouter(),
26✔
608
        ))
26✔
609

26✔
610
        p := &Brontide{
26✔
611
                cfg:           cfg,
26✔
612
                activeSignal:  make(chan struct{}),
26✔
613
                sendQueue:     make(chan outgoingMsg),
26✔
614
                outgoingQueue: make(chan outgoingMsg),
26✔
615
                addedChannels: &lnutils.SyncMap[lnwire.ChannelID, struct{}]{},
26✔
616
                activeChannels: &lnutils.SyncMap[
26✔
617
                        lnwire.ChannelID, *lnwallet.LightningChannel,
26✔
618
                ]{},
26✔
619
                newActiveChannel:     make(chan *newChannelMsg, 1),
26✔
620
                newPendingChannel:    make(chan *newChannelMsg, 1),
26✔
621
                removePendingChannel: make(chan *newChannelMsg),
26✔
622

26✔
623
                activeMsgStreams:   make(map[lnwire.ChannelID]*msgStream),
26✔
624
                activeChanCloses:   make(map[lnwire.ChannelID]*chancloser.ChanCloser),
26✔
625
                localCloseChanReqs: make(chan *htlcswitch.ChanClose),
26✔
626
                linkFailures:       make(chan linkFailureReport),
26✔
627
                chanCloseMsgs:      make(chan *closeMsg),
26✔
628
                resentChanSyncMsg:  make(map[lnwire.ChannelID]struct{}),
26✔
629
                startReady:         make(chan struct{}),
26✔
630
                quit:               make(chan struct{}),
26✔
631
                log:                peerLog.WithPrefix(logPrefix),
26✔
632
                msgRouter:          msgRouter,
26✔
633
                globalMsgRouter:    globalMsgRouter,
26✔
634
        }
26✔
635

26✔
636
        if cfg.Conn != nil && cfg.Conn.RemoteAddr() != nil {
27✔
637
                remoteAddr := cfg.Conn.RemoteAddr().String()
1✔
638
                p.isTorConnection = strings.Contains(remoteAddr, ".onion") ||
1✔
639
                        strings.Contains(remoteAddr, "127.0.0.1")
1✔
640
        }
1✔
641

642
        var (
26✔
643
                lastBlockHeader           *wire.BlockHeader
26✔
644
                lastSerializedBlockHeader [wire.MaxBlockHeaderPayload]byte
26✔
645
        )
26✔
646
        newPingPayload := func() []byte {
26✔
647
                // We query the BestBlockHeader from our BestBlockView each time
×
648
                // this is called, and update our serialized block header if
×
649
                // they differ.  Over time, we'll use this to disseminate the
×
650
                // latest block header between all our peers, which can later be
×
651
                // used to cross-check our own view of the network to mitigate
×
652
                // various types of eclipse attacks.
×
653
                header, err := p.cfg.BestBlockView.BestBlockHeader()
×
654
                if err != nil && header == lastBlockHeader {
×
655
                        return lastSerializedBlockHeader[:]
×
656
                }
×
657

658
                buf := bytes.NewBuffer(lastSerializedBlockHeader[0:0])
×
659
                err = header.Serialize(buf)
×
660
                if err == nil {
×
661
                        lastBlockHeader = header
×
662
                } else {
×
663
                        p.log.Warn("unable to serialize current block" +
×
664
                                "header for ping payload generation." +
×
665
                                "This should be impossible and means" +
×
666
                                "there is an implementation bug.")
×
667
                }
×
668

669
                return lastSerializedBlockHeader[:]
×
670
        }
671

672
        // TODO(roasbeef): make dynamic in order to create fake cover traffic.
673
        //
674
        // NOTE(proofofkeags): this was changed to be dynamic to allow better
675
        // pong identification, however, more thought is needed to make this
676
        // actually usable as a traffic decoy.
677
        randPongSize := func() uint16 {
26✔
678
                return uint16(
×
679
                        // We don't need cryptographic randomness here.
×
680
                        /* #nosec */
×
681
                        rand.Intn(pongSizeCeiling) + 1,
×
682
                )
×
683
        }
×
684

685
        p.pingManager = NewPingManager(&PingManagerConfig{
26✔
686
                NewPingPayload:   newPingPayload,
26✔
687
                NewPongSize:      randPongSize,
26✔
688
                IntervalDuration: p.scaleTimeout(pingInterval),
26✔
689
                TimeoutDuration:  p.scaleTimeout(pingTimeout),
26✔
690
                SendPing: func(ping *lnwire.Ping) {
26✔
691
                        p.queueMsg(ping, nil)
×
692
                },
×
693
                OnPongFailure: func(err error) {
×
694
                        eStr := "pong response failure for %s: %v " +
×
695
                                "-- disconnecting"
×
696
                        p.log.Warnf(eStr, p, err)
×
697
                        go p.Disconnect(fmt.Errorf(eStr, p, err))
×
698
                },
×
699
        })
700

701
        return p
26✔
702
}
703

704
// Start starts all helper goroutines the peer needs for normal operations.  In
705
// the case this peer has already been started, then this function is a noop.
706
func (p *Brontide) Start() error {
4✔
707
        if atomic.AddInt32(&p.started, 1) != 1 {
4✔
708
                return nil
×
709
        }
×
710

711
        // Once we've finished starting up the peer, we'll signal to other
712
        // goroutines that the they can move forward to tear down the peer, or
713
        // carry out other relevant changes.
714
        defer close(p.startReady)
4✔
715

4✔
716
        p.log.Tracef("starting with conn[%v->%v]",
4✔
717
                p.cfg.Conn.LocalAddr(), p.cfg.Conn.RemoteAddr())
4✔
718

4✔
719
        // Fetch and then load all the active channels we have with this remote
4✔
720
        // peer from the database.
4✔
721
        activeChans, err := p.cfg.ChannelDB.FetchOpenChannels(
4✔
722
                p.cfg.Addr.IdentityKey,
4✔
723
        )
4✔
724
        if err != nil {
4✔
725
                p.log.Errorf("Unable to fetch active chans "+
×
726
                        "for peer: %v", err)
×
727
                return err
×
728
        }
×
729

730
        if len(activeChans) == 0 {
6✔
731
                go p.cfg.PrunePersistentPeerConnection(p.cfg.PubKeyBytes)
2✔
732
        }
2✔
733

734
        // Quickly check if we have any existing legacy channels with this
735
        // peer.
736
        haveLegacyChan := false
4✔
737
        for _, c := range activeChans {
7✔
738
                if c.ChanType.IsTweakless() {
6✔
739
                        continue
3✔
740
                }
741

742
                haveLegacyChan = true
1✔
743
                break
1✔
744
        }
745

746
        // Exchange local and global features, the init message should be very
747
        // first between two nodes.
748
        if err := p.sendInitMsg(haveLegacyChan); err != nil {
4✔
749
                return fmt.Errorf("unable to send init msg: %w", err)
×
750
        }
×
751

752
        // Before we launch any of the helper goroutines off the peer struct,
753
        // we'll first ensure proper adherence to the p2p protocol. The init
754
        // message MUST be sent before any other message.
755
        readErr := make(chan error, 1)
4✔
756
        msgChan := make(chan lnwire.Message, 1)
4✔
757
        p.wg.Add(1)
4✔
758
        go func() {
8✔
759
                defer p.wg.Done()
4✔
760

4✔
761
                msg, err := p.readNextMessage()
4✔
762
                if err != nil {
5✔
763
                        readErr <- err
1✔
764
                        msgChan <- nil
1✔
765
                        return
1✔
766
                }
1✔
767
                readErr <- nil
4✔
768
                msgChan <- msg
4✔
769
        }()
770

771
        select {
4✔
772
        // In order to avoid blocking indefinitely, we'll give the other peer
773
        // an upper timeout to respond before we bail out early.
774
        case <-time.After(handshakeTimeout):
×
775
                return fmt.Errorf("peer did not complete handshake within %v",
×
776
                        handshakeTimeout)
×
777
        case err := <-readErr:
4✔
778
                if err != nil {
5✔
779
                        return fmt.Errorf("unable to read init msg: %w", err)
1✔
780
                }
1✔
781
        }
782

783
        // Once the init message arrives, we can parse it so we can figure out
784
        // the negotiation of features for this session.
785
        msg := <-msgChan
4✔
786
        if msg, ok := msg.(*lnwire.Init); ok {
8✔
787
                if err := p.handleInitMsg(msg); err != nil {
4✔
788
                        p.storeError(err)
×
789
                        return err
×
790
                }
×
791
        } else {
×
792
                return errors.New("very first message between nodes " +
×
793
                        "must be init message")
×
794
        }
×
795

796
        // Next, load all the active channels we have with this peer,
797
        // registering them with the switch and launching the necessary
798
        // goroutines required to operate them.
799
        p.log.Debugf("Loaded %v active channels from database",
4✔
800
                len(activeChans))
4✔
801

4✔
802
        // Conditionally subscribe to channel events before loading channels so
4✔
803
        // we won't miss events. This subscription is used to listen to active
4✔
804
        // channel event when reenabling channels. Once the reenabling process
4✔
805
        // is finished, this subscription will be canceled.
4✔
806
        //
4✔
807
        // NOTE: ChannelNotifier must be started before subscribing events
4✔
808
        // otherwise we'd panic here.
4✔
809
        if err := p.attachChannelEventSubscription(); err != nil {
4✔
810
                return err
×
811
        }
×
812

813
        // Register the message router now as we may need to register some
814
        // endpoints while loading the channels below.
815
        p.msgRouter.WhenSome(func(router msgmux.Router) {
8✔
816
                router.Start()
4✔
817
        })
4✔
818

819
        msgs, err := p.loadActiveChannels(activeChans)
4✔
820
        if err != nil {
4✔
821
                return fmt.Errorf("unable to load channels: %w", err)
×
822
        }
×
823

824
        p.startTime = time.Now()
4✔
825

4✔
826
        // Before launching the writeHandler goroutine, we send any channel
4✔
827
        // sync messages that must be resent for borked channels. We do this to
4✔
828
        // avoid data races with WriteMessage & Flush calls.
4✔
829
        if len(msgs) > 0 {
7✔
830
                p.log.Infof("Sending %d channel sync messages to peer after "+
3✔
831
                        "loading active channels", len(msgs))
3✔
832

3✔
833
                // Send the messages directly via writeMessage and bypass the
3✔
834
                // writeHandler goroutine.
3✔
835
                for _, msg := range msgs {
6✔
836
                        if err := p.writeMessage(msg); err != nil {
3✔
837
                                return fmt.Errorf("unable to send "+
×
838
                                        "reestablish msg: %v", err)
×
839
                        }
×
840
                }
841
        }
842

843
        err = p.pingManager.Start()
4✔
844
        if err != nil {
4✔
845
                return fmt.Errorf("could not start ping manager %w", err)
×
846
        }
×
847

848
        p.wg.Add(4)
4✔
849
        go p.queueHandler()
4✔
850
        go p.writeHandler()
4✔
851
        go p.channelManager()
4✔
852
        go p.readHandler()
4✔
853

4✔
854
        // Signal to any external processes that the peer is now active.
4✔
855
        close(p.activeSignal)
4✔
856

4✔
857
        // Node announcements don't propagate very well throughout the network
4✔
858
        // as there isn't a way to efficiently query for them through their
4✔
859
        // timestamp, mostly affecting nodes that were offline during the time
4✔
860
        // of broadcast. We'll resend our node announcement to the remote peer
4✔
861
        // as a best-effort delivery such that it can also propagate to their
4✔
862
        // peers. To ensure they can successfully process it in most cases,
4✔
863
        // we'll only resend it as long as we have at least one confirmed
4✔
864
        // advertised channel with the remote peer.
4✔
865
        //
4✔
866
        // TODO(wilmer): Remove this once we're able to query for node
4✔
867
        // announcements through their timestamps.
4✔
868
        p.wg.Add(2)
4✔
869
        go p.maybeSendNodeAnn(activeChans)
4✔
870
        go p.maybeSendChannelUpdates()
4✔
871

4✔
872
        return nil
4✔
873
}
874

875
// initGossipSync initializes either a gossip syncer or an initial routing
876
// dump, depending on the negotiated synchronization method.
877
func (p *Brontide) initGossipSync() {
4✔
878
        // If the remote peer knows of the new gossip queries feature, then
4✔
879
        // we'll create a new gossipSyncer in the AuthenticatedGossiper for it.
4✔
880
        if p.remoteFeatures.HasFeature(lnwire.GossipQueriesOptional) {
8✔
881
                p.log.Info("Negotiated chan series queries")
4✔
882

4✔
883
                if p.cfg.AuthGossiper == nil {
7✔
884
                        // This should only ever be hit in the unit tests.
3✔
885
                        p.log.Warn("No AuthGossiper configured. Abandoning " +
3✔
886
                                "gossip sync.")
3✔
887
                        return
3✔
888
                }
3✔
889

890
                // Register the peer's gossip syncer with the gossiper.
891
                // This blocks synchronously to ensure the gossip syncer is
892
                // registered with the gossiper before attempting to read
893
                // messages from the remote peer.
894
                //
895
                // TODO(wilmer): Only sync updates from non-channel peers. This
896
                // requires an improved version of the current network
897
                // bootstrapper to ensure we can find and connect to non-channel
898
                // peers.
899
                p.cfg.AuthGossiper.InitSyncState(p)
1✔
900
        }
901
}
902

903
// taprootShutdownAllowed returns true if both parties have negotiated the
904
// shutdown-any-segwit feature.
905
func (p *Brontide) taprootShutdownAllowed() bool {
7✔
906
        return p.RemoteFeatures().HasFeature(lnwire.ShutdownAnySegwitOptional) &&
7✔
907
                p.LocalFeatures().HasFeature(lnwire.ShutdownAnySegwitOptional)
7✔
908
}
7✔
909

910
// QuitSignal is a method that should return a channel which will be sent upon
911
// or closed once the backing peer exits. This allows callers using the
912
// interface to cancel any processing in the event the backing implementation
913
// exits.
914
//
915
// NOTE: Part of the lnpeer.Peer interface.
916
func (p *Brontide) QuitSignal() <-chan struct{} {
1✔
917
        return p.quit
1✔
918
}
1✔
919

920
// addrWithInternalKey takes a delivery script, then attempts to supplement it
921
// with information related to the internal key for the addr, but only if it's
922
// a taproot addr.
923
func (p *Brontide) addrWithInternalKey(
924
        deliveryScript []byte) (*chancloser.DeliveryAddrWithKey, error) {
10✔
925

10✔
926
        // Currently, custom channels cannot be created with external upfront
10✔
927
        // shutdown addresses, so this shouldn't be an issue. We only require
10✔
928
        // the internal key for taproot addresses to be able to provide a non
10✔
929
        // inclusion proof of any scripts.
10✔
930
        internalKeyDesc, err := lnwallet.InternalKeyForAddr(
10✔
931
                p.cfg.Wallet, &p.cfg.Wallet.Cfg.NetParams, deliveryScript,
10✔
932
        )
10✔
933
        if err != nil {
10✔
934
                return nil, fmt.Errorf("unable to fetch internal key: %w", err)
×
935
        }
×
936

937
        return &chancloser.DeliveryAddrWithKey{
10✔
938
                DeliveryAddress: deliveryScript,
10✔
939
                InternalKey: fn.MapOption(
10✔
940
                        func(desc keychain.KeyDescriptor) btcec.PublicKey {
11✔
941
                                return *desc.PubKey
1✔
942
                        },
1✔
943
                )(internalKeyDesc),
944
        }, nil
945
}
946

947
// loadActiveChannels creates indexes within the peer for tracking all active
948
// channels returned by the database. It returns a slice of channel reestablish
949
// messages that should be sent to the peer immediately, in case we have borked
950
// channels that haven't been closed yet.
951
func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
952
        []lnwire.Message, error) {
4✔
953

4✔
954
        // Return a slice of messages to send to the peers in case the channel
4✔
955
        // cannot be loaded normally.
4✔
956
        var msgs []lnwire.Message
4✔
957

4✔
958
        scidAliasNegotiated := p.hasNegotiatedScidAlias()
4✔
959

4✔
960
        for _, dbChan := range chans {
7✔
961
                hasScidFeature := dbChan.ChanType.HasScidAliasFeature()
3✔
962
                if scidAliasNegotiated && !hasScidFeature {
4✔
963
                        // We'll request and store an alias, making sure that a
1✔
964
                        // gossiper mapping is not created for the alias to the
1✔
965
                        // real SCID. This is done because the peer and funding
1✔
966
                        // manager are not aware of each other's states and if
1✔
967
                        // we did not do this, we would accept alias channel
1✔
968
                        // updates after 6 confirmations, which would be buggy.
1✔
969
                        // We'll queue a channel_ready message with the new
1✔
970
                        // alias. This should technically be done *after* the
1✔
971
                        // reestablish, but this behavior is pre-existing since
1✔
972
                        // the funding manager may already queue a
1✔
973
                        // channel_ready before the channel_reestablish.
1✔
974
                        if !dbChan.IsPending {
2✔
975
                                aliasScid, err := p.cfg.RequestAlias()
1✔
976
                                if err != nil {
1✔
977
                                        return nil, err
×
978
                                }
×
979

980
                                err = p.cfg.AddLocalAlias(
1✔
981
                                        aliasScid, dbChan.ShortChanID(), false,
1✔
982
                                        false,
1✔
983
                                )
1✔
984
                                if err != nil {
1✔
985
                                        return nil, err
×
986
                                }
×
987

988
                                chanID := lnwire.NewChanIDFromOutPoint(
1✔
989
                                        dbChan.FundingOutpoint,
1✔
990
                                )
1✔
991

1✔
992
                                // Fetch the second commitment point to send in
1✔
993
                                // the channel_ready message.
1✔
994
                                second, err := dbChan.SecondCommitmentPoint()
1✔
995
                                if err != nil {
1✔
996
                                        return nil, err
×
997
                                }
×
998

999
                                channelReadyMsg := lnwire.NewChannelReady(
1✔
1000
                                        chanID, second,
1✔
1001
                                )
1✔
1002
                                channelReadyMsg.AliasScid = &aliasScid
1✔
1003

1✔
1004
                                msgs = append(msgs, channelReadyMsg)
1✔
1005
                        }
1006

1007
                        // If we've negotiated the option-scid-alias feature
1008
                        // and this channel does not have ScidAliasFeature set
1009
                        // to true due to an upgrade where the feature bit was
1010
                        // turned on, we'll update the channel's database
1011
                        // state.
1012
                        err := dbChan.MarkScidAliasNegotiated()
1✔
1013
                        if err != nil {
1✔
1014
                                return nil, err
×
1015
                        }
×
1016
                }
1017

1018
                var chanOpts []lnwallet.ChannelOpt
3✔
1019
                p.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
3✔
1020
                        chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
1021
                })
×
1022
                p.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
3✔
1023
                        chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
1024
                })
×
1025
                p.cfg.AuxResolver.WhenSome(
3✔
1026
                        func(s lnwallet.AuxContractResolver) {
3✔
1027
                                chanOpts = append(
×
1028
                                        chanOpts, lnwallet.WithAuxResolver(s),
×
1029
                                )
×
1030
                        },
×
1031
                )
1032

1033
                lnChan, err := lnwallet.NewLightningChannel(
3✔
1034
                        p.cfg.Signer, dbChan, p.cfg.SigPool, chanOpts...,
3✔
1035
                )
3✔
1036
                if err != nil {
3✔
1037
                        return nil, fmt.Errorf("unable to create channel "+
×
1038
                                "state machine: %w", err)
×
1039
                }
×
1040

1041
                chanPoint := dbChan.FundingOutpoint
3✔
1042

3✔
1043
                chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
3✔
1044

3✔
1045
                p.log.Infof("Loading ChannelPoint(%v), isPending=%v",
3✔
1046
                        chanPoint, lnChan.IsPending())
3✔
1047

3✔
1048
                // Skip adding any permanently irreconcilable channels to the
3✔
1049
                // htlcswitch.
3✔
1050
                if !dbChan.HasChanStatus(channeldb.ChanStatusDefault) &&
3✔
1051
                        !dbChan.HasChanStatus(channeldb.ChanStatusRestored) {
6✔
1052

3✔
1053
                        p.log.Warnf("ChannelPoint(%v) has status %v, won't "+
3✔
1054
                                "start.", chanPoint, dbChan.ChanStatus())
3✔
1055

3✔
1056
                        // To help our peer recover from a potential data loss,
3✔
1057
                        // we resend our channel reestablish message if the
3✔
1058
                        // channel is in a borked state. We won't process any
3✔
1059
                        // channel reestablish message sent from the peer, but
3✔
1060
                        // that's okay since the assumption is that we did when
3✔
1061
                        // marking the channel borked.
3✔
1062
                        chanSync, err := dbChan.ChanSyncMsg()
3✔
1063
                        if err != nil {
3✔
1064
                                p.log.Errorf("Unable to create channel "+
×
1065
                                        "reestablish message for channel %v: "+
×
1066
                                        "%v", chanPoint, err)
×
1067
                                continue
×
1068
                        }
1069

1070
                        msgs = append(msgs, chanSync)
3✔
1071

3✔
1072
                        // Check if this channel needs to have the cooperative
3✔
1073
                        // close process restarted. If so, we'll need to send
3✔
1074
                        // the Shutdown message that is returned.
3✔
1075
                        if dbChan.HasChanStatus(
3✔
1076
                                channeldb.ChanStatusCoopBroadcasted,
3✔
1077
                        ) {
3✔
1078

×
1079
                                shutdownMsg, err := p.restartCoopClose(lnChan)
×
1080
                                if err != nil {
×
1081
                                        p.log.Errorf("Unable to restart "+
×
1082
                                                "coop close for channel: %v",
×
1083
                                                err)
×
1084
                                        continue
×
1085
                                }
1086

1087
                                if shutdownMsg == nil {
×
1088
                                        continue
×
1089
                                }
1090

1091
                                // Append the message to the set of messages to
1092
                                // send.
1093
                                msgs = append(msgs, shutdownMsg)
×
1094
                        }
1095

1096
                        continue
3✔
1097
                }
1098

1099
                // Before we register this new link with the HTLC Switch, we'll
1100
                // need to fetch its current link-layer forwarding policy from
1101
                // the database.
1102
                graph := p.cfg.ChannelGraph
1✔
1103
                info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(
1✔
1104
                        &chanPoint,
1✔
1105
                )
1✔
1106
                if err != nil && !errors.Is(err, graphdb.ErrEdgeNotFound) {
1✔
1107
                        return nil, err
×
1108
                }
×
1109

1110
                // We'll filter out our policy from the directional channel
1111
                // edges based whom the edge connects to. If it doesn't connect
1112
                // to us, then we know that we were the one that advertised the
1113
                // policy.
1114
                //
1115
                // TODO(roasbeef): can add helper method to get policy for
1116
                // particular channel.
1117
                var selfPolicy *models.ChannelEdgePolicy
1✔
1118
                if info != nil && bytes.Equal(info.NodeKey1Bytes[:],
1✔
1119
                        p.cfg.ServerPubKey[:]) {
2✔
1120

1✔
1121
                        selfPolicy = p1
1✔
1122
                } else {
2✔
1123
                        selfPolicy = p2
1✔
1124
                }
1✔
1125

1126
                // If we don't yet have an advertised routing policy, then
1127
                // we'll use the current default, otherwise we'll translate the
1128
                // routing policy into a forwarding policy.
1129
                var forwardingPolicy *models.ForwardingPolicy
1✔
1130
                if selfPolicy != nil {
2✔
1131
                        var inboundWireFee lnwire.Fee
1✔
1132
                        _, err := selfPolicy.ExtraOpaqueData.ExtractRecords(
1✔
1133
                                &inboundWireFee,
1✔
1134
                        )
1✔
1135
                        if err != nil {
1✔
1136
                                return nil, err
×
1137
                        }
×
1138

1139
                        inboundFee := models.NewInboundFeeFromWire(
1✔
1140
                                inboundWireFee,
1✔
1141
                        )
1✔
1142

1✔
1143
                        forwardingPolicy = &models.ForwardingPolicy{
1✔
1144
                                MinHTLCOut:    selfPolicy.MinHTLC,
1✔
1145
                                MaxHTLC:       selfPolicy.MaxHTLC,
1✔
1146
                                BaseFee:       selfPolicy.FeeBaseMSat,
1✔
1147
                                FeeRate:       selfPolicy.FeeProportionalMillionths,
1✔
1148
                                TimeLockDelta: uint32(selfPolicy.TimeLockDelta),
1✔
1149

1✔
1150
                                InboundFee: inboundFee,
1✔
1151
                        }
1✔
1152
                } else {
1✔
1153
                        p.log.Warnf("Unable to find our forwarding policy "+
1✔
1154
                                "for channel %v, using default values",
1✔
1155
                                chanPoint)
1✔
1156
                        forwardingPolicy = &p.cfg.RoutingPolicy
1✔
1157
                }
1✔
1158

1159
                p.log.Tracef("Using link policy of: %v",
1✔
1160
                        spew.Sdump(forwardingPolicy))
1✔
1161

1✔
1162
                // If the channel is pending, set the value to nil in the
1✔
1163
                // activeChannels map. This is done to signify that the channel
1✔
1164
                // is pending. We don't add the link to the switch here - it's
1✔
1165
                // the funding manager's responsibility to spin up pending
1✔
1166
                // channels. Adding them here would just be extra work as we'll
1✔
1167
                // tear them down when creating + adding the final link.
1✔
1168
                if lnChan.IsPending() {
2✔
1169
                        p.activeChannels.Store(chanID, nil)
1✔
1170

1✔
1171
                        continue
1✔
1172
                }
1173

1174
                shutdownInfo, err := lnChan.State().ShutdownInfo()
1✔
1175
                if err != nil && !errors.Is(err, channeldb.ErrNoShutdownInfo) {
1✔
1176
                        return nil, err
×
1177
                }
×
1178

1179
                var (
1✔
1180
                        shutdownMsg     fn.Option[lnwire.Shutdown]
1✔
1181
                        shutdownInfoErr error
1✔
1182
                )
1✔
1183
                shutdownInfo.WhenSome(func(info channeldb.ShutdownInfo) {
2✔
1184
                        // Compute an ideal fee.
1✔
1185
                        feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
1✔
1186
                                p.cfg.CoopCloseTargetConfs,
1✔
1187
                        )
1✔
1188
                        if err != nil {
1✔
1189
                                shutdownInfoErr = fmt.Errorf("unable to "+
×
1190
                                        "estimate fee: %w", err)
×
1191

×
1192
                                return
×
1193
                        }
×
1194

1195
                        addr, err := p.addrWithInternalKey(
1✔
1196
                                info.DeliveryScript.Val,
1✔
1197
                        )
1✔
1198
                        if err != nil {
1✔
1199
                                shutdownInfoErr = fmt.Errorf("unable to make "+
×
1200
                                        "delivery addr: %w", err)
×
1201
                                return
×
1202
                        }
×
1203
                        chanCloser, err := p.createChanCloser(
1✔
1204
                                lnChan, addr, feePerKw, nil, info.Closer(),
1✔
1205
                        )
1✔
1206
                        if err != nil {
1✔
1207
                                shutdownInfoErr = fmt.Errorf("unable to "+
×
1208
                                        "create chan closer: %w", err)
×
1209

×
1210
                                return
×
1211
                        }
×
1212

1213
                        chanID := lnwire.NewChanIDFromOutPoint(
1✔
1214
                                lnChan.State().FundingOutpoint,
1✔
1215
                        )
1✔
1216

1✔
1217
                        p.activeChanCloses[chanID] = chanCloser
1✔
1218

1✔
1219
                        // Create the Shutdown message.
1✔
1220
                        shutdown, err := chanCloser.ShutdownChan()
1✔
1221
                        if err != nil {
1✔
1222
                                delete(p.activeChanCloses, chanID)
×
1223
                                shutdownInfoErr = err
×
1224

×
1225
                                return
×
1226
                        }
×
1227

1228
                        shutdownMsg = fn.Some(*shutdown)
1✔
1229
                })
1230
                if shutdownInfoErr != nil {
1✔
1231
                        return nil, shutdownInfoErr
×
1232
                }
×
1233

1234
                // Subscribe to the set of on-chain events for this channel.
1235
                chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(
1✔
1236
                        chanPoint,
1✔
1237
                )
1✔
1238
                if err != nil {
1✔
1239
                        return nil, err
×
1240
                }
×
1241

1242
                err = p.addLink(
1✔
1243
                        &chanPoint, lnChan, forwardingPolicy, chainEvents,
1✔
1244
                        true, shutdownMsg,
1✔
1245
                )
1✔
1246
                if err != nil {
1✔
1247
                        return nil, fmt.Errorf("unable to add link %v to "+
×
1248
                                "switch: %v", chanPoint, err)
×
1249
                }
×
1250

1251
                p.activeChannels.Store(chanID, lnChan)
1✔
1252
        }
1253

1254
        return msgs, nil
4✔
1255
}
1256

1257
// addLink creates and adds a new ChannelLink from the specified channel.
1258
func (p *Brontide) addLink(chanPoint *wire.OutPoint,
1259
        lnChan *lnwallet.LightningChannel,
1260
        forwardingPolicy *models.ForwardingPolicy,
1261
        chainEvents *contractcourt.ChainEventSubscription,
1262
        syncStates bool, shutdownMsg fn.Option[lnwire.Shutdown]) error {
1✔
1263

1✔
1264
        // onChannelFailure will be called by the link in case the channel
1✔
1265
        // fails for some reason.
1✔
1266
        onChannelFailure := func(chanID lnwire.ChannelID,
1✔
1267
                shortChanID lnwire.ShortChannelID,
1✔
1268
                linkErr htlcswitch.LinkFailureError) {
2✔
1269

1✔
1270
                failure := linkFailureReport{
1✔
1271
                        chanPoint:   *chanPoint,
1✔
1272
                        chanID:      chanID,
1✔
1273
                        shortChanID: shortChanID,
1✔
1274
                        linkErr:     linkErr,
1✔
1275
                }
1✔
1276

1✔
1277
                select {
1✔
1278
                case p.linkFailures <- failure:
1✔
1279
                case <-p.quit:
×
1280
                case <-p.cfg.Quit:
×
1281
                }
1282
        }
1283

1284
        updateContractSignals := func(signals *contractcourt.ContractSignals) error {
2✔
1285
                return p.cfg.ChainArb.UpdateContractSignals(*chanPoint, signals)
1✔
1286
        }
1✔
1287

1288
        notifyContractUpdate := func(update *contractcourt.ContractUpdate) error {
2✔
1289
                return p.cfg.ChainArb.NotifyContractUpdate(*chanPoint, update)
1✔
1290
        }
1✔
1291

1292
        //nolint:ll
1293
        linkCfg := htlcswitch.ChannelLinkConfig{
1✔
1294
                Peer:                   p,
1✔
1295
                DecodeHopIterators:     p.cfg.Sphinx.DecodeHopIterators,
1✔
1296
                ExtractErrorEncrypter:  p.cfg.Sphinx.ExtractErrorEncrypter,
1✔
1297
                FetchLastChannelUpdate: p.cfg.FetchLastChanUpdate,
1✔
1298
                HodlMask:               p.cfg.Hodl.Mask(),
1✔
1299
                Registry:               p.cfg.Invoices,
1✔
1300
                BestHeight:             p.cfg.Switch.BestHeight,
1✔
1301
                Circuits:               p.cfg.Switch.CircuitModifier(),
1✔
1302
                ForwardPackets:         p.cfg.InterceptSwitch.ForwardPackets,
1✔
1303
                FwrdingPolicy:          *forwardingPolicy,
1✔
1304
                FeeEstimator:           p.cfg.FeeEstimator,
1✔
1305
                PreimageCache:          p.cfg.WitnessBeacon,
1✔
1306
                ChainEvents:            chainEvents,
1✔
1307
                UpdateContractSignals:  updateContractSignals,
1✔
1308
                NotifyContractUpdate:   notifyContractUpdate,
1✔
1309
                OnChannelFailure:       onChannelFailure,
1✔
1310
                SyncStates:             syncStates,
1✔
1311
                BatchTicker:            ticker.New(p.cfg.ChannelCommitInterval),
1✔
1312
                FwdPkgGCTicker:         ticker.New(time.Hour),
1✔
1313
                PendingCommitTicker: ticker.New(
1✔
1314
                        p.cfg.PendingCommitInterval,
1✔
1315
                ),
1✔
1316
                BatchSize:               p.cfg.ChannelCommitBatchSize,
1✔
1317
                UnsafeReplay:            p.cfg.UnsafeReplay,
1✔
1318
                MinUpdateTimeout:        htlcswitch.DefaultMinLinkFeeUpdateTimeout,
1✔
1319
                MaxUpdateTimeout:        htlcswitch.DefaultMaxLinkFeeUpdateTimeout,
1✔
1320
                OutgoingCltvRejectDelta: p.cfg.OutgoingCltvRejectDelta,
1✔
1321
                TowerClient:             p.cfg.TowerClient,
1✔
1322
                MaxOutgoingCltvExpiry:   p.cfg.MaxOutgoingCltvExpiry,
1✔
1323
                MaxFeeAllocation:        p.cfg.MaxChannelFeeAllocation,
1✔
1324
                MaxAnchorsCommitFeeRate: p.cfg.MaxAnchorsCommitFeeRate,
1✔
1325
                NotifyActiveLink:        p.cfg.ChannelNotifier.NotifyActiveLinkEvent,
1✔
1326
                NotifyActiveChannel:     p.cfg.ChannelNotifier.NotifyActiveChannelEvent,
1✔
1327
                NotifyInactiveChannel:   p.cfg.ChannelNotifier.NotifyInactiveChannelEvent,
1✔
1328
                NotifyInactiveLinkEvent: p.cfg.ChannelNotifier.NotifyInactiveLinkEvent,
1✔
1329
                HtlcNotifier:            p.cfg.HtlcNotifier,
1✔
1330
                GetAliases:              p.cfg.GetAliases,
1✔
1331
                PreviouslySentShutdown:  shutdownMsg,
1✔
1332
                DisallowRouteBlinding:   p.cfg.DisallowRouteBlinding,
1✔
1333
                MaxFeeExposure:          p.cfg.MaxFeeExposure,
1✔
1334
                ShouldFwdExpEndorsement: p.cfg.ShouldFwdExpEndorsement,
1✔
1335
                DisallowQuiescence: p.cfg.DisallowQuiescence ||
1✔
1336
                        !p.remoteFeatures.HasFeature(lnwire.QuiescenceOptional),
1✔
1337
                AuxTrafficShaper: p.cfg.AuxTrafficShaper,
1✔
1338
        }
1✔
1339

1✔
1340
        // Before adding our new link, purge the switch of any pending or live
1✔
1341
        // links going by the same channel id. If one is found, we'll shut it
1✔
1342
        // down to ensure that the mailboxes are only ever under the control of
1✔
1343
        // one link.
1✔
1344
        chanID := lnwire.NewChanIDFromOutPoint(*chanPoint)
1✔
1345
        p.cfg.Switch.RemoveLink(chanID)
1✔
1346

1✔
1347
        // With the channel link created, we'll now notify the htlc switch so
1✔
1348
        // this channel can be used to dispatch local payments and also
1✔
1349
        // passively forward payments.
1✔
1350
        return p.cfg.Switch.CreateAndAddLink(linkCfg, lnChan)
1✔
1351
}
1352

1353
// maybeSendNodeAnn sends our node announcement to the remote peer if at least
1354
// one confirmed public channel exists with them.
1355
func (p *Brontide) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
4✔
1356
        defer p.wg.Done()
4✔
1357

4✔
1358
        hasConfirmedPublicChan := false
4✔
1359
        for _, channel := range channels {
7✔
1360
                if channel.IsPending {
4✔
1361
                        continue
1✔
1362
                }
1363
                if channel.ChannelFlags&lnwire.FFAnnounceChannel == 0 {
6✔
1364
                        continue
3✔
1365
                }
1366

1367
                hasConfirmedPublicChan = true
1✔
1368
                break
1✔
1369
        }
1370
        if !hasConfirmedPublicChan {
8✔
1371
                return
4✔
1372
        }
4✔
1373

1374
        ourNodeAnn, err := p.cfg.GenNodeAnnouncement()
1✔
1375
        if err != nil {
1✔
1376
                p.log.Debugf("Unable to retrieve node announcement: %v", err)
×
1377
                return
×
1378
        }
×
1379

1380
        if err := p.SendMessageLazy(false, &ourNodeAnn); err != nil {
1✔
1381
                p.log.Debugf("Unable to resend node announcement: %v", err)
×
1382
        }
×
1383
}
1384

1385
// maybeSendChannelUpdates sends our channel updates to the remote peer if we
1386
// have any active channels with them.
1387
func (p *Brontide) maybeSendChannelUpdates() {
4✔
1388
        defer p.wg.Done()
4✔
1389

4✔
1390
        // If we don't have any active channels, then we can exit early.
4✔
1391
        if p.activeChannels.Len() == 0 {
6✔
1392
                return
2✔
1393
        }
2✔
1394

1395
        maybeSendUpd := func(cid lnwire.ChannelID,
3✔
1396
                lnChan *lnwallet.LightningChannel) error {
6✔
1397

3✔
1398
                // Nil channels are pending, so we'll skip them.
3✔
1399
                if lnChan == nil {
4✔
1400
                        return nil
1✔
1401
                }
1✔
1402

1403
                dbChan := lnChan.State()
3✔
1404
                scid := func() lnwire.ShortChannelID {
6✔
1405
                        switch {
3✔
1406
                        // Otherwise if it's a zero conf channel and confirmed,
1407
                        // then we need to use the "real" scid.
1408
                        case dbChan.IsZeroConf() && dbChan.ZeroConfConfirmed():
1✔
1409
                                return dbChan.ZeroConfRealScid()
1✔
1410

1411
                        // Otherwise, we can use the normal scid.
1412
                        default:
3✔
1413
                                return dbChan.ShortChanID()
3✔
1414
                        }
1415
                }()
1416

1417
                // Now that we know the channel is in a good state, we'll try
1418
                // to fetch the update to send to the remote peer. If the
1419
                // channel is pending, and not a zero conf channel, we'll get
1420
                // an error here which we'll ignore.
1421
                chanUpd, err := p.cfg.FetchLastChanUpdate(scid)
3✔
1422
                if err != nil {
4✔
1423
                        p.log.Debugf("Unable to fetch channel update for "+
1✔
1424
                                "ChannelPoint(%v), scid=%v: %v",
1✔
1425
                                dbChan.FundingOutpoint, dbChan.ShortChanID, err)
1✔
1426

1✔
1427
                        return nil
1✔
1428
                }
1✔
1429

1430
                p.log.Debugf("Sending channel update for ChannelPoint(%v), "+
3✔
1431
                        "scid=%v", dbChan.FundingOutpoint, dbChan.ShortChanID)
3✔
1432

3✔
1433
                // We'll send it as a normal message instead of using the lazy
3✔
1434
                // queue to prioritize transmission of the fresh update.
3✔
1435
                if err := p.SendMessage(false, chanUpd); err != nil {
3✔
1436
                        err := fmt.Errorf("unable to send channel update for "+
×
1437
                                "ChannelPoint(%v), scid=%v: %w",
×
1438
                                dbChan.FundingOutpoint, dbChan.ShortChanID(),
×
1439
                                err)
×
1440
                        p.log.Errorf(err.Error())
×
1441

×
1442
                        return err
×
1443
                }
×
1444

1445
                return nil
3✔
1446
        }
1447

1448
        p.activeChannels.ForEach(maybeSendUpd)
3✔
1449
}
1450

1451
// WaitForDisconnect waits until the peer has disconnected. A peer may be
1452
// disconnected if the local or remote side terminates the connection, or an
1453
// irrecoverable protocol error has been encountered. This method will only
1454
// begin watching the peer's waitgroup after the ready channel or the peer's
1455
// quit channel are signaled. The ready channel should only be signaled if a
1456
// call to Start returns no error. Otherwise, if the peer fails to start,
1457
// calling Disconnect will signal the quit channel and the method will not
1458
// block, since no goroutines were spawned.
1459
func (p *Brontide) WaitForDisconnect(ready chan struct{}) {
1✔
1460
        // Before we try to call the `Wait` goroutine, we'll make sure the main
1✔
1461
        // set of goroutines are already active.
1✔
1462
        select {
1✔
1463
        case <-p.startReady:
1✔
1464
        case <-p.quit:
×
1465
                return
×
1466
        }
1467

1468
        select {
1✔
1469
        case <-ready:
1✔
1470
        case <-p.quit:
1✔
1471
        }
1472

1473
        p.wg.Wait()
1✔
1474
}
1475

1476
// Disconnect terminates the connection with the remote peer. Additionally, a
1477
// signal is sent to the server and htlcSwitch indicating the resources
1478
// allocated to the peer can now be cleaned up.
1479
func (p *Brontide) Disconnect(reason error) {
2✔
1480
        if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) {
3✔
1481
                return
1✔
1482
        }
1✔
1483

1484
        // Make sure initialization has completed before we try to tear things
1485
        // down.
1486
        //
1487
        // NOTE: We only read the `startReady` chan if the peer has been
1488
        // started, otherwise we will skip reading it as this chan won't be
1489
        // closed, hence blocks forever.
1490
        if atomic.LoadInt32(&p.started) == 1 {
3✔
1491
                p.log.Debugf("Started, waiting on startReady signal")
1✔
1492

1✔
1493
                select {
1✔
1494
                case <-p.startReady:
1✔
1495
                case <-p.quit:
×
1496
                        return
×
1497
                }
1498
        }
1499

1500
        err := fmt.Errorf("disconnecting %s, reason: %v", p, reason)
2✔
1501
        p.storeError(err)
2✔
1502

2✔
1503
        p.log.Infof(err.Error())
2✔
1504

2✔
1505
        // Stop PingManager before closing TCP connection.
2✔
1506
        p.pingManager.Stop()
2✔
1507

2✔
1508
        // Ensure that the TCP connection is properly closed before continuing.
2✔
1509
        p.cfg.Conn.Close()
2✔
1510

2✔
1511
        close(p.quit)
2✔
1512

2✔
1513
        // If our msg router isn't global (local to this instance), then we'll
2✔
1514
        // stop it. Otherwise, we'll leave it running.
2✔
1515
        if !p.globalMsgRouter {
4✔
1516
                p.msgRouter.WhenSome(func(router msgmux.Router) {
4✔
1517
                        router.Stop()
2✔
1518
                })
2✔
1519
        }
1520
}
1521

1522
// String returns the string representation of this peer.
1523
func (p *Brontide) String() string {
2✔
1524
        return fmt.Sprintf("%x@%s", p.cfg.PubKeyBytes, p.cfg.Conn.RemoteAddr())
2✔
1525
}
2✔
1526

1527
// readNextMessage reads, and returns the next message on the wire along with
1528
// any additional raw payload.
1529
func (p *Brontide) readNextMessage() (lnwire.Message, error) {
8✔
1530
        noiseConn := p.cfg.Conn
8✔
1531
        err := noiseConn.SetReadDeadline(time.Time{})
8✔
1532
        if err != nil {
8✔
1533
                return nil, err
×
1534
        }
×
1535

1536
        pktLen, err := noiseConn.ReadNextHeader()
8✔
1537
        if err != nil {
9✔
1538
                return nil, fmt.Errorf("read next header: %w", err)
1✔
1539
        }
1✔
1540

1541
        // First we'll read the next _full_ message. We do this rather than
1542
        // reading incrementally from the stream as the Lightning wire protocol
1543
        // is message oriented and allows nodes to pad on additional data to
1544
        // the message stream.
1545
        var (
5✔
1546
                nextMsg lnwire.Message
5✔
1547
                msgLen  uint64
5✔
1548
        )
5✔
1549
        err = p.cfg.ReadPool.Submit(func(buf *buffer.Read) error {
10✔
1550
                // Before reading the body of the message, set the read timeout
5✔
1551
                // accordingly to ensure we don't block other readers using the
5✔
1552
                // pool. We do so only after the task has been scheduled to
5✔
1553
                // ensure the deadline doesn't expire while the message is in
5✔
1554
                // the process of being scheduled.
5✔
1555
                readDeadline := time.Now().Add(
5✔
1556
                        p.scaleTimeout(readMessageTimeout),
5✔
1557
                )
5✔
1558
                readErr := noiseConn.SetReadDeadline(readDeadline)
5✔
1559
                if readErr != nil {
5✔
1560
                        return readErr
×
1561
                }
×
1562

1563
                // The ReadNextBody method will actually end up re-using the
1564
                // buffer, so within this closure, we can continue to use
1565
                // rawMsg as it's just a slice into the buf from the buffer
1566
                // pool.
1567
                rawMsg, readErr := noiseConn.ReadNextBody(buf[:pktLen])
5✔
1568
                if readErr != nil {
5✔
1569
                        return fmt.Errorf("read next body: %w", readErr)
×
1570
                }
×
1571
                msgLen = uint64(len(rawMsg))
5✔
1572

5✔
1573
                // Next, create a new io.Reader implementation from the raw
5✔
1574
                // message, and use this to decode the message directly from.
5✔
1575
                msgReader := bytes.NewReader(rawMsg)
5✔
1576
                nextMsg, err = lnwire.ReadMessage(msgReader, 0)
5✔
1577
                if err != nil {
6✔
1578
                        return err
1✔
1579
                }
1✔
1580

1581
                // At this point, rawMsg and buf will be returned back to the
1582
                // buffer pool for re-use.
1583
                return nil
5✔
1584
        })
1585
        atomic.AddUint64(&p.bytesReceived, msgLen)
5✔
1586
        if err != nil {
6✔
1587
                return nil, err
1✔
1588
        }
1✔
1589

1590
        p.logWireMessage(nextMsg, true)
5✔
1591

5✔
1592
        return nextMsg, nil
5✔
1593
}
1594

1595
// msgStream implements a goroutine-safe, in-order stream of messages to be
1596
// delivered via closure to a receiver. These messages MUST be in order due to
1597
// the nature of the lightning channel commitment and gossiper state machines.
1598
// TODO(conner): use stream handler interface to abstract out stream
1599
// state/logging.
1600
type msgStream struct {
1601
        streamShutdown int32 // To be used atomically.
1602

1603
        peer *Brontide
1604

1605
        apply func(lnwire.Message)
1606

1607
        startMsg string
1608
        stopMsg  string
1609

1610
        msgCond *sync.Cond
1611
        msgs    []lnwire.Message
1612

1613
        mtx sync.Mutex
1614

1615
        producerSema chan struct{}
1616

1617
        wg   sync.WaitGroup
1618
        quit chan struct{}
1619
}
1620

1621
// newMsgStream creates a new instance of a chanMsgStream for a particular
1622
// channel identified by its channel ID. bufSize is the max number of messages
1623
// that should be buffered in the internal queue. Callers should set this to a
1624
// sane value that avoids blocking unnecessarily, but doesn't allow an
1625
// unbounded amount of memory to be allocated to buffer incoming messages.
1626
func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize uint32,
1627
        apply func(lnwire.Message)) *msgStream {
4✔
1628

4✔
1629
        stream := &msgStream{
4✔
1630
                peer:         p,
4✔
1631
                apply:        apply,
4✔
1632
                startMsg:     startMsg,
4✔
1633
                stopMsg:      stopMsg,
4✔
1634
                producerSema: make(chan struct{}, bufSize),
4✔
1635
                quit:         make(chan struct{}),
4✔
1636
        }
4✔
1637
        stream.msgCond = sync.NewCond(&stream.mtx)
4✔
1638

4✔
1639
        // Before we return the active stream, we'll populate the producer's
4✔
1640
        // semaphore channel. We'll use this to ensure that the producer won't
4✔
1641
        // attempt to allocate memory in the queue for an item until it has
4✔
1642
        // sufficient extra space.
4✔
1643
        for i := uint32(0); i < bufSize; i++ {
3,005✔
1644
                stream.producerSema <- struct{}{}
3,001✔
1645
        }
3,001✔
1646

1647
        return stream
4✔
1648
}
1649

1650
// Start starts the chanMsgStream.
1651
func (ms *msgStream) Start() {
4✔
1652
        ms.wg.Add(1)
4✔
1653
        go ms.msgConsumer()
4✔
1654
}
4✔
1655

1656
// Stop stops the chanMsgStream.
1657
func (ms *msgStream) Stop() {
1✔
1658
        // TODO(roasbeef): signal too?
1✔
1659

1✔
1660
        close(ms.quit)
1✔
1661

1✔
1662
        // Now that we've closed the channel, we'll repeatedly signal the msg
1✔
1663
        // consumer until we've detected that it has exited.
1✔
1664
        for atomic.LoadInt32(&ms.streamShutdown) == 0 {
2✔
1665
                ms.msgCond.Signal()
1✔
1666
                time.Sleep(time.Millisecond * 100)
1✔
1667
        }
1✔
1668

1669
        ms.wg.Wait()
1✔
1670
}
1671

1672
// msgConsumer is the main goroutine that streams messages from the peer's
1673
// readHandler directly to the target channel.
1674
func (ms *msgStream) msgConsumer() {
4✔
1675
        defer ms.wg.Done()
4✔
1676
        defer peerLog.Tracef(ms.stopMsg)
4✔
1677
        defer atomic.StoreInt32(&ms.streamShutdown, 1)
4✔
1678

4✔
1679
        peerLog.Tracef(ms.startMsg)
4✔
1680

4✔
1681
        for {
8✔
1682
                // First, we'll check our condition. If the queue of messages
4✔
1683
                // is empty, then we'll wait until a new item is added.
4✔
1684
                ms.msgCond.L.Lock()
4✔
1685
                for len(ms.msgs) == 0 {
8✔
1686
                        ms.msgCond.Wait()
4✔
1687

4✔
1688
                        // If we woke up in order to exit, then we'll do so.
4✔
1689
                        // Otherwise, we'll check the message queue for any new
4✔
1690
                        // items.
4✔
1691
                        select {
4✔
1692
                        case <-ms.peer.quit:
1✔
1693
                                ms.msgCond.L.Unlock()
1✔
1694
                                return
1✔
1695
                        case <-ms.quit:
1✔
1696
                                ms.msgCond.L.Unlock()
1✔
1697
                                return
1✔
1698
                        default:
1✔
1699
                        }
1700
                }
1701

1702
                // Grab the message off the front of the queue, shifting the
1703
                // slice's reference down one in order to remove the message
1704
                // from the queue.
1705
                msg := ms.msgs[0]
1✔
1706
                ms.msgs[0] = nil // Set to nil to prevent GC leak.
1✔
1707
                ms.msgs = ms.msgs[1:]
1✔
1708

1✔
1709
                ms.msgCond.L.Unlock()
1✔
1710

1✔
1711
                ms.apply(msg)
1✔
1712

1✔
1713
                // We've just successfully processed an item, so we'll signal
1✔
1714
                // to the producer that a new slot in the buffer. We'll use
1✔
1715
                // this to bound the size of the buffer to avoid allowing it to
1✔
1716
                // grow indefinitely.
1✔
1717
                select {
1✔
1718
                case ms.producerSema <- struct{}{}:
1✔
1719
                case <-ms.peer.quit:
1✔
1720
                        return
1✔
1721
                case <-ms.quit:
1✔
1722
                        return
1✔
1723
                }
1724
        }
1725
}
1726

1727
// AddMsg adds a new message to the msgStream. This function is safe for
1728
// concurrent access.
1729
func (ms *msgStream) AddMsg(msg lnwire.Message) {
1✔
1730
        // First, we'll attempt to receive from the producerSema struct. This
1✔
1731
        // acts as a semaphore to prevent us from indefinitely buffering
1✔
1732
        // incoming items from the wire. Either the msg queue isn't full, and
1✔
1733
        // we'll not block, or the queue is full, and we'll block until either
1✔
1734
        // we're signalled to quit, or a slot is freed up.
1✔
1735
        select {
1✔
1736
        case <-ms.producerSema:
1✔
1737
        case <-ms.peer.quit:
×
1738
                return
×
1739
        case <-ms.quit:
×
1740
                return
×
1741
        }
1742

1743
        // Next, we'll lock the condition, and add the message to the end of
1744
        // the message queue.
1745
        ms.msgCond.L.Lock()
1✔
1746
        ms.msgs = append(ms.msgs, msg)
1✔
1747
        ms.msgCond.L.Unlock()
1✔
1748

1✔
1749
        // With the message added, we signal to the msgConsumer that there are
1✔
1750
        // additional messages to consume.
1✔
1751
        ms.msgCond.Signal()
1✔
1752
}
1753

1754
// waitUntilLinkActive waits until the target link is active and returns a
1755
// ChannelLink to pass messages to. It accomplishes this by subscribing to
1756
// an ActiveLinkEvent which is emitted by the link when it first starts up.
1757
func waitUntilLinkActive(p *Brontide,
1758
        cid lnwire.ChannelID) htlcswitch.ChannelUpdateHandler {
1✔
1759

1✔
1760
        p.log.Tracef("Waiting for link=%v to be active", cid)
1✔
1761

1✔
1762
        // Subscribe to receive channel events.
1✔
1763
        //
1✔
1764
        // NOTE: If the link is already active by SubscribeChannelEvents, then
1✔
1765
        // GetLink will retrieve the link and we can send messages. If the link
1✔
1766
        // becomes active between SubscribeChannelEvents and GetLink, then GetLink
1✔
1767
        // will retrieve the link. If the link becomes active after GetLink, then
1✔
1768
        // we will get an ActiveLinkEvent notification and retrieve the link. If
1✔
1769
        // the call to GetLink is before SubscribeChannelEvents, however, there
1✔
1770
        // will be a race condition.
1✔
1771
        sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
1✔
1772
        if err != nil {
2✔
1773
                // If we have a non-nil error, then the server is shutting down and we
1✔
1774
                // can exit here and return nil. This means no message will be delivered
1✔
1775
                // to the link.
1✔
1776
                return nil
1✔
1777
        }
1✔
1778
        defer sub.Cancel()
1✔
1779

1✔
1780
        // The link may already be active by this point, and we may have missed the
1✔
1781
        // ActiveLinkEvent. Check if the link exists.
1✔
1782
        link := p.fetchLinkFromKeyAndCid(cid)
1✔
1783
        if link != nil {
2✔
1784
                return link
1✔
1785
        }
1✔
1786

1787
        // If the link is nil, we must wait for it to be active.
1788
        for {
2✔
1789
                select {
1✔
1790
                // A new event has been sent by the ChannelNotifier. We first check
1791
                // whether the event is an ActiveLinkEvent. If it is, we'll check
1792
                // that the event is for this channel. Otherwise, we discard the
1793
                // message.
1794
                case e := <-sub.Updates():
1✔
1795
                        event, ok := e.(channelnotifier.ActiveLinkEvent)
1✔
1796
                        if !ok {
2✔
1797
                                // Ignore this notification.
1✔
1798
                                continue
1✔
1799
                        }
1800

1801
                        chanPoint := event.ChannelPoint
1✔
1802

1✔
1803
                        // Check whether the retrieved chanPoint matches the target
1✔
1804
                        // channel id.
1✔
1805
                        if !cid.IsChanPoint(chanPoint) {
1✔
1806
                                continue
×
1807
                        }
1808

1809
                        // The link shouldn't be nil as we received an
1810
                        // ActiveLinkEvent. If it is nil, we return nil and the
1811
                        // calling function should catch it.
1812
                        return p.fetchLinkFromKeyAndCid(cid)
1✔
1813

1814
                case <-p.quit:
1✔
1815
                        return nil
1✔
1816
                }
1817
        }
1818
}
1819

1820
// newChanMsgStream is used to create a msgStream between the peer and
1821
// particular channel link in the htlcswitch. We utilize additional
1822
// synchronization with the fundingManager to ensure we don't attempt to
1823
// dispatch a message to a channel before it is fully active. A reference to the
1824
// channel this stream forwards to is held in scope to prevent unnecessary
1825
// lookups.
1826
func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream {
1✔
1827
        var chanLink htlcswitch.ChannelUpdateHandler
1✔
1828

1✔
1829
        apply := func(msg lnwire.Message) {
2✔
1830
                // This check is fine because if the link no longer exists, it will
1✔
1831
                // be removed from the activeChannels map and subsequent messages
1✔
1832
                // shouldn't reach the chan msg stream.
1✔
1833
                if chanLink == nil {
2✔
1834
                        chanLink = waitUntilLinkActive(p, cid)
1✔
1835

1✔
1836
                        // If the link is still not active and the calling function
1✔
1837
                        // errored out, just return.
1✔
1838
                        if chanLink == nil {
2✔
1839
                                p.log.Warnf("Link=%v is not active", cid)
1✔
1840
                                return
1✔
1841
                        }
1✔
1842
                }
1843

1844
                // In order to avoid unnecessarily delivering message
1845
                // as the peer is exiting, we'll check quickly to see
1846
                // if we need to exit.
1847
                select {
1✔
1848
                case <-p.quit:
×
1849
                        return
×
1850
                default:
1✔
1851
                }
1852

1853
                chanLink.HandleChannelUpdate(msg)
1✔
1854
        }
1855

1856
        return newMsgStream(p,
1✔
1857
                fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]),
1✔
1858
                fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]),
1✔
1859
                1000,
1✔
1860
                apply,
1✔
1861
        )
1✔
1862
}
1863

1864
// newDiscMsgStream is used to setup a msgStream between the peer and the
1865
// authenticated gossiper. This stream should be used to forward all remote
1866
// channel announcements.
1867
func newDiscMsgStream(p *Brontide) *msgStream {
4✔
1868
        apply := func(msg lnwire.Message) {
5✔
1869
                // TODO(yy): `ProcessRemoteAnnouncement` returns an error chan
1✔
1870
                // and we need to process it.
1✔
1871
                p.cfg.AuthGossiper.ProcessRemoteAnnouncement(msg, p)
1✔
1872
        }
1✔
1873

1874
        return newMsgStream(
4✔
1875
                p,
4✔
1876
                "Update stream for gossiper created",
4✔
1877
                "Update stream for gossiper exited",
4✔
1878
                1000,
4✔
1879
                apply,
4✔
1880
        )
4✔
1881
}
1882

1883
// readHandler is responsible for reading messages off the wire in series, then
1884
// properly dispatching the handling of the message to the proper subsystem.
1885
//
1886
// NOTE: This method MUST be run as a goroutine.
1887
func (p *Brontide) readHandler() {
4✔
1888
        defer p.wg.Done()
4✔
1889

4✔
1890
        // We'll stop the timer after a new messages is received, and also
4✔
1891
        // reset it after we process the next message.
4✔
1892
        idleTimer := time.AfterFunc(idleTimeout, func() {
4✔
1893
                err := fmt.Errorf("peer %s no answer for %s -- disconnecting",
×
1894
                        p, idleTimeout)
×
1895
                p.Disconnect(err)
×
1896
        })
×
1897

1898
        // Initialize our negotiated gossip sync method before reading messages
1899
        // off the wire. When using gossip queries, this ensures a gossip
1900
        // syncer is active by the time query messages arrive.
1901
        //
1902
        // TODO(conner): have peer store gossip syncer directly and bypass
1903
        // gossiper?
1904
        p.initGossipSync()
4✔
1905

4✔
1906
        discStream := newDiscMsgStream(p)
4✔
1907
        discStream.Start()
4✔
1908
        defer discStream.Stop()
4✔
1909
out:
4✔
1910
        for atomic.LoadInt32(&p.disconnect) == 0 {
9✔
1911
                nextMsg, err := p.readNextMessage()
5✔
1912
                if !idleTimer.Stop() {
5✔
1913
                        select {
×
1914
                        case <-idleTimer.C:
×
1915
                        default:
×
1916
                        }
1917
                }
1918
                if err != nil {
3✔
1919
                        p.log.Infof("unable to read message from peer: %v", err)
1✔
1920

1✔
1921
                        // If we could not read our peer's message due to an
1✔
1922
                        // unknown type or invalid alias, we continue processing
1✔
1923
                        // as normal. We store unknown message and address
1✔
1924
                        // types, as they may provide debugging insight.
1✔
1925
                        switch e := err.(type) {
1✔
1926
                        // If this is just a message we don't yet recognize,
1927
                        // we'll continue processing as normal as this allows
1928
                        // us to introduce new messages in a forwards
1929
                        // compatible manner.
1930
                        case *lnwire.UnknownMessage:
1✔
1931
                                p.storeError(e)
1✔
1932
                                idleTimer.Reset(idleTimeout)
1✔
1933
                                continue
1✔
1934

1935
                        // If they sent us an address type that we don't yet
1936
                        // know of, then this isn't a wire error, so we'll
1937
                        // simply continue parsing the remainder of their
1938
                        // messages.
1939
                        case *lnwire.ErrUnknownAddrType:
×
1940
                                p.storeError(e)
×
1941
                                idleTimer.Reset(idleTimeout)
×
1942
                                continue
×
1943

1944
                        // If the NodeAnnouncement has an invalid alias, then
1945
                        // we'll log that error above and continue so we can
1946
                        // continue to read messages from the peer. We do not
1947
                        // store this error because it is of little debugging
1948
                        // value.
1949
                        case *lnwire.ErrInvalidNodeAlias:
×
1950
                                idleTimer.Reset(idleTimeout)
×
1951
                                continue
×
1952

1953
                        // If the error we encountered wasn't just a message we
1954
                        // didn't recognize, then we'll stop all processing as
1955
                        // this is a fatal error.
1956
                        default:
1✔
1957
                                break out
1✔
1958
                        }
1959
                }
1960

1961
                // If a message router is active, then we'll try to have it
1962
                // handle this message. If it can, then we're able to skip the
1963
                // rest of the message handling logic.
1964
                err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
4✔
1965
                        return r.RouteMsg(msgmux.PeerMsg{
2✔
1966
                                PeerPub: *p.IdentityKey(),
2✔
1967
                                Message: nextMsg,
2✔
1968
                        })
2✔
1969
                })
2✔
1970

1971
                // No error occurred, and the message was handled by the
1972
                // router.
1973
                if err == nil {
2✔
1974
                        continue
×
1975
                }
1976

1977
                var (
2✔
1978
                        targetChan   lnwire.ChannelID
2✔
1979
                        isLinkUpdate bool
2✔
1980
                )
2✔
1981

2✔
1982
                switch msg := nextMsg.(type) {
2✔
1983
                case *lnwire.Pong:
×
1984
                        // When we receive a Pong message in response to our
×
1985
                        // last ping message, we send it to the pingManager
×
1986
                        p.pingManager.ReceivedPong(msg)
×
1987

1988
                case *lnwire.Ping:
×
1989
                        // First, we'll store their latest ping payload within
×
1990
                        // the relevant atomic variable.
×
1991
                        p.lastPingPayload.Store(msg.PaddingBytes[:])
×
1992

×
1993
                        // Next, we'll send over the amount of specified pong
×
1994
                        // bytes.
×
1995
                        pong := lnwire.NewPong(p.cfg.PongBuf[0:msg.NumPongBytes])
×
1996
                        p.queueMsg(pong, nil)
×
1997

1998
                case *lnwire.OpenChannel,
1999
                        *lnwire.AcceptChannel,
2000
                        *lnwire.FundingCreated,
2001
                        *lnwire.FundingSigned,
2002
                        *lnwire.ChannelReady:
1✔
2003

1✔
2004
                        p.cfg.FundingManager.ProcessFundingMsg(msg, p)
1✔
2005

2006
                case *lnwire.Shutdown:
1✔
2007
                        select {
1✔
2008
                        case p.chanCloseMsgs <- &closeMsg{msg.ChannelID, msg}:
1✔
2009
                        case <-p.quit:
×
2010
                                break out
×
2011
                        }
2012
                case *lnwire.ClosingSigned:
1✔
2013
                        select {
1✔
2014
                        case p.chanCloseMsgs <- &closeMsg{msg.ChannelID, msg}:
1✔
2015
                        case <-p.quit:
×
2016
                                break out
×
2017
                        }
2018

2019
                case *lnwire.Warning:
×
2020
                        targetChan = msg.ChanID
×
2021
                        isLinkUpdate = p.handleWarningOrError(targetChan, msg)
×
2022

2023
                case *lnwire.Error:
1✔
2024
                        targetChan = msg.ChanID
1✔
2025
                        isLinkUpdate = p.handleWarningOrError(targetChan, msg)
1✔
2026

2027
                case *lnwire.ChannelReestablish:
1✔
2028
                        targetChan = msg.ChanID
1✔
2029
                        isLinkUpdate = p.hasChannel(targetChan)
1✔
2030

1✔
2031
                        // If we failed to find the link in question, and the
1✔
2032
                        // message received was a channel sync message, then
1✔
2033
                        // this might be a peer trying to resync closed channel.
1✔
2034
                        // In this case we'll try to resend our last channel
1✔
2035
                        // sync message, such that the peer can recover funds
1✔
2036
                        // from the closed channel.
1✔
2037
                        if !isLinkUpdate {
2✔
2038
                                err := p.resendChanSyncMsg(targetChan)
1✔
2039
                                if err != nil {
2✔
2040
                                        // TODO(halseth): send error to peer?
1✔
2041
                                        p.log.Errorf("resend failed: %v",
1✔
2042
                                                err)
1✔
2043
                                }
1✔
2044
                        }
2045

2046
                // For messages that implement the LinkUpdater interface, we
2047
                // will consider them as link updates and send them to
2048
                // chanStream. These messages will be queued inside chanStream
2049
                // if the channel is not active yet.
2050
                case lnwire.LinkUpdater:
1✔
2051
                        targetChan = msg.TargetChanID()
1✔
2052
                        isLinkUpdate = p.hasChannel(targetChan)
1✔
2053

1✔
2054
                        // Log an error if we don't have this channel. This
1✔
2055
                        // means the peer has sent us a message with unknown
1✔
2056
                        // channel ID.
1✔
2057
                        if !isLinkUpdate {
2✔
2058
                                p.log.Errorf("Unknown channel ID: %v found "+
1✔
2059
                                        "in received msg=%s", targetChan,
1✔
2060
                                        nextMsg.MsgType())
1✔
2061
                        }
1✔
2062

2063
                case *lnwire.ChannelUpdate1,
2064
                        *lnwire.ChannelAnnouncement1,
2065
                        *lnwire.NodeAnnouncement,
2066
                        *lnwire.AnnounceSignatures1,
2067
                        *lnwire.GossipTimestampRange,
2068
                        *lnwire.QueryShortChanIDs,
2069
                        *lnwire.QueryChannelRange,
2070
                        *lnwire.ReplyChannelRange,
2071
                        *lnwire.ReplyShortChanIDsEnd:
1✔
2072

1✔
2073
                        discStream.AddMsg(msg)
1✔
2074

2075
                case *lnwire.Custom:
2✔
2076
                        err := p.handleCustomMessage(msg)
2✔
2077
                        if err != nil {
2✔
2078
                                p.storeError(err)
×
2079
                                p.log.Errorf("%v", err)
×
2080
                        }
×
2081

2082
                default:
×
2083
                        // If the message we received is unknown to us, store
×
2084
                        // the type to track the failure.
×
2085
                        err := fmt.Errorf("unknown message type %v received",
×
2086
                                uint16(msg.MsgType()))
×
2087
                        p.storeError(err)
×
2088

×
2089
                        p.log.Errorf("%v", err)
×
2090
                }
2091

2092
                if isLinkUpdate {
3✔
2093
                        // If this is a channel update, then we need to feed it
1✔
2094
                        // into the channel's in-order message stream.
1✔
2095
                        p.sendLinkUpdateMsg(targetChan, nextMsg)
1✔
2096
                }
1✔
2097

2098
                idleTimer.Reset(idleTimeout)
2✔
2099
        }
2100

2101
        p.Disconnect(errors.New("read handler closed"))
1✔
2102

1✔
2103
        p.log.Trace("readHandler for peer done")
1✔
2104
}
2105

2106
// handleCustomMessage handles the given custom message if a handler is
2107
// registered.
2108
func (p *Brontide) handleCustomMessage(msg *lnwire.Custom) error {
2✔
2109
        if p.cfg.HandleCustomMessage == nil {
2✔
2110
                return fmt.Errorf("no custom message handler for "+
×
2111
                        "message type %v", uint16(msg.MsgType()))
×
2112
        }
×
2113

2114
        return p.cfg.HandleCustomMessage(p.PubKey(), msg)
2✔
2115
}
2116

2117
// isLoadedFromDisk returns true if the provided channel ID is loaded from
2118
// disk.
2119
//
2120
// NOTE: only returns true for pending channels.
2121
func (p *Brontide) isLoadedFromDisk(chanID lnwire.ChannelID) bool {
1✔
2122
        // If this is a newly added channel, no need to reestablish.
1✔
2123
        _, added := p.addedChannels.Load(chanID)
1✔
2124
        if added {
2✔
2125
                return false
1✔
2126
        }
1✔
2127

2128
        // Return false if the channel is unknown.
2129
        channel, ok := p.activeChannels.Load(chanID)
1✔
2130
        if !ok {
1✔
2131
                return false
×
2132
        }
×
2133

2134
        // During startup, we will use a nil value to mark a pending channel
2135
        // that's loaded from disk.
2136
        return channel == nil
1✔
2137
}
2138

2139
// isActiveChannel returns true if the provided channel id is active, otherwise
2140
// returns false.
2141
func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool {
9✔
2142
        // The channel would be nil if,
9✔
2143
        // - the channel doesn't exist, or,
9✔
2144
        // - the channel exists, but is pending. In this case, we don't
9✔
2145
        //   consider this channel active.
9✔
2146
        channel, _ := p.activeChannels.Load(chanID)
9✔
2147

9✔
2148
        return channel != nil
9✔
2149
}
9✔
2150

2151
// isPendingChannel returns true if the provided channel ID is pending, and
2152
// returns false if the channel is active or unknown.
2153
func (p *Brontide) isPendingChannel(chanID lnwire.ChannelID) bool {
7✔
2154
        // Return false if the channel is unknown.
7✔
2155
        channel, ok := p.activeChannels.Load(chanID)
7✔
2156
        if !ok {
11✔
2157
                return false
4✔
2158
        }
4✔
2159

2160
        return channel == nil
3✔
2161
}
2162

2163
// hasChannel returns true if the peer has a pending/active channel specified
2164
// by the channel ID.
2165
func (p *Brontide) hasChannel(chanID lnwire.ChannelID) bool {
1✔
2166
        _, ok := p.activeChannels.Load(chanID)
1✔
2167
        return ok
1✔
2168
}
1✔
2169

2170
// storeError stores an error in our peer's buffer of recent errors with the
2171
// current timestamp. Errors are only stored if we have at least one active
2172
// channel with the peer to mitigate a dos vector where a peer costlessly
2173
// connects to us and spams us with errors.
2174
func (p *Brontide) storeError(err error) {
2✔
2175
        var haveChannels bool
2✔
2176

2✔
2177
        p.activeChannels.Range(func(_ lnwire.ChannelID,
2✔
2178
                channel *lnwallet.LightningChannel) bool {
4✔
2179

2✔
2180
                // Pending channels will be nil in the activeChannels map.
2✔
2181
                if channel == nil {
3✔
2182
                        // Return true to continue the iteration.
1✔
2183
                        return true
1✔
2184
                }
1✔
2185

2186
                haveChannels = true
2✔
2187

2✔
2188
                // Return false to break the iteration.
2✔
2189
                return false
2✔
2190
        })
2191

2192
        // If we do not have any active channels with the peer, we do not store
2193
        // errors as a dos mitigation.
2194
        if !haveChannels {
3✔
2195
                p.log.Trace("no channels with peer, not storing err")
1✔
2196
                return
1✔
2197
        }
1✔
2198

2199
        p.cfg.ErrorBuffer.Add(
2✔
2200
                &TimestampedError{Timestamp: time.Now(), Error: err},
2✔
2201
        )
2✔
2202
}
2203

2204
// handleWarningOrError processes a warning or error msg and returns true if
2205
// msg should be forwarded to the associated channel link. False is returned if
2206
// any necessary forwarding of msg was already handled by this method. If msg is
2207
// an error from a peer with an active channel, we'll store it in memory.
2208
//
2209
// NOTE: This method should only be called from within the readHandler.
2210
func (p *Brontide) handleWarningOrError(chanID lnwire.ChannelID,
2211
        msg lnwire.Message) bool {
1✔
2212

1✔
2213
        if errMsg, ok := msg.(*lnwire.Error); ok {
2✔
2214
                p.storeError(errMsg)
1✔
2215
        }
1✔
2216

2217
        switch {
1✔
2218
        // Connection wide messages should be forwarded to all channel links
2219
        // with this peer.
2220
        case chanID == lnwire.ConnectionWideID:
×
2221
                for _, chanStream := range p.activeMsgStreams {
×
2222
                        chanStream.AddMsg(msg)
×
2223
                }
×
2224

2225
                return false
×
2226

2227
        // If the channel ID for the message corresponds to a pending channel,
2228
        // then the funding manager will handle it.
2229
        case p.cfg.FundingManager.IsPendingChannel(chanID, p):
1✔
2230
                p.cfg.FundingManager.ProcessFundingMsg(msg, p)
1✔
2231
                return false
1✔
2232

2233
        // If not we hand the message to the channel link for this channel.
2234
        case p.isActiveChannel(chanID):
1✔
2235
                return true
1✔
2236

2237
        default:
1✔
2238
                return false
1✔
2239
        }
2240
}
2241

2242
// messageSummary returns a human-readable string that summarizes a
2243
// incoming/outgoing message. Not all messages will have a summary, only those
2244
// which have additional data that can be informative at a glance.
2245
func messageSummary(msg lnwire.Message) string {
1✔
2246
        switch msg := msg.(type) {
1✔
2247
        case *lnwire.Init:
1✔
2248
                // No summary.
1✔
2249
                return ""
1✔
2250

2251
        case *lnwire.OpenChannel:
1✔
2252
                return fmt.Sprintf("temp_chan_id=%x, chain=%v, csv=%v, amt=%v, "+
1✔
2253
                        "push_amt=%v, reserve=%v, flags=%v",
1✔
2254
                        msg.PendingChannelID[:], msg.ChainHash,
1✔
2255
                        msg.CsvDelay, msg.FundingAmount, msg.PushAmount,
1✔
2256
                        msg.ChannelReserve, msg.ChannelFlags)
1✔
2257

2258
        case *lnwire.AcceptChannel:
1✔
2259
                return fmt.Sprintf("temp_chan_id=%x, reserve=%v, csv=%v, num_confs=%v",
1✔
2260
                        msg.PendingChannelID[:], msg.ChannelReserve, msg.CsvDelay,
1✔
2261
                        msg.MinAcceptDepth)
1✔
2262

2263
        case *lnwire.FundingCreated:
1✔
2264
                return fmt.Sprintf("temp_chan_id=%x, chan_point=%v",
1✔
2265
                        msg.PendingChannelID[:], msg.FundingPoint)
1✔
2266

2267
        case *lnwire.FundingSigned:
1✔
2268
                return fmt.Sprintf("chan_id=%v", msg.ChanID)
1✔
2269

2270
        case *lnwire.ChannelReady:
1✔
2271
                return fmt.Sprintf("chan_id=%v, next_point=%x",
1✔
2272
                        msg.ChanID, msg.NextPerCommitmentPoint.SerializeCompressed())
1✔
2273

2274
        case *lnwire.Shutdown:
1✔
2275
                return fmt.Sprintf("chan_id=%v, script=%x", msg.ChannelID,
1✔
2276
                        msg.Address[:])
1✔
2277

2278
        case *lnwire.ClosingComplete:
×
2279
                return fmt.Sprintf("chan_id=%v, fee_sat=%v, locktime=%v",
×
2280
                        msg.ChannelID, msg.FeeSatoshis, msg.LockTime)
×
2281

2282
        case *lnwire.ClosingSig:
×
2283
                return fmt.Sprintf("chan_id=%v", msg.ChannelID)
×
2284

2285
        case *lnwire.ClosingSigned:
1✔
2286
                return fmt.Sprintf("chan_id=%v, fee_sat=%v", msg.ChannelID,
1✔
2287
                        msg.FeeSatoshis)
1✔
2288

2289
        case *lnwire.UpdateAddHTLC:
1✔
2290
                var blindingPoint []byte
1✔
2291
                msg.BlindingPoint.WhenSome(
1✔
2292
                        func(b tlv.RecordT[lnwire.BlindingPointTlvType,
1✔
2293
                                *btcec.PublicKey]) {
2✔
2294

1✔
2295
                                blindingPoint = b.Val.SerializeCompressed()
1✔
2296
                        },
1✔
2297
                )
2298

2299
                return fmt.Sprintf("chan_id=%v, id=%v, amt=%v, expiry=%v, "+
1✔
2300
                        "hash=%x, blinding_point=%x, custom_records=%v",
1✔
2301
                        msg.ChanID, msg.ID, msg.Amount, msg.Expiry,
1✔
2302
                        msg.PaymentHash[:], blindingPoint, msg.CustomRecords)
1✔
2303

2304
        case *lnwire.UpdateFailHTLC:
1✔
2305
                return fmt.Sprintf("chan_id=%v, id=%v, reason=%x", msg.ChanID,
1✔
2306
                        msg.ID, msg.Reason)
1✔
2307

2308
        case *lnwire.UpdateFulfillHTLC:
1✔
2309
                return fmt.Sprintf("chan_id=%v, id=%v, pre_image=%x, "+
1✔
2310
                        "custom_records=%v", msg.ChanID, msg.ID,
1✔
2311
                        msg.PaymentPreimage[:], msg.CustomRecords)
1✔
2312

2313
        case *lnwire.CommitSig:
1✔
2314
                return fmt.Sprintf("chan_id=%v, num_htlcs=%v", msg.ChanID,
1✔
2315
                        len(msg.HtlcSigs))
1✔
2316

2317
        case *lnwire.RevokeAndAck:
1✔
2318
                return fmt.Sprintf("chan_id=%v, rev=%x, next_point=%x",
1✔
2319
                        msg.ChanID, msg.Revocation[:],
1✔
2320
                        msg.NextRevocationKey.SerializeCompressed())
1✔
2321

2322
        case *lnwire.UpdateFailMalformedHTLC:
1✔
2323
                return fmt.Sprintf("chan_id=%v, id=%v, fail_code=%v",
1✔
2324
                        msg.ChanID, msg.ID, msg.FailureCode)
1✔
2325

2326
        case *lnwire.Warning:
×
2327
                return fmt.Sprintf("%v", msg.Warning())
×
2328

2329
        case *lnwire.Error:
1✔
2330
                return fmt.Sprintf("%v", msg.Error())
1✔
2331

2332
        case *lnwire.AnnounceSignatures1:
1✔
2333
                return fmt.Sprintf("chan_id=%v, short_chan_id=%v", msg.ChannelID,
1✔
2334
                        msg.ShortChannelID.ToUint64())
1✔
2335

2336
        case *lnwire.ChannelAnnouncement1:
1✔
2337
                return fmt.Sprintf("chain_hash=%v, short_chan_id=%v",
1✔
2338
                        msg.ChainHash, msg.ShortChannelID.ToUint64())
1✔
2339

2340
        case *lnwire.ChannelUpdate1:
1✔
2341
                return fmt.Sprintf("chain_hash=%v, short_chan_id=%v, "+
1✔
2342
                        "mflags=%v, cflags=%v, update_time=%v", msg.ChainHash,
1✔
2343
                        msg.ShortChannelID.ToUint64(), msg.MessageFlags,
1✔
2344
                        msg.ChannelFlags, time.Unix(int64(msg.Timestamp), 0))
1✔
2345

2346
        case *lnwire.NodeAnnouncement:
1✔
2347
                return fmt.Sprintf("node=%x, update_time=%v",
1✔
2348
                        msg.NodeID, time.Unix(int64(msg.Timestamp), 0))
1✔
2349

2350
        case *lnwire.Ping:
×
2351
                return fmt.Sprintf("ping_bytes=%x", msg.PaddingBytes[:])
×
2352

2353
        case *lnwire.Pong:
×
2354
                return fmt.Sprintf("len(pong_bytes)=%d", len(msg.PongBytes[:]))
×
2355

2356
        case *lnwire.UpdateFee:
×
2357
                return fmt.Sprintf("chan_id=%v, fee_update_sat=%v",
×
2358
                        msg.ChanID, int64(msg.FeePerKw))
×
2359

2360
        case *lnwire.ChannelReestablish:
1✔
2361
                return fmt.Sprintf("chan_id=%v, next_local_height=%v, "+
1✔
2362
                        "remote_tail_height=%v", msg.ChanID,
1✔
2363
                        msg.NextLocalCommitHeight, msg.RemoteCommitTailHeight)
1✔
2364

2365
        case *lnwire.ReplyShortChanIDsEnd:
1✔
2366
                return fmt.Sprintf("chain_hash=%v, complete=%v", msg.ChainHash,
1✔
2367
                        msg.Complete)
1✔
2368

2369
        case *lnwire.ReplyChannelRange:
1✔
2370
                return fmt.Sprintf("start_height=%v, end_height=%v, "+
1✔
2371
                        "num_chans=%v, encoding=%v", msg.FirstBlockHeight,
1✔
2372
                        msg.LastBlockHeight(), len(msg.ShortChanIDs),
1✔
2373
                        msg.EncodingType)
1✔
2374

2375
        case *lnwire.QueryShortChanIDs:
1✔
2376
                return fmt.Sprintf("chain_hash=%v, encoding=%v, num_chans=%v",
1✔
2377
                        msg.ChainHash, msg.EncodingType, len(msg.ShortChanIDs))
1✔
2378

2379
        case *lnwire.QueryChannelRange:
1✔
2380
                return fmt.Sprintf("chain_hash=%v, start_height=%v, "+
1✔
2381
                        "end_height=%v", msg.ChainHash, msg.FirstBlockHeight,
1✔
2382
                        msg.LastBlockHeight())
1✔
2383

2384
        case *lnwire.GossipTimestampRange:
1✔
2385
                return fmt.Sprintf("chain_hash=%v, first_stamp=%v, "+
1✔
2386
                        "stamp_range=%v", msg.ChainHash,
1✔
2387
                        time.Unix(int64(msg.FirstTimestamp), 0),
1✔
2388
                        msg.TimestampRange)
1✔
2389

2390
        case *lnwire.Stfu:
1✔
2391
                return fmt.Sprintf("chan_id=%v, initiator=%v", msg.ChanID,
1✔
2392
                        msg.Initiator)
1✔
2393

2394
        case *lnwire.Custom:
1✔
2395
                return fmt.Sprintf("type=%d", msg.Type)
1✔
2396
        }
2397

2398
        return fmt.Sprintf("unknown msg type=%T", msg)
×
2399
}
2400

2401
// logWireMessage logs the receipt or sending of particular wire message. This
2402
// function is used rather than just logging the message in order to produce
2403
// less spammy log messages in trace mode by setting the 'Curve" parameter to
2404
// nil. Doing this avoids printing out each of the field elements in the curve
2405
// parameters for secp256k1.
2406
func (p *Brontide) logWireMessage(msg lnwire.Message, read bool) {
18✔
2407
        summaryPrefix := "Received"
18✔
2408
        if !read {
32✔
2409
                summaryPrefix = "Sending"
14✔
2410
        }
14✔
2411

2412
        p.log.Debugf("%v", lnutils.NewLogClosure(func() string {
19✔
2413
                // Debug summary of message.
1✔
2414
                summary := messageSummary(msg)
1✔
2415
                if len(summary) > 0 {
2✔
2416
                        summary = "(" + summary + ")"
1✔
2417
                }
1✔
2418

2419
                preposition := "to"
1✔
2420
                if read {
2✔
2421
                        preposition = "from"
1✔
2422
                }
1✔
2423

2424
                var msgType string
1✔
2425
                if msg.MsgType() < lnwire.CustomTypeStart {
2✔
2426
                        msgType = msg.MsgType().String()
1✔
2427
                } else {
2✔
2428
                        msgType = "custom"
1✔
2429
                }
1✔
2430

2431
                return fmt.Sprintf("%v %v%s %v %s", summaryPrefix,
1✔
2432
                        msgType, summary, preposition, p)
1✔
2433
        }))
2434

2435
        prefix := "readMessage from peer"
18✔
2436
        if !read {
32✔
2437
                prefix = "writeMessage to peer"
14✔
2438
        }
14✔
2439

2440
        p.log.Tracef(prefix+": %v", lnutils.SpewLogClosure(msg))
18✔
2441
}
2442

2443
// writeMessage writes and flushes the target lnwire.Message to the remote peer.
2444
// If the passed message is nil, this method will only try to flush an existing
2445
// message buffered on the connection. It is safe to call this method again
2446
// with a nil message iff a timeout error is returned. This will continue to
2447
// flush the pending message to the wire.
2448
//
2449
// NOTE:
2450
// Besides its usage in Start, this function should not be used elsewhere
2451
// except in writeHandler. If multiple goroutines call writeMessage at the same
2452
// time, panics can occur because WriteMessage and Flush don't use any locking
2453
// internally.
2454
func (p *Brontide) writeMessage(msg lnwire.Message) error {
14✔
2455
        // Only log the message on the first attempt.
14✔
2456
        if msg != nil {
28✔
2457
                p.logWireMessage(msg, false)
14✔
2458
        }
14✔
2459

2460
        noiseConn := p.cfg.Conn
14✔
2461

14✔
2462
        flushMsg := func() error {
28✔
2463
                // Ensure the write deadline is set before we attempt to send
14✔
2464
                // the message.
14✔
2465
                writeDeadline := time.Now().Add(
14✔
2466
                        p.scaleTimeout(writeMessageTimeout),
14✔
2467
                )
14✔
2468
                err := noiseConn.SetWriteDeadline(writeDeadline)
14✔
2469
                if err != nil {
14✔
2470
                        return err
×
2471
                }
×
2472

2473
                // Flush the pending message to the wire. If an error is
2474
                // encountered, e.g. write timeout, the number of bytes written
2475
                // so far will be returned.
2476
                n, err := noiseConn.Flush()
14✔
2477

14✔
2478
                // Record the number of bytes written on the wire, if any.
14✔
2479
                if n > 0 {
15✔
2480
                        atomic.AddUint64(&p.bytesSent, uint64(n))
1✔
2481
                }
1✔
2482

2483
                return err
14✔
2484
        }
2485

2486
        // If the current message has already been serialized, encrypted, and
2487
        // buffered on the underlying connection we will skip straight to
2488
        // flushing it to the wire.
2489
        if msg == nil {
14✔
2490
                return flushMsg()
×
2491
        }
×
2492

2493
        // Otherwise, this is a new message. We'll acquire a write buffer to
2494
        // serialize the message and buffer the ciphertext on the connection.
2495
        err := p.cfg.WritePool.Submit(func(buf *bytes.Buffer) error {
28✔
2496
                // Using a buffer allocated by the write pool, encode the
14✔
2497
                // message directly into the buffer.
14✔
2498
                _, writeErr := lnwire.WriteMessage(buf, msg, 0)
14✔
2499
                if writeErr != nil {
14✔
2500
                        return writeErr
×
2501
                }
×
2502

2503
                // Finally, write the message itself in a single swoop. This
2504
                // will buffer the ciphertext on the underlying connection. We
2505
                // will defer flushing the message until the write pool has been
2506
                // released.
2507
                return noiseConn.WriteMessage(buf.Bytes())
14✔
2508
        })
2509
        if err != nil {
14✔
2510
                return err
×
2511
        }
×
2512

2513
        return flushMsg()
14✔
2514
}
2515

2516
// writeHandler is a goroutine dedicated to reading messages off of an incoming
2517
// queue, and writing them out to the wire. This goroutine coordinates with the
2518
// queueHandler in order to ensure the incoming message queue is quickly
2519
// drained.
2520
//
2521
// NOTE: This method MUST be run as a goroutine.
2522
func (p *Brontide) writeHandler() {
4✔
2523
        // We'll stop the timer after a new messages is sent, and also reset it
4✔
2524
        // after we process the next message.
4✔
2525
        idleTimer := time.AfterFunc(idleTimeout, func() {
4✔
2526
                err := fmt.Errorf("peer %s no write for %s -- disconnecting",
×
2527
                        p, idleTimeout)
×
2528
                p.Disconnect(err)
×
2529
        })
×
2530

2531
        var exitErr error
4✔
2532

4✔
2533
out:
4✔
2534
        for {
12✔
2535
                select {
8✔
2536
                case outMsg := <-p.sendQueue:
5✔
2537
                        // Record the time at which we first attempt to send the
5✔
2538
                        // message.
5✔
2539
                        startTime := time.Now()
5✔
2540

5✔
2541
                retry:
5✔
2542
                        // Write out the message to the socket. If a timeout
2543
                        // error is encountered, we will catch this and retry
2544
                        // after backing off in case the remote peer is just
2545
                        // slow to process messages from the wire.
2546
                        err := p.writeMessage(outMsg.msg)
5✔
2547
                        if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
5✔
2548
                                p.log.Debugf("Write timeout detected for "+
×
2549
                                        "peer, first write for message "+
×
2550
                                        "attempted %v ago",
×
2551
                                        time.Since(startTime))
×
2552

×
2553
                                // If we received a timeout error, this implies
×
2554
                                // that the message was buffered on the
×
2555
                                // connection successfully and that a flush was
×
2556
                                // attempted. We'll set the message to nil so
×
2557
                                // that on a subsequent pass we only try to
×
2558
                                // flush the buffered message, and forgo
×
2559
                                // reserializing or reencrypting it.
×
2560
                                outMsg.msg = nil
×
2561

×
2562
                                goto retry
×
2563
                        }
2564

2565
                        // The write succeeded, reset the idle timer to prevent
2566
                        // us from disconnecting the peer.
2567
                        if !idleTimer.Stop() {
5✔
2568
                                select {
×
2569
                                case <-idleTimer.C:
×
2570
                                default:
×
2571
                                }
2572
                        }
2573
                        idleTimer.Reset(idleTimeout)
5✔
2574

5✔
2575
                        // If the peer requested a synchronous write, respond
5✔
2576
                        // with the error.
5✔
2577
                        if outMsg.errChan != nil {
7✔
2578
                                outMsg.errChan <- err
2✔
2579
                        }
2✔
2580

2581
                        if err != nil {
5✔
2582
                                exitErr = fmt.Errorf("unable to write "+
×
2583
                                        "message: %v", err)
×
2584
                                break out
×
2585
                        }
2586

2587
                case <-p.quit:
1✔
2588
                        exitErr = lnpeer.ErrPeerExiting
1✔
2589
                        break out
1✔
2590
                }
2591
        }
2592

2593
        // Avoid an exit deadlock by ensuring WaitGroups are decremented before
2594
        // disconnect.
2595
        p.wg.Done()
1✔
2596

1✔
2597
        p.Disconnect(exitErr)
1✔
2598

1✔
2599
        p.log.Trace("writeHandler for peer done")
1✔
2600
}
2601

2602
// queueHandler is responsible for accepting messages from outside subsystems
2603
// to be eventually sent out on the wire by the writeHandler.
2604
//
2605
// NOTE: This method MUST be run as a goroutine.
2606
func (p *Brontide) queueHandler() {
4✔
2607
        defer p.wg.Done()
4✔
2608

4✔
2609
        // priorityMsgs holds an in order list of messages deemed high-priority
4✔
2610
        // to be added to the sendQueue. This predominately includes messages
4✔
2611
        // from the funding manager and htlcswitch.
4✔
2612
        priorityMsgs := list.New()
4✔
2613

4✔
2614
        // lazyMsgs holds an in order list of messages deemed low-priority to be
4✔
2615
        // added to the sendQueue only after all high-priority messages have
4✔
2616
        // been queued. This predominately includes messages from the gossiper.
4✔
2617
        lazyMsgs := list.New()
4✔
2618

4✔
2619
        for {
16✔
2620
                // Examine the front of the priority queue, if it is empty check
12✔
2621
                // the low priority queue.
12✔
2622
                elem := priorityMsgs.Front()
12✔
2623
                if elem == nil {
21✔
2624
                        elem = lazyMsgs.Front()
9✔
2625
                }
9✔
2626

2627
                if elem != nil {
17✔
2628
                        front := elem.Value.(outgoingMsg)
5✔
2629

5✔
2630
                        // There's an element on the queue, try adding
5✔
2631
                        // it to the sendQueue. We also watch for
5✔
2632
                        // messages on the outgoingQueue, in case the
5✔
2633
                        // writeHandler cannot accept messages on the
5✔
2634
                        // sendQueue.
5✔
2635
                        select {
5✔
2636
                        case p.sendQueue <- front:
5✔
2637
                                if front.priority {
9✔
2638
                                        priorityMsgs.Remove(elem)
4✔
2639
                                } else {
6✔
2640
                                        lazyMsgs.Remove(elem)
2✔
2641
                                }
2✔
2642
                        case msg := <-p.outgoingQueue:
1✔
2643
                                if msg.priority {
2✔
2644
                                        priorityMsgs.PushBack(msg)
1✔
2645
                                } else {
2✔
2646
                                        lazyMsgs.PushBack(msg)
1✔
2647
                                }
1✔
2648
                        case <-p.quit:
×
2649
                                return
×
2650
                        }
2651
                } else {
8✔
2652
                        // If there weren't any messages to send to the
8✔
2653
                        // writeHandler, then we'll accept a new message
8✔
2654
                        // into the queue from outside sub-systems.
8✔
2655
                        select {
8✔
2656
                        case msg := <-p.outgoingQueue:
5✔
2657
                                if msg.priority {
9✔
2658
                                        priorityMsgs.PushBack(msg)
4✔
2659
                                } else {
6✔
2660
                                        lazyMsgs.PushBack(msg)
2✔
2661
                                }
2✔
2662
                        case <-p.quit:
1✔
2663
                                return
1✔
2664
                        }
2665
                }
2666
        }
2667
}
2668

2669
// PingTime returns the estimated ping time to the peer in microseconds.
2670
func (p *Brontide) PingTime() int64 {
1✔
2671
        return p.pingManager.GetPingTimeMicroSeconds()
1✔
2672
}
1✔
2673

2674
// queueMsg adds the lnwire.Message to the back of the high priority send queue.
2675
// If the errChan is non-nil, an error is sent back if the msg failed to queue
2676
// or failed to write, and nil otherwise.
2677
func (p *Brontide) queueMsg(msg lnwire.Message, errChan chan error) {
25✔
2678
        p.queue(true, msg, errChan)
25✔
2679
}
25✔
2680

2681
// queueMsgLazy adds the lnwire.Message to the back of the low priority send
2682
// queue. If the errChan is non-nil, an error is sent back if the msg failed to
2683
// queue or failed to write, and nil otherwise.
2684
func (p *Brontide) queueMsgLazy(msg lnwire.Message, errChan chan error) {
2✔
2685
        p.queue(false, msg, errChan)
2✔
2686
}
2✔
2687

2688
// queue sends a given message to the queueHandler using the passed priority. If
2689
// the errChan is non-nil, an error is sent back if the msg failed to queue or
2690
// failed to write, and nil otherwise.
2691
func (p *Brontide) queue(priority bool, msg lnwire.Message,
2692
        errChan chan error) {
26✔
2693

26✔
2694
        select {
26✔
2695
        case p.outgoingQueue <- outgoingMsg{priority, msg, errChan}:
26✔
2696
        case <-p.quit:
1✔
2697
                p.log.Tracef("Peer shutting down, could not enqueue msg: %v.",
1✔
2698
                        spew.Sdump(msg))
1✔
2699
                if errChan != nil {
1✔
2700
                        errChan <- lnpeer.ErrPeerExiting
×
2701
                }
×
2702
        }
2703
}
2704

2705
// ChannelSnapshots returns a slice of channel snapshots detailing all
2706
// currently active channels maintained with the remote peer.
2707
func (p *Brontide) ChannelSnapshots() []*channeldb.ChannelSnapshot {
1✔
2708
        snapshots := make(
1✔
2709
                []*channeldb.ChannelSnapshot, 0, p.activeChannels.Len(),
1✔
2710
        )
1✔
2711

1✔
2712
        p.activeChannels.ForEach(func(_ lnwire.ChannelID,
1✔
2713
                activeChan *lnwallet.LightningChannel) error {
2✔
2714

1✔
2715
                // If the activeChan is nil, then we skip it as the channel is
1✔
2716
                // pending.
1✔
2717
                if activeChan == nil {
2✔
2718
                        return nil
1✔
2719
                }
1✔
2720

2721
                // We'll only return a snapshot for channels that are
2722
                // *immediately* available for routing payments over.
2723
                if activeChan.RemoteNextRevocation() == nil {
2✔
2724
                        return nil
1✔
2725
                }
1✔
2726

2727
                snapshot := activeChan.StateSnapshot()
1✔
2728
                snapshots = append(snapshots, snapshot)
1✔
2729

1✔
2730
                return nil
1✔
2731
        })
2732

2733
        return snapshots
1✔
2734
}
2735

2736
// genDeliveryScript returns a new script to be used to send our funds to in
2737
// the case of a cooperative channel close negotiation.
2738
func (p *Brontide) genDeliveryScript() ([]byte, error) {
7✔
2739
        // We'll send a normal p2wkh address unless we've negotiated the
7✔
2740
        // shutdown-any-segwit feature.
7✔
2741
        addrType := lnwallet.WitnessPubKey
7✔
2742
        if p.taprootShutdownAllowed() {
8✔
2743
                addrType = lnwallet.TaprootPubkey
1✔
2744
        }
1✔
2745

2746
        deliveryAddr, err := p.cfg.Wallet.NewAddress(
7✔
2747
                addrType, false, lnwallet.DefaultAccountName,
7✔
2748
        )
7✔
2749
        if err != nil {
7✔
2750
                return nil, err
×
2751
        }
×
2752
        p.log.Infof("Delivery addr for channel close: %v",
7✔
2753
                deliveryAddr)
7✔
2754

7✔
2755
        return txscript.PayToAddrScript(deliveryAddr)
7✔
2756
}
2757

2758
// channelManager is goroutine dedicated to handling all requests/signals
2759
// pertaining to the opening, cooperative closing, and force closing of all
2760
// channels maintained with the remote peer.
2761
//
2762
// NOTE: This method MUST be run as a goroutine.
2763
func (p *Brontide) channelManager() {
18✔
2764
        defer p.wg.Done()
18✔
2765

18✔
2766
        // reenableTimeout will fire once after the configured channel status
18✔
2767
        // interval has elapsed. This will trigger us to sign new channel
18✔
2768
        // updates and broadcast them with the "disabled" flag unset.
18✔
2769
        reenableTimeout := time.After(p.cfg.ChanActiveTimeout)
18✔
2770

18✔
2771
out:
18✔
2772
        for {
58✔
2773
                select {
40✔
2774
                // A new pending channel has arrived which means we are about
2775
                // to complete a funding workflow and is waiting for the final
2776
                // `ChannelReady` messages to be exchanged. We will add this
2777
                // channel to the `activeChannels` with a nil value to indicate
2778
                // this is a pending channel.
2779
                case req := <-p.newPendingChannel:
2✔
2780
                        p.handleNewPendingChannel(req)
2✔
2781

2782
                // A new channel has arrived which means we've just completed a
2783
                // funding workflow. We'll initialize the necessary local
2784
                // state, and notify the htlc switch of a new link.
2785
                case req := <-p.newActiveChannel:
1✔
2786
                        p.handleNewActiveChannel(req)
1✔
2787

2788
                // The funding flow for a pending channel is failed, we will
2789
                // remove it from Brontide.
2790
                case req := <-p.removePendingChannel:
2✔
2791
                        p.handleRemovePendingChannel(req)
2✔
2792

2793
                // We've just received a local request to close an active
2794
                // channel. It will either kick of a cooperative channel
2795
                // closure negotiation, or be a notification of a breached
2796
                // contract that should be abandoned.
2797
                case req := <-p.localCloseChanReqs:
8✔
2798
                        p.handleLocalCloseReq(req)
8✔
2799

2800
                // We've received a link failure from a link that was added to
2801
                // the switch. This will initiate the teardown of the link, and
2802
                // initiate any on-chain closures if necessary.
2803
                case failure := <-p.linkFailures:
1✔
2804
                        p.handleLinkFailure(failure)
1✔
2805

2806
                // We've received a new cooperative channel closure related
2807
                // message from the remote peer, we'll use this message to
2808
                // advance the chan closer state machine.
2809
                case closeMsg := <-p.chanCloseMsgs:
14✔
2810
                        p.handleCloseMsg(closeMsg)
14✔
2811

2812
                // The channel reannounce delay has elapsed, broadcast the
2813
                // reenabled channel updates to the network. This should only
2814
                // fire once, so we set the reenableTimeout channel to nil to
2815
                // mark it for garbage collection. If the peer is torn down
2816
                // before firing, reenabling will not be attempted.
2817
                // TODO(conner): consolidate reenables timers inside chan status
2818
                // manager
2819
                case <-reenableTimeout:
1✔
2820
                        p.reenableActiveChannels()
1✔
2821

1✔
2822
                        // Since this channel will never fire again during the
1✔
2823
                        // lifecycle of the peer, we nil the channel to mark it
1✔
2824
                        // eligible for garbage collection, and make this
1✔
2825
                        // explicitly ineligible to receive in future calls to
1✔
2826
                        // select. This also shaves a few CPU cycles since the
1✔
2827
                        // select will ignore this case entirely.
1✔
2828
                        reenableTimeout = nil
1✔
2829

1✔
2830
                        // Once the reenabling is attempted, we also cancel the
1✔
2831
                        // channel event subscription to free up the overflow
1✔
2832
                        // queue used in channel notifier.
1✔
2833
                        //
1✔
2834
                        // NOTE: channelEventClient will be nil if the
1✔
2835
                        // reenableTimeout is greater than 1 minute.
1✔
2836
                        if p.channelEventClient != nil {
2✔
2837
                                p.channelEventClient.Cancel()
1✔
2838
                        }
1✔
2839

2840
                case <-p.quit:
2✔
2841
                        // As, we've been signalled to exit, we'll reset all
2✔
2842
                        // our active channel back to their default state.
2✔
2843
                        p.activeChannels.ForEach(func(_ lnwire.ChannelID,
2✔
2844
                                lc *lnwallet.LightningChannel) error {
4✔
2845

2✔
2846
                                // Exit if the channel is nil as it's a pending
2✔
2847
                                // channel.
2✔
2848
                                if lc == nil {
3✔
2849
                                        return nil
1✔
2850
                                }
1✔
2851

2852
                                lc.ResetState()
2✔
2853

2✔
2854
                                return nil
2✔
2855
                        })
2856

2857
                        break out
2✔
2858
                }
2859
        }
2860
}
2861

2862
// reenableActiveChannels searches the index of channels maintained with this
2863
// peer, and reenables each public, non-pending channel. This is done at the
2864
// gossip level by broadcasting a new ChannelUpdate with the disabled bit unset.
2865
// No message will be sent if the channel is already enabled.
2866
func (p *Brontide) reenableActiveChannels() {
1✔
2867
        // First, filter all known channels with this peer for ones that are
1✔
2868
        // both public and not pending.
1✔
2869
        activePublicChans := p.filterChannelsToEnable()
1✔
2870

1✔
2871
        // Create a map to hold channels that needs to be retried.
1✔
2872
        retryChans := make(map[wire.OutPoint]struct{}, len(activePublicChans))
1✔
2873

1✔
2874
        // For each of the public, non-pending channels, set the channel
1✔
2875
        // disabled bit to false and send out a new ChannelUpdate. If this
1✔
2876
        // channel is already active, the update won't be sent.
1✔
2877
        for _, chanPoint := range activePublicChans {
2✔
2878
                err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false)
1✔
2879

1✔
2880
                switch {
1✔
2881
                // No error occurred, continue to request the next channel.
2882
                case err == nil:
1✔
2883
                        continue
1✔
2884

2885
                // Cannot auto enable a manually disabled channel so we do
2886
                // nothing but proceed to the next channel.
2887
                case errors.Is(err, netann.ErrEnableManuallyDisabledChan):
1✔
2888
                        p.log.Debugf("Channel(%v) was manually disabled, "+
1✔
2889
                                "ignoring automatic enable request", chanPoint)
1✔
2890

1✔
2891
                        continue
1✔
2892

2893
                // If the channel is reported as inactive, we will give it
2894
                // another chance. When handling the request, ChanStatusManager
2895
                // will check whether the link is active or not. One of the
2896
                // conditions is whether the link has been marked as
2897
                // reestablished, which happens inside a goroutine(htlcManager)
2898
                // after the link is started. And we may get a false negative
2899
                // saying the link is not active because that goroutine hasn't
2900
                // reached the line to mark the reestablishment. Thus we give
2901
                // it a second chance to send the request.
2902
                case errors.Is(err, netann.ErrEnableInactiveChan):
×
2903
                        // If we don't have a client created, it means we
×
2904
                        // shouldn't retry enabling the channel.
×
2905
                        if p.channelEventClient == nil {
×
2906
                                p.log.Errorf("Channel(%v) request enabling "+
×
2907
                                        "failed due to inactive link",
×
2908
                                        chanPoint)
×
2909

×
2910
                                continue
×
2911
                        }
2912

2913
                        p.log.Warnf("Channel(%v) cannot be enabled as " +
×
2914
                                "ChanStatusManager reported inactive, retrying")
×
2915

×
2916
                        // Add the channel to the retry map.
×
2917
                        retryChans[chanPoint] = struct{}{}
×
2918
                }
2919
        }
2920

2921
        // Retry the channels if we have any.
2922
        if len(retryChans) != 0 {
1✔
2923
                p.retryRequestEnable(retryChans)
×
2924
        }
×
2925
}
2926

2927
// fetchActiveChanCloser attempts to fetch the active chan closer state machine
2928
// for the target channel ID. If the channel isn't active an error is returned.
2929
// Otherwise, either an existing state machine will be returned, or a new one
2930
// will be created.
2931
func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
2932
        *chancloser.ChanCloser, error) {
14✔
2933

14✔
2934
        chanCloser, found := p.activeChanCloses[chanID]
14✔
2935
        if found {
25✔
2936
                // An entry will only be found if the closer has already been
11✔
2937
                // created for a non-pending channel or for a channel that had
11✔
2938
                // previously started the shutdown process but the connection
11✔
2939
                // was restarted.
11✔
2940
                return chanCloser, nil
11✔
2941
        }
11✔
2942

2943
        // First, we'll ensure that we actually know of the target channel. If
2944
        // not, we'll ignore this message.
2945
        channel, ok := p.activeChannels.Load(chanID)
4✔
2946

4✔
2947
        // If the channel isn't in the map or the channel is nil, return
4✔
2948
        // ErrChannelNotFound as the channel is pending.
4✔
2949
        if !ok || channel == nil {
5✔
2950
                return nil, ErrChannelNotFound
1✔
2951
        }
1✔
2952

2953
        // We'll create a valid closing state machine in order to respond to
2954
        // the initiated cooperative channel closure. First, we set the
2955
        // delivery script that our funds will be paid out to. If an upfront
2956
        // shutdown script was set, we will use it. Otherwise, we get a fresh
2957
        // delivery script.
2958
        //
2959
        // TODO: Expose option to allow upfront shutdown script from watch-only
2960
        // accounts.
2961
        deliveryScript := channel.LocalUpfrontShutdownScript()
4✔
2962
        if len(deliveryScript) == 0 {
8✔
2963
                var err error
4✔
2964
                deliveryScript, err = p.genDeliveryScript()
4✔
2965
                if err != nil {
4✔
2966
                        p.log.Errorf("unable to gen delivery script: %v",
×
2967
                                err)
×
2968
                        return nil, fmt.Errorf("close addr unavailable")
×
2969
                }
×
2970
        }
2971

2972
        // In order to begin fee negotiations, we'll first compute our target
2973
        // ideal fee-per-kw.
2974
        feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
4✔
2975
                p.cfg.CoopCloseTargetConfs,
4✔
2976
        )
4✔
2977
        if err != nil {
4✔
2978
                p.log.Errorf("unable to query fee estimator: %v", err)
×
2979
                return nil, fmt.Errorf("unable to estimate fee")
×
2980
        }
×
2981

2982
        addr, err := p.addrWithInternalKey(deliveryScript)
4✔
2983
        if err != nil {
4✔
2984
                return nil, fmt.Errorf("unable to parse addr: %w", err)
×
2985
        }
×
2986
        chanCloser, err = p.createChanCloser(
4✔
2987
                channel, addr, feePerKw, nil, lntypes.Remote,
4✔
2988
        )
4✔
2989
        if err != nil {
4✔
2990
                p.log.Errorf("unable to create chan closer: %v", err)
×
2991
                return nil, fmt.Errorf("unable to create chan closer")
×
2992
        }
×
2993

2994
        p.activeChanCloses[chanID] = chanCloser
4✔
2995

4✔
2996
        return chanCloser, nil
4✔
2997
}
2998

2999
// filterChannelsToEnable filters a list of channels to be enabled upon start.
3000
// The filtered channels are active channels that's neither private nor
3001
// pending.
3002
func (p *Brontide) filterChannelsToEnable() []wire.OutPoint {
1✔
3003
        var activePublicChans []wire.OutPoint
1✔
3004

1✔
3005
        p.activeChannels.Range(func(chanID lnwire.ChannelID,
1✔
3006
                lnChan *lnwallet.LightningChannel) bool {
2✔
3007

1✔
3008
                // If the lnChan is nil, continue as this is a pending channel.
1✔
3009
                if lnChan == nil {
2✔
3010
                        return true
1✔
3011
                }
1✔
3012

3013
                dbChan := lnChan.State()
1✔
3014
                isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
1✔
3015
                if !isPublic || dbChan.IsPending {
1✔
3016
                        return true
×
3017
                }
×
3018

3019
                // We'll also skip any channels added during this peer's
3020
                // lifecycle since they haven't waited out the timeout. Their
3021
                // first announcement will be enabled, and the chan status
3022
                // manager will begin monitoring them passively since they exist
3023
                // in the database.
3024
                if _, ok := p.addedChannels.Load(chanID); ok {
1✔
3025
                        return true
×
3026
                }
×
3027

3028
                activePublicChans = append(
1✔
3029
                        activePublicChans, dbChan.FundingOutpoint,
1✔
3030
                )
1✔
3031

1✔
3032
                return true
1✔
3033
        })
3034

3035
        return activePublicChans
1✔
3036
}
3037

3038
// retryRequestEnable takes a map of channel outpoints and a channel event
3039
// client. It listens to the channel events and removes a channel from the map
3040
// if it's matched to the event. Upon receiving an active channel event, it
3041
// will send the enabling request again.
3042
func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}) {
×
3043
        p.log.Debugf("Retry enabling %v channels", len(activeChans))
×
3044

×
3045
        // retryEnable is a helper closure that sends an enable request and
×
3046
        // removes the channel from the map if it's matched.
×
3047
        retryEnable := func(chanPoint wire.OutPoint) error {
×
3048
                // If this is an active channel event, check whether it's in
×
3049
                // our targeted channels map.
×
3050
                _, found := activeChans[chanPoint]
×
3051

×
3052
                // If this channel is irrelevant, return nil so the loop can
×
3053
                // jump to next iteration.
×
3054
                if !found {
×
3055
                        return nil
×
3056
                }
×
3057

3058
                // Otherwise we've just received an active signal for a channel
3059
                // that's previously failed to be enabled, we send the request
3060
                // again.
3061
                //
3062
                // We only give the channel one more shot, so we delete it from
3063
                // our map first to keep it from being attempted again.
3064
                delete(activeChans, chanPoint)
×
3065

×
3066
                // Send the request.
×
3067
                err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false)
×
3068
                if err != nil {
×
3069
                        return fmt.Errorf("request enabling channel %v "+
×
3070
                                "failed: %w", chanPoint, err)
×
3071
                }
×
3072

3073
                return nil
×
3074
        }
3075

3076
        for {
×
3077
                // If activeChans is empty, we've done processing all the
×
3078
                // channels.
×
3079
                if len(activeChans) == 0 {
×
3080
                        p.log.Debug("Finished retry enabling channels")
×
3081
                        return
×
3082
                }
×
3083

3084
                select {
×
3085
                // A new event has been sent by the ChannelNotifier. We now
3086
                // check whether it's an active or inactive channel event.
3087
                case e := <-p.channelEventClient.Updates():
×
3088
                        // If this is an active channel event, try enable the
×
3089
                        // channel then jump to the next iteration.
×
3090
                        active, ok := e.(channelnotifier.ActiveChannelEvent)
×
3091
                        if ok {
×
3092
                                chanPoint := *active.ChannelPoint
×
3093

×
3094
                                // If we received an error for this particular
×
3095
                                // channel, we log an error and won't quit as
×
3096
                                // we still want to retry other channels.
×
3097
                                if err := retryEnable(chanPoint); err != nil {
×
3098
                                        p.log.Errorf("Retry failed: %v", err)
×
3099
                                }
×
3100

3101
                                continue
×
3102
                        }
3103

3104
                        // Otherwise check for inactive link event, and jump to
3105
                        // next iteration if it's not.
3106
                        inactive, ok := e.(channelnotifier.InactiveLinkEvent)
×
3107
                        if !ok {
×
3108
                                continue
×
3109
                        }
3110

3111
                        // Found an inactive link event, if this is our
3112
                        // targeted channel, remove it from our map.
3113
                        chanPoint := *inactive.ChannelPoint
×
3114
                        _, found := activeChans[chanPoint]
×
3115
                        if !found {
×
3116
                                continue
×
3117
                        }
3118

3119
                        delete(activeChans, chanPoint)
×
3120
                        p.log.Warnf("Re-enable channel %v failed, received "+
×
3121
                                "inactive link event", chanPoint)
×
3122

3123
                case <-p.quit:
×
3124
                        p.log.Debugf("Peer shutdown during retry enabling")
×
3125
                        return
×
3126
                }
3127
        }
3128
}
3129

3130
// chooseDeliveryScript takes two optionally set shutdown scripts and returns
3131
// a suitable script to close out to. This may be nil if neither script is
3132
// set. If both scripts are set, this function will error if they do not match.
3133
func chooseDeliveryScript(upfront,
3134
        requested lnwire.DeliveryAddress) (lnwire.DeliveryAddress, error) {
13✔
3135

13✔
3136
        // If no upfront shutdown script was provided, return the user
13✔
3137
        // requested address (which may be nil).
13✔
3138
        if len(upfront) == 0 {
20✔
3139
                return requested, nil
7✔
3140
        }
7✔
3141

3142
        // If an upfront shutdown script was provided, and the user did not
3143
        // request a custom shutdown script, return the upfront address.
3144
        if len(requested) == 0 {
10✔
3145
                return upfront, nil
3✔
3146
        }
3✔
3147

3148
        // If both an upfront shutdown script and a custom close script were
3149
        // provided, error if the user provided shutdown script does not match
3150
        // the upfront shutdown script (because closing out to a different
3151
        // script would violate upfront shutdown).
3152
        if !bytes.Equal(upfront, requested) {
6✔
3153
                return nil, chancloser.ErrUpfrontShutdownScriptMismatch
2✔
3154
        }
2✔
3155

3156
        // The user requested script matches the upfront shutdown script, so we
3157
        // can return it without error.
3158
        return upfront, nil
2✔
3159
}
3160

3161
// restartCoopClose checks whether we need to restart the cooperative close
3162
// process for a given channel.
3163
func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
3164
        *lnwire.Shutdown, error) {
×
3165

×
3166
        // If this channel has status ChanStatusCoopBroadcasted and does not
×
3167
        // have a closing transaction, then the cooperative close process was
×
3168
        // started but never finished. We'll re-create the chanCloser state
×
3169
        // machine and resend Shutdown. BOLT#2 requires that we retransmit
×
3170
        // Shutdown exactly, but doing so would mean persisting the RPC
×
3171
        // provided close script. Instead use the LocalUpfrontShutdownScript
×
3172
        // or generate a script.
×
3173
        c := lnChan.State()
×
3174
        _, err := c.BroadcastedCooperative()
×
3175
        if err != nil && err != channeldb.ErrNoCloseTx {
×
3176
                // An error other than ErrNoCloseTx was encountered.
×
3177
                return nil, err
×
3178
        } else if err == nil {
×
3179
                // This channel has already completed the coop close
×
3180
                // negotiation.
×
3181
                return nil, nil
×
3182
        }
×
3183

3184
        var deliveryScript []byte
×
3185

×
3186
        shutdownInfo, err := c.ShutdownInfo()
×
3187
        switch {
×
3188
        // We have previously stored the delivery script that we need to use
3189
        // in the shutdown message. Re-use this script.
3190
        case err == nil:
×
3191
                shutdownInfo.WhenSome(func(info channeldb.ShutdownInfo) {
×
3192
                        deliveryScript = info.DeliveryScript.Val
×
3193
                })
×
3194

3195
        // An error other than ErrNoShutdownInfo was returned
3196
        case !errors.Is(err, channeldb.ErrNoShutdownInfo):
×
3197
                return nil, err
×
3198

3199
        case errors.Is(err, channeldb.ErrNoShutdownInfo):
×
3200
                deliveryScript = c.LocalShutdownScript
×
3201
                if len(deliveryScript) == 0 {
×
3202
                        var err error
×
3203
                        deliveryScript, err = p.genDeliveryScript()
×
3204
                        if err != nil {
×
3205
                                p.log.Errorf("unable to gen delivery script: "+
×
3206
                                        "%v", err)
×
3207

×
3208
                                return nil, fmt.Errorf("close addr unavailable")
×
3209
                        }
×
3210
                }
3211
        }
3212

3213
        // Compute an ideal fee.
3214
        feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
×
3215
                p.cfg.CoopCloseTargetConfs,
×
3216
        )
×
3217
        if err != nil {
×
3218
                p.log.Errorf("unable to query fee estimator: %v", err)
×
3219
                return nil, fmt.Errorf("unable to estimate fee")
×
3220
        }
×
3221

3222
        // Determine whether we or the peer are the initiator of the coop
3223
        // close attempt by looking at the channel's status.
3224
        closingParty := lntypes.Remote
×
3225
        if c.HasChanStatus(channeldb.ChanStatusLocalCloseInitiator) {
×
3226
                closingParty = lntypes.Local
×
3227
        }
×
3228

3229
        addr, err := p.addrWithInternalKey(deliveryScript)
×
3230
        if err != nil {
×
3231
                return nil, fmt.Errorf("unable to parse addr: %w", err)
×
3232
        }
×
3233
        chanCloser, err := p.createChanCloser(
×
3234
                lnChan, addr, feePerKw, nil, closingParty,
×
3235
        )
×
3236
        if err != nil {
×
3237
                p.log.Errorf("unable to create chan closer: %v", err)
×
3238
                return nil, fmt.Errorf("unable to create chan closer")
×
3239
        }
×
3240

3241
        // This does not need a mutex even though it is in a different
3242
        // goroutine since this is done before the channelManager goroutine is
3243
        // created.
3244
        chanID := lnwire.NewChanIDFromOutPoint(c.FundingOutpoint)
×
3245
        p.activeChanCloses[chanID] = chanCloser
×
3246

×
3247
        // Create the Shutdown message.
×
3248
        shutdownMsg, err := chanCloser.ShutdownChan()
×
3249
        if err != nil {
×
3250
                p.log.Errorf("unable to create shutdown message: %v", err)
×
3251
                delete(p.activeChanCloses, chanID)
×
3252
                return nil, err
×
3253
        }
×
3254

3255
        return shutdownMsg, nil
×
3256
}
3257

3258
// createChanCloser constructs a ChanCloser from the passed parameters and is
3259
// used to de-duplicate code.
3260
func (p *Brontide) createChanCloser(channel *lnwallet.LightningChannel,
3261
        deliveryScript *chancloser.DeliveryAddrWithKey,
3262
        fee chainfee.SatPerKWeight, req *htlcswitch.ChanClose,
3263
        closer lntypes.ChannelParty) (*chancloser.ChanCloser, error) {
10✔
3264

10✔
3265
        _, startingHeight, err := p.cfg.ChainIO.GetBestBlock()
10✔
3266
        if err != nil {
10✔
3267
                p.log.Errorf("unable to obtain best block: %v", err)
×
3268
                return nil, fmt.Errorf("cannot obtain best block")
×
3269
        }
×
3270

3271
        // The req will only be set if we initiated the co-op closing flow.
3272
        var maxFee chainfee.SatPerKWeight
10✔
3273
        if req != nil {
17✔
3274
                maxFee = req.MaxFee
7✔
3275
        }
7✔
3276

3277
        chanCloser := chancloser.NewChanCloser(
10✔
3278
                chancloser.ChanCloseCfg{
10✔
3279
                        Channel:      channel,
10✔
3280
                        MusigSession: NewMusigChanCloser(channel),
10✔
3281
                        FeeEstimator: &chancloser.SimpleCoopFeeEstimator{},
10✔
3282
                        BroadcastTx:  p.cfg.Wallet.PublishTransaction,
10✔
3283
                        AuxCloser:    p.cfg.AuxChanCloser,
10✔
3284
                        DisableChannel: func(op wire.OutPoint) error {
20✔
3285
                                return p.cfg.ChanStatusMgr.RequestDisable(
10✔
3286
                                        op, false,
10✔
3287
                                )
10✔
3288
                        },
10✔
3289
                        MaxFee: maxFee,
3290
                        Disconnect: func() error {
×
3291
                                return p.cfg.DisconnectPeer(p.IdentityKey())
×
3292
                        },
×
3293
                        ChainParams: &p.cfg.Wallet.Cfg.NetParams,
3294
                        Quit:        p.quit,
3295
                },
3296
                *deliveryScript,
3297
                fee,
3298
                uint32(startingHeight),
3299
                req,
3300
                closer,
3301
        )
3302

3303
        return chanCloser, nil
10✔
3304
}
3305

3306
// handleLocalCloseReq kicks-off the workflow to execute a cooperative or
3307
// forced unilateral closure of the channel initiated by a local subsystem.
3308
func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
8✔
3309
        chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
8✔
3310

8✔
3311
        channel, ok := p.activeChannels.Load(chanID)
8✔
3312

8✔
3313
        // Though this function can't be called for pending channels, we still
8✔
3314
        // check whether channel is nil for safety.
8✔
3315
        if !ok || channel == nil {
8✔
3316
                err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+
×
3317
                        "unknown", chanID)
×
3318
                p.log.Errorf(err.Error())
×
3319
                req.Err <- err
×
3320
                return
×
3321
        }
×
3322

3323
        switch req.CloseType {
8✔
3324
        // A type of CloseRegular indicates that the user has opted to close
3325
        // out this channel on-chain, so we execute the cooperative channel
3326
        // closure workflow.
3327
        case contractcourt.CloseRegular:
8✔
3328
                // First, we'll choose a delivery address that we'll use to send the
8✔
3329
                // funds to in the case of a successful negotiation.
8✔
3330

8✔
3331
                // An upfront shutdown and user provided script are both optional,
8✔
3332
                // but must be equal if both set  (because we cannot serve a request
8✔
3333
                // to close out to a script which violates upfront shutdown). Get the
8✔
3334
                // appropriate address to close out to (which may be nil if neither
8✔
3335
                // are set) and error if they are both set and do not match.
8✔
3336
                deliveryScript, err := chooseDeliveryScript(
8✔
3337
                        channel.LocalUpfrontShutdownScript(), req.DeliveryScript,
8✔
3338
                )
8✔
3339
                if err != nil {
9✔
3340
                        p.log.Errorf("cannot close channel %v: %v", req.ChanPoint, err)
1✔
3341
                        req.Err <- err
1✔
3342
                        return
1✔
3343
                }
1✔
3344

3345
                // If neither an upfront address or a user set address was
3346
                // provided, generate a fresh script.
3347
                if len(deliveryScript) == 0 {
11✔
3348
                        deliveryScript, err = p.genDeliveryScript()
4✔
3349
                        if err != nil {
4✔
3350
                                p.log.Errorf(err.Error())
×
3351
                                req.Err <- err
×
3352
                                return
×
3353
                        }
×
3354
                }
3355
                addr, err := p.addrWithInternalKey(deliveryScript)
7✔
3356
                if err != nil {
7✔
3357
                        err = fmt.Errorf("unable to parse addr for channel "+
×
3358
                                "%v: %w", req.ChanPoint, err)
×
3359
                        p.log.Errorf(err.Error())
×
3360
                        req.Err <- err
×
3361

×
3362
                        return
×
3363
                }
×
3364
                chanCloser, err := p.createChanCloser(
7✔
3365
                        channel, addr, req.TargetFeePerKw, req, lntypes.Local,
7✔
3366
                )
7✔
3367
                if err != nil {
7✔
3368
                        p.log.Errorf(err.Error())
×
3369
                        req.Err <- err
×
3370
                        return
×
3371
                }
×
3372

3373
                p.activeChanCloses[chanID] = chanCloser
7✔
3374

7✔
3375
                // Finally, we'll initiate the channel shutdown within the
7✔
3376
                // chanCloser, and send the shutdown message to the remote
7✔
3377
                // party to kick things off.
7✔
3378
                shutdownMsg, err := chanCloser.ShutdownChan()
7✔
3379
                if err != nil {
7✔
3380
                        p.log.Errorf(err.Error())
×
3381
                        req.Err <- err
×
3382
                        delete(p.activeChanCloses, chanID)
×
3383

×
3384
                        // As we were unable to shutdown the channel, we'll
×
3385
                        // return it back to its normal state.
×
3386
                        channel.ResetState()
×
3387
                        return
×
3388
                }
×
3389

3390
                link := p.fetchLinkFromKeyAndCid(chanID)
7✔
3391
                if link == nil {
7✔
3392
                        // If the link is nil then it means it was already
×
3393
                        // removed from the switch or it never existed in the
×
3394
                        // first place. The latter case is handled at the
×
3395
                        // beginning of this function, so in the case where it
×
3396
                        // has already been removed, we can skip adding the
×
3397
                        // commit hook to queue a Shutdown message.
×
3398
                        p.log.Warnf("link not found during attempted closure: "+
×
3399
                                "%v", chanID)
×
3400
                        return
×
3401
                }
×
3402

3403
                if !link.DisableAdds(htlcswitch.Outgoing) {
7✔
3404
                        p.log.Warnf("Outgoing link adds already "+
×
3405
                                "disabled: %v", link.ChanID())
×
3406
                }
×
3407

3408
                link.OnCommitOnce(htlcswitch.Outgoing, func() {
14✔
3409
                        p.queueMsg(shutdownMsg, nil)
7✔
3410
                })
7✔
3411

3412
        // A type of CloseBreach indicates that the counterparty has breached
3413
        // the channel therefore we need to clean up our local state.
3414
        case contractcourt.CloseBreach:
×
3415
                // TODO(roasbeef): no longer need with newer beach logic?
×
3416
                p.log.Infof("ChannelPoint(%v) has been breached, wiping "+
×
3417
                        "channel", req.ChanPoint)
×
3418
                p.WipeChannel(req.ChanPoint)
×
3419
        }
3420
}
3421

3422
// linkFailureReport is sent to the channelManager whenever a link reports a
3423
// link failure, and is forced to exit. The report houses the necessary
3424
// information to clean up the channel state, send back the error message, and
3425
// force close if necessary.
3426
type linkFailureReport struct {
3427
        chanPoint   wire.OutPoint
3428
        chanID      lnwire.ChannelID
3429
        shortChanID lnwire.ShortChannelID
3430
        linkErr     htlcswitch.LinkFailureError
3431
}
3432

3433
// handleLinkFailure processes a link failure report when a link in the switch
3434
// fails. It facilitates the removal of all channel state within the peer,
3435
// force closing the channel depending on severity, and sending the error
3436
// message back to the remote party.
3437
func (p *Brontide) handleLinkFailure(failure linkFailureReport) {
1✔
3438
        // Retrieve the channel from the map of active channels. We do this to
1✔
3439
        // have access to it even after WipeChannel remove it from the map.
1✔
3440
        chanID := lnwire.NewChanIDFromOutPoint(failure.chanPoint)
1✔
3441
        lnChan, _ := p.activeChannels.Load(chanID)
1✔
3442

1✔
3443
        // We begin by wiping the link, which will remove it from the switch,
1✔
3444
        // such that it won't be attempted used for any more updates.
1✔
3445
        //
1✔
3446
        // TODO(halseth): should introduce a way to atomically stop/pause the
1✔
3447
        // link and cancel back any adds in its mailboxes such that we can
1✔
3448
        // safely force close without the link being added again and updates
1✔
3449
        // being applied.
1✔
3450
        p.WipeChannel(&failure.chanPoint)
1✔
3451

1✔
3452
        // If the error encountered was severe enough, we'll now force close
1✔
3453
        // the channel to prevent reading it to the switch in the future.
1✔
3454
        if failure.linkErr.FailureAction == htlcswitch.LinkFailureForceClose {
2✔
3455
                p.log.Warnf("Force closing link(%v)", failure.shortChanID)
1✔
3456

1✔
3457
                closeTx, err := p.cfg.ChainArb.ForceCloseContract(
1✔
3458
                        failure.chanPoint,
1✔
3459
                )
1✔
3460
                if err != nil {
2✔
3461
                        p.log.Errorf("unable to force close "+
1✔
3462
                                "link(%v): %v", failure.shortChanID, err)
1✔
3463
                } else {
2✔
3464
                        p.log.Infof("channel(%v) force "+
1✔
3465
                                "closed with txid %v",
1✔
3466
                                failure.shortChanID, closeTx.TxHash())
1✔
3467
                }
1✔
3468
        }
3469

3470
        // If this is a permanent failure, we will mark the channel borked.
3471
        if failure.linkErr.PermanentFailure && lnChan != nil {
1✔
3472
                p.log.Warnf("Marking link(%v) borked due to permanent "+
×
3473
                        "failure", failure.shortChanID)
×
3474

×
3475
                if err := lnChan.State().MarkBorked(); err != nil {
×
3476
                        p.log.Errorf("Unable to mark channel %v borked: %v",
×
3477
                                failure.shortChanID, err)
×
3478
                }
×
3479
        }
3480

3481
        // Send an error to the peer, why we failed the channel.
3482
        if failure.linkErr.ShouldSendToPeer() {
2✔
3483
                // If SendData is set, send it to the peer. If not, we'll use
1✔
3484
                // the standard error messages in the payload. We only include
1✔
3485
                // sendData in the cases where the error data does not contain
1✔
3486
                // sensitive information.
1✔
3487
                data := []byte(failure.linkErr.Error())
1✔
3488
                if failure.linkErr.SendData != nil {
1✔
3489
                        data = failure.linkErr.SendData
×
3490
                }
×
3491

3492
                var networkMsg lnwire.Message
1✔
3493
                if failure.linkErr.Warning {
1✔
3494
                        networkMsg = &lnwire.Warning{
×
3495
                                ChanID: failure.chanID,
×
3496
                                Data:   data,
×
3497
                        }
×
3498
                } else {
1✔
3499
                        networkMsg = &lnwire.Error{
1✔
3500
                                ChanID: failure.chanID,
1✔
3501
                                Data:   data,
1✔
3502
                        }
1✔
3503
                }
1✔
3504

3505
                err := p.SendMessage(true, networkMsg)
1✔
3506
                if err != nil {
1✔
3507
                        p.log.Errorf("unable to send msg to "+
×
3508
                                "remote peer: %v", err)
×
3509
                }
×
3510
        }
3511

3512
        // If the failure action is disconnect, then we'll execute that now. If
3513
        // we had to send an error above, it was a sync call, so we expect the
3514
        // message to be flushed on the wire by now.
3515
        if failure.linkErr.FailureAction == htlcswitch.LinkFailureDisconnect {
1✔
3516
                p.Disconnect(fmt.Errorf("link requested disconnect"))
×
3517
        }
×
3518
}
3519

3520
// fetchLinkFromKeyAndCid fetches a link from the switch via the remote's
3521
// public key and the channel id.
3522
func (p *Brontide) fetchLinkFromKeyAndCid(
3523
        cid lnwire.ChannelID) htlcswitch.ChannelUpdateHandler {
20✔
3524

20✔
3525
        var chanLink htlcswitch.ChannelUpdateHandler
20✔
3526

20✔
3527
        // We don't need to check the error here, and can instead just loop
20✔
3528
        // over the slice and return nil.
20✔
3529
        links, _ := p.cfg.Switch.GetLinksByInterface(p.cfg.PubKeyBytes)
20✔
3530
        for _, link := range links {
39✔
3531
                if link.ChanID() == cid {
38✔
3532
                        chanLink = link
19✔
3533
                        break
19✔
3534
                }
3535
        }
3536

3537
        return chanLink
20✔
3538
}
3539

3540
// finalizeChanClosure performs the final clean up steps once the cooperative
3541
// closure transaction has been fully broadcast. The finalized closing state
3542
// machine should be passed in. Once the transaction has been sufficiently
3543
// confirmed, the channel will be marked as fully closed within the database,
3544
// and any clients will be notified of updates to the closing state.
3545
func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) {
5✔
3546
        closeReq := chanCloser.CloseRequest()
5✔
3547

5✔
3548
        // First, we'll clear all indexes related to the channel in question.
5✔
3549
        chanPoint := chanCloser.Channel().ChannelPoint()
5✔
3550
        p.WipeChannel(&chanPoint)
5✔
3551

5✔
3552
        // Also clear the activeChanCloses map of this channel.
5✔
3553
        cid := lnwire.NewChanIDFromOutPoint(chanPoint)
5✔
3554
        delete(p.activeChanCloses, cid)
5✔
3555

5✔
3556
        // Next, we'll launch a goroutine which will request to be notified by
5✔
3557
        // the ChainNotifier once the closure transaction obtains a single
5✔
3558
        // confirmation.
5✔
3559
        notifier := p.cfg.ChainNotifier
5✔
3560

5✔
3561
        // If any error happens during waitForChanToClose, forward it to
5✔
3562
        // closeReq. If this channel closure is not locally initiated, closeReq
5✔
3563
        // will be nil, so just ignore the error.
5✔
3564
        errChan := make(chan error, 1)
5✔
3565
        if closeReq != nil {
8✔
3566
                errChan = closeReq.Err
3✔
3567
        }
3✔
3568

3569
        closingTx, err := chanCloser.ClosingTx()
5✔
3570
        if err != nil {
5✔
3571
                if closeReq != nil {
×
3572
                        p.log.Error(err)
×
3573
                        closeReq.Err <- err
×
3574
                }
×
3575
        }
3576

3577
        closingTxid := closingTx.TxHash()
5✔
3578

5✔
3579
        // If this is a locally requested shutdown, update the caller with a
5✔
3580
        // new event detailing the current pending state of this request.
5✔
3581
        if closeReq != nil {
8✔
3582
                closeReq.Updates <- &PendingUpdate{
3✔
3583
                        Txid: closingTxid[:],
3✔
3584
                }
3✔
3585
        }
3✔
3586

3587
        localOut := chanCloser.LocalCloseOutput()
5✔
3588
        remoteOut := chanCloser.RemoteCloseOutput()
5✔
3589
        auxOut := chanCloser.AuxOutputs()
5✔
3590
        go WaitForChanToClose(
5✔
3591
                chanCloser.NegotiationHeight(), notifier, errChan,
5✔
3592
                &chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() {
10✔
3593
                        // Respond to the local subsystem which requested the
5✔
3594
                        // channel closure.
5✔
3595
                        if closeReq != nil {
8✔
3596
                                closeReq.Updates <- &ChannelCloseUpdate{
3✔
3597
                                        ClosingTxid:       closingTxid[:],
3✔
3598
                                        Success:           true,
3✔
3599
                                        LocalCloseOutput:  localOut,
3✔
3600
                                        RemoteCloseOutput: remoteOut,
3✔
3601
                                        AuxOutputs:        auxOut,
3✔
3602
                                }
3✔
3603
                        }
3✔
3604
                },
3605
        )
3606
}
3607

3608
// WaitForChanToClose uses the passed notifier to wait until the channel has
3609
// been detected as closed on chain and then concludes by executing the
3610
// following actions: the channel point will be sent over the settleChan, and
3611
// finally the callback will be executed. If any error is encountered within
3612
// the function, then it will be sent over the errChan.
3613
func WaitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier,
3614
        errChan chan error, chanPoint *wire.OutPoint,
3615
        closingTxID *chainhash.Hash, closeScript []byte, cb func()) {
5✔
3616

5✔
3617
        peerLog.Infof("Waiting for confirmation of close of ChannelPoint(%v) "+
5✔
3618
                "with txid: %v", chanPoint, closingTxID)
5✔
3619

5✔
3620
        // TODO(roasbeef): add param for num needed confs
5✔
3621
        confNtfn, err := notifier.RegisterConfirmationsNtfn(
5✔
3622
                closingTxID, closeScript, 1, bestHeight,
5✔
3623
        )
5✔
3624
        if err != nil {
5✔
3625
                if errChan != nil {
×
3626
                        errChan <- err
×
3627
                }
×
3628
                return
×
3629
        }
3630

3631
        // In the case that the ChainNotifier is shutting down, all subscriber
3632
        // notification channels will be closed, generating a nil receive.
3633
        height, ok := <-confNtfn.Confirmed
5✔
3634
        if !ok {
6✔
3635
                return
1✔
3636
        }
1✔
3637

3638
        // The channel has been closed, remove it from any active indexes, and
3639
        // the database state.
3640
        peerLog.Infof("ChannelPoint(%v) is now closed at "+
5✔
3641
                "height %v", chanPoint, height.BlockHeight)
5✔
3642

5✔
3643
        // Finally, execute the closure call back to mark the confirmation of
5✔
3644
        // the transaction closing the contract.
5✔
3645
        cb()
5✔
3646
}
3647

3648
// WipeChannel removes the passed channel point from all indexes associated with
3649
// the peer and the switch.
3650
func (p *Brontide) WipeChannel(chanPoint *wire.OutPoint) {
5✔
3651
        chanID := lnwire.NewChanIDFromOutPoint(*chanPoint)
5✔
3652

5✔
3653
        p.activeChannels.Delete(chanID)
5✔
3654

5✔
3655
        // Instruct the HtlcSwitch to close this link as the channel is no
5✔
3656
        // longer active.
5✔
3657
        p.cfg.Switch.RemoveLink(chanID)
5✔
3658
}
5✔
3659

3660
// handleInitMsg handles the incoming init message which contains global and
3661
// local feature vectors. If feature vectors are incompatible then disconnect.
3662
func (p *Brontide) handleInitMsg(msg *lnwire.Init) error {
4✔
3663
        // First, merge any features from the legacy global features field into
4✔
3664
        // those presented in the local features fields.
4✔
3665
        err := msg.Features.Merge(msg.GlobalFeatures)
4✔
3666
        if err != nil {
4✔
3667
                return fmt.Errorf("unable to merge legacy global features: %w",
×
3668
                        err)
×
3669
        }
×
3670

3671
        // Then, finalize the remote feature vector providing the flattened
3672
        // feature bit namespace.
3673
        p.remoteFeatures = lnwire.NewFeatureVector(
4✔
3674
                msg.Features, lnwire.Features,
4✔
3675
        )
4✔
3676

4✔
3677
        // Now that we have their features loaded, we'll ensure that they
4✔
3678
        // didn't set any required bits that we don't know of.
4✔
3679
        err = feature.ValidateRequired(p.remoteFeatures)
4✔
3680
        if err != nil {
4✔
3681
                return fmt.Errorf("invalid remote features: %w", err)
×
3682
        }
×
3683

3684
        // Ensure the remote party's feature vector contains all transitive
3685
        // dependencies. We know ours are correct since they are validated
3686
        // during the feature manager's instantiation.
3687
        err = feature.ValidateDeps(p.remoteFeatures)
4✔
3688
        if err != nil {
4✔
3689
                return fmt.Errorf("invalid remote features: %w", err)
×
3690
        }
×
3691

3692
        // Now that we know we understand their requirements, we'll check to
3693
        // see if they don't support anything that we deem to be mandatory.
3694
        if !p.remoteFeatures.HasFeature(lnwire.DataLossProtectRequired) {
4✔
3695
                return fmt.Errorf("data loss protection required")
×
3696
        }
×
3697

3698
        return nil
4✔
3699
}
3700

3701
// LocalFeatures returns the set of global features that has been advertised by
3702
// the local node. This allows sub-systems that use this interface to gate their
3703
// behavior off the set of negotiated feature bits.
3704
//
3705
// NOTE: Part of the lnpeer.Peer interface.
3706
func (p *Brontide) LocalFeatures() *lnwire.FeatureVector {
1✔
3707
        return p.cfg.Features
1✔
3708
}
1✔
3709

3710
// RemoteFeatures returns the set of global features that has been advertised by
3711
// the remote node. This allows sub-systems that use this interface to gate
3712
// their behavior off the set of negotiated feature bits.
3713
//
3714
// NOTE: Part of the lnpeer.Peer interface.
3715
func (p *Brontide) RemoteFeatures() *lnwire.FeatureVector {
7✔
3716
        return p.remoteFeatures
7✔
3717
}
7✔
3718

3719
// hasNegotiatedScidAlias returns true if we've negotiated the
3720
// option-scid-alias feature bit with the peer.
3721
func (p *Brontide) hasNegotiatedScidAlias() bool {
4✔
3722
        peerHas := p.remoteFeatures.HasFeature(lnwire.ScidAliasOptional)
4✔
3723
        localHas := p.cfg.Features.HasFeature(lnwire.ScidAliasOptional)
4✔
3724
        return peerHas && localHas
4✔
3725
}
4✔
3726

3727
// sendInitMsg sends the Init message to the remote peer. This message contains
3728
// our currently supported local and global features.
3729
func (p *Brontide) sendInitMsg(legacyChan bool) error {
8✔
3730
        features := p.cfg.Features.Clone()
8✔
3731
        legacyFeatures := p.cfg.LegacyFeatures.Clone()
8✔
3732

8✔
3733
        // If we have a legacy channel open with a peer, we downgrade static
8✔
3734
        // remote required to optional in case the peer does not understand the
8✔
3735
        // required feature bit. If we do not do this, the peer will reject our
8✔
3736
        // connection because it does not understand a required feature bit, and
8✔
3737
        // our channel will be unusable.
8✔
3738
        if legacyChan && features.RequiresFeature(lnwire.StaticRemoteKeyRequired) {
9✔
3739
                p.log.Infof("Legacy channel open with peer, " +
1✔
3740
                        "downgrading static remote required feature bit to " +
1✔
3741
                        "optional")
1✔
3742

1✔
3743
                // Unset and set in both the local and global features to
1✔
3744
                // ensure both sets are consistent and merge able by old and
1✔
3745
                // new nodes.
1✔
3746
                features.Unset(lnwire.StaticRemoteKeyRequired)
1✔
3747
                legacyFeatures.Unset(lnwire.StaticRemoteKeyRequired)
1✔
3748

1✔
3749
                features.Set(lnwire.StaticRemoteKeyOptional)
1✔
3750
                legacyFeatures.Set(lnwire.StaticRemoteKeyOptional)
1✔
3751
        }
1✔
3752

3753
        msg := lnwire.NewInitMessage(
8✔
3754
                legacyFeatures.RawFeatureVector,
8✔
3755
                features.RawFeatureVector,
8✔
3756
        )
8✔
3757

8✔
3758
        return p.writeMessage(msg)
8✔
3759
}
3760

3761
// resendChanSyncMsg will attempt to find a channel sync message for the closed
3762
// channel and resend it to our peer.
3763
func (p *Brontide) resendChanSyncMsg(cid lnwire.ChannelID) error {
1✔
3764
        // If we already re-sent the mssage for this channel, we won't do it
1✔
3765
        // again.
1✔
3766
        if _, ok := p.resentChanSyncMsg[cid]; ok {
1✔
3767
                return nil
×
3768
        }
×
3769

3770
        // Check if we have any channel sync messages stored for this channel.
3771
        c, err := p.cfg.ChannelDB.FetchClosedChannelForID(cid)
1✔
3772
        if err != nil {
2✔
3773
                return fmt.Errorf("unable to fetch channel sync messages for "+
1✔
3774
                        "peer %v: %v", p, err)
1✔
3775
        }
1✔
3776

3777
        if c.LastChanSyncMsg == nil {
1✔
3778
                return fmt.Errorf("no chan sync message stored for channel %v",
×
3779
                        cid)
×
3780
        }
×
3781

3782
        if !c.RemotePub.IsEqual(p.IdentityKey()) {
1✔
3783
                return fmt.Errorf("ignoring channel reestablish from "+
×
3784
                        "peer=%x", p.IdentityKey().SerializeCompressed())
×
3785
        }
×
3786

3787
        p.log.Debugf("Re-sending channel sync message for channel %v to "+
1✔
3788
                "peer", cid)
1✔
3789

1✔
3790
        if err := p.SendMessage(true, c.LastChanSyncMsg); err != nil {
1✔
3791
                return fmt.Errorf("failed resending channel sync "+
×
3792
                        "message to peer %v: %v", p, err)
×
3793
        }
×
3794

3795
        p.log.Debugf("Re-sent channel sync message for channel %v to peer ",
1✔
3796
                cid)
1✔
3797

1✔
3798
        // Note down that we sent the message, so we won't resend it again for
1✔
3799
        // this connection.
1✔
3800
        p.resentChanSyncMsg[cid] = struct{}{}
1✔
3801

1✔
3802
        return nil
1✔
3803
}
3804

3805
// SendMessage sends a variadic number of high-priority messages to the remote
3806
// peer. The first argument denotes if the method should block until the
3807
// messages have been sent to the remote peer or an error is returned,
3808
// otherwise it returns immediately after queuing.
3809
//
3810
// NOTE: Part of the lnpeer.Peer interface.
3811
func (p *Brontide) SendMessage(sync bool, msgs ...lnwire.Message) error {
4✔
3812
        return p.sendMessage(sync, true, msgs...)
4✔
3813
}
4✔
3814

3815
// SendMessageLazy sends a variadic number of low-priority messages to the
3816
// remote peer. The first argument denotes if the method should block until
3817
// the messages have been sent to the remote peer or an error is returned,
3818
// otherwise it returns immediately after queueing.
3819
//
3820
// NOTE: Part of the lnpeer.Peer interface.
3821
func (p *Brontide) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
2✔
3822
        return p.sendMessage(sync, false, msgs...)
2✔
3823
}
2✔
3824

3825
// sendMessage queues a variadic number of messages using the passed priority
3826
// to the remote peer. If sync is true, this method will block until the
3827
// messages have been sent to the remote peer or an error is returned, otherwise
3828
// it returns immediately after queueing.
3829
func (p *Brontide) sendMessage(sync, priority bool, msgs ...lnwire.Message) error {
5✔
3830
        // Add all incoming messages to the outgoing queue. A list of error
5✔
3831
        // chans is populated for each message if the caller requested a sync
5✔
3832
        // send.
5✔
3833
        var errChans []chan error
5✔
3834
        if sync {
7✔
3835
                errChans = make([]chan error, 0, len(msgs))
2✔
3836
        }
2✔
3837
        for _, msg := range msgs {
10✔
3838
                // If a sync send was requested, create an error chan to listen
5✔
3839
                // for an ack from the writeHandler.
5✔
3840
                var errChan chan error
5✔
3841
                if sync {
7✔
3842
                        errChan = make(chan error, 1)
2✔
3843
                        errChans = append(errChans, errChan)
2✔
3844
                }
2✔
3845

3846
                if priority {
9✔
3847
                        p.queueMsg(msg, errChan)
4✔
3848
                } else {
6✔
3849
                        p.queueMsgLazy(msg, errChan)
2✔
3850
                }
2✔
3851
        }
3852

3853
        // Wait for all replies from the writeHandler. For async sends, this
3854
        // will be a NOP as the list of error chans is nil.
3855
        for _, errChan := range errChans {
7✔
3856
                select {
2✔
3857
                case err := <-errChan:
2✔
3858
                        return err
2✔
3859
                case <-p.quit:
×
3860
                        return lnpeer.ErrPeerExiting
×
3861
                case <-p.cfg.Quit:
×
3862
                        return lnpeer.ErrPeerExiting
×
3863
                }
3864
        }
3865

3866
        return nil
4✔
3867
}
3868

3869
// PubKey returns the pubkey of the peer in compressed serialized format.
3870
//
3871
// NOTE: Part of the lnpeer.Peer interface.
3872
func (p *Brontide) PubKey() [33]byte {
3✔
3873
        return p.cfg.PubKeyBytes
3✔
3874
}
3✔
3875

3876
// IdentityKey returns the public key of the remote peer.
3877
//
3878
// NOTE: Part of the lnpeer.Peer interface.
3879
func (p *Brontide) IdentityKey() *btcec.PublicKey {
16✔
3880
        return p.cfg.Addr.IdentityKey
16✔
3881
}
16✔
3882

3883
// Address returns the network address of the remote peer.
3884
//
3885
// NOTE: Part of the lnpeer.Peer interface.
3886
func (p *Brontide) Address() net.Addr {
1✔
3887
        return p.cfg.Addr.Address
1✔
3888
}
1✔
3889

3890
// AddNewChannel adds a new channel to the peer. The channel should fail to be
3891
// added if the cancel channel is closed.
3892
//
3893
// NOTE: Part of the lnpeer.Peer interface.
3894
func (p *Brontide) AddNewChannel(newChan *lnpeer.NewChannel,
3895
        cancel <-chan struct{}) error {
1✔
3896

1✔
3897
        errChan := make(chan error, 1)
1✔
3898
        newChanMsg := &newChannelMsg{
1✔
3899
                channel: newChan,
1✔
3900
                err:     errChan,
1✔
3901
        }
1✔
3902

1✔
3903
        select {
1✔
3904
        case p.newActiveChannel <- newChanMsg:
1✔
3905
        case <-cancel:
×
3906
                return errors.New("canceled adding new channel")
×
3907
        case <-p.quit:
×
3908
                return lnpeer.ErrPeerExiting
×
3909
        }
3910

3911
        // We pause here to wait for the peer to recognize the new channel
3912
        // before we close the channel barrier corresponding to the channel.
3913
        select {
1✔
3914
        case err := <-errChan:
1✔
3915
                return err
1✔
3916
        case <-p.quit:
×
3917
                return lnpeer.ErrPeerExiting
×
3918
        }
3919
}
3920

3921
// AddPendingChannel adds a pending open channel to the peer. The channel
3922
// should fail to be added if the cancel channel is closed.
3923
//
3924
// NOTE: Part of the lnpeer.Peer interface.
3925
func (p *Brontide) AddPendingChannel(cid lnwire.ChannelID,
3926
        cancel <-chan struct{}) error {
1✔
3927

1✔
3928
        errChan := make(chan error, 1)
1✔
3929
        newChanMsg := &newChannelMsg{
1✔
3930
                channelID: cid,
1✔
3931
                err:       errChan,
1✔
3932
        }
1✔
3933

1✔
3934
        select {
1✔
3935
        case p.newPendingChannel <- newChanMsg:
1✔
3936

3937
        case <-cancel:
×
3938
                return errors.New("canceled adding pending channel")
×
3939

3940
        case <-p.quit:
×
3941
                return lnpeer.ErrPeerExiting
×
3942
        }
3943

3944
        // We pause here to wait for the peer to recognize the new pending
3945
        // channel before we close the channel barrier corresponding to the
3946
        // channel.
3947
        select {
1✔
3948
        case err := <-errChan:
1✔
3949
                return err
1✔
3950

3951
        case <-cancel:
×
3952
                return errors.New("canceled adding pending channel")
×
3953

3954
        case <-p.quit:
×
3955
                return lnpeer.ErrPeerExiting
×
3956
        }
3957
}
3958

3959
// RemovePendingChannel removes a pending open channel from the peer.
3960
//
3961
// NOTE: Part of the lnpeer.Peer interface.
3962
func (p *Brontide) RemovePendingChannel(cid lnwire.ChannelID) error {
1✔
3963
        errChan := make(chan error, 1)
1✔
3964
        newChanMsg := &newChannelMsg{
1✔
3965
                channelID: cid,
1✔
3966
                err:       errChan,
1✔
3967
        }
1✔
3968

1✔
3969
        select {
1✔
3970
        case p.removePendingChannel <- newChanMsg:
1✔
3971
        case <-p.quit:
×
3972
                return lnpeer.ErrPeerExiting
×
3973
        }
3974

3975
        // We pause here to wait for the peer to respond to the cancellation of
3976
        // the pending channel before we close the channel barrier
3977
        // corresponding to the channel.
3978
        select {
1✔
3979
        case err := <-errChan:
1✔
3980
                return err
1✔
3981

3982
        case <-p.quit:
×
3983
                return lnpeer.ErrPeerExiting
×
3984
        }
3985
}
3986

3987
// StartTime returns the time at which the connection was established if the
3988
// peer started successfully, and zero otherwise.
3989
func (p *Brontide) StartTime() time.Time {
1✔
3990
        return p.startTime
1✔
3991
}
1✔
3992

3993
// handleCloseMsg is called when a new cooperative channel closure related
3994
// message is received from the remote peer. We'll use this message to advance
3995
// the chan closer state machine.
3996
func (p *Brontide) handleCloseMsg(msg *closeMsg) {
14✔
3997
        link := p.fetchLinkFromKeyAndCid(msg.cid)
14✔
3998

14✔
3999
        // We'll now fetch the matching closing state machine in order to continue,
14✔
4000
        // or finalize the channel closure process.
14✔
4001
        chanCloser, err := p.fetchActiveChanCloser(msg.cid)
14✔
4002
        if err != nil {
15✔
4003
                // If the channel is not known to us, we'll simply ignore this message.
1✔
4004
                if err == ErrChannelNotFound {
2✔
4005
                        return
1✔
4006
                }
1✔
4007

4008
                p.log.Errorf("Unable to respond to remote close msg: %v", err)
×
4009

×
4010
                errMsg := &lnwire.Error{
×
4011
                        ChanID: msg.cid,
×
4012
                        Data:   lnwire.ErrorData(err.Error()),
×
4013
                }
×
4014
                p.queueMsg(errMsg, nil)
×
4015
                return
×
4016
        }
4017

4018
        handleErr := func(err error) {
15✔
4019
                err = fmt.Errorf("unable to process close msg: %w", err)
1✔
4020
                p.log.Error(err)
1✔
4021

1✔
4022
                // As the negotiations failed, we'll reset the channel state machine to
1✔
4023
                // ensure we act to on-chain events as normal.
1✔
4024
                chanCloser.Channel().ResetState()
1✔
4025

1✔
4026
                if chanCloser.CloseRequest() != nil {
1✔
4027
                        chanCloser.CloseRequest().Err <- err
×
4028
                }
×
4029
                delete(p.activeChanCloses, msg.cid)
1✔
4030

1✔
4031
                p.Disconnect(err)
1✔
4032
        }
4033

4034
        // Next, we'll process the next message using the target state machine.
4035
        // We'll either continue negotiation, or halt.
4036
        switch typed := msg.msg.(type) {
14✔
4037
        case *lnwire.Shutdown:
6✔
4038
                // Disable incoming adds immediately.
6✔
4039
                if link != nil && !link.DisableAdds(htlcswitch.Incoming) {
6✔
4040
                        p.log.Warnf("Incoming link adds already disabled: %v",
×
4041
                                link.ChanID())
×
4042
                }
×
4043

4044
                oShutdown, err := chanCloser.ReceiveShutdown(*typed)
6✔
4045
                if err != nil {
6✔
4046
                        handleErr(err)
×
4047
                        return
×
4048
                }
×
4049

4050
                oShutdown.WhenSome(func(msg lnwire.Shutdown) {
10✔
4051
                        // If the link is nil it means we can immediately queue
4✔
4052
                        // the Shutdown message since we don't have to wait for
4✔
4053
                        // commitment transaction synchronization.
4✔
4054
                        if link == nil {
5✔
4055
                                p.queueMsg(&msg, nil)
1✔
4056
                                return
1✔
4057
                        }
1✔
4058

4059
                        // Immediately disallow any new HTLC's from being added
4060
                        // in the outgoing direction.
4061
                        if !link.DisableAdds(htlcswitch.Outgoing) {
3✔
4062
                                p.log.Warnf("Outgoing link adds already "+
×
4063
                                        "disabled: %v", link.ChanID())
×
4064
                        }
×
4065

4066
                        // When we have a Shutdown to send, we defer it till the
4067
                        // next time we send a CommitSig to remain spec
4068
                        // compliant.
4069
                        link.OnCommitOnce(htlcswitch.Outgoing, func() {
6✔
4070
                                p.queueMsg(&msg, nil)
3✔
4071
                        })
3✔
4072
                })
4073

4074
                beginNegotiation := func() {
12✔
4075
                        oClosingSigned, err := chanCloser.BeginNegotiation()
6✔
4076
                        if err != nil {
7✔
4077
                                handleErr(err)
1✔
4078
                                return
1✔
4079
                        }
1✔
4080

4081
                        oClosingSigned.WhenSome(func(msg lnwire.ClosingSigned) {
10✔
4082
                                p.queueMsg(&msg, nil)
5✔
4083
                        })
5✔
4084
                }
4085

4086
                if link == nil {
7✔
4087
                        beginNegotiation()
1✔
4088
                } else {
6✔
4089
                        // Now we register a flush hook to advance the
5✔
4090
                        // ChanCloser and possibly send out a ClosingSigned
5✔
4091
                        // when the link finishes draining.
5✔
4092
                        link.OnFlushedOnce(func() {
10✔
4093
                                // Remove link in goroutine to prevent deadlock.
5✔
4094
                                go p.cfg.Switch.RemoveLink(msg.cid)
5✔
4095
                                beginNegotiation()
5✔
4096
                        })
5✔
4097
                }
4098

4099
        case *lnwire.ClosingSigned:
9✔
4100
                oClosingSigned, err := chanCloser.ReceiveClosingSigned(*typed)
9✔
4101
                if err != nil {
9✔
4102
                        handleErr(err)
×
4103
                        return
×
4104
                }
×
4105

4106
                oClosingSigned.WhenSome(func(msg lnwire.ClosingSigned) {
18✔
4107
                        p.queueMsg(&msg, nil)
9✔
4108
                })
9✔
4109

4110
        default:
×
4111
                panic("impossible closeMsg type")
×
4112
        }
4113

4114
        // If we haven't finished close negotiations, then we'll continue as we
4115
        // can't yet finalize the closure.
4116
        if _, err := chanCloser.ClosingTx(); err != nil {
24✔
4117
                return
10✔
4118
        }
10✔
4119

4120
        // Otherwise, we've agreed on a closing fee! In this case, we'll wrap up
4121
        // the channel closure by notifying relevant sub-systems and launching a
4122
        // goroutine to wait for close tx conf.
4123
        p.finalizeChanClosure(chanCloser)
5✔
4124
}
4125

4126
// HandleLocalCloseChanReqs accepts a *htlcswitch.ChanClose and passes it onto
4127
// the channelManager goroutine, which will shut down the link and possibly
4128
// close the channel.
4129
func (p *Brontide) HandleLocalCloseChanReqs(req *htlcswitch.ChanClose) {
1✔
4130
        select {
1✔
4131
        case p.localCloseChanReqs <- req:
1✔
4132
                p.log.Info("Local close channel request is going to be " +
1✔
4133
                        "delivered to the peer")
1✔
4134
        case <-p.quit:
×
4135
                p.log.Info("Unable to deliver local close channel request " +
×
4136
                        "to peer")
×
4137
        }
4138
}
4139

4140
// NetAddress returns the network of the remote peer as an lnwire.NetAddress.
4141
func (p *Brontide) NetAddress() *lnwire.NetAddress {
1✔
4142
        return p.cfg.Addr
1✔
4143
}
1✔
4144

4145
// Inbound is a getter for the Brontide's Inbound boolean in cfg.
4146
func (p *Brontide) Inbound() bool {
1✔
4147
        return p.cfg.Inbound
1✔
4148
}
1✔
4149

4150
// ConnReq is a getter for the Brontide's connReq in cfg.
4151
func (p *Brontide) ConnReq() *connmgr.ConnReq {
1✔
4152
        return p.cfg.ConnReq
1✔
4153
}
1✔
4154

4155
// ErrorBuffer is a getter for the Brontide's errorBuffer in cfg.
4156
func (p *Brontide) ErrorBuffer() *queue.CircularBuffer {
1✔
4157
        return p.cfg.ErrorBuffer
1✔
4158
}
1✔
4159

4160
// SetAddress sets the remote peer's address given an address.
4161
func (p *Brontide) SetAddress(address net.Addr) {
×
4162
        p.cfg.Addr.Address = address
×
4163
}
×
4164

4165
// ActiveSignal returns the peer's active signal.
4166
func (p *Brontide) ActiveSignal() chan struct{} {
1✔
4167
        return p.activeSignal
1✔
4168
}
1✔
4169

4170
// Conn returns a pointer to the peer's connection struct.
4171
func (p *Brontide) Conn() net.Conn {
1✔
4172
        return p.cfg.Conn
1✔
4173
}
1✔
4174

4175
// BytesReceived returns the number of bytes received from the peer.
4176
func (p *Brontide) BytesReceived() uint64 {
1✔
4177
        return atomic.LoadUint64(&p.bytesReceived)
1✔
4178
}
1✔
4179

4180
// BytesSent returns the number of bytes sent to the peer.
4181
func (p *Brontide) BytesSent() uint64 {
1✔
4182
        return atomic.LoadUint64(&p.bytesSent)
1✔
4183
}
1✔
4184

4185
// LastRemotePingPayload returns the last payload the remote party sent as part
4186
// of their ping.
4187
func (p *Brontide) LastRemotePingPayload() []byte {
1✔
4188
        pingPayload := p.lastPingPayload.Load()
1✔
4189
        if pingPayload == nil {
2✔
4190
                return []byte{}
1✔
4191
        }
1✔
4192

4193
        pingBytes, ok := pingPayload.(lnwire.PingPayload)
×
4194
        if !ok {
×
4195
                return nil
×
4196
        }
×
4197

4198
        return pingBytes
×
4199
}
4200

4201
// attachChannelEventSubscription creates a channel event subscription and
4202
// attaches to client to Brontide if the reenableTimeout is no greater than 1
4203
// minute.
4204
func (p *Brontide) attachChannelEventSubscription() error {
4✔
4205
        // If the timeout is greater than 1 minute, it's unlikely that the link
4✔
4206
        // hasn't yet finished its reestablishment. Return a nil without
4✔
4207
        // creating the client to specify that we don't want to retry.
4✔
4208
        if p.cfg.ChanActiveTimeout > 1*time.Minute {
5✔
4209
                return nil
1✔
4210
        }
1✔
4211

4212
        // When the reenable timeout is less than 1 minute, it's likely the
4213
        // channel link hasn't finished its reestablishment yet. In that case,
4214
        // we'll give it a second chance by subscribing to the channel update
4215
        // events. Upon receiving the `ActiveLinkEvent`, we'll then request
4216
        // enabling the channel again.
4217
        sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
4✔
4218
        if err != nil {
4✔
4219
                return fmt.Errorf("SubscribeChannelEvents failed: %w", err)
×
4220
        }
×
4221

4222
        p.channelEventClient = sub
4✔
4223

4✔
4224
        return nil
4✔
4225
}
4226

4227
// updateNextRevocation updates the existing channel's next revocation if it's
4228
// nil.
4229
func (p *Brontide) updateNextRevocation(c *channeldb.OpenChannel) error {
4✔
4230
        chanPoint := c.FundingOutpoint
4✔
4231
        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
4✔
4232

4✔
4233
        // Read the current channel.
4✔
4234
        currentChan, loaded := p.activeChannels.Load(chanID)
4✔
4235

4✔
4236
        // currentChan should exist, but we perform a check anyway to avoid nil
4✔
4237
        // pointer dereference.
4✔
4238
        if !loaded {
5✔
4239
                return fmt.Errorf("missing active channel with chanID=%v",
1✔
4240
                        chanID)
1✔
4241
        }
1✔
4242

4243
        // currentChan should not be nil, but we perform a check anyway to
4244
        // avoid nil pointer dereference.
4245
        if currentChan == nil {
4✔
4246
                return fmt.Errorf("found nil active channel with chanID=%v",
1✔
4247
                        chanID)
1✔
4248
        }
1✔
4249

4250
        // If we're being sent a new channel, and our existing channel doesn't
4251
        // have the next revocation, then we need to update the current
4252
        // existing channel.
4253
        if currentChan.RemoteNextRevocation() != nil {
2✔
4254
                return nil
×
4255
        }
×
4256

4257
        p.log.Infof("Processing retransmitted ChannelReady for "+
2✔
4258
                "ChannelPoint(%v)", chanPoint)
2✔
4259

2✔
4260
        nextRevoke := c.RemoteNextRevocation
2✔
4261

2✔
4262
        err := currentChan.InitNextRevocation(nextRevoke)
2✔
4263
        if err != nil {
2✔
4264
                return fmt.Errorf("unable to init next revocation: %w", err)
×
4265
        }
×
4266

4267
        return nil
2✔
4268
}
4269

4270
// addActiveChannel adds a new active channel to the `activeChannels` map. It
4271
// takes a `channeldb.OpenChannel`, creates a `lnwallet.LightningChannel` from
4272
// it and assembles it with a channel link.
4273
func (p *Brontide) addActiveChannel(c *lnpeer.NewChannel) error {
1✔
4274
        chanPoint := c.FundingOutpoint
1✔
4275
        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
1✔
4276

1✔
4277
        // If we've reached this point, there are two possible scenarios.  If
1✔
4278
        // the channel was in the active channels map as nil, then it was
1✔
4279
        // loaded from disk and we need to send reestablish. Else, it was not
1✔
4280
        // loaded from disk and we don't need to send reestablish as this is a
1✔
4281
        // fresh channel.
1✔
4282
        shouldReestablish := p.isLoadedFromDisk(chanID)
1✔
4283

1✔
4284
        chanOpts := c.ChanOpts
1✔
4285
        if shouldReestablish {
2✔
4286
                // If we have to do the reestablish dance for this channel,
1✔
4287
                // ensure that we don't try to call InitRemoteMusigNonces twice
1✔
4288
                // by calling SkipNonceInit.
1✔
4289
                chanOpts = append(chanOpts, lnwallet.WithSkipNonceInit())
1✔
4290
        }
1✔
4291

4292
        p.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
1✔
4293
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
4294
        })
×
4295
        p.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
1✔
4296
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
4297
        })
×
4298
        p.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
1✔
4299
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
4300
        })
×
4301

4302
        // If not already active, we'll add this channel to the set of active
4303
        // channels, so we can look it up later easily according to its channel
4304
        // ID.
4305
        lnChan, err := lnwallet.NewLightningChannel(
1✔
4306
                p.cfg.Signer, c.OpenChannel, p.cfg.SigPool, chanOpts...,
1✔
4307
        )
1✔
4308
        if err != nil {
1✔
4309
                return fmt.Errorf("unable to create LightningChannel: %w", err)
×
4310
        }
×
4311

4312
        // Store the channel in the activeChannels map.
4313
        p.activeChannels.Store(chanID, lnChan)
1✔
4314

1✔
4315
        p.log.Infof("New channel active ChannelPoint(%v) with peer", chanPoint)
1✔
4316

1✔
4317
        // Next, we'll assemble a ChannelLink along with the necessary items it
1✔
4318
        // needs to function.
1✔
4319
        chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(chanPoint)
1✔
4320
        if err != nil {
1✔
4321
                return fmt.Errorf("unable to subscribe to chain events: %w",
×
4322
                        err)
×
4323
        }
×
4324

4325
        // We'll query the channel DB for the new channel's initial forwarding
4326
        // policies to determine the policy we start out with.
4327
        initialPolicy, err := p.cfg.ChannelDB.GetInitialForwardingPolicy(chanID)
1✔
4328
        if err != nil {
1✔
4329
                return fmt.Errorf("unable to query for initial forwarding "+
×
4330
                        "policy: %v", err)
×
4331
        }
×
4332

4333
        // Create the link and add it to the switch.
4334
        err = p.addLink(
1✔
4335
                &chanPoint, lnChan, initialPolicy, chainEvents,
1✔
4336
                shouldReestablish, fn.None[lnwire.Shutdown](),
1✔
4337
        )
1✔
4338
        if err != nil {
1✔
4339
                return fmt.Errorf("can't register new channel link(%v) with "+
×
4340
                        "peer", chanPoint)
×
4341
        }
×
4342

4343
        return nil
1✔
4344
}
4345

4346
// handleNewActiveChannel handles a `newChannelMsg` request. Depending on we
4347
// know this channel ID or not, we'll either add it to the `activeChannels` map
4348
// or init the next revocation for it.
4349
func (p *Brontide) handleNewActiveChannel(req *newChannelMsg) {
1✔
4350
        newChan := req.channel
1✔
4351
        chanPoint := newChan.FundingOutpoint
1✔
4352
        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
1✔
4353

1✔
4354
        // Only update RemoteNextRevocation if the channel is in the
1✔
4355
        // activeChannels map and if we added the link to the switch. Only
1✔
4356
        // active channels will be added to the switch.
1✔
4357
        if p.isActiveChannel(chanID) {
2✔
4358
                p.log.Infof("Already have ChannelPoint(%v), ignoring",
1✔
4359
                        chanPoint)
1✔
4360

1✔
4361
                // Handle it and close the err chan on the request.
1✔
4362
                close(req.err)
1✔
4363

1✔
4364
                // Update the next revocation point.
1✔
4365
                err := p.updateNextRevocation(newChan.OpenChannel)
1✔
4366
                if err != nil {
1✔
4367
                        p.log.Errorf(err.Error())
×
4368
                }
×
4369

4370
                return
1✔
4371
        }
4372

4373
        // This is a new channel, we now add it to the map.
4374
        if err := p.addActiveChannel(req.channel); err != nil {
1✔
4375
                // Log and send back the error to the request.
×
4376
                p.log.Errorf(err.Error())
×
4377
                req.err <- err
×
4378

×
4379
                return
×
4380
        }
×
4381

4382
        // Close the err chan if everything went fine.
4383
        close(req.err)
1✔
4384
}
4385

4386
// handleNewPendingChannel takes a `newChannelMsg` request and add it to
4387
// `activeChannels` map with nil value. This pending channel will be saved as
4388
// it may become active in the future. Once active, the funding manager will
4389
// send it again via `AddNewChannel`, and we'd handle the link creation there.
4390
func (p *Brontide) handleNewPendingChannel(req *newChannelMsg) {
5✔
4391
        defer close(req.err)
5✔
4392

5✔
4393
        chanID := req.channelID
5✔
4394

5✔
4395
        // If we already have this channel, something is wrong with the funding
5✔
4396
        // flow as it will only be marked as active after `ChannelReady` is
5✔
4397
        // handled. In this case, we will do nothing but log an error, just in
5✔
4398
        // case this is a legit channel.
5✔
4399
        if p.isActiveChannel(chanID) {
6✔
4400
                p.log.Errorf("Channel(%v) is already active, ignoring "+
1✔
4401
                        "pending channel request", chanID)
1✔
4402

1✔
4403
                return
1✔
4404
        }
1✔
4405

4406
        // The channel has already been added, we will do nothing and return.
4407
        if p.isPendingChannel(chanID) {
5✔
4408
                p.log.Infof("Channel(%v) is already added, ignoring "+
1✔
4409
                        "pending channel request", chanID)
1✔
4410

1✔
4411
                return
1✔
4412
        }
1✔
4413

4414
        // This is a new channel, we now add it to the map `activeChannels`
4415
        // with nil value and mark it as a newly added channel in
4416
        // `addedChannels`.
4417
        p.activeChannels.Store(chanID, nil)
3✔
4418
        p.addedChannels.Store(chanID, struct{}{})
3✔
4419
}
4420

4421
// handleRemovePendingChannel takes a `newChannelMsg` request and removes it
4422
// from `activeChannels` map. The request will be ignored if the channel is
4423
// considered active by Brontide. Noop if the channel ID cannot be found.
4424
func (p *Brontide) handleRemovePendingChannel(req *newChannelMsg) {
5✔
4425
        defer close(req.err)
5✔
4426

5✔
4427
        chanID := req.channelID
5✔
4428

5✔
4429
        // If we already have this channel, something is wrong with the funding
5✔
4430
        // flow as it will only be marked as active after `ChannelReady` is
5✔
4431
        // handled. In this case, we will log an error and exit.
5✔
4432
        if p.isActiveChannel(chanID) {
6✔
4433
                p.log.Errorf("Channel(%v) is active, ignoring remove request",
1✔
4434
                        chanID)
1✔
4435
                return
1✔
4436
        }
1✔
4437

4438
        // The channel has not been added yet, we will log a warning as there
4439
        // is an unexpected call from funding manager.
4440
        if !p.isPendingChannel(chanID) {
6✔
4441
                p.log.Warnf("Channel(%v) not found, removing it anyway", chanID)
2✔
4442
        }
2✔
4443

4444
        // Remove the record of this pending channel.
4445
        p.activeChannels.Delete(chanID)
4✔
4446
        p.addedChannels.Delete(chanID)
4✔
4447
}
4448

4449
// sendLinkUpdateMsg sends a message that updates the channel to the
4450
// channel's message stream.
4451
func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) {
1✔
4452
        p.log.Tracef("Sending link update msg=%v", msg.MsgType())
1✔
4453

1✔
4454
        chanStream, ok := p.activeMsgStreams[cid]
1✔
4455
        if !ok {
2✔
4456
                // If a stream hasn't yet been created, then we'll do so, add
1✔
4457
                // it to the map, and finally start it.
1✔
4458
                chanStream = newChanMsgStream(p, cid)
1✔
4459
                p.activeMsgStreams[cid] = chanStream
1✔
4460
                chanStream.Start()
1✔
4461

1✔
4462
                // Stop the stream when quit.
1✔
4463
                go func() {
2✔
4464
                        <-p.quit
1✔
4465
                        chanStream.Stop()
1✔
4466
                }()
1✔
4467
        }
4468

4469
        // With the stream obtained, add the message to the stream so we can
4470
        // continue processing message.
4471
        chanStream.AddMsg(msg)
1✔
4472
}
4473

4474
// scaleTimeout multiplies the argument duration by a constant factor depending
4475
// on variious heuristics. Currently this is only used to check whether our peer
4476
// appears to be connected over Tor and relaxes the timout deadline. However,
4477
// this is subject to change and should be treated as opaque.
4478
func (p *Brontide) scaleTimeout(timeout time.Duration) time.Duration {
68✔
4479
        if p.isTorConnection {
69✔
4480
                return timeout * time.Duration(torTimeoutMultiplier)
1✔
4481
        }
1✔
4482

4483
        return timeout
67✔
4484
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc