• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157733617

05 Feb 2025 12:49PM UTC coverage: 57.712% (-1.1%) from 58.82%
13157733617

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19472 existing lines in 252 files now uncovered.

103634 of 179570 relevant lines covered (57.71%)

24840.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.47
/peer/brontide.go
1
package peer
2

3
import (
4
        "bytes"
5
        "container/list"
6
        "errors"
7
        "fmt"
8
        "math/rand"
9
        "net"
10
        "strings"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcec/v2"
16
        "github.com/btcsuite/btcd/chaincfg/chainhash"
17
        "github.com/btcsuite/btcd/connmgr"
18
        "github.com/btcsuite/btcd/txscript"
19
        "github.com/btcsuite/btcd/wire"
20
        "github.com/btcsuite/btclog/v2"
21
        "github.com/davecgh/go-spew/spew"
22
        "github.com/lightningnetwork/lnd/buffer"
23
        "github.com/lightningnetwork/lnd/chainntnfs"
24
        "github.com/lightningnetwork/lnd/channeldb"
25
        "github.com/lightningnetwork/lnd/channelnotifier"
26
        "github.com/lightningnetwork/lnd/contractcourt"
27
        "github.com/lightningnetwork/lnd/discovery"
28
        "github.com/lightningnetwork/lnd/feature"
29
        "github.com/lightningnetwork/lnd/fn/v2"
30
        "github.com/lightningnetwork/lnd/funding"
31
        graphdb "github.com/lightningnetwork/lnd/graph/db"
32
        "github.com/lightningnetwork/lnd/graph/db/models"
33
        "github.com/lightningnetwork/lnd/htlcswitch"
34
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
35
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
36
        "github.com/lightningnetwork/lnd/input"
37
        "github.com/lightningnetwork/lnd/invoices"
38
        "github.com/lightningnetwork/lnd/keychain"
39
        "github.com/lightningnetwork/lnd/lnpeer"
40
        "github.com/lightningnetwork/lnd/lntypes"
41
        "github.com/lightningnetwork/lnd/lnutils"
42
        "github.com/lightningnetwork/lnd/lnwallet"
43
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
44
        "github.com/lightningnetwork/lnd/lnwallet/chancloser"
45
        "github.com/lightningnetwork/lnd/lnwire"
46
        "github.com/lightningnetwork/lnd/msgmux"
47
        "github.com/lightningnetwork/lnd/netann"
48
        "github.com/lightningnetwork/lnd/pool"
49
        "github.com/lightningnetwork/lnd/queue"
50
        "github.com/lightningnetwork/lnd/subscribe"
51
        "github.com/lightningnetwork/lnd/ticker"
52
        "github.com/lightningnetwork/lnd/tlv"
53
        "github.com/lightningnetwork/lnd/watchtower/wtclient"
54
)
55

56
const (
57
        // pingInterval is the interval at which ping messages are sent.
58
        pingInterval = 1 * time.Minute
59

60
        // pingTimeout is the amount of time we will wait for a pong response
61
        // before considering the peer to be unresponsive.
62
        //
63
        // This MUST be a smaller value than the pingInterval.
64
        pingTimeout = 30 * time.Second
65

66
        // idleTimeout is the duration of inactivity before we time out a peer.
67
        idleTimeout = 5 * time.Minute
68

69
        // writeMessageTimeout is the timeout used when writing a message to the
70
        // peer.
71
        writeMessageTimeout = 5 * time.Second
72

73
        // readMessageTimeout is the timeout used when reading a message from a
74
        // peer.
75
        readMessageTimeout = 5 * time.Second
76

77
        // handshakeTimeout is the timeout used when waiting for the peer's init
78
        // message.
79
        handshakeTimeout = 15 * time.Second
80

81
        // ErrorBufferSize is the number of historic peer errors that we store.
82
        ErrorBufferSize = 10
83

84
        // pongSizeCeiling is the upper bound on a uniformly distributed random
85
        // variable that we use for requesting pong responses. We don't use the
86
        // MaxPongBytes (upper bound accepted by the protocol) because it is
87
        // needlessly wasteful of precious Tor bandwidth for little to no gain.
88
        pongSizeCeiling = 4096
89

90
        // torTimeoutMultiplier is the scaling factor we use on network timeouts
91
        // for Tor peers.
92
        torTimeoutMultiplier = 3
93
)
94

95
var (
96
        // ErrChannelNotFound is an error returned when a channel is queried and
97
        // either the Brontide doesn't know of it, or the channel in question
98
        // is pending.
99
        ErrChannelNotFound = fmt.Errorf("channel not found")
100
)
101

102
// outgoingMsg packages an lnwire.Message to be sent out on the wire, along with
103
// a buffered channel which will be sent upon once the write is complete. This
104
// buffered channel acts as a semaphore to be used for synchronization purposes.
105
type outgoingMsg struct {
106
        priority bool
107
        msg      lnwire.Message
108
        errChan  chan error // MUST be buffered.
109
}
110

111
// newChannelMsg packages a channeldb.OpenChannel with a channel that allows
112
// the receiver of the request to report when the channel creation process has
113
// completed.
114
type newChannelMsg struct {
115
        // channel is used when the pending channel becomes active.
116
        channel *lnpeer.NewChannel
117

118
        // channelID is used when there's a new pending channel.
119
        channelID lnwire.ChannelID
120

121
        err chan error
122
}
123

124
type customMsg struct {
125
        peer [33]byte
126
        msg  lnwire.Custom
127
}
128

129
// closeMsg is a wrapper struct around any wire messages that deal with the
130
// cooperative channel closure negotiation process. This struct includes the
131
// raw channel ID targeted along with the original message.
132
type closeMsg struct {
133
        cid lnwire.ChannelID
134
        msg lnwire.Message
135
}
136

137
// PendingUpdate describes the pending state of a closing channel.
138
type PendingUpdate struct {
139
        Txid        []byte
140
        OutputIndex uint32
141
}
142

143
// ChannelCloseUpdate contains the outcome of the close channel operation.
144
type ChannelCloseUpdate struct {
145
        ClosingTxid []byte
146
        Success     bool
147

148
        // LocalCloseOutput is an optional, additional output on the closing
149
        // transaction that the local party should be paid to. This will only be
150
        // populated if the local balance isn't dust.
151
        LocalCloseOutput fn.Option[chancloser.CloseOutput]
152

153
        // RemoteCloseOutput is an optional, additional output on the closing
154
        // transaction that the remote party should be paid to. This will only
155
        // be populated if the remote balance isn't dust.
156
        RemoteCloseOutput fn.Option[chancloser.CloseOutput]
157

158
        // AuxOutputs is an optional set of additional outputs that might be
159
        // included in the closing transaction. These are used for custom
160
        // channel types.
161
        AuxOutputs fn.Option[chancloser.AuxCloseOutputs]
162
}
163

164
// TimestampedError is a timestamped error that is used to store the most recent
165
// errors we have experienced with our peers.
166
type TimestampedError struct {
167
        Error     error
168
        Timestamp time.Time
169
}
170

171
// Config defines configuration fields that are necessary for a peer object
172
// to function.
173
type Config struct {
174
        // Conn is the underlying network connection for this peer.
175
        Conn MessageConn
176

177
        // ConnReq stores information related to the persistent connection request
178
        // for this peer.
179
        ConnReq *connmgr.ConnReq
180

181
        // PubKeyBytes is the serialized, compressed public key of this peer.
182
        PubKeyBytes [33]byte
183

184
        // Addr is the network address of the peer.
185
        Addr *lnwire.NetAddress
186

187
        // Inbound indicates whether or not the peer is an inbound peer.
188
        Inbound bool
189

190
        // Features is the set of features that we advertise to the remote party.
191
        Features *lnwire.FeatureVector
192

193
        // LegacyFeatures is the set of features that we advertise to the remote
194
        // peer for backwards compatibility. Nodes that have not implemented
195
        // flat features will still be able to read our feature bits from the
196
        // legacy global field, but we will also advertise everything in the
197
        // default features field.
198
        LegacyFeatures *lnwire.FeatureVector
199

200
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
201
        // an htlc where we don't offer it anymore.
202
        OutgoingCltvRejectDelta uint32
203

204
        // ChanActiveTimeout specifies the duration the peer will wait to request
205
        // a channel reenable, beginning from the time the peer was started.
206
        ChanActiveTimeout time.Duration
207

208
        // ErrorBuffer stores a set of errors related to a peer. It contains error
209
        // messages that our peer has recently sent us over the wire and records of
210
        // unknown messages that were sent to us so that we can have a full track
211
        // record of the communication errors we have had with our peer. If we
212
        // choose to disconnect from a peer, it also stores the reason we had for
213
        // disconnecting.
214
        ErrorBuffer *queue.CircularBuffer
215

216
        // WritePool is the task pool that manages reuse of write buffers. Write
217
        // tasks are submitted to the pool in order to conserve the total number of
218
        // write buffers allocated at any one time, and decouple write buffer
219
        // allocation from the peer life cycle.
220
        WritePool *pool.Write
221

222
        // ReadPool is the task pool that manages reuse of read buffers.
223
        ReadPool *pool.Read
224

225
        // Switch is a pointer to the htlcswitch. It is used to setup, get, and
226
        // tear-down ChannelLinks.
227
        Switch messageSwitch
228

229
        // InterceptSwitch is a pointer to the InterceptableSwitch, a wrapper around
230
        // the regular Switch. We only export it here to pass ForwardPackets to the
231
        // ChannelLinkConfig.
232
        InterceptSwitch *htlcswitch.InterceptableSwitch
233

234
        // ChannelDB is used to fetch opened channels, and closed channels.
235
        ChannelDB *channeldb.ChannelStateDB
236

237
        // ChannelGraph is a pointer to the channel graph which is used to
238
        // query information about the set of known active channels.
239
        ChannelGraph *graphdb.ChannelGraph
240

241
        // ChainArb is used to subscribe to channel events, update contract signals,
242
        // and force close channels.
243
        ChainArb *contractcourt.ChainArbitrator
244

245
        // AuthGossiper is needed so that the Brontide impl can register with the
246
        // gossiper and process remote channel announcements.
247
        AuthGossiper *discovery.AuthenticatedGossiper
248

249
        // ChanStatusMgr is used to set or un-set the disabled bit in channel
250
        // updates.
251
        ChanStatusMgr *netann.ChanStatusManager
252

253
        // ChainIO is used to retrieve the best block.
254
        ChainIO lnwallet.BlockChainIO
255

256
        // FeeEstimator is used to compute our target ideal fee-per-kw when
257
        // initializing the coop close process.
258
        FeeEstimator chainfee.Estimator
259

260
        // Signer is used when creating *lnwallet.LightningChannel instances.
261
        Signer input.Signer
262

263
        // SigPool is used when creating *lnwallet.LightningChannel instances.
264
        SigPool *lnwallet.SigPool
265

266
        // Wallet is used to publish transactions and generates delivery
267
        // scripts during the coop close process.
268
        Wallet *lnwallet.LightningWallet
269

270
        // ChainNotifier is used to receive confirmations of a coop close
271
        // transaction.
272
        ChainNotifier chainntnfs.ChainNotifier
273

274
        // BestBlockView is used to efficiently query for up-to-date
275
        // blockchain state information
276
        BestBlockView chainntnfs.BestBlockView
277

278
        // RoutingPolicy is used to set the forwarding policy for links created by
279
        // the Brontide.
280
        RoutingPolicy models.ForwardingPolicy
281

282
        // Sphinx is used when setting up ChannelLinks so they can decode sphinx
283
        // onion blobs.
284
        Sphinx *hop.OnionProcessor
285

286
        // WitnessBeacon is used when setting up ChannelLinks so they can add any
287
        // preimages that they learn.
288
        WitnessBeacon contractcourt.WitnessBeacon
289

290
        // Invoices is passed to the ChannelLink on creation and handles all
291
        // invoice-related logic.
292
        Invoices *invoices.InvoiceRegistry
293

294
        // ChannelNotifier is used by the link to notify other sub-systems about
295
        // channel-related events and by the Brontide to subscribe to
296
        // ActiveLinkEvents.
297
        ChannelNotifier *channelnotifier.ChannelNotifier
298

299
        // HtlcNotifier is used when creating a ChannelLink.
300
        HtlcNotifier *htlcswitch.HtlcNotifier
301

302
        // TowerClient is used to backup revoked states.
303
        TowerClient wtclient.ClientManager
304

305
        // DisconnectPeer is used to disconnect this peer if the cooperative close
306
        // process fails.
307
        DisconnectPeer func(*btcec.PublicKey) error
308

309
        // GenNodeAnnouncement is used to send our node announcement to the remote
310
        // on startup.
311
        GenNodeAnnouncement func(...netann.NodeAnnModifier) (
312
                lnwire.NodeAnnouncement, error)
313

314
        // PrunePersistentPeerConnection is used to remove all internal state
315
        // related to this peer in the server.
316
        PrunePersistentPeerConnection func([33]byte)
317

318
        // FetchLastChanUpdate fetches our latest channel update for a target
319
        // channel.
320
        FetchLastChanUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate1,
321
                error)
322

323
        // FundingManager is an implementation of the funding.Controller interface.
324
        FundingManager funding.Controller
325

326
        // Hodl is used when creating ChannelLinks to specify HodlFlags as
327
        // breakpoints in dev builds.
328
        Hodl *hodl.Config
329

330
        // UnsafeReplay is used when creating ChannelLinks to specify whether or
331
        // not to replay adds on its commitment tx.
332
        UnsafeReplay bool
333

334
        // MaxOutgoingCltvExpiry is used when creating ChannelLinks and is the max
335
        // number of blocks that funds could be locked up for when forwarding
336
        // payments.
337
        MaxOutgoingCltvExpiry uint32
338

339
        // MaxChannelFeeAllocation is used when creating ChannelLinks and is the
340
        // maximum percentage of total funds that can be allocated to a channel's
341
        // commitment fee. This only applies for the initiator of the channel.
342
        MaxChannelFeeAllocation float64
343

344
        // MaxAnchorsCommitFeeRate is the maximum fee rate we'll use as an
345
        // initiator for anchor channel commitments.
346
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
347

348
        // CoopCloseTargetConfs is the confirmation target that will be used
349
        // to estimate the fee rate to use during a cooperative channel
350
        // closure initiated by the remote peer.
351
        CoopCloseTargetConfs uint32
352

353
        // ServerPubKey is the serialized, compressed public key of our lnd node.
354
        // It is used to determine which policy (channel edge) to pass to the
355
        // ChannelLink.
356
        ServerPubKey [33]byte
357

358
        // ChannelCommitInterval is the maximum time that is allowed to pass between
359
        // receiving a channel state update and signing the next commitment.
360
        // Setting this to a longer duration allows for more efficient channel
361
        // operations at the cost of latency.
362
        ChannelCommitInterval time.Duration
363

364
        // PendingCommitInterval is the maximum time that is allowed to pass
365
        // while waiting for the remote party to revoke a locally initiated
366
        // commitment state. Setting this to a longer duration if a slow
367
        // response is expected from the remote party or large number of
368
        // payments are attempted at the same time.
369
        PendingCommitInterval time.Duration
370

371
        // ChannelCommitBatchSize is the maximum number of channel state updates
372
        // that is accumulated before signing a new commitment.
373
        ChannelCommitBatchSize uint32
374

375
        // HandleCustomMessage is called whenever a custom message is received
376
        // from the peer.
377
        HandleCustomMessage func(peer [33]byte, msg *lnwire.Custom) error
378

379
        // GetAliases is passed to created links so the Switch and link can be
380
        // aware of the channel's aliases.
381
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
382

383
        // RequestAlias allows the Brontide struct to request an alias to send
384
        // to the peer.
385
        RequestAlias func() (lnwire.ShortChannelID, error)
386

387
        // AddLocalAlias persists an alias to an underlying alias store.
388
        AddLocalAlias func(alias, base lnwire.ShortChannelID,
389
                gossip, liveUpdate bool) error
390

391
        // AuxLeafStore is an optional store that can be used to store auxiliary
392
        // leaves for certain custom channel types.
393
        AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
394

395
        // AuxSigner is an optional signer that can be used to sign auxiliary
396
        // leaves for certain custom channel types.
397
        AuxSigner fn.Option[lnwallet.AuxSigner]
398

399
        // AuxResolver is an optional interface that can be used to modify the
400
        // way contracts are resolved.
401
        AuxResolver fn.Option[lnwallet.AuxContractResolver]
402

403
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
404
        // used to manage the bandwidth of peer links.
405
        AuxTrafficShaper fn.Option[htlcswitch.AuxTrafficShaper]
406

407
        // PongBuf is a slice we'll reuse instead of allocating memory on the
408
        // heap. Since only reads will occur and no writes, there is no need
409
        // for any synchronization primitives. As a result, it's safe to share
410
        // this across multiple Peer struct instances.
411
        PongBuf []byte
412

413
        // Adds the option to disable forwarding payments in blinded routes
414
        // by failing back any blinding-related payloads as if they were
415
        // invalid.
416
        DisallowRouteBlinding bool
417

418
        // DisallowQuiescence is a flag that indicates whether the Brontide
419
        // should have the quiescence feature disabled.
420
        DisallowQuiescence bool
421

422
        // MaxFeeExposure limits the number of outstanding fees in a channel.
423
        // This value will be passed to created links.
424
        MaxFeeExposure lnwire.MilliSatoshi
425

426
        // MsgRouter is an optional instance of the main message router that
427
        // the peer will use. If None, then a new default version will be used
428
        // in place.
429
        MsgRouter fn.Option[msgmux.Router]
430

431
        // AuxChanCloser is an optional instance of an abstraction that can be
432
        // used to modify the way the co-op close transaction is constructed.
433
        AuxChanCloser fn.Option[chancloser.AuxChanCloser]
434

435
        // ShouldFwdExpEndorsement is a closure that indicates whether
436
        // experimental endorsement signals should be set.
437
        ShouldFwdExpEndorsement func() bool
438

439
        // Quit is the server's quit channel. If this is closed, we halt operation.
440
        Quit chan struct{}
441
}
442

443
// Brontide is an active peer on the Lightning Network. This struct is responsible
444
// for managing any channel state related to this peer. To do so, it has
445
// several helper goroutines to handle events such as HTLC timeouts, new
446
// funding workflow, and detecting an uncooperative closure of any active
447
// channels.
448
// TODO(roasbeef): proper reconnection logic.
449
type Brontide struct {
450
        // MUST be used atomically.
451
        started    int32
452
        disconnect int32
453

454
        // MUST be used atomically.
455
        bytesReceived uint64
456
        bytesSent     uint64
457

458
        // isTorConnection is a flag that indicates whether or not we believe
459
        // the remote peer is a tor connection. It is not always possible to
460
        // know this with certainty but we have heuristics we use that should
461
        // catch most cases.
462
        //
463
        // NOTE: We judge the tor-ness of a connection by if the remote peer has
464
        // ".onion" in the address OR if it's connected over localhost.
465
        // This will miss cases where our peer is connected to our clearnet
466
        // address over the tor network (via exit nodes). It will also misjudge
467
        // actual localhost connections as tor. We need to include this because
468
        // inbound connections to our tor address will appear to come from the
469
        // local socks5 proxy. This heuristic is only used to expand the timeout
470
        // window for peers so it is OK to misjudge this. If you use this field
471
        // for any other purpose you should seriously consider whether or not
472
        // this heuristic is good enough for your use case.
473
        isTorConnection bool
474

475
        pingManager *PingManager
476

477
        // lastPingPayload stores an unsafe pointer wrapped as an atomic
478
        // variable which points to the last payload the remote party sent us
479
        // as their ping.
480
        //
481
        // MUST be used atomically.
482
        lastPingPayload atomic.Value
483

484
        cfg Config
485

486
        // activeSignal when closed signals that the peer is now active and
487
        // ready to process messages.
488
        activeSignal chan struct{}
489

490
        // startTime is the time this peer connection was successfully established.
491
        // It will be zero for peers that did not successfully call Start().
492
        startTime time.Time
493

494
        // sendQueue is the channel which is used to queue outgoing messages to be
495
        // written onto the wire. Note that this channel is unbuffered.
496
        sendQueue chan outgoingMsg
497

498
        // outgoingQueue is a buffered channel which allows second/third party
499
        // objects to queue messages to be sent out on the wire.
500
        outgoingQueue chan outgoingMsg
501

502
        // activeChannels is a map which stores the state machines of all
503
        // active channels. Channels are indexed into the map by the txid of
504
        // the funding transaction which opened the channel.
505
        //
506
        // NOTE: On startup, pending channels are stored as nil in this map.
507
        // Confirmed channels have channel data populated in the map. This means
508
        // that accesses to this map should nil-check the LightningChannel to
509
        // see if this is a pending channel or not. The tradeoff here is either
510
        // having two maps everywhere (one for pending, one for confirmed chans)
511
        // or having an extra nil-check per access.
512
        activeChannels *lnutils.SyncMap[
513
                lnwire.ChannelID, *lnwallet.LightningChannel]
514

515
        // addedChannels tracks any new channels opened during this peer's
516
        // lifecycle. We use this to filter out these new channels when the time
517
        // comes to request a reenable for active channels, since they will have
518
        // waited a shorter duration.
519
        addedChannels *lnutils.SyncMap[lnwire.ChannelID, struct{}]
520

521
        // newActiveChannel is used by the fundingManager to send fully opened
522
        // channels to the source peer which handled the funding workflow.
523
        newActiveChannel chan *newChannelMsg
524

525
        // newPendingChannel is used by the fundingManager to send pending open
526
        // channels to the source peer which handled the funding workflow.
527
        newPendingChannel chan *newChannelMsg
528

529
        // removePendingChannel is used by the fundingManager to cancel pending
530
        // open channels to the source peer when the funding flow is failed.
531
        removePendingChannel chan *newChannelMsg
532

533
        // activeMsgStreams is a map from channel id to the channel streams that
534
        // proxy messages to individual, active links.
535
        activeMsgStreams map[lnwire.ChannelID]*msgStream
536

537
        // activeChanCloses is a map that keeps track of all the active
538
        // cooperative channel closures. Any channel closing messages are directed
539
        // to one of these active state machines. Once the channel has been closed,
540
        // the state machine will be deleted from the map.
541
        activeChanCloses map[lnwire.ChannelID]*chancloser.ChanCloser
542

543
        // localCloseChanReqs is a channel in which any local requests to close
544
        // a particular channel are sent over.
545
        localCloseChanReqs chan *htlcswitch.ChanClose
546

547
        // linkFailures receives all reported channel failures from the switch,
548
        // and instructs the channelManager to clean remaining channel state.
549
        linkFailures chan linkFailureReport
550

551
        // chanCloseMsgs is a channel that any message related to channel
552
        // closures are sent over. This includes lnwire.Shutdown message as
553
        // well as lnwire.ClosingSigned messages.
554
        chanCloseMsgs chan *closeMsg
555

556
        // remoteFeatures is the feature vector received from the peer during
557
        // the connection handshake.
558
        remoteFeatures *lnwire.FeatureVector
559

560
        // resentChanSyncMsg is a set that keeps track of which channels we
561
        // have re-sent channel reestablishment messages for. This is done to
562
        // avoid getting into loop where both peers will respond to the other
563
        // peer's chansync message with its own over and over again.
564
        resentChanSyncMsg map[lnwire.ChannelID]struct{}
565

566
        // channelEventClient is the channel event subscription client that's
567
        // used to assist retry enabling the channels. This client is only
568
        // created when the reenableTimeout is no greater than 1 minute. Once
569
        // created, it is canceled once the reenabling has been finished.
570
        //
571
        // NOTE: we choose to create the client conditionally to avoid
572
        // potentially holding lots of un-consumed events.
573
        channelEventClient *subscribe.Client
574

575
        // msgRouter is an instance of the msgmux.Router which is used to send
576
        // off new wire messages for handing.
577
        msgRouter fn.Option[msgmux.Router]
578

579
        // globalMsgRouter is a flag that indicates whether we have a global
580
        // msg router. If so, then we don't worry about stopping the msg router
581
        // when a peer disconnects.
582
        globalMsgRouter bool
583

584
        startReady chan struct{}
585
        quit       chan struct{}
586
        wg         sync.WaitGroup
587

588
        // log is a peer-specific logging instance.
589
        log btclog.Logger
590
}
591

592
// A compile-time check to ensure that Brontide satisfies the lnpeer.Peer interface.
593
var _ lnpeer.Peer = (*Brontide)(nil)
594

595
// NewBrontide creates a new Brontide from a peer.Config struct.
596
func NewBrontide(cfg Config) *Brontide {
25✔
597
        logPrefix := fmt.Sprintf("Peer(%x):", cfg.PubKeyBytes)
25✔
598

25✔
599
        // We have a global message router if one was passed in via the config.
25✔
600
        // In this case, we don't need to attempt to tear it down when the peer
25✔
601
        // is stopped.
25✔
602
        globalMsgRouter := cfg.MsgRouter.IsSome()
25✔
603

25✔
604
        // We'll either use the msg router instance passed in, or create a new
25✔
605
        // blank instance.
25✔
606
        msgRouter := cfg.MsgRouter.Alt(fn.Some[msgmux.Router](
25✔
607
                msgmux.NewMultiMsgRouter(),
25✔
608
        ))
25✔
609

25✔
610
        p := &Brontide{
25✔
611
                cfg:           cfg,
25✔
612
                activeSignal:  make(chan struct{}),
25✔
613
                sendQueue:     make(chan outgoingMsg),
25✔
614
                outgoingQueue: make(chan outgoingMsg),
25✔
615
                addedChannels: &lnutils.SyncMap[lnwire.ChannelID, struct{}]{},
25✔
616
                activeChannels: &lnutils.SyncMap[
25✔
617
                        lnwire.ChannelID, *lnwallet.LightningChannel,
25✔
618
                ]{},
25✔
619
                newActiveChannel:     make(chan *newChannelMsg, 1),
25✔
620
                newPendingChannel:    make(chan *newChannelMsg, 1),
25✔
621
                removePendingChannel: make(chan *newChannelMsg),
25✔
622

25✔
623
                activeMsgStreams:   make(map[lnwire.ChannelID]*msgStream),
25✔
624
                activeChanCloses:   make(map[lnwire.ChannelID]*chancloser.ChanCloser),
25✔
625
                localCloseChanReqs: make(chan *htlcswitch.ChanClose),
25✔
626
                linkFailures:       make(chan linkFailureReport),
25✔
627
                chanCloseMsgs:      make(chan *closeMsg),
25✔
628
                resentChanSyncMsg:  make(map[lnwire.ChannelID]struct{}),
25✔
629
                startReady:         make(chan struct{}),
25✔
630
                quit:               make(chan struct{}),
25✔
631
                log:                peerLog.WithPrefix(logPrefix),
25✔
632
                msgRouter:          msgRouter,
25✔
633
                globalMsgRouter:    globalMsgRouter,
25✔
634
        }
25✔
635

25✔
636
        if cfg.Conn != nil && cfg.Conn.RemoteAddr() != nil {
25✔
UNCOV
637
                remoteAddr := cfg.Conn.RemoteAddr().String()
×
UNCOV
638
                p.isTorConnection = strings.Contains(remoteAddr, ".onion") ||
×
UNCOV
639
                        strings.Contains(remoteAddr, "127.0.0.1")
×
UNCOV
640
        }
×
641

642
        var (
25✔
643
                lastBlockHeader           *wire.BlockHeader
25✔
644
                lastSerializedBlockHeader [wire.MaxBlockHeaderPayload]byte
25✔
645
        )
25✔
646
        newPingPayload := func() []byte {
25✔
647
                // We query the BestBlockHeader from our BestBlockView each time
×
648
                // this is called, and update our serialized block header if
×
649
                // they differ.  Over time, we'll use this to disseminate the
×
650
                // latest block header between all our peers, which can later be
×
651
                // used to cross-check our own view of the network to mitigate
×
652
                // various types of eclipse attacks.
×
653
                header, err := p.cfg.BestBlockView.BestBlockHeader()
×
654
                if err != nil && header == lastBlockHeader {
×
655
                        return lastSerializedBlockHeader[:]
×
656
                }
×
657

658
                buf := bytes.NewBuffer(lastSerializedBlockHeader[0:0])
×
659
                err = header.Serialize(buf)
×
660
                if err == nil {
×
661
                        lastBlockHeader = header
×
662
                } else {
×
663
                        p.log.Warn("unable to serialize current block" +
×
664
                                "header for ping payload generation." +
×
665
                                "This should be impossible and means" +
×
666
                                "there is an implementation bug.")
×
667
                }
×
668

669
                return lastSerializedBlockHeader[:]
×
670
        }
671

672
        // TODO(roasbeef): make dynamic in order to create fake cover traffic.
673
        //
674
        // NOTE(proofofkeags): this was changed to be dynamic to allow better
675
        // pong identification, however, more thought is needed to make this
676
        // actually usable as a traffic decoy.
677
        randPongSize := func() uint16 {
25✔
678
                return uint16(
×
679
                        // We don't need cryptographic randomness here.
×
680
                        /* #nosec */
×
681
                        rand.Intn(pongSizeCeiling) + 1,
×
682
                )
×
683
        }
×
684

685
        p.pingManager = NewPingManager(&PingManagerConfig{
25✔
686
                NewPingPayload:   newPingPayload,
25✔
687
                NewPongSize:      randPongSize,
25✔
688
                IntervalDuration: p.scaleTimeout(pingInterval),
25✔
689
                TimeoutDuration:  p.scaleTimeout(pingTimeout),
25✔
690
                SendPing: func(ping *lnwire.Ping) {
25✔
691
                        p.queueMsg(ping, nil)
×
692
                },
×
693
                OnPongFailure: func(err error) {
×
694
                        eStr := "pong response failure for %s: %v " +
×
695
                                "-- disconnecting"
×
696
                        p.log.Warnf(eStr, p, err)
×
697
                        go p.Disconnect(fmt.Errorf(eStr, p, err))
×
698
                },
×
699
        })
700

701
        return p
25✔
702
}
703

704
// Start starts all helper goroutines the peer needs for normal operations.  In
705
// the case this peer has already been started, then this function is a noop.
706
func (p *Brontide) Start() error {
3✔
707
        if atomic.AddInt32(&p.started, 1) != 1 {
3✔
708
                return nil
×
709
        }
×
710

711
        // Once we've finished starting up the peer, we'll signal to other
712
        // goroutines that the they can move forward to tear down the peer, or
713
        // carry out other relevant changes.
714
        defer close(p.startReady)
3✔
715

3✔
716
        p.log.Tracef("starting with conn[%v->%v]",
3✔
717
                p.cfg.Conn.LocalAddr(), p.cfg.Conn.RemoteAddr())
3✔
718

3✔
719
        // Fetch and then load all the active channels we have with this remote
3✔
720
        // peer from the database.
3✔
721
        activeChans, err := p.cfg.ChannelDB.FetchOpenChannels(
3✔
722
                p.cfg.Addr.IdentityKey,
3✔
723
        )
3✔
724
        if err != nil {
3✔
725
                p.log.Errorf("Unable to fetch active chans "+
×
726
                        "for peer: %v", err)
×
727
                return err
×
728
        }
×
729

730
        if len(activeChans) == 0 {
4✔
731
                go p.cfg.PrunePersistentPeerConnection(p.cfg.PubKeyBytes)
1✔
732
        }
1✔
733

734
        // Quickly check if we have any existing legacy channels with this
735
        // peer.
736
        haveLegacyChan := false
3✔
737
        for _, c := range activeChans {
5✔
738
                if c.ChanType.IsTweakless() {
4✔
739
                        continue
2✔
740
                }
741

UNCOV
742
                haveLegacyChan = true
×
UNCOV
743
                break
×
744
        }
745

746
        // Exchange local and global features, the init message should be very
747
        // first between two nodes.
748
        if err := p.sendInitMsg(haveLegacyChan); err != nil {
3✔
749
                return fmt.Errorf("unable to send init msg: %w", err)
×
750
        }
×
751

752
        // Before we launch any of the helper goroutines off the peer struct,
753
        // we'll first ensure proper adherence to the p2p protocol. The init
754
        // message MUST be sent before any other message.
755
        readErr := make(chan error, 1)
3✔
756
        msgChan := make(chan lnwire.Message, 1)
3✔
757
        p.wg.Add(1)
3✔
758
        go func() {
6✔
759
                defer p.wg.Done()
3✔
760

3✔
761
                msg, err := p.readNextMessage()
3✔
762
                if err != nil {
3✔
763
                        readErr <- err
×
764
                        msgChan <- nil
×
765
                        return
×
766
                }
×
767
                readErr <- nil
3✔
768
                msgChan <- msg
3✔
769
        }()
770

771
        select {
3✔
772
        // In order to avoid blocking indefinitely, we'll give the other peer
773
        // an upper timeout to respond before we bail out early.
774
        case <-time.After(handshakeTimeout):
×
775
                return fmt.Errorf("peer did not complete handshake within %v",
×
776
                        handshakeTimeout)
×
777
        case err := <-readErr:
3✔
778
                if err != nil {
3✔
779
                        return fmt.Errorf("unable to read init msg: %w", err)
×
780
                }
×
781
        }
782

783
        // Once the init message arrives, we can parse it so we can figure out
784
        // the negotiation of features for this session.
785
        msg := <-msgChan
3✔
786
        if msg, ok := msg.(*lnwire.Init); ok {
6✔
787
                if err := p.handleInitMsg(msg); err != nil {
3✔
788
                        p.storeError(err)
×
789
                        return err
×
790
                }
×
791
        } else {
×
792
                return errors.New("very first message between nodes " +
×
793
                        "must be init message")
×
794
        }
×
795

796
        // Next, load all the active channels we have with this peer,
797
        // registering them with the switch and launching the necessary
798
        // goroutines required to operate them.
799
        p.log.Debugf("Loaded %v active channels from database",
3✔
800
                len(activeChans))
3✔
801

3✔
802
        // Conditionally subscribe to channel events before loading channels so
3✔
803
        // we won't miss events. This subscription is used to listen to active
3✔
804
        // channel event when reenabling channels. Once the reenabling process
3✔
805
        // is finished, this subscription will be canceled.
3✔
806
        //
3✔
807
        // NOTE: ChannelNotifier must be started before subscribing events
3✔
808
        // otherwise we'd panic here.
3✔
809
        if err := p.attachChannelEventSubscription(); err != nil {
3✔
810
                return err
×
811
        }
×
812

813
        // Register the message router now as we may need to register some
814
        // endpoints while loading the channels below.
815
        p.msgRouter.WhenSome(func(router msgmux.Router) {
6✔
816
                router.Start()
3✔
817
        })
3✔
818

819
        msgs, err := p.loadActiveChannels(activeChans)
3✔
820
        if err != nil {
3✔
821
                return fmt.Errorf("unable to load channels: %w", err)
×
822
        }
×
823

824
        p.startTime = time.Now()
3✔
825

3✔
826
        // Before launching the writeHandler goroutine, we send any channel
3✔
827
        // sync messages that must be resent for borked channels. We do this to
3✔
828
        // avoid data races with WriteMessage & Flush calls.
3✔
829
        if len(msgs) > 0 {
5✔
830
                p.log.Infof("Sending %d channel sync messages to peer after "+
2✔
831
                        "loading active channels", len(msgs))
2✔
832

2✔
833
                // Send the messages directly via writeMessage and bypass the
2✔
834
                // writeHandler goroutine.
2✔
835
                for _, msg := range msgs {
4✔
836
                        if err := p.writeMessage(msg); err != nil {
2✔
837
                                return fmt.Errorf("unable to send "+
×
838
                                        "reestablish msg: %v", err)
×
839
                        }
×
840
                }
841
        }
842

843
        err = p.pingManager.Start()
3✔
844
        if err != nil {
3✔
845
                return fmt.Errorf("could not start ping manager %w", err)
×
846
        }
×
847

848
        p.wg.Add(4)
3✔
849
        go p.queueHandler()
3✔
850
        go p.writeHandler()
3✔
851
        go p.channelManager()
3✔
852
        go p.readHandler()
3✔
853

3✔
854
        // Signal to any external processes that the peer is now active.
3✔
855
        close(p.activeSignal)
3✔
856

3✔
857
        // Node announcements don't propagate very well throughout the network
3✔
858
        // as there isn't a way to efficiently query for them through their
3✔
859
        // timestamp, mostly affecting nodes that were offline during the time
3✔
860
        // of broadcast. We'll resend our node announcement to the remote peer
3✔
861
        // as a best-effort delivery such that it can also propagate to their
3✔
862
        // peers. To ensure they can successfully process it in most cases,
3✔
863
        // we'll only resend it as long as we have at least one confirmed
3✔
864
        // advertised channel with the remote peer.
3✔
865
        //
3✔
866
        // TODO(wilmer): Remove this once we're able to query for node
3✔
867
        // announcements through their timestamps.
3✔
868
        p.wg.Add(2)
3✔
869
        go p.maybeSendNodeAnn(activeChans)
3✔
870
        go p.maybeSendChannelUpdates()
3✔
871

3✔
872
        return nil
3✔
873
}
874

875
// initGossipSync initializes either a gossip syncer or an initial routing
876
// dump, depending on the negotiated synchronization method.
877
func (p *Brontide) initGossipSync() {
3✔
878
        // If the remote peer knows of the new gossip queries feature, then
3✔
879
        // we'll create a new gossipSyncer in the AuthenticatedGossiper for it.
3✔
880
        if p.remoteFeatures.HasFeature(lnwire.GossipQueriesOptional) {
6✔
881
                p.log.Info("Negotiated chan series queries")
3✔
882

3✔
883
                if p.cfg.AuthGossiper == nil {
6✔
884
                        // This should only ever be hit in the unit tests.
3✔
885
                        p.log.Warn("No AuthGossiper configured. Abandoning " +
3✔
886
                                "gossip sync.")
3✔
887
                        return
3✔
888
                }
3✔
889

890
                // Register the peer's gossip syncer with the gossiper.
891
                // This blocks synchronously to ensure the gossip syncer is
892
                // registered with the gossiper before attempting to read
893
                // messages from the remote peer.
894
                //
895
                // TODO(wilmer): Only sync updates from non-channel peers. This
896
                // requires an improved version of the current network
897
                // bootstrapper to ensure we can find and connect to non-channel
898
                // peers.
UNCOV
899
                p.cfg.AuthGossiper.InitSyncState(p)
×
900
        }
901
}
902

903
// taprootShutdownAllowed returns true if both parties have negotiated the
904
// shutdown-any-segwit feature.
905
func (p *Brontide) taprootShutdownAllowed() bool {
6✔
906
        return p.RemoteFeatures().HasFeature(lnwire.ShutdownAnySegwitOptional) &&
6✔
907
                p.LocalFeatures().HasFeature(lnwire.ShutdownAnySegwitOptional)
6✔
908
}
6✔
909

910
// QuitSignal is a method that should return a channel which will be sent upon
911
// or closed once the backing peer exits. This allows callers using the
912
// interface to cancel any processing in the event the backing implementation
913
// exits.
914
//
915
// NOTE: Part of the lnpeer.Peer interface.
UNCOV
916
func (p *Brontide) QuitSignal() <-chan struct{} {
×
UNCOV
917
        return p.quit
×
UNCOV
918
}
×
919

920
// addrWithInternalKey takes a delivery script, then attempts to supplement it
921
// with information related to the internal key for the addr, but only if it's
922
// a taproot addr.
923
func (p *Brontide) addrWithInternalKey(
924
        deliveryScript []byte) (*chancloser.DeliveryAddrWithKey, error) {
9✔
925

9✔
926
        // Currently, custom channels cannot be created with external upfront
9✔
927
        // shutdown addresses, so this shouldn't be an issue. We only require
9✔
928
        // the internal key for taproot addresses to be able to provide a non
9✔
929
        // inclusion proof of any scripts.
9✔
930
        internalKeyDesc, err := lnwallet.InternalKeyForAddr(
9✔
931
                p.cfg.Wallet, &p.cfg.Wallet.Cfg.NetParams, deliveryScript,
9✔
932
        )
9✔
933
        if err != nil {
9✔
934
                return nil, fmt.Errorf("unable to fetch internal key: %w", err)
×
935
        }
×
936

937
        return &chancloser.DeliveryAddrWithKey{
9✔
938
                DeliveryAddress: deliveryScript,
9✔
939
                InternalKey: fn.MapOption(
9✔
940
                        func(desc keychain.KeyDescriptor) btcec.PublicKey {
9✔
UNCOV
941
                                return *desc.PubKey
×
UNCOV
942
                        },
×
943
                )(internalKeyDesc),
944
        }, nil
945
}
946

947
// loadActiveChannels creates indexes within the peer for tracking all active
948
// channels returned by the database. It returns a slice of channel reestablish
949
// messages that should be sent to the peer immediately, in case we have borked
950
// channels that haven't been closed yet.
951
func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
952
        []lnwire.Message, error) {
3✔
953

3✔
954
        // Return a slice of messages to send to the peers in case the channel
3✔
955
        // cannot be loaded normally.
3✔
956
        var msgs []lnwire.Message
3✔
957

3✔
958
        scidAliasNegotiated := p.hasNegotiatedScidAlias()
3✔
959

3✔
960
        for _, dbChan := range chans {
5✔
961
                hasScidFeature := dbChan.ChanType.HasScidAliasFeature()
2✔
962
                if scidAliasNegotiated && !hasScidFeature {
2✔
UNCOV
963
                        // We'll request and store an alias, making sure that a
×
UNCOV
964
                        // gossiper mapping is not created for the alias to the
×
UNCOV
965
                        // real SCID. This is done because the peer and funding
×
UNCOV
966
                        // manager are not aware of each other's states and if
×
UNCOV
967
                        // we did not do this, we would accept alias channel
×
UNCOV
968
                        // updates after 6 confirmations, which would be buggy.
×
UNCOV
969
                        // We'll queue a channel_ready message with the new
×
UNCOV
970
                        // alias. This should technically be done *after* the
×
UNCOV
971
                        // reestablish, but this behavior is pre-existing since
×
UNCOV
972
                        // the funding manager may already queue a
×
UNCOV
973
                        // channel_ready before the channel_reestablish.
×
UNCOV
974
                        if !dbChan.IsPending {
×
UNCOV
975
                                aliasScid, err := p.cfg.RequestAlias()
×
UNCOV
976
                                if err != nil {
×
977
                                        return nil, err
×
978
                                }
×
979

UNCOV
980
                                err = p.cfg.AddLocalAlias(
×
UNCOV
981
                                        aliasScid, dbChan.ShortChanID(), false,
×
UNCOV
982
                                        false,
×
UNCOV
983
                                )
×
UNCOV
984
                                if err != nil {
×
985
                                        return nil, err
×
986
                                }
×
987

UNCOV
988
                                chanID := lnwire.NewChanIDFromOutPoint(
×
UNCOV
989
                                        dbChan.FundingOutpoint,
×
UNCOV
990
                                )
×
UNCOV
991

×
UNCOV
992
                                // Fetch the second commitment point to send in
×
UNCOV
993
                                // the channel_ready message.
×
UNCOV
994
                                second, err := dbChan.SecondCommitmentPoint()
×
UNCOV
995
                                if err != nil {
×
996
                                        return nil, err
×
997
                                }
×
998

UNCOV
999
                                channelReadyMsg := lnwire.NewChannelReady(
×
UNCOV
1000
                                        chanID, second,
×
UNCOV
1001
                                )
×
UNCOV
1002
                                channelReadyMsg.AliasScid = &aliasScid
×
UNCOV
1003

×
UNCOV
1004
                                msgs = append(msgs, channelReadyMsg)
×
1005
                        }
1006

1007
                        // If we've negotiated the option-scid-alias feature
1008
                        // and this channel does not have ScidAliasFeature set
1009
                        // to true due to an upgrade where the feature bit was
1010
                        // turned on, we'll update the channel's database
1011
                        // state.
UNCOV
1012
                        err := dbChan.MarkScidAliasNegotiated()
×
UNCOV
1013
                        if err != nil {
×
1014
                                return nil, err
×
1015
                        }
×
1016
                }
1017

1018
                var chanOpts []lnwallet.ChannelOpt
2✔
1019
                p.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
2✔
1020
                        chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
1021
                })
×
1022
                p.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
2✔
1023
                        chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
1024
                })
×
1025
                p.cfg.AuxResolver.WhenSome(
2✔
1026
                        func(s lnwallet.AuxContractResolver) {
2✔
1027
                                chanOpts = append(
×
1028
                                        chanOpts, lnwallet.WithAuxResolver(s),
×
1029
                                )
×
1030
                        },
×
1031
                )
1032

1033
                lnChan, err := lnwallet.NewLightningChannel(
2✔
1034
                        p.cfg.Signer, dbChan, p.cfg.SigPool, chanOpts...,
2✔
1035
                )
2✔
1036
                if err != nil {
2✔
1037
                        return nil, fmt.Errorf("unable to create channel "+
×
1038
                                "state machine: %w", err)
×
1039
                }
×
1040

1041
                chanPoint := dbChan.FundingOutpoint
2✔
1042

2✔
1043
                chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
2✔
1044

2✔
1045
                p.log.Infof("Loading ChannelPoint(%v), isPending=%v",
2✔
1046
                        chanPoint, lnChan.IsPending())
2✔
1047

2✔
1048
                // Skip adding any permanently irreconcilable channels to the
2✔
1049
                // htlcswitch.
2✔
1050
                if !dbChan.HasChanStatus(channeldb.ChanStatusDefault) &&
2✔
1051
                        !dbChan.HasChanStatus(channeldb.ChanStatusRestored) {
4✔
1052

2✔
1053
                        p.log.Warnf("ChannelPoint(%v) has status %v, won't "+
2✔
1054
                                "start.", chanPoint, dbChan.ChanStatus())
2✔
1055

2✔
1056
                        // To help our peer recover from a potential data loss,
2✔
1057
                        // we resend our channel reestablish message if the
2✔
1058
                        // channel is in a borked state. We won't process any
2✔
1059
                        // channel reestablish message sent from the peer, but
2✔
1060
                        // that's okay since the assumption is that we did when
2✔
1061
                        // marking the channel borked.
2✔
1062
                        chanSync, err := dbChan.ChanSyncMsg()
2✔
1063
                        if err != nil {
2✔
1064
                                p.log.Errorf("Unable to create channel "+
×
1065
                                        "reestablish message for channel %v: "+
×
1066
                                        "%v", chanPoint, err)
×
1067
                                continue
×
1068
                        }
1069

1070
                        msgs = append(msgs, chanSync)
2✔
1071

2✔
1072
                        // Check if this channel needs to have the cooperative
2✔
1073
                        // close process restarted. If so, we'll need to send
2✔
1074
                        // the Shutdown message that is returned.
2✔
1075
                        if dbChan.HasChanStatus(
2✔
1076
                                channeldb.ChanStatusCoopBroadcasted,
2✔
1077
                        ) {
2✔
1078

×
1079
                                shutdownMsg, err := p.restartCoopClose(lnChan)
×
1080
                                if err != nil {
×
1081
                                        p.log.Errorf("Unable to restart "+
×
1082
                                                "coop close for channel: %v",
×
1083
                                                err)
×
1084
                                        continue
×
1085
                                }
1086

1087
                                if shutdownMsg == nil {
×
1088
                                        continue
×
1089
                                }
1090

1091
                                // Append the message to the set of messages to
1092
                                // send.
1093
                                msgs = append(msgs, shutdownMsg)
×
1094
                        }
1095

1096
                        continue
2✔
1097
                }
1098

1099
                // Before we register this new link with the HTLC Switch, we'll
1100
                // need to fetch its current link-layer forwarding policy from
1101
                // the database.
UNCOV
1102
                graph := p.cfg.ChannelGraph
×
UNCOV
1103
                info, p1, p2, err := graph.FetchChannelEdgesByOutpoint(
×
UNCOV
1104
                        &chanPoint,
×
UNCOV
1105
                )
×
UNCOV
1106
                if err != nil && !errors.Is(err, graphdb.ErrEdgeNotFound) {
×
1107
                        return nil, err
×
1108
                }
×
1109

1110
                // We'll filter out our policy from the directional channel
1111
                // edges based whom the edge connects to. If it doesn't connect
1112
                // to us, then we know that we were the one that advertised the
1113
                // policy.
1114
                //
1115
                // TODO(roasbeef): can add helper method to get policy for
1116
                // particular channel.
UNCOV
1117
                var selfPolicy *models.ChannelEdgePolicy
×
UNCOV
1118
                if info != nil && bytes.Equal(info.NodeKey1Bytes[:],
×
UNCOV
1119
                        p.cfg.ServerPubKey[:]) {
×
UNCOV
1120

×
UNCOV
1121
                        selfPolicy = p1
×
UNCOV
1122
                } else {
×
UNCOV
1123
                        selfPolicy = p2
×
UNCOV
1124
                }
×
1125

1126
                // If we don't yet have an advertised routing policy, then
1127
                // we'll use the current default, otherwise we'll translate the
1128
                // routing policy into a forwarding policy.
UNCOV
1129
                var forwardingPolicy *models.ForwardingPolicy
×
UNCOV
1130
                if selfPolicy != nil {
×
UNCOV
1131
                        var inboundWireFee lnwire.Fee
×
UNCOV
1132
                        _, err := selfPolicy.ExtraOpaqueData.ExtractRecords(
×
UNCOV
1133
                                &inboundWireFee,
×
UNCOV
1134
                        )
×
UNCOV
1135
                        if err != nil {
×
1136
                                return nil, err
×
1137
                        }
×
1138

UNCOV
1139
                        inboundFee := models.NewInboundFeeFromWire(
×
UNCOV
1140
                                inboundWireFee,
×
UNCOV
1141
                        )
×
UNCOV
1142

×
UNCOV
1143
                        forwardingPolicy = &models.ForwardingPolicy{
×
UNCOV
1144
                                MinHTLCOut:    selfPolicy.MinHTLC,
×
UNCOV
1145
                                MaxHTLC:       selfPolicy.MaxHTLC,
×
UNCOV
1146
                                BaseFee:       selfPolicy.FeeBaseMSat,
×
UNCOV
1147
                                FeeRate:       selfPolicy.FeeProportionalMillionths,
×
UNCOV
1148
                                TimeLockDelta: uint32(selfPolicy.TimeLockDelta),
×
UNCOV
1149

×
UNCOV
1150
                                InboundFee: inboundFee,
×
UNCOV
1151
                        }
×
UNCOV
1152
                } else {
×
UNCOV
1153
                        p.log.Warnf("Unable to find our forwarding policy "+
×
UNCOV
1154
                                "for channel %v, using default values",
×
UNCOV
1155
                                chanPoint)
×
UNCOV
1156
                        forwardingPolicy = &p.cfg.RoutingPolicy
×
UNCOV
1157
                }
×
1158

UNCOV
1159
                p.log.Tracef("Using link policy of: %v",
×
UNCOV
1160
                        spew.Sdump(forwardingPolicy))
×
UNCOV
1161

×
UNCOV
1162
                // If the channel is pending, set the value to nil in the
×
UNCOV
1163
                // activeChannels map. This is done to signify that the channel
×
UNCOV
1164
                // is pending. We don't add the link to the switch here - it's
×
UNCOV
1165
                // the funding manager's responsibility to spin up pending
×
UNCOV
1166
                // channels. Adding them here would just be extra work as we'll
×
UNCOV
1167
                // tear them down when creating + adding the final link.
×
UNCOV
1168
                if lnChan.IsPending() {
×
UNCOV
1169
                        p.activeChannels.Store(chanID, nil)
×
UNCOV
1170

×
UNCOV
1171
                        continue
×
1172
                }
1173

UNCOV
1174
                shutdownInfo, err := lnChan.State().ShutdownInfo()
×
UNCOV
1175
                if err != nil && !errors.Is(err, channeldb.ErrNoShutdownInfo) {
×
1176
                        return nil, err
×
1177
                }
×
1178

UNCOV
1179
                var (
×
UNCOV
1180
                        shutdownMsg     fn.Option[lnwire.Shutdown]
×
UNCOV
1181
                        shutdownInfoErr error
×
UNCOV
1182
                )
×
UNCOV
1183
                shutdownInfo.WhenSome(func(info channeldb.ShutdownInfo) {
×
UNCOV
1184
                        // Compute an ideal fee.
×
UNCOV
1185
                        feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
×
UNCOV
1186
                                p.cfg.CoopCloseTargetConfs,
×
UNCOV
1187
                        )
×
UNCOV
1188
                        if err != nil {
×
1189
                                shutdownInfoErr = fmt.Errorf("unable to "+
×
1190
                                        "estimate fee: %w", err)
×
1191

×
1192
                                return
×
1193
                        }
×
1194

UNCOV
1195
                        addr, err := p.addrWithInternalKey(
×
UNCOV
1196
                                info.DeliveryScript.Val,
×
UNCOV
1197
                        )
×
UNCOV
1198
                        if err != nil {
×
1199
                                shutdownInfoErr = fmt.Errorf("unable to make "+
×
1200
                                        "delivery addr: %w", err)
×
1201
                                return
×
1202
                        }
×
UNCOV
1203
                        chanCloser, err := p.createChanCloser(
×
UNCOV
1204
                                lnChan, addr, feePerKw, nil, info.Closer(),
×
UNCOV
1205
                        )
×
UNCOV
1206
                        if err != nil {
×
1207
                                shutdownInfoErr = fmt.Errorf("unable to "+
×
1208
                                        "create chan closer: %w", err)
×
1209

×
1210
                                return
×
1211
                        }
×
1212

UNCOV
1213
                        chanID := lnwire.NewChanIDFromOutPoint(
×
UNCOV
1214
                                lnChan.State().FundingOutpoint,
×
UNCOV
1215
                        )
×
UNCOV
1216

×
UNCOV
1217
                        p.activeChanCloses[chanID] = chanCloser
×
UNCOV
1218

×
UNCOV
1219
                        // Create the Shutdown message.
×
UNCOV
1220
                        shutdown, err := chanCloser.ShutdownChan()
×
UNCOV
1221
                        if err != nil {
×
1222
                                delete(p.activeChanCloses, chanID)
×
1223
                                shutdownInfoErr = err
×
1224

×
1225
                                return
×
1226
                        }
×
1227

UNCOV
1228
                        shutdownMsg = fn.Some(*shutdown)
×
1229
                })
UNCOV
1230
                if shutdownInfoErr != nil {
×
1231
                        return nil, shutdownInfoErr
×
1232
                }
×
1233

1234
                // Subscribe to the set of on-chain events for this channel.
UNCOV
1235
                chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(
×
UNCOV
1236
                        chanPoint,
×
UNCOV
1237
                )
×
UNCOV
1238
                if err != nil {
×
1239
                        return nil, err
×
1240
                }
×
1241

UNCOV
1242
                err = p.addLink(
×
UNCOV
1243
                        &chanPoint, lnChan, forwardingPolicy, chainEvents,
×
UNCOV
1244
                        true, shutdownMsg,
×
UNCOV
1245
                )
×
UNCOV
1246
                if err != nil {
×
1247
                        return nil, fmt.Errorf("unable to add link %v to "+
×
1248
                                "switch: %v", chanPoint, err)
×
1249
                }
×
1250

UNCOV
1251
                p.activeChannels.Store(chanID, lnChan)
×
1252
        }
1253

1254
        return msgs, nil
3✔
1255
}
1256

1257
// addLink creates and adds a new ChannelLink from the specified channel.
1258
func (p *Brontide) addLink(chanPoint *wire.OutPoint,
1259
        lnChan *lnwallet.LightningChannel,
1260
        forwardingPolicy *models.ForwardingPolicy,
1261
        chainEvents *contractcourt.ChainEventSubscription,
UNCOV
1262
        syncStates bool, shutdownMsg fn.Option[lnwire.Shutdown]) error {
×
UNCOV
1263

×
UNCOV
1264
        // onChannelFailure will be called by the link in case the channel
×
UNCOV
1265
        // fails for some reason.
×
UNCOV
1266
        onChannelFailure := func(chanID lnwire.ChannelID,
×
UNCOV
1267
                shortChanID lnwire.ShortChannelID,
×
UNCOV
1268
                linkErr htlcswitch.LinkFailureError) {
×
UNCOV
1269

×
UNCOV
1270
                failure := linkFailureReport{
×
UNCOV
1271
                        chanPoint:   *chanPoint,
×
UNCOV
1272
                        chanID:      chanID,
×
UNCOV
1273
                        shortChanID: shortChanID,
×
UNCOV
1274
                        linkErr:     linkErr,
×
UNCOV
1275
                }
×
UNCOV
1276

×
UNCOV
1277
                select {
×
UNCOV
1278
                case p.linkFailures <- failure:
×
1279
                case <-p.quit:
×
1280
                case <-p.cfg.Quit:
×
1281
                }
1282
        }
1283

UNCOV
1284
        updateContractSignals := func(signals *contractcourt.ContractSignals) error {
×
UNCOV
1285
                return p.cfg.ChainArb.UpdateContractSignals(*chanPoint, signals)
×
UNCOV
1286
        }
×
1287

UNCOV
1288
        notifyContractUpdate := func(update *contractcourt.ContractUpdate) error {
×
UNCOV
1289
                return p.cfg.ChainArb.NotifyContractUpdate(*chanPoint, update)
×
UNCOV
1290
        }
×
1291

1292
        //nolint:ll
UNCOV
1293
        linkCfg := htlcswitch.ChannelLinkConfig{
×
UNCOV
1294
                Peer:                   p,
×
UNCOV
1295
                DecodeHopIterators:     p.cfg.Sphinx.DecodeHopIterators,
×
UNCOV
1296
                ExtractErrorEncrypter:  p.cfg.Sphinx.ExtractErrorEncrypter,
×
UNCOV
1297
                FetchLastChannelUpdate: p.cfg.FetchLastChanUpdate,
×
UNCOV
1298
                HodlMask:               p.cfg.Hodl.Mask(),
×
UNCOV
1299
                Registry:               p.cfg.Invoices,
×
UNCOV
1300
                BestHeight:             p.cfg.Switch.BestHeight,
×
UNCOV
1301
                Circuits:               p.cfg.Switch.CircuitModifier(),
×
UNCOV
1302
                ForwardPackets:         p.cfg.InterceptSwitch.ForwardPackets,
×
UNCOV
1303
                FwrdingPolicy:          *forwardingPolicy,
×
UNCOV
1304
                FeeEstimator:           p.cfg.FeeEstimator,
×
UNCOV
1305
                PreimageCache:          p.cfg.WitnessBeacon,
×
UNCOV
1306
                ChainEvents:            chainEvents,
×
UNCOV
1307
                UpdateContractSignals:  updateContractSignals,
×
UNCOV
1308
                NotifyContractUpdate:   notifyContractUpdate,
×
UNCOV
1309
                OnChannelFailure:       onChannelFailure,
×
UNCOV
1310
                SyncStates:             syncStates,
×
UNCOV
1311
                BatchTicker:            ticker.New(p.cfg.ChannelCommitInterval),
×
UNCOV
1312
                FwdPkgGCTicker:         ticker.New(time.Hour),
×
UNCOV
1313
                PendingCommitTicker: ticker.New(
×
UNCOV
1314
                        p.cfg.PendingCommitInterval,
×
UNCOV
1315
                ),
×
UNCOV
1316
                BatchSize:               p.cfg.ChannelCommitBatchSize,
×
UNCOV
1317
                UnsafeReplay:            p.cfg.UnsafeReplay,
×
UNCOV
1318
                MinUpdateTimeout:        htlcswitch.DefaultMinLinkFeeUpdateTimeout,
×
UNCOV
1319
                MaxUpdateTimeout:        htlcswitch.DefaultMaxLinkFeeUpdateTimeout,
×
UNCOV
1320
                OutgoingCltvRejectDelta: p.cfg.OutgoingCltvRejectDelta,
×
UNCOV
1321
                TowerClient:             p.cfg.TowerClient,
×
UNCOV
1322
                MaxOutgoingCltvExpiry:   p.cfg.MaxOutgoingCltvExpiry,
×
UNCOV
1323
                MaxFeeAllocation:        p.cfg.MaxChannelFeeAllocation,
×
UNCOV
1324
                MaxAnchorsCommitFeeRate: p.cfg.MaxAnchorsCommitFeeRate,
×
UNCOV
1325
                NotifyActiveLink:        p.cfg.ChannelNotifier.NotifyActiveLinkEvent,
×
UNCOV
1326
                NotifyActiveChannel:     p.cfg.ChannelNotifier.NotifyActiveChannelEvent,
×
UNCOV
1327
                NotifyInactiveChannel:   p.cfg.ChannelNotifier.NotifyInactiveChannelEvent,
×
UNCOV
1328
                NotifyInactiveLinkEvent: p.cfg.ChannelNotifier.NotifyInactiveLinkEvent,
×
UNCOV
1329
                HtlcNotifier:            p.cfg.HtlcNotifier,
×
UNCOV
1330
                GetAliases:              p.cfg.GetAliases,
×
UNCOV
1331
                PreviouslySentShutdown:  shutdownMsg,
×
UNCOV
1332
                DisallowRouteBlinding:   p.cfg.DisallowRouteBlinding,
×
UNCOV
1333
                MaxFeeExposure:          p.cfg.MaxFeeExposure,
×
UNCOV
1334
                ShouldFwdExpEndorsement: p.cfg.ShouldFwdExpEndorsement,
×
UNCOV
1335
                DisallowQuiescence: p.cfg.DisallowQuiescence ||
×
UNCOV
1336
                        !p.remoteFeatures.HasFeature(lnwire.QuiescenceOptional),
×
UNCOV
1337
                AuxTrafficShaper: p.cfg.AuxTrafficShaper,
×
UNCOV
1338
        }
×
UNCOV
1339

×
UNCOV
1340
        // Before adding our new link, purge the switch of any pending or live
×
UNCOV
1341
        // links going by the same channel id. If one is found, we'll shut it
×
UNCOV
1342
        // down to ensure that the mailboxes are only ever under the control of
×
UNCOV
1343
        // one link.
×
UNCOV
1344
        chanID := lnwire.NewChanIDFromOutPoint(*chanPoint)
×
UNCOV
1345
        p.cfg.Switch.RemoveLink(chanID)
×
UNCOV
1346

×
UNCOV
1347
        // With the channel link created, we'll now notify the htlc switch so
×
UNCOV
1348
        // this channel can be used to dispatch local payments and also
×
UNCOV
1349
        // passively forward payments.
×
UNCOV
1350
        return p.cfg.Switch.CreateAndAddLink(linkCfg, lnChan)
×
1351
}
1352

1353
// maybeSendNodeAnn sends our node announcement to the remote peer if at least
1354
// one confirmed public channel exists with them.
1355
func (p *Brontide) maybeSendNodeAnn(channels []*channeldb.OpenChannel) {
3✔
1356
        defer p.wg.Done()
3✔
1357

3✔
1358
        hasConfirmedPublicChan := false
3✔
1359
        for _, channel := range channels {
5✔
1360
                if channel.IsPending {
2✔
UNCOV
1361
                        continue
×
1362
                }
1363
                if channel.ChannelFlags&lnwire.FFAnnounceChannel == 0 {
4✔
1364
                        continue
2✔
1365
                }
1366

UNCOV
1367
                hasConfirmedPublicChan = true
×
UNCOV
1368
                break
×
1369
        }
1370
        if !hasConfirmedPublicChan {
6✔
1371
                return
3✔
1372
        }
3✔
1373

UNCOV
1374
        ourNodeAnn, err := p.cfg.GenNodeAnnouncement()
×
UNCOV
1375
        if err != nil {
×
1376
                p.log.Debugf("Unable to retrieve node announcement: %v", err)
×
1377
                return
×
1378
        }
×
1379

UNCOV
1380
        if err := p.SendMessageLazy(false, &ourNodeAnn); err != nil {
×
1381
                p.log.Debugf("Unable to resend node announcement: %v", err)
×
1382
        }
×
1383
}
1384

1385
// maybeSendChannelUpdates sends our channel updates to the remote peer if we
1386
// have any active channels with them.
1387
func (p *Brontide) maybeSendChannelUpdates() {
3✔
1388
        defer p.wg.Done()
3✔
1389

3✔
1390
        // If we don't have any active channels, then we can exit early.
3✔
1391
        if p.activeChannels.Len() == 0 {
4✔
1392
                return
1✔
1393
        }
1✔
1394

1395
        maybeSendUpd := func(cid lnwire.ChannelID,
2✔
1396
                lnChan *lnwallet.LightningChannel) error {
4✔
1397

2✔
1398
                // Nil channels are pending, so we'll skip them.
2✔
1399
                if lnChan == nil {
2✔
UNCOV
1400
                        return nil
×
UNCOV
1401
                }
×
1402

1403
                dbChan := lnChan.State()
2✔
1404
                scid := func() lnwire.ShortChannelID {
4✔
1405
                        switch {
2✔
1406
                        // Otherwise if it's a zero conf channel and confirmed,
1407
                        // then we need to use the "real" scid.
UNCOV
1408
                        case dbChan.IsZeroConf() && dbChan.ZeroConfConfirmed():
×
UNCOV
1409
                                return dbChan.ZeroConfRealScid()
×
1410

1411
                        // Otherwise, we can use the normal scid.
1412
                        default:
2✔
1413
                                return dbChan.ShortChanID()
2✔
1414
                        }
1415
                }()
1416

1417
                // Now that we know the channel is in a good state, we'll try
1418
                // to fetch the update to send to the remote peer. If the
1419
                // channel is pending, and not a zero conf channel, we'll get
1420
                // an error here which we'll ignore.
1421
                chanUpd, err := p.cfg.FetchLastChanUpdate(scid)
2✔
1422
                if err != nil {
2✔
UNCOV
1423
                        p.log.Debugf("Unable to fetch channel update for "+
×
UNCOV
1424
                                "ChannelPoint(%v), scid=%v: %v",
×
UNCOV
1425
                                dbChan.FundingOutpoint, dbChan.ShortChanID, err)
×
UNCOV
1426

×
UNCOV
1427
                        return nil
×
UNCOV
1428
                }
×
1429

1430
                p.log.Debugf("Sending channel update for ChannelPoint(%v), "+
2✔
1431
                        "scid=%v", dbChan.FundingOutpoint, dbChan.ShortChanID)
2✔
1432

2✔
1433
                // We'll send it as a normal message instead of using the lazy
2✔
1434
                // queue to prioritize transmission of the fresh update.
2✔
1435
                if err := p.SendMessage(false, chanUpd); err != nil {
2✔
1436
                        err := fmt.Errorf("unable to send channel update for "+
×
1437
                                "ChannelPoint(%v), scid=%v: %w",
×
1438
                                dbChan.FundingOutpoint, dbChan.ShortChanID(),
×
1439
                                err)
×
1440
                        p.log.Errorf(err.Error())
×
1441

×
1442
                        return err
×
1443
                }
×
1444

1445
                return nil
2✔
1446
        }
1447

1448
        p.activeChannels.ForEach(maybeSendUpd)
2✔
1449
}
1450

1451
// WaitForDisconnect waits until the peer has disconnected. A peer may be
1452
// disconnected if the local or remote side terminates the connection, or an
1453
// irrecoverable protocol error has been encountered. This method will only
1454
// begin watching the peer's waitgroup after the ready channel or the peer's
1455
// quit channel are signaled. The ready channel should only be signaled if a
1456
// call to Start returns no error. Otherwise, if the peer fails to start,
1457
// calling Disconnect will signal the quit channel and the method will not
1458
// block, since no goroutines were spawned.
UNCOV
1459
func (p *Brontide) WaitForDisconnect(ready chan struct{}) {
×
UNCOV
1460
        // Before we try to call the `Wait` goroutine, we'll make sure the main
×
UNCOV
1461
        // set of goroutines are already active.
×
UNCOV
1462
        select {
×
UNCOV
1463
        case <-p.startReady:
×
1464
        case <-p.quit:
×
1465
                return
×
1466
        }
1467

UNCOV
1468
        select {
×
UNCOV
1469
        case <-ready:
×
1470
        case <-p.quit:
×
1471
        }
1472

UNCOV
1473
        p.wg.Wait()
×
1474
}
1475

1476
// Disconnect terminates the connection with the remote peer. Additionally, a
1477
// signal is sent to the server and htlcSwitch indicating the resources
1478
// allocated to the peer can now be cleaned up.
UNCOV
1479
func (p *Brontide) Disconnect(reason error) {
×
UNCOV
1480
        if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) {
×
UNCOV
1481
                return
×
UNCOV
1482
        }
×
1483

1484
        // Make sure initialization has completed before we try to tear things
1485
        // down.
1486
        //
1487
        // NOTE: We only read the `startReady` chan if the peer has been
1488
        // started, otherwise we will skip reading it as this chan won't be
1489
        // closed, hence blocks forever.
UNCOV
1490
        if atomic.LoadInt32(&p.started) == 1 {
×
UNCOV
1491
                p.log.Debugf("Started, waiting on startReady signal")
×
UNCOV
1492

×
UNCOV
1493
                select {
×
UNCOV
1494
                case <-p.startReady:
×
1495
                case <-p.quit:
×
1496
                        return
×
1497
                }
1498
        }
1499

UNCOV
1500
        err := fmt.Errorf("disconnecting %s, reason: %v", p, reason)
×
UNCOV
1501
        p.storeError(err)
×
UNCOV
1502

×
UNCOV
1503
        p.log.Infof(err.Error())
×
UNCOV
1504

×
UNCOV
1505
        // Stop PingManager before closing TCP connection.
×
UNCOV
1506
        p.pingManager.Stop()
×
UNCOV
1507

×
UNCOV
1508
        // Ensure that the TCP connection is properly closed before continuing.
×
UNCOV
1509
        p.cfg.Conn.Close()
×
UNCOV
1510

×
UNCOV
1511
        close(p.quit)
×
UNCOV
1512

×
UNCOV
1513
        // If our msg router isn't global (local to this instance), then we'll
×
UNCOV
1514
        // stop it. Otherwise, we'll leave it running.
×
UNCOV
1515
        if !p.globalMsgRouter {
×
UNCOV
1516
                p.msgRouter.WhenSome(func(router msgmux.Router) {
×
UNCOV
1517
                        router.Stop()
×
UNCOV
1518
                })
×
1519
        }
1520
}
1521

1522
// String returns the string representation of this peer.
UNCOV
1523
func (p *Brontide) String() string {
×
UNCOV
1524
        return fmt.Sprintf("%x@%s", p.cfg.PubKeyBytes, p.cfg.Conn.RemoteAddr())
×
UNCOV
1525
}
×
1526

1527
// readNextMessage reads, and returns the next message on the wire along with
1528
// any additional raw payload.
1529
func (p *Brontide) readNextMessage() (lnwire.Message, error) {
7✔
1530
        noiseConn := p.cfg.Conn
7✔
1531
        err := noiseConn.SetReadDeadline(time.Time{})
7✔
1532
        if err != nil {
7✔
1533
                return nil, err
×
1534
        }
×
1535

1536
        pktLen, err := noiseConn.ReadNextHeader()
7✔
1537
        if err != nil {
7✔
UNCOV
1538
                return nil, fmt.Errorf("read next header: %w", err)
×
UNCOV
1539
        }
×
1540

1541
        // First we'll read the next _full_ message. We do this rather than
1542
        // reading incrementally from the stream as the Lightning wire protocol
1543
        // is message oriented and allows nodes to pad on additional data to
1544
        // the message stream.
1545
        var (
4✔
1546
                nextMsg lnwire.Message
4✔
1547
                msgLen  uint64
4✔
1548
        )
4✔
1549
        err = p.cfg.ReadPool.Submit(func(buf *buffer.Read) error {
8✔
1550
                // Before reading the body of the message, set the read timeout
4✔
1551
                // accordingly to ensure we don't block other readers using the
4✔
1552
                // pool. We do so only after the task has been scheduled to
4✔
1553
                // ensure the deadline doesn't expire while the message is in
4✔
1554
                // the process of being scheduled.
4✔
1555
                readDeadline := time.Now().Add(
4✔
1556
                        p.scaleTimeout(readMessageTimeout),
4✔
1557
                )
4✔
1558
                readErr := noiseConn.SetReadDeadline(readDeadline)
4✔
1559
                if readErr != nil {
4✔
1560
                        return readErr
×
1561
                }
×
1562

1563
                // The ReadNextBody method will actually end up re-using the
1564
                // buffer, so within this closure, we can continue to use
1565
                // rawMsg as it's just a slice into the buf from the buffer
1566
                // pool.
1567
                rawMsg, readErr := noiseConn.ReadNextBody(buf[:pktLen])
4✔
1568
                if readErr != nil {
4✔
1569
                        return fmt.Errorf("read next body: %w", readErr)
×
1570
                }
×
1571
                msgLen = uint64(len(rawMsg))
4✔
1572

4✔
1573
                // Next, create a new io.Reader implementation from the raw
4✔
1574
                // message, and use this to decode the message directly from.
4✔
1575
                msgReader := bytes.NewReader(rawMsg)
4✔
1576
                nextMsg, err = lnwire.ReadMessage(msgReader, 0)
4✔
1577
                if err != nil {
4✔
UNCOV
1578
                        return err
×
UNCOV
1579
                }
×
1580

1581
                // At this point, rawMsg and buf will be returned back to the
1582
                // buffer pool for re-use.
1583
                return nil
4✔
1584
        })
1585
        atomic.AddUint64(&p.bytesReceived, msgLen)
4✔
1586
        if err != nil {
4✔
UNCOV
1587
                return nil, err
×
UNCOV
1588
        }
×
1589

1590
        p.logWireMessage(nextMsg, true)
4✔
1591

4✔
1592
        return nextMsg, nil
4✔
1593
}
1594

1595
// msgStream implements a goroutine-safe, in-order stream of messages to be
1596
// delivered via closure to a receiver. These messages MUST be in order due to
1597
// the nature of the lightning channel commitment and gossiper state machines.
1598
// TODO(conner): use stream handler interface to abstract out stream
1599
// state/logging.
1600
type msgStream struct {
1601
        streamShutdown int32 // To be used atomically.
1602

1603
        peer *Brontide
1604

1605
        apply func(lnwire.Message)
1606

1607
        startMsg string
1608
        stopMsg  string
1609

1610
        msgCond *sync.Cond
1611
        msgs    []lnwire.Message
1612

1613
        mtx sync.Mutex
1614

1615
        producerSema chan struct{}
1616

1617
        wg   sync.WaitGroup
1618
        quit chan struct{}
1619
}
1620

1621
// newMsgStream creates a new instance of a chanMsgStream for a particular
1622
// channel identified by its channel ID. bufSize is the max number of messages
1623
// that should be buffered in the internal queue. Callers should set this to a
1624
// sane value that avoids blocking unnecessarily, but doesn't allow an
1625
// unbounded amount of memory to be allocated to buffer incoming messages.
1626
func newMsgStream(p *Brontide, startMsg, stopMsg string, bufSize uint32,
1627
        apply func(lnwire.Message)) *msgStream {
3✔
1628

3✔
1629
        stream := &msgStream{
3✔
1630
                peer:         p,
3✔
1631
                apply:        apply,
3✔
1632
                startMsg:     startMsg,
3✔
1633
                stopMsg:      stopMsg,
3✔
1634
                producerSema: make(chan struct{}, bufSize),
3✔
1635
                quit:         make(chan struct{}),
3✔
1636
        }
3✔
1637
        stream.msgCond = sync.NewCond(&stream.mtx)
3✔
1638

3✔
1639
        // Before we return the active stream, we'll populate the producer's
3✔
1640
        // semaphore channel. We'll use this to ensure that the producer won't
3✔
1641
        // attempt to allocate memory in the queue for an item until it has
3✔
1642
        // sufficient extra space.
3✔
1643
        for i := uint32(0); i < bufSize; i++ {
3,003✔
1644
                stream.producerSema <- struct{}{}
3,000✔
1645
        }
3,000✔
1646

1647
        return stream
3✔
1648
}
1649

1650
// Start starts the chanMsgStream.
1651
func (ms *msgStream) Start() {
3✔
1652
        ms.wg.Add(1)
3✔
1653
        go ms.msgConsumer()
3✔
1654
}
3✔
1655

1656
// Stop stops the chanMsgStream.
UNCOV
1657
func (ms *msgStream) Stop() {
×
UNCOV
1658
        // TODO(roasbeef): signal too?
×
UNCOV
1659

×
UNCOV
1660
        close(ms.quit)
×
UNCOV
1661

×
UNCOV
1662
        // Now that we've closed the channel, we'll repeatedly signal the msg
×
UNCOV
1663
        // consumer until we've detected that it has exited.
×
UNCOV
1664
        for atomic.LoadInt32(&ms.streamShutdown) == 0 {
×
UNCOV
1665
                ms.msgCond.Signal()
×
UNCOV
1666
                time.Sleep(time.Millisecond * 100)
×
UNCOV
1667
        }
×
1668

UNCOV
1669
        ms.wg.Wait()
×
1670
}
1671

1672
// msgConsumer is the main goroutine that streams messages from the peer's
1673
// readHandler directly to the target channel.
1674
func (ms *msgStream) msgConsumer() {
3✔
1675
        defer ms.wg.Done()
3✔
1676
        defer peerLog.Tracef(ms.stopMsg)
3✔
1677
        defer atomic.StoreInt32(&ms.streamShutdown, 1)
3✔
1678

3✔
1679
        peerLog.Tracef(ms.startMsg)
3✔
1680

3✔
1681
        for {
6✔
1682
                // First, we'll check our condition. If the queue of messages
3✔
1683
                // is empty, then we'll wait until a new item is added.
3✔
1684
                ms.msgCond.L.Lock()
3✔
1685
                for len(ms.msgs) == 0 {
6✔
1686
                        ms.msgCond.Wait()
3✔
1687

3✔
1688
                        // If we woke up in order to exit, then we'll do so.
3✔
1689
                        // Otherwise, we'll check the message queue for any new
3✔
1690
                        // items.
3✔
1691
                        select {
3✔
UNCOV
1692
                        case <-ms.peer.quit:
×
UNCOV
1693
                                ms.msgCond.L.Unlock()
×
UNCOV
1694
                                return
×
UNCOV
1695
                        case <-ms.quit:
×
UNCOV
1696
                                ms.msgCond.L.Unlock()
×
UNCOV
1697
                                return
×
UNCOV
1698
                        default:
×
1699
                        }
1700
                }
1701

1702
                // Grab the message off the front of the queue, shifting the
1703
                // slice's reference down one in order to remove the message
1704
                // from the queue.
UNCOV
1705
                msg := ms.msgs[0]
×
UNCOV
1706
                ms.msgs[0] = nil // Set to nil to prevent GC leak.
×
UNCOV
1707
                ms.msgs = ms.msgs[1:]
×
UNCOV
1708

×
UNCOV
1709
                ms.msgCond.L.Unlock()
×
UNCOV
1710

×
UNCOV
1711
                ms.apply(msg)
×
UNCOV
1712

×
UNCOV
1713
                // We've just successfully processed an item, so we'll signal
×
UNCOV
1714
                // to the producer that a new slot in the buffer. We'll use
×
UNCOV
1715
                // this to bound the size of the buffer to avoid allowing it to
×
UNCOV
1716
                // grow indefinitely.
×
UNCOV
1717
                select {
×
UNCOV
1718
                case ms.producerSema <- struct{}{}:
×
UNCOV
1719
                case <-ms.peer.quit:
×
UNCOV
1720
                        return
×
UNCOV
1721
                case <-ms.quit:
×
UNCOV
1722
                        return
×
1723
                }
1724
        }
1725
}
1726

1727
// AddMsg adds a new message to the msgStream. This function is safe for
1728
// concurrent access.
UNCOV
1729
func (ms *msgStream) AddMsg(msg lnwire.Message) {
×
UNCOV
1730
        // First, we'll attempt to receive from the producerSema struct. This
×
UNCOV
1731
        // acts as a semaphore to prevent us from indefinitely buffering
×
UNCOV
1732
        // incoming items from the wire. Either the msg queue isn't full, and
×
UNCOV
1733
        // we'll not block, or the queue is full, and we'll block until either
×
UNCOV
1734
        // we're signalled to quit, or a slot is freed up.
×
UNCOV
1735
        select {
×
UNCOV
1736
        case <-ms.producerSema:
×
1737
        case <-ms.peer.quit:
×
1738
                return
×
1739
        case <-ms.quit:
×
1740
                return
×
1741
        }
1742

1743
        // Next, we'll lock the condition, and add the message to the end of
1744
        // the message queue.
UNCOV
1745
        ms.msgCond.L.Lock()
×
UNCOV
1746
        ms.msgs = append(ms.msgs, msg)
×
UNCOV
1747
        ms.msgCond.L.Unlock()
×
UNCOV
1748

×
UNCOV
1749
        // With the message added, we signal to the msgConsumer that there are
×
UNCOV
1750
        // additional messages to consume.
×
UNCOV
1751
        ms.msgCond.Signal()
×
1752
}
1753

1754
// waitUntilLinkActive waits until the target link is active and returns a
1755
// ChannelLink to pass messages to. It accomplishes this by subscribing to
1756
// an ActiveLinkEvent which is emitted by the link when it first starts up.
1757
func waitUntilLinkActive(p *Brontide,
UNCOV
1758
        cid lnwire.ChannelID) htlcswitch.ChannelUpdateHandler {
×
UNCOV
1759

×
UNCOV
1760
        p.log.Tracef("Waiting for link=%v to be active", cid)
×
UNCOV
1761

×
UNCOV
1762
        // Subscribe to receive channel events.
×
UNCOV
1763
        //
×
UNCOV
1764
        // NOTE: If the link is already active by SubscribeChannelEvents, then
×
UNCOV
1765
        // GetLink will retrieve the link and we can send messages. If the link
×
UNCOV
1766
        // becomes active between SubscribeChannelEvents and GetLink, then GetLink
×
UNCOV
1767
        // will retrieve the link. If the link becomes active after GetLink, then
×
UNCOV
1768
        // we will get an ActiveLinkEvent notification and retrieve the link. If
×
UNCOV
1769
        // the call to GetLink is before SubscribeChannelEvents, however, there
×
UNCOV
1770
        // will be a race condition.
×
UNCOV
1771
        sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
×
UNCOV
1772
        if err != nil {
×
UNCOV
1773
                // If we have a non-nil error, then the server is shutting down and we
×
UNCOV
1774
                // can exit here and return nil. This means no message will be delivered
×
UNCOV
1775
                // to the link.
×
UNCOV
1776
                return nil
×
UNCOV
1777
        }
×
UNCOV
1778
        defer sub.Cancel()
×
UNCOV
1779

×
UNCOV
1780
        // The link may already be active by this point, and we may have missed the
×
UNCOV
1781
        // ActiveLinkEvent. Check if the link exists.
×
UNCOV
1782
        link := p.fetchLinkFromKeyAndCid(cid)
×
UNCOV
1783
        if link != nil {
×
UNCOV
1784
                return link
×
UNCOV
1785
        }
×
1786

1787
        // If the link is nil, we must wait for it to be active.
UNCOV
1788
        for {
×
UNCOV
1789
                select {
×
1790
                // A new event has been sent by the ChannelNotifier. We first check
1791
                // whether the event is an ActiveLinkEvent. If it is, we'll check
1792
                // that the event is for this channel. Otherwise, we discard the
1793
                // message.
UNCOV
1794
                case e := <-sub.Updates():
×
UNCOV
1795
                        event, ok := e.(channelnotifier.ActiveLinkEvent)
×
UNCOV
1796
                        if !ok {
×
UNCOV
1797
                                // Ignore this notification.
×
UNCOV
1798
                                continue
×
1799
                        }
1800

UNCOV
1801
                        chanPoint := event.ChannelPoint
×
UNCOV
1802

×
UNCOV
1803
                        // Check whether the retrieved chanPoint matches the target
×
UNCOV
1804
                        // channel id.
×
UNCOV
1805
                        if !cid.IsChanPoint(chanPoint) {
×
1806
                                continue
×
1807
                        }
1808

1809
                        // The link shouldn't be nil as we received an
1810
                        // ActiveLinkEvent. If it is nil, we return nil and the
1811
                        // calling function should catch it.
UNCOV
1812
                        return p.fetchLinkFromKeyAndCid(cid)
×
1813

UNCOV
1814
                case <-p.quit:
×
UNCOV
1815
                        return nil
×
1816
                }
1817
        }
1818
}
1819

1820
// newChanMsgStream is used to create a msgStream between the peer and
1821
// particular channel link in the htlcswitch. We utilize additional
1822
// synchronization with the fundingManager to ensure we don't attempt to
1823
// dispatch a message to a channel before it is fully active. A reference to the
1824
// channel this stream forwards to is held in scope to prevent unnecessary
1825
// lookups.
UNCOV
1826
func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream {
×
UNCOV
1827
        var chanLink htlcswitch.ChannelUpdateHandler
×
UNCOV
1828

×
UNCOV
1829
        apply := func(msg lnwire.Message) {
×
UNCOV
1830
                // This check is fine because if the link no longer exists, it will
×
UNCOV
1831
                // be removed from the activeChannels map and subsequent messages
×
UNCOV
1832
                // shouldn't reach the chan msg stream.
×
UNCOV
1833
                if chanLink == nil {
×
UNCOV
1834
                        chanLink = waitUntilLinkActive(p, cid)
×
UNCOV
1835

×
UNCOV
1836
                        // If the link is still not active and the calling function
×
UNCOV
1837
                        // errored out, just return.
×
UNCOV
1838
                        if chanLink == nil {
×
UNCOV
1839
                                p.log.Warnf("Link=%v is not active", cid)
×
UNCOV
1840
                                return
×
UNCOV
1841
                        }
×
1842
                }
1843

1844
                // In order to avoid unnecessarily delivering message
1845
                // as the peer is exiting, we'll check quickly to see
1846
                // if we need to exit.
UNCOV
1847
                select {
×
1848
                case <-p.quit:
×
1849
                        return
×
UNCOV
1850
                default:
×
1851
                }
1852

UNCOV
1853
                chanLink.HandleChannelUpdate(msg)
×
1854
        }
1855

UNCOV
1856
        return newMsgStream(p,
×
UNCOV
1857
                fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]),
×
UNCOV
1858
                fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]),
×
UNCOV
1859
                1000,
×
UNCOV
1860
                apply,
×
UNCOV
1861
        )
×
1862
}
1863

1864
// newDiscMsgStream is used to setup a msgStream between the peer and the
1865
// authenticated gossiper. This stream should be used to forward all remote
1866
// channel announcements.
1867
func newDiscMsgStream(p *Brontide) *msgStream {
3✔
1868
        apply := func(msg lnwire.Message) {
3✔
UNCOV
1869
                // TODO(yy): `ProcessRemoteAnnouncement` returns an error chan
×
UNCOV
1870
                // and we need to process it.
×
UNCOV
1871
                p.cfg.AuthGossiper.ProcessRemoteAnnouncement(msg, p)
×
UNCOV
1872
        }
×
1873

1874
        return newMsgStream(
3✔
1875
                p,
3✔
1876
                "Update stream for gossiper created",
3✔
1877
                "Update stream for gossiper exited",
3✔
1878
                1000,
3✔
1879
                apply,
3✔
1880
        )
3✔
1881
}
1882

1883
// readHandler is responsible for reading messages off the wire in series, then
1884
// properly dispatching the handling of the message to the proper subsystem.
1885
//
1886
// NOTE: This method MUST be run as a goroutine.
1887
func (p *Brontide) readHandler() {
3✔
1888
        defer p.wg.Done()
3✔
1889

3✔
1890
        // We'll stop the timer after a new messages is received, and also
3✔
1891
        // reset it after we process the next message.
3✔
1892
        idleTimer := time.AfterFunc(idleTimeout, func() {
3✔
1893
                err := fmt.Errorf("peer %s no answer for %s -- disconnecting",
×
1894
                        p, idleTimeout)
×
1895
                p.Disconnect(err)
×
1896
        })
×
1897

1898
        // Initialize our negotiated gossip sync method before reading messages
1899
        // off the wire. When using gossip queries, this ensures a gossip
1900
        // syncer is active by the time query messages arrive.
1901
        //
1902
        // TODO(conner): have peer store gossip syncer directly and bypass
1903
        // gossiper?
1904
        p.initGossipSync()
3✔
1905

3✔
1906
        discStream := newDiscMsgStream(p)
3✔
1907
        discStream.Start()
3✔
1908
        defer discStream.Stop()
3✔
1909
out:
3✔
1910
        for atomic.LoadInt32(&p.disconnect) == 0 {
7✔
1911
                nextMsg, err := p.readNextMessage()
4✔
1912
                if !idleTimer.Stop() {
4✔
1913
                        select {
×
1914
                        case <-idleTimer.C:
×
1915
                        default:
×
1916
                        }
1917
                }
1918
                if err != nil {
1✔
UNCOV
1919
                        p.log.Infof("unable to read message from peer: %v", err)
×
UNCOV
1920

×
UNCOV
1921
                        // If we could not read our peer's message due to an
×
UNCOV
1922
                        // unknown type or invalid alias, we continue processing
×
UNCOV
1923
                        // as normal. We store unknown message and address
×
UNCOV
1924
                        // types, as they may provide debugging insight.
×
UNCOV
1925
                        switch e := err.(type) {
×
1926
                        // If this is just a message we don't yet recognize,
1927
                        // we'll continue processing as normal as this allows
1928
                        // us to introduce new messages in a forwards
1929
                        // compatible manner.
UNCOV
1930
                        case *lnwire.UnknownMessage:
×
UNCOV
1931
                                p.storeError(e)
×
UNCOV
1932
                                idleTimer.Reset(idleTimeout)
×
UNCOV
1933
                                continue
×
1934

1935
                        // If they sent us an address type that we don't yet
1936
                        // know of, then this isn't a wire error, so we'll
1937
                        // simply continue parsing the remainder of their
1938
                        // messages.
1939
                        case *lnwire.ErrUnknownAddrType:
×
1940
                                p.storeError(e)
×
1941
                                idleTimer.Reset(idleTimeout)
×
1942
                                continue
×
1943

1944
                        // If the NodeAnnouncement has an invalid alias, then
1945
                        // we'll log that error above and continue so we can
1946
                        // continue to read messages from the peer. We do not
1947
                        // store this error because it is of little debugging
1948
                        // value.
1949
                        case *lnwire.ErrInvalidNodeAlias:
×
1950
                                idleTimer.Reset(idleTimeout)
×
1951
                                continue
×
1952

1953
                        // If the error we encountered wasn't just a message we
1954
                        // didn't recognize, then we'll stop all processing as
1955
                        // this is a fatal error.
UNCOV
1956
                        default:
×
UNCOV
1957
                                break out
×
1958
                        }
1959
                }
1960

1961
                // If a message router is active, then we'll try to have it
1962
                // handle this message. If it can, then we're able to skip the
1963
                // rest of the message handling logic.
1964
                err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
2✔
1965
                        return r.RouteMsg(msgmux.PeerMsg{
1✔
1966
                                PeerPub: *p.IdentityKey(),
1✔
1967
                                Message: nextMsg,
1✔
1968
                        })
1✔
1969
                })
1✔
1970

1971
                // No error occurred, and the message was handled by the
1972
                // router.
1973
                if err == nil {
1✔
1974
                        continue
×
1975
                }
1976

1977
                var (
1✔
1978
                        targetChan   lnwire.ChannelID
1✔
1979
                        isLinkUpdate bool
1✔
1980
                )
1✔
1981

1✔
1982
                switch msg := nextMsg.(type) {
1✔
1983
                case *lnwire.Pong:
×
1984
                        // When we receive a Pong message in response to our
×
1985
                        // last ping message, we send it to the pingManager
×
1986
                        p.pingManager.ReceivedPong(msg)
×
1987

1988
                case *lnwire.Ping:
×
1989
                        // First, we'll store their latest ping payload within
×
1990
                        // the relevant atomic variable.
×
1991
                        p.lastPingPayload.Store(msg.PaddingBytes[:])
×
1992

×
1993
                        // Next, we'll send over the amount of specified pong
×
1994
                        // bytes.
×
1995
                        pong := lnwire.NewPong(p.cfg.PongBuf[0:msg.NumPongBytes])
×
1996
                        p.queueMsg(pong, nil)
×
1997

1998
                case *lnwire.OpenChannel,
1999
                        *lnwire.AcceptChannel,
2000
                        *lnwire.FundingCreated,
2001
                        *lnwire.FundingSigned,
UNCOV
2002
                        *lnwire.ChannelReady:
×
UNCOV
2003

×
UNCOV
2004
                        p.cfg.FundingManager.ProcessFundingMsg(msg, p)
×
2005

UNCOV
2006
                case *lnwire.Shutdown:
×
UNCOV
2007
                        select {
×
UNCOV
2008
                        case p.chanCloseMsgs <- &closeMsg{msg.ChannelID, msg}:
×
2009
                        case <-p.quit:
×
2010
                                break out
×
2011
                        }
UNCOV
2012
                case *lnwire.ClosingSigned:
×
UNCOV
2013
                        select {
×
UNCOV
2014
                        case p.chanCloseMsgs <- &closeMsg{msg.ChannelID, msg}:
×
2015
                        case <-p.quit:
×
2016
                                break out
×
2017
                        }
2018

2019
                case *lnwire.Warning:
×
2020
                        targetChan = msg.ChanID
×
2021
                        isLinkUpdate = p.handleWarningOrError(targetChan, msg)
×
2022

UNCOV
2023
                case *lnwire.Error:
×
UNCOV
2024
                        targetChan = msg.ChanID
×
UNCOV
2025
                        isLinkUpdate = p.handleWarningOrError(targetChan, msg)
×
2026

UNCOV
2027
                case *lnwire.ChannelReestablish:
×
UNCOV
2028
                        targetChan = msg.ChanID
×
UNCOV
2029
                        isLinkUpdate = p.hasChannel(targetChan)
×
UNCOV
2030

×
UNCOV
2031
                        // If we failed to find the link in question, and the
×
UNCOV
2032
                        // message received was a channel sync message, then
×
UNCOV
2033
                        // this might be a peer trying to resync closed channel.
×
UNCOV
2034
                        // In this case we'll try to resend our last channel
×
UNCOV
2035
                        // sync message, such that the peer can recover funds
×
UNCOV
2036
                        // from the closed channel.
×
UNCOV
2037
                        if !isLinkUpdate {
×
UNCOV
2038
                                err := p.resendChanSyncMsg(targetChan)
×
UNCOV
2039
                                if err != nil {
×
UNCOV
2040
                                        // TODO(halseth): send error to peer?
×
UNCOV
2041
                                        p.log.Errorf("resend failed: %v",
×
UNCOV
2042
                                                err)
×
UNCOV
2043
                                }
×
2044
                        }
2045

2046
                // For messages that implement the LinkUpdater interface, we
2047
                // will consider them as link updates and send them to
2048
                // chanStream. These messages will be queued inside chanStream
2049
                // if the channel is not active yet.
UNCOV
2050
                case lnwire.LinkUpdater:
×
UNCOV
2051
                        targetChan = msg.TargetChanID()
×
UNCOV
2052
                        isLinkUpdate = p.hasChannel(targetChan)
×
UNCOV
2053

×
UNCOV
2054
                        // Log an error if we don't have this channel. This
×
UNCOV
2055
                        // means the peer has sent us a message with unknown
×
UNCOV
2056
                        // channel ID.
×
UNCOV
2057
                        if !isLinkUpdate {
×
UNCOV
2058
                                p.log.Errorf("Unknown channel ID: %v found "+
×
UNCOV
2059
                                        "in received msg=%s", targetChan,
×
UNCOV
2060
                                        nextMsg.MsgType())
×
UNCOV
2061
                        }
×
2062

2063
                case *lnwire.ChannelUpdate1,
2064
                        *lnwire.ChannelAnnouncement1,
2065
                        *lnwire.NodeAnnouncement,
2066
                        *lnwire.AnnounceSignatures1,
2067
                        *lnwire.GossipTimestampRange,
2068
                        *lnwire.QueryShortChanIDs,
2069
                        *lnwire.QueryChannelRange,
2070
                        *lnwire.ReplyChannelRange,
UNCOV
2071
                        *lnwire.ReplyShortChanIDsEnd:
×
UNCOV
2072

×
UNCOV
2073
                        discStream.AddMsg(msg)
×
2074

2075
                case *lnwire.Custom:
1✔
2076
                        err := p.handleCustomMessage(msg)
1✔
2077
                        if err != nil {
1✔
2078
                                p.storeError(err)
×
2079
                                p.log.Errorf("%v", err)
×
2080
                        }
×
2081

2082
                default:
×
2083
                        // If the message we received is unknown to us, store
×
2084
                        // the type to track the failure.
×
2085
                        err := fmt.Errorf("unknown message type %v received",
×
2086
                                uint16(msg.MsgType()))
×
2087
                        p.storeError(err)
×
2088

×
2089
                        p.log.Errorf("%v", err)
×
2090
                }
2091

2092
                if isLinkUpdate {
1✔
UNCOV
2093
                        // If this is a channel update, then we need to feed it
×
UNCOV
2094
                        // into the channel's in-order message stream.
×
UNCOV
2095
                        p.sendLinkUpdateMsg(targetChan, nextMsg)
×
UNCOV
2096
                }
×
2097

2098
                idleTimer.Reset(idleTimeout)
1✔
2099
        }
2100

UNCOV
2101
        p.Disconnect(errors.New("read handler closed"))
×
UNCOV
2102

×
UNCOV
2103
        p.log.Trace("readHandler for peer done")
×
2104
}
2105

2106
// handleCustomMessage handles the given custom message if a handler is
2107
// registered.
2108
func (p *Brontide) handleCustomMessage(msg *lnwire.Custom) error {
1✔
2109
        if p.cfg.HandleCustomMessage == nil {
1✔
2110
                return fmt.Errorf("no custom message handler for "+
×
2111
                        "message type %v", uint16(msg.MsgType()))
×
2112
        }
×
2113

2114
        return p.cfg.HandleCustomMessage(p.PubKey(), msg)
1✔
2115
}
2116

2117
// isLoadedFromDisk returns true if the provided channel ID is loaded from
2118
// disk.
2119
//
2120
// NOTE: only returns true for pending channels.
UNCOV
2121
func (p *Brontide) isLoadedFromDisk(chanID lnwire.ChannelID) bool {
×
UNCOV
2122
        // If this is a newly added channel, no need to reestablish.
×
UNCOV
2123
        _, added := p.addedChannels.Load(chanID)
×
UNCOV
2124
        if added {
×
UNCOV
2125
                return false
×
UNCOV
2126
        }
×
2127

2128
        // Return false if the channel is unknown.
UNCOV
2129
        channel, ok := p.activeChannels.Load(chanID)
×
UNCOV
2130
        if !ok {
×
2131
                return false
×
2132
        }
×
2133

2134
        // During startup, we will use a nil value to mark a pending channel
2135
        // that's loaded from disk.
UNCOV
2136
        return channel == nil
×
2137
}
2138

2139
// isActiveChannel returns true if the provided channel id is active, otherwise
2140
// returns false.
2141
func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool {
8✔
2142
        // The channel would be nil if,
8✔
2143
        // - the channel doesn't exist, or,
8✔
2144
        // - the channel exists, but is pending. In this case, we don't
8✔
2145
        //   consider this channel active.
8✔
2146
        channel, _ := p.activeChannels.Load(chanID)
8✔
2147

8✔
2148
        return channel != nil
8✔
2149
}
8✔
2150

2151
// isPendingChannel returns true if the provided channel ID is pending, and
2152
// returns false if the channel is active or unknown.
2153
func (p *Brontide) isPendingChannel(chanID lnwire.ChannelID) bool {
6✔
2154
        // Return false if the channel is unknown.
6✔
2155
        channel, ok := p.activeChannels.Load(chanID)
6✔
2156
        if !ok {
9✔
2157
                return false
3✔
2158
        }
3✔
2159

2160
        return channel == nil
3✔
2161
}
2162

2163
// hasChannel returns true if the peer has a pending/active channel specified
2164
// by the channel ID.
UNCOV
2165
func (p *Brontide) hasChannel(chanID lnwire.ChannelID) bool {
×
UNCOV
2166
        _, ok := p.activeChannels.Load(chanID)
×
UNCOV
2167
        return ok
×
UNCOV
2168
}
×
2169

2170
// storeError stores an error in our peer's buffer of recent errors with the
2171
// current timestamp. Errors are only stored if we have at least one active
2172
// channel with the peer to mitigate a dos vector where a peer costlessly
2173
// connects to us and spams us with errors.
UNCOV
2174
func (p *Brontide) storeError(err error) {
×
UNCOV
2175
        var haveChannels bool
×
UNCOV
2176

×
UNCOV
2177
        p.activeChannels.Range(func(_ lnwire.ChannelID,
×
UNCOV
2178
                channel *lnwallet.LightningChannel) bool {
×
UNCOV
2179

×
UNCOV
2180
                // Pending channels will be nil in the activeChannels map.
×
UNCOV
2181
                if channel == nil {
×
UNCOV
2182
                        // Return true to continue the iteration.
×
UNCOV
2183
                        return true
×
UNCOV
2184
                }
×
2185

UNCOV
2186
                haveChannels = true
×
UNCOV
2187

×
UNCOV
2188
                // Return false to break the iteration.
×
UNCOV
2189
                return false
×
2190
        })
2191

2192
        // If we do not have any active channels with the peer, we do not store
2193
        // errors as a dos mitigation.
UNCOV
2194
        if !haveChannels {
×
UNCOV
2195
                p.log.Trace("no channels with peer, not storing err")
×
UNCOV
2196
                return
×
UNCOV
2197
        }
×
2198

UNCOV
2199
        p.cfg.ErrorBuffer.Add(
×
UNCOV
2200
                &TimestampedError{Timestamp: time.Now(), Error: err},
×
UNCOV
2201
        )
×
2202
}
2203

2204
// handleWarningOrError processes a warning or error msg and returns true if
2205
// msg should be forwarded to the associated channel link. False is returned if
2206
// any necessary forwarding of msg was already handled by this method. If msg is
2207
// an error from a peer with an active channel, we'll store it in memory.
2208
//
2209
// NOTE: This method should only be called from within the readHandler.
2210
func (p *Brontide) handleWarningOrError(chanID lnwire.ChannelID,
UNCOV
2211
        msg lnwire.Message) bool {
×
UNCOV
2212

×
UNCOV
2213
        if errMsg, ok := msg.(*lnwire.Error); ok {
×
UNCOV
2214
                p.storeError(errMsg)
×
UNCOV
2215
        }
×
2216

UNCOV
2217
        switch {
×
2218
        // Connection wide messages should be forwarded to all channel links
2219
        // with this peer.
2220
        case chanID == lnwire.ConnectionWideID:
×
2221
                for _, chanStream := range p.activeMsgStreams {
×
2222
                        chanStream.AddMsg(msg)
×
2223
                }
×
2224

2225
                return false
×
2226

2227
        // If the channel ID for the message corresponds to a pending channel,
2228
        // then the funding manager will handle it.
UNCOV
2229
        case p.cfg.FundingManager.IsPendingChannel(chanID, p):
×
UNCOV
2230
                p.cfg.FundingManager.ProcessFundingMsg(msg, p)
×
UNCOV
2231
                return false
×
2232

2233
        // If not we hand the message to the channel link for this channel.
UNCOV
2234
        case p.isActiveChannel(chanID):
×
UNCOV
2235
                return true
×
2236

UNCOV
2237
        default:
×
UNCOV
2238
                return false
×
2239
        }
2240
}
2241

2242
// messageSummary returns a human-readable string that summarizes a
2243
// incoming/outgoing message. Not all messages will have a summary, only those
2244
// which have additional data that can be informative at a glance.
UNCOV
2245
func messageSummary(msg lnwire.Message) string {
×
UNCOV
2246
        switch msg := msg.(type) {
×
UNCOV
2247
        case *lnwire.Init:
×
UNCOV
2248
                // No summary.
×
UNCOV
2249
                return ""
×
2250

UNCOV
2251
        case *lnwire.OpenChannel:
×
UNCOV
2252
                return fmt.Sprintf("temp_chan_id=%x, chain=%v, csv=%v, amt=%v, "+
×
UNCOV
2253
                        "push_amt=%v, reserve=%v, flags=%v",
×
UNCOV
2254
                        msg.PendingChannelID[:], msg.ChainHash,
×
UNCOV
2255
                        msg.CsvDelay, msg.FundingAmount, msg.PushAmount,
×
UNCOV
2256
                        msg.ChannelReserve, msg.ChannelFlags)
×
2257

UNCOV
2258
        case *lnwire.AcceptChannel:
×
UNCOV
2259
                return fmt.Sprintf("temp_chan_id=%x, reserve=%v, csv=%v, num_confs=%v",
×
UNCOV
2260
                        msg.PendingChannelID[:], msg.ChannelReserve, msg.CsvDelay,
×
UNCOV
2261
                        msg.MinAcceptDepth)
×
2262

UNCOV
2263
        case *lnwire.FundingCreated:
×
UNCOV
2264
                return fmt.Sprintf("temp_chan_id=%x, chan_point=%v",
×
UNCOV
2265
                        msg.PendingChannelID[:], msg.FundingPoint)
×
2266

UNCOV
2267
        case *lnwire.FundingSigned:
×
UNCOV
2268
                return fmt.Sprintf("chan_id=%v", msg.ChanID)
×
2269

UNCOV
2270
        case *lnwire.ChannelReady:
×
UNCOV
2271
                return fmt.Sprintf("chan_id=%v, next_point=%x",
×
UNCOV
2272
                        msg.ChanID, msg.NextPerCommitmentPoint.SerializeCompressed())
×
2273

UNCOV
2274
        case *lnwire.Shutdown:
×
UNCOV
2275
                return fmt.Sprintf("chan_id=%v, script=%x", msg.ChannelID,
×
UNCOV
2276
                        msg.Address[:])
×
2277

2278
        case *lnwire.ClosingComplete:
×
2279
                return fmt.Sprintf("chan_id=%v, fee_sat=%v, locktime=%v",
×
2280
                        msg.ChannelID, msg.FeeSatoshis, msg.LockTime)
×
2281

2282
        case *lnwire.ClosingSig:
×
2283
                return fmt.Sprintf("chan_id=%v", msg.ChannelID)
×
2284

UNCOV
2285
        case *lnwire.ClosingSigned:
×
UNCOV
2286
                return fmt.Sprintf("chan_id=%v, fee_sat=%v", msg.ChannelID,
×
UNCOV
2287
                        msg.FeeSatoshis)
×
2288

UNCOV
2289
        case *lnwire.UpdateAddHTLC:
×
UNCOV
2290
                var blindingPoint []byte
×
UNCOV
2291
                msg.BlindingPoint.WhenSome(
×
UNCOV
2292
                        func(b tlv.RecordT[lnwire.BlindingPointTlvType,
×
UNCOV
2293
                                *btcec.PublicKey]) {
×
UNCOV
2294

×
UNCOV
2295
                                blindingPoint = b.Val.SerializeCompressed()
×
UNCOV
2296
                        },
×
2297
                )
2298

UNCOV
2299
                return fmt.Sprintf("chan_id=%v, id=%v, amt=%v, expiry=%v, "+
×
UNCOV
2300
                        "hash=%x, blinding_point=%x, custom_records=%v",
×
UNCOV
2301
                        msg.ChanID, msg.ID, msg.Amount, msg.Expiry,
×
UNCOV
2302
                        msg.PaymentHash[:], blindingPoint, msg.CustomRecords)
×
2303

UNCOV
2304
        case *lnwire.UpdateFailHTLC:
×
UNCOV
2305
                return fmt.Sprintf("chan_id=%v, id=%v, reason=%x", msg.ChanID,
×
UNCOV
2306
                        msg.ID, msg.Reason)
×
2307

UNCOV
2308
        case *lnwire.UpdateFulfillHTLC:
×
UNCOV
2309
                return fmt.Sprintf("chan_id=%v, id=%v, pre_image=%x, "+
×
UNCOV
2310
                        "custom_records=%v", msg.ChanID, msg.ID,
×
UNCOV
2311
                        msg.PaymentPreimage[:], msg.CustomRecords)
×
2312

UNCOV
2313
        case *lnwire.CommitSig:
×
UNCOV
2314
                return fmt.Sprintf("chan_id=%v, num_htlcs=%v", msg.ChanID,
×
UNCOV
2315
                        len(msg.HtlcSigs))
×
2316

UNCOV
2317
        case *lnwire.RevokeAndAck:
×
UNCOV
2318
                return fmt.Sprintf("chan_id=%v, rev=%x, next_point=%x",
×
UNCOV
2319
                        msg.ChanID, msg.Revocation[:],
×
UNCOV
2320
                        msg.NextRevocationKey.SerializeCompressed())
×
2321

UNCOV
2322
        case *lnwire.UpdateFailMalformedHTLC:
×
UNCOV
2323
                return fmt.Sprintf("chan_id=%v, id=%v, fail_code=%v",
×
UNCOV
2324
                        msg.ChanID, msg.ID, msg.FailureCode)
×
2325

2326
        case *lnwire.Warning:
×
2327
                return fmt.Sprintf("%v", msg.Warning())
×
2328

UNCOV
2329
        case *lnwire.Error:
×
UNCOV
2330
                return fmt.Sprintf("%v", msg.Error())
×
2331

UNCOV
2332
        case *lnwire.AnnounceSignatures1:
×
UNCOV
2333
                return fmt.Sprintf("chan_id=%v, short_chan_id=%v", msg.ChannelID,
×
UNCOV
2334
                        msg.ShortChannelID.ToUint64())
×
2335

UNCOV
2336
        case *lnwire.ChannelAnnouncement1:
×
UNCOV
2337
                return fmt.Sprintf("chain_hash=%v, short_chan_id=%v",
×
UNCOV
2338
                        msg.ChainHash, msg.ShortChannelID.ToUint64())
×
2339

UNCOV
2340
        case *lnwire.ChannelUpdate1:
×
UNCOV
2341
                return fmt.Sprintf("chain_hash=%v, short_chan_id=%v, "+
×
UNCOV
2342
                        "mflags=%v, cflags=%v, update_time=%v", msg.ChainHash,
×
UNCOV
2343
                        msg.ShortChannelID.ToUint64(), msg.MessageFlags,
×
UNCOV
2344
                        msg.ChannelFlags, time.Unix(int64(msg.Timestamp), 0))
×
2345

UNCOV
2346
        case *lnwire.NodeAnnouncement:
×
UNCOV
2347
                return fmt.Sprintf("node=%x, update_time=%v",
×
UNCOV
2348
                        msg.NodeID, time.Unix(int64(msg.Timestamp), 0))
×
2349

2350
        case *lnwire.Ping:
×
2351
                return fmt.Sprintf("ping_bytes=%x", msg.PaddingBytes[:])
×
2352

2353
        case *lnwire.Pong:
×
2354
                return fmt.Sprintf("len(pong_bytes)=%d", len(msg.PongBytes[:]))
×
2355

2356
        case *lnwire.UpdateFee:
×
2357
                return fmt.Sprintf("chan_id=%v, fee_update_sat=%v",
×
2358
                        msg.ChanID, int64(msg.FeePerKw))
×
2359

UNCOV
2360
        case *lnwire.ChannelReestablish:
×
UNCOV
2361
                return fmt.Sprintf("chan_id=%v, next_local_height=%v, "+
×
UNCOV
2362
                        "remote_tail_height=%v", msg.ChanID,
×
UNCOV
2363
                        msg.NextLocalCommitHeight, msg.RemoteCommitTailHeight)
×
2364

UNCOV
2365
        case *lnwire.ReplyShortChanIDsEnd:
×
UNCOV
2366
                return fmt.Sprintf("chain_hash=%v, complete=%v", msg.ChainHash,
×
UNCOV
2367
                        msg.Complete)
×
2368

UNCOV
2369
        case *lnwire.ReplyChannelRange:
×
UNCOV
2370
                return fmt.Sprintf("start_height=%v, end_height=%v, "+
×
UNCOV
2371
                        "num_chans=%v, encoding=%v", msg.FirstBlockHeight,
×
UNCOV
2372
                        msg.LastBlockHeight(), len(msg.ShortChanIDs),
×
UNCOV
2373
                        msg.EncodingType)
×
2374

UNCOV
2375
        case *lnwire.QueryShortChanIDs:
×
UNCOV
2376
                return fmt.Sprintf("chain_hash=%v, encoding=%v, num_chans=%v",
×
UNCOV
2377
                        msg.ChainHash, msg.EncodingType, len(msg.ShortChanIDs))
×
2378

UNCOV
2379
        case *lnwire.QueryChannelRange:
×
UNCOV
2380
                return fmt.Sprintf("chain_hash=%v, start_height=%v, "+
×
UNCOV
2381
                        "end_height=%v", msg.ChainHash, msg.FirstBlockHeight,
×
UNCOV
2382
                        msg.LastBlockHeight())
×
2383

UNCOV
2384
        case *lnwire.GossipTimestampRange:
×
UNCOV
2385
                return fmt.Sprintf("chain_hash=%v, first_stamp=%v, "+
×
UNCOV
2386
                        "stamp_range=%v", msg.ChainHash,
×
UNCOV
2387
                        time.Unix(int64(msg.FirstTimestamp), 0),
×
UNCOV
2388
                        msg.TimestampRange)
×
2389

UNCOV
2390
        case *lnwire.Stfu:
×
UNCOV
2391
                return fmt.Sprintf("chan_id=%v, initiator=%v", msg.ChanID,
×
UNCOV
2392
                        msg.Initiator)
×
2393

UNCOV
2394
        case *lnwire.Custom:
×
UNCOV
2395
                return fmt.Sprintf("type=%d", msg.Type)
×
2396
        }
2397

2398
        return fmt.Sprintf("unknown msg type=%T", msg)
×
2399
}
2400

2401
// logWireMessage logs the receipt or sending of particular wire message. This
2402
// function is used rather than just logging the message in order to produce
2403
// less spammy log messages in trace mode by setting the 'Curve" parameter to
2404
// nil. Doing this avoids printing out each of the field elements in the curve
2405
// parameters for secp256k1.
2406
func (p *Brontide) logWireMessage(msg lnwire.Message, read bool) {
17✔
2407
        summaryPrefix := "Received"
17✔
2408
        if !read {
30✔
2409
                summaryPrefix = "Sending"
13✔
2410
        }
13✔
2411

2412
        p.log.Debugf("%v", lnutils.NewLogClosure(func() string {
17✔
UNCOV
2413
                // Debug summary of message.
×
UNCOV
2414
                summary := messageSummary(msg)
×
UNCOV
2415
                if len(summary) > 0 {
×
UNCOV
2416
                        summary = "(" + summary + ")"
×
UNCOV
2417
                }
×
2418

UNCOV
2419
                preposition := "to"
×
UNCOV
2420
                if read {
×
UNCOV
2421
                        preposition = "from"
×
UNCOV
2422
                }
×
2423

UNCOV
2424
                var msgType string
×
UNCOV
2425
                if msg.MsgType() < lnwire.CustomTypeStart {
×
UNCOV
2426
                        msgType = msg.MsgType().String()
×
UNCOV
2427
                } else {
×
UNCOV
2428
                        msgType = "custom"
×
UNCOV
2429
                }
×
2430

UNCOV
2431
                return fmt.Sprintf("%v %v%s %v %s", summaryPrefix,
×
UNCOV
2432
                        msgType, summary, preposition, p)
×
2433
        }))
2434

2435
        prefix := "readMessage from peer"
17✔
2436
        if !read {
30✔
2437
                prefix = "writeMessage to peer"
13✔
2438
        }
13✔
2439

2440
        p.log.Tracef(prefix+": %v", lnutils.SpewLogClosure(msg))
17✔
2441
}
2442

2443
// writeMessage writes and flushes the target lnwire.Message to the remote peer.
2444
// If the passed message is nil, this method will only try to flush an existing
2445
// message buffered on the connection. It is safe to call this method again
2446
// with a nil message iff a timeout error is returned. This will continue to
2447
// flush the pending message to the wire.
2448
//
2449
// NOTE:
2450
// Besides its usage in Start, this function should not be used elsewhere
2451
// except in writeHandler. If multiple goroutines call writeMessage at the same
2452
// time, panics can occur because WriteMessage and Flush don't use any locking
2453
// internally.
2454
func (p *Brontide) writeMessage(msg lnwire.Message) error {
13✔
2455
        // Only log the message on the first attempt.
13✔
2456
        if msg != nil {
26✔
2457
                p.logWireMessage(msg, false)
13✔
2458
        }
13✔
2459

2460
        noiseConn := p.cfg.Conn
13✔
2461

13✔
2462
        flushMsg := func() error {
26✔
2463
                // Ensure the write deadline is set before we attempt to send
13✔
2464
                // the message.
13✔
2465
                writeDeadline := time.Now().Add(
13✔
2466
                        p.scaleTimeout(writeMessageTimeout),
13✔
2467
                )
13✔
2468
                err := noiseConn.SetWriteDeadline(writeDeadline)
13✔
2469
                if err != nil {
13✔
2470
                        return err
×
2471
                }
×
2472

2473
                // Flush the pending message to the wire. If an error is
2474
                // encountered, e.g. write timeout, the number of bytes written
2475
                // so far will be returned.
2476
                n, err := noiseConn.Flush()
13✔
2477

13✔
2478
                // Record the number of bytes written on the wire, if any.
13✔
2479
                if n > 0 {
13✔
UNCOV
2480
                        atomic.AddUint64(&p.bytesSent, uint64(n))
×
UNCOV
2481
                }
×
2482

2483
                return err
13✔
2484
        }
2485

2486
        // If the current message has already been serialized, encrypted, and
2487
        // buffered on the underlying connection we will skip straight to
2488
        // flushing it to the wire.
2489
        if msg == nil {
13✔
2490
                return flushMsg()
×
2491
        }
×
2492

2493
        // Otherwise, this is a new message. We'll acquire a write buffer to
2494
        // serialize the message and buffer the ciphertext on the connection.
2495
        err := p.cfg.WritePool.Submit(func(buf *bytes.Buffer) error {
26✔
2496
                // Using a buffer allocated by the write pool, encode the
13✔
2497
                // message directly into the buffer.
13✔
2498
                _, writeErr := lnwire.WriteMessage(buf, msg, 0)
13✔
2499
                if writeErr != nil {
13✔
2500
                        return writeErr
×
2501
                }
×
2502

2503
                // Finally, write the message itself in a single swoop. This
2504
                // will buffer the ciphertext on the underlying connection. We
2505
                // will defer flushing the message until the write pool has been
2506
                // released.
2507
                return noiseConn.WriteMessage(buf.Bytes())
13✔
2508
        })
2509
        if err != nil {
13✔
2510
                return err
×
2511
        }
×
2512

2513
        return flushMsg()
13✔
2514
}
2515

2516
// writeHandler is a goroutine dedicated to reading messages off of an incoming
2517
// queue, and writing them out to the wire. This goroutine coordinates with the
2518
// queueHandler in order to ensure the incoming message queue is quickly
2519
// drained.
2520
//
2521
// NOTE: This method MUST be run as a goroutine.
2522
func (p *Brontide) writeHandler() {
3✔
2523
        // We'll stop the timer after a new messages is sent, and also reset it
3✔
2524
        // after we process the next message.
3✔
2525
        idleTimer := time.AfterFunc(idleTimeout, func() {
3✔
2526
                err := fmt.Errorf("peer %s no write for %s -- disconnecting",
×
2527
                        p, idleTimeout)
×
2528
                p.Disconnect(err)
×
2529
        })
×
2530

2531
        var exitErr error
3✔
2532

3✔
2533
out:
3✔
2534
        for {
10✔
2535
                select {
7✔
2536
                case outMsg := <-p.sendQueue:
4✔
2537
                        // Record the time at which we first attempt to send the
4✔
2538
                        // message.
4✔
2539
                        startTime := time.Now()
4✔
2540

4✔
2541
                retry:
4✔
2542
                        // Write out the message to the socket. If a timeout
2543
                        // error is encountered, we will catch this and retry
2544
                        // after backing off in case the remote peer is just
2545
                        // slow to process messages from the wire.
2546
                        err := p.writeMessage(outMsg.msg)
4✔
2547
                        if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
4✔
2548
                                p.log.Debugf("Write timeout detected for "+
×
2549
                                        "peer, first write for message "+
×
2550
                                        "attempted %v ago",
×
2551
                                        time.Since(startTime))
×
2552

×
2553
                                // If we received a timeout error, this implies
×
2554
                                // that the message was buffered on the
×
2555
                                // connection successfully and that a flush was
×
2556
                                // attempted. We'll set the message to nil so
×
2557
                                // that on a subsequent pass we only try to
×
2558
                                // flush the buffered message, and forgo
×
2559
                                // reserializing or reencrypting it.
×
2560
                                outMsg.msg = nil
×
2561

×
2562
                                goto retry
×
2563
                        }
2564

2565
                        // The write succeeded, reset the idle timer to prevent
2566
                        // us from disconnecting the peer.
2567
                        if !idleTimer.Stop() {
4✔
2568
                                select {
×
2569
                                case <-idleTimer.C:
×
2570
                                default:
×
2571
                                }
2572
                        }
2573
                        idleTimer.Reset(idleTimeout)
4✔
2574

4✔
2575
                        // If the peer requested a synchronous write, respond
4✔
2576
                        // with the error.
4✔
2577
                        if outMsg.errChan != nil {
5✔
2578
                                outMsg.errChan <- err
1✔
2579
                        }
1✔
2580

2581
                        if err != nil {
4✔
2582
                                exitErr = fmt.Errorf("unable to write "+
×
2583
                                        "message: %v", err)
×
2584
                                break out
×
2585
                        }
2586

UNCOV
2587
                case <-p.quit:
×
UNCOV
2588
                        exitErr = lnpeer.ErrPeerExiting
×
UNCOV
2589
                        break out
×
2590
                }
2591
        }
2592

2593
        // Avoid an exit deadlock by ensuring WaitGroups are decremented before
2594
        // disconnect.
UNCOV
2595
        p.wg.Done()
×
UNCOV
2596

×
UNCOV
2597
        p.Disconnect(exitErr)
×
UNCOV
2598

×
UNCOV
2599
        p.log.Trace("writeHandler for peer done")
×
2600
}
2601

2602
// queueHandler is responsible for accepting messages from outside subsystems
2603
// to be eventually sent out on the wire by the writeHandler.
2604
//
2605
// NOTE: This method MUST be run as a goroutine.
2606
func (p *Brontide) queueHandler() {
3✔
2607
        defer p.wg.Done()
3✔
2608

3✔
2609
        // priorityMsgs holds an in order list of messages deemed high-priority
3✔
2610
        // to be added to the sendQueue. This predominately includes messages
3✔
2611
        // from the funding manager and htlcswitch.
3✔
2612
        priorityMsgs := list.New()
3✔
2613

3✔
2614
        // lazyMsgs holds an in order list of messages deemed low-priority to be
3✔
2615
        // added to the sendQueue only after all high-priority messages have
3✔
2616
        // been queued. This predominately includes messages from the gossiper.
3✔
2617
        lazyMsgs := list.New()
3✔
2618

3✔
2619
        for {
14✔
2620
                // Examine the front of the priority queue, if it is empty check
11✔
2621
                // the low priority queue.
11✔
2622
                elem := priorityMsgs.Front()
11✔
2623
                if elem == nil {
19✔
2624
                        elem = lazyMsgs.Front()
8✔
2625
                }
8✔
2626

2627
                if elem != nil {
15✔
2628
                        front := elem.Value.(outgoingMsg)
4✔
2629

4✔
2630
                        // There's an element on the queue, try adding
4✔
2631
                        // it to the sendQueue. We also watch for
4✔
2632
                        // messages on the outgoingQueue, in case the
4✔
2633
                        // writeHandler cannot accept messages on the
4✔
2634
                        // sendQueue.
4✔
2635
                        select {
4✔
2636
                        case p.sendQueue <- front:
4✔
2637
                                if front.priority {
7✔
2638
                                        priorityMsgs.Remove(elem)
3✔
2639
                                } else {
4✔
2640
                                        lazyMsgs.Remove(elem)
1✔
2641
                                }
1✔
UNCOV
2642
                        case msg := <-p.outgoingQueue:
×
UNCOV
2643
                                if msg.priority {
×
UNCOV
2644
                                        priorityMsgs.PushBack(msg)
×
UNCOV
2645
                                } else {
×
UNCOV
2646
                                        lazyMsgs.PushBack(msg)
×
UNCOV
2647
                                }
×
2648
                        case <-p.quit:
×
2649
                                return
×
2650
                        }
2651
                } else {
7✔
2652
                        // If there weren't any messages to send to the
7✔
2653
                        // writeHandler, then we'll accept a new message
7✔
2654
                        // into the queue from outside sub-systems.
7✔
2655
                        select {
7✔
2656
                        case msg := <-p.outgoingQueue:
4✔
2657
                                if msg.priority {
7✔
2658
                                        priorityMsgs.PushBack(msg)
3✔
2659
                                } else {
4✔
2660
                                        lazyMsgs.PushBack(msg)
1✔
2661
                                }
1✔
UNCOV
2662
                        case <-p.quit:
×
UNCOV
2663
                                return
×
2664
                        }
2665
                }
2666
        }
2667
}
2668

2669
// PingTime returns the estimated ping time to the peer in microseconds.
UNCOV
2670
func (p *Brontide) PingTime() int64 {
×
UNCOV
2671
        return p.pingManager.GetPingTimeMicroSeconds()
×
UNCOV
2672
}
×
2673

2674
// queueMsg adds the lnwire.Message to the back of the high priority send queue.
2675
// If the errChan is non-nil, an error is sent back if the msg failed to queue
2676
// or failed to write, and nil otherwise.
2677
func (p *Brontide) queueMsg(msg lnwire.Message, errChan chan error) {
25✔
2678
        p.queue(true, msg, errChan)
25✔
2679
}
25✔
2680

2681
// queueMsgLazy adds the lnwire.Message to the back of the low priority send
2682
// queue. If the errChan is non-nil, an error is sent back if the msg failed to
2683
// queue or failed to write, and nil otherwise.
2684
func (p *Brontide) queueMsgLazy(msg lnwire.Message, errChan chan error) {
1✔
2685
        p.queue(false, msg, errChan)
1✔
2686
}
1✔
2687

2688
// queue sends a given message to the queueHandler using the passed priority. If
2689
// the errChan is non-nil, an error is sent back if the msg failed to queue or
2690
// failed to write, and nil otherwise.
2691
func (p *Brontide) queue(priority bool, msg lnwire.Message,
2692
        errChan chan error) {
26✔
2693

26✔
2694
        select {
26✔
2695
        case p.outgoingQueue <- outgoingMsg{priority, msg, errChan}:
25✔
2696
        case <-p.quit:
×
2697
                p.log.Tracef("Peer shutting down, could not enqueue msg: %v.",
×
2698
                        spew.Sdump(msg))
×
2699
                if errChan != nil {
×
2700
                        errChan <- lnpeer.ErrPeerExiting
×
2701
                }
×
2702
        }
2703
}
2704

2705
// ChannelSnapshots returns a slice of channel snapshots detailing all
2706
// currently active channels maintained with the remote peer.
UNCOV
2707
func (p *Brontide) ChannelSnapshots() []*channeldb.ChannelSnapshot {
×
UNCOV
2708
        snapshots := make(
×
UNCOV
2709
                []*channeldb.ChannelSnapshot, 0, p.activeChannels.Len(),
×
UNCOV
2710
        )
×
UNCOV
2711

×
UNCOV
2712
        p.activeChannels.ForEach(func(_ lnwire.ChannelID,
×
UNCOV
2713
                activeChan *lnwallet.LightningChannel) error {
×
UNCOV
2714

×
UNCOV
2715
                // If the activeChan is nil, then we skip it as the channel is
×
UNCOV
2716
                // pending.
×
UNCOV
2717
                if activeChan == nil {
×
UNCOV
2718
                        return nil
×
UNCOV
2719
                }
×
2720

2721
                // We'll only return a snapshot for channels that are
2722
                // *immediately* available for routing payments over.
UNCOV
2723
                if activeChan.RemoteNextRevocation() == nil {
×
UNCOV
2724
                        return nil
×
UNCOV
2725
                }
×
2726

UNCOV
2727
                snapshot := activeChan.StateSnapshot()
×
UNCOV
2728
                snapshots = append(snapshots, snapshot)
×
UNCOV
2729

×
UNCOV
2730
                return nil
×
2731
        })
2732

UNCOV
2733
        return snapshots
×
2734
}
2735

2736
// genDeliveryScript returns a new script to be used to send our funds to in
2737
// the case of a cooperative channel close negotiation.
2738
func (p *Brontide) genDeliveryScript() ([]byte, error) {
6✔
2739
        // We'll send a normal p2wkh address unless we've negotiated the
6✔
2740
        // shutdown-any-segwit feature.
6✔
2741
        addrType := lnwallet.WitnessPubKey
6✔
2742
        if p.taprootShutdownAllowed() {
6✔
UNCOV
2743
                addrType = lnwallet.TaprootPubkey
×
UNCOV
2744
        }
×
2745

2746
        deliveryAddr, err := p.cfg.Wallet.NewAddress(
6✔
2747
                addrType, false, lnwallet.DefaultAccountName,
6✔
2748
        )
6✔
2749
        if err != nil {
6✔
2750
                return nil, err
×
2751
        }
×
2752
        p.log.Infof("Delivery addr for channel close: %v",
6✔
2753
                deliveryAddr)
6✔
2754

6✔
2755
        return txscript.PayToAddrScript(deliveryAddr)
6✔
2756
}
2757

2758
// channelManager is goroutine dedicated to handling all requests/signals
2759
// pertaining to the opening, cooperative closing, and force closing of all
2760
// channels maintained with the remote peer.
2761
//
2762
// NOTE: This method MUST be run as a goroutine.
2763
func (p *Brontide) channelManager() {
17✔
2764
        defer p.wg.Done()
17✔
2765

17✔
2766
        // reenableTimeout will fire once after the configured channel status
17✔
2767
        // interval has elapsed. This will trigger us to sign new channel
17✔
2768
        // updates and broadcast them with the "disabled" flag unset.
17✔
2769
        reenableTimeout := time.After(p.cfg.ChanActiveTimeout)
17✔
2770

17✔
2771
out:
17✔
2772
        for {
55✔
2773
                select {
38✔
2774
                // A new pending channel has arrived which means we are about
2775
                // to complete a funding workflow and is waiting for the final
2776
                // `ChannelReady` messages to be exchanged. We will add this
2777
                // channel to the `activeChannels` with a nil value to indicate
2778
                // this is a pending channel.
2779
                case req := <-p.newPendingChannel:
1✔
2780
                        p.handleNewPendingChannel(req)
1✔
2781

2782
                // A new channel has arrived which means we've just completed a
2783
                // funding workflow. We'll initialize the necessary local
2784
                // state, and notify the htlc switch of a new link.
UNCOV
2785
                case req := <-p.newActiveChannel:
×
UNCOV
2786
                        p.handleNewActiveChannel(req)
×
2787

2788
                // The funding flow for a pending channel is failed, we will
2789
                // remove it from Brontide.
2790
                case req := <-p.removePendingChannel:
1✔
2791
                        p.handleRemovePendingChannel(req)
1✔
2792

2793
                // We've just received a local request to close an active
2794
                // channel. It will either kick of a cooperative channel
2795
                // closure negotiation, or be a notification of a breached
2796
                // contract that should be abandoned.
2797
                case req := <-p.localCloseChanReqs:
7✔
2798
                        p.handleLocalCloseReq(req)
7✔
2799

2800
                // We've received a link failure from a link that was added to
2801
                // the switch. This will initiate the teardown of the link, and
2802
                // initiate any on-chain closures if necessary.
UNCOV
2803
                case failure := <-p.linkFailures:
×
UNCOV
2804
                        p.handleLinkFailure(failure)
×
2805

2806
                // We've received a new cooperative channel closure related
2807
                // message from the remote peer, we'll use this message to
2808
                // advance the chan closer state machine.
2809
                case closeMsg := <-p.chanCloseMsgs:
13✔
2810
                        p.handleCloseMsg(closeMsg)
13✔
2811

2812
                // The channel reannounce delay has elapsed, broadcast the
2813
                // reenabled channel updates to the network. This should only
2814
                // fire once, so we set the reenableTimeout channel to nil to
2815
                // mark it for garbage collection. If the peer is torn down
2816
                // before firing, reenabling will not be attempted.
2817
                // TODO(conner): consolidate reenables timers inside chan status
2818
                // manager
UNCOV
2819
                case <-reenableTimeout:
×
UNCOV
2820
                        p.reenableActiveChannels()
×
UNCOV
2821

×
UNCOV
2822
                        // Since this channel will never fire again during the
×
UNCOV
2823
                        // lifecycle of the peer, we nil the channel to mark it
×
UNCOV
2824
                        // eligible for garbage collection, and make this
×
UNCOV
2825
                        // explicitly ineligible to receive in future calls to
×
UNCOV
2826
                        // select. This also shaves a few CPU cycles since the
×
UNCOV
2827
                        // select will ignore this case entirely.
×
UNCOV
2828
                        reenableTimeout = nil
×
UNCOV
2829

×
UNCOV
2830
                        // Once the reenabling is attempted, we also cancel the
×
UNCOV
2831
                        // channel event subscription to free up the overflow
×
UNCOV
2832
                        // queue used in channel notifier.
×
UNCOV
2833
                        //
×
UNCOV
2834
                        // NOTE: channelEventClient will be nil if the
×
UNCOV
2835
                        // reenableTimeout is greater than 1 minute.
×
UNCOV
2836
                        if p.channelEventClient != nil {
×
UNCOV
2837
                                p.channelEventClient.Cancel()
×
UNCOV
2838
                        }
×
2839

UNCOV
2840
                case <-p.quit:
×
UNCOV
2841
                        // As, we've been signalled to exit, we'll reset all
×
UNCOV
2842
                        // our active channel back to their default state.
×
UNCOV
2843
                        p.activeChannels.ForEach(func(_ lnwire.ChannelID,
×
UNCOV
2844
                                lc *lnwallet.LightningChannel) error {
×
UNCOV
2845

×
UNCOV
2846
                                // Exit if the channel is nil as it's a pending
×
UNCOV
2847
                                // channel.
×
UNCOV
2848
                                if lc == nil {
×
UNCOV
2849
                                        return nil
×
UNCOV
2850
                                }
×
2851

UNCOV
2852
                                lc.ResetState()
×
UNCOV
2853

×
UNCOV
2854
                                return nil
×
2855
                        })
2856

UNCOV
2857
                        break out
×
2858
                }
2859
        }
2860
}
2861

2862
// reenableActiveChannels searches the index of channels maintained with this
2863
// peer, and reenables each public, non-pending channel. This is done at the
2864
// gossip level by broadcasting a new ChannelUpdate with the disabled bit unset.
2865
// No message will be sent if the channel is already enabled.
UNCOV
2866
func (p *Brontide) reenableActiveChannels() {
×
UNCOV
2867
        // First, filter all known channels with this peer for ones that are
×
UNCOV
2868
        // both public and not pending.
×
UNCOV
2869
        activePublicChans := p.filterChannelsToEnable()
×
UNCOV
2870

×
UNCOV
2871
        // Create a map to hold channels that needs to be retried.
×
UNCOV
2872
        retryChans := make(map[wire.OutPoint]struct{}, len(activePublicChans))
×
UNCOV
2873

×
UNCOV
2874
        // For each of the public, non-pending channels, set the channel
×
UNCOV
2875
        // disabled bit to false and send out a new ChannelUpdate. If this
×
UNCOV
2876
        // channel is already active, the update won't be sent.
×
UNCOV
2877
        for _, chanPoint := range activePublicChans {
×
UNCOV
2878
                err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false)
×
UNCOV
2879

×
UNCOV
2880
                switch {
×
2881
                // No error occurred, continue to request the next channel.
UNCOV
2882
                case err == nil:
×
UNCOV
2883
                        continue
×
2884

2885
                // Cannot auto enable a manually disabled channel so we do
2886
                // nothing but proceed to the next channel.
UNCOV
2887
                case errors.Is(err, netann.ErrEnableManuallyDisabledChan):
×
UNCOV
2888
                        p.log.Debugf("Channel(%v) was manually disabled, "+
×
UNCOV
2889
                                "ignoring automatic enable request", chanPoint)
×
UNCOV
2890

×
UNCOV
2891
                        continue
×
2892

2893
                // If the channel is reported as inactive, we will give it
2894
                // another chance. When handling the request, ChanStatusManager
2895
                // will check whether the link is active or not. One of the
2896
                // conditions is whether the link has been marked as
2897
                // reestablished, which happens inside a goroutine(htlcManager)
2898
                // after the link is started. And we may get a false negative
2899
                // saying the link is not active because that goroutine hasn't
2900
                // reached the line to mark the reestablishment. Thus we give
2901
                // it a second chance to send the request.
2902
                case errors.Is(err, netann.ErrEnableInactiveChan):
×
2903
                        // If we don't have a client created, it means we
×
2904
                        // shouldn't retry enabling the channel.
×
2905
                        if p.channelEventClient == nil {
×
2906
                                p.log.Errorf("Channel(%v) request enabling "+
×
2907
                                        "failed due to inactive link",
×
2908
                                        chanPoint)
×
2909

×
2910
                                continue
×
2911
                        }
2912

2913
                        p.log.Warnf("Channel(%v) cannot be enabled as " +
×
2914
                                "ChanStatusManager reported inactive, retrying")
×
2915

×
2916
                        // Add the channel to the retry map.
×
2917
                        retryChans[chanPoint] = struct{}{}
×
2918
                }
2919
        }
2920

2921
        // Retry the channels if we have any.
UNCOV
2922
        if len(retryChans) != 0 {
×
2923
                p.retryRequestEnable(retryChans)
×
2924
        }
×
2925
}
2926

2927
// fetchActiveChanCloser attempts to fetch the active chan closer state machine
2928
// for the target channel ID. If the channel isn't active an error is returned.
2929
// Otherwise, either an existing state machine will be returned, or a new one
2930
// will be created.
2931
func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
2932
        *chancloser.ChanCloser, error) {
13✔
2933

13✔
2934
        chanCloser, found := p.activeChanCloses[chanID]
13✔
2935
        if found {
23✔
2936
                // An entry will only be found if the closer has already been
10✔
2937
                // created for a non-pending channel or for a channel that had
10✔
2938
                // previously started the shutdown process but the connection
10✔
2939
                // was restarted.
10✔
2940
                return chanCloser, nil
10✔
2941
        }
10✔
2942

2943
        // First, we'll ensure that we actually know of the target channel. If
2944
        // not, we'll ignore this message.
2945
        channel, ok := p.activeChannels.Load(chanID)
3✔
2946

3✔
2947
        // If the channel isn't in the map or the channel is nil, return
3✔
2948
        // ErrChannelNotFound as the channel is pending.
3✔
2949
        if !ok || channel == nil {
3✔
UNCOV
2950
                return nil, ErrChannelNotFound
×
UNCOV
2951
        }
×
2952

2953
        // We'll create a valid closing state machine in order to respond to
2954
        // the initiated cooperative channel closure. First, we set the
2955
        // delivery script that our funds will be paid out to. If an upfront
2956
        // shutdown script was set, we will use it. Otherwise, we get a fresh
2957
        // delivery script.
2958
        //
2959
        // TODO: Expose option to allow upfront shutdown script from watch-only
2960
        // accounts.
2961
        deliveryScript := channel.LocalUpfrontShutdownScript()
3✔
2962
        if len(deliveryScript) == 0 {
6✔
2963
                var err error
3✔
2964
                deliveryScript, err = p.genDeliveryScript()
3✔
2965
                if err != nil {
3✔
2966
                        p.log.Errorf("unable to gen delivery script: %v",
×
2967
                                err)
×
2968
                        return nil, fmt.Errorf("close addr unavailable")
×
2969
                }
×
2970
        }
2971

2972
        // In order to begin fee negotiations, we'll first compute our target
2973
        // ideal fee-per-kw.
2974
        feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
3✔
2975
                p.cfg.CoopCloseTargetConfs,
3✔
2976
        )
3✔
2977
        if err != nil {
3✔
2978
                p.log.Errorf("unable to query fee estimator: %v", err)
×
2979
                return nil, fmt.Errorf("unable to estimate fee")
×
2980
        }
×
2981

2982
        addr, err := p.addrWithInternalKey(deliveryScript)
3✔
2983
        if err != nil {
3✔
2984
                return nil, fmt.Errorf("unable to parse addr: %w", err)
×
2985
        }
×
2986
        chanCloser, err = p.createChanCloser(
3✔
2987
                channel, addr, feePerKw, nil, lntypes.Remote,
3✔
2988
        )
3✔
2989
        if err != nil {
3✔
2990
                p.log.Errorf("unable to create chan closer: %v", err)
×
2991
                return nil, fmt.Errorf("unable to create chan closer")
×
2992
        }
×
2993

2994
        p.activeChanCloses[chanID] = chanCloser
3✔
2995

3✔
2996
        return chanCloser, nil
3✔
2997
}
2998

2999
// filterChannelsToEnable filters a list of channels to be enabled upon start.
3000
// The filtered channels are active channels that's neither private nor
3001
// pending.
UNCOV
3002
func (p *Brontide) filterChannelsToEnable() []wire.OutPoint {
×
UNCOV
3003
        var activePublicChans []wire.OutPoint
×
UNCOV
3004

×
UNCOV
3005
        p.activeChannels.Range(func(chanID lnwire.ChannelID,
×
UNCOV
3006
                lnChan *lnwallet.LightningChannel) bool {
×
UNCOV
3007

×
UNCOV
3008
                // If the lnChan is nil, continue as this is a pending channel.
×
UNCOV
3009
                if lnChan == nil {
×
UNCOV
3010
                        return true
×
UNCOV
3011
                }
×
3012

UNCOV
3013
                dbChan := lnChan.State()
×
UNCOV
3014
                isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
×
UNCOV
3015
                if !isPublic || dbChan.IsPending {
×
3016
                        return true
×
3017
                }
×
3018

3019
                // We'll also skip any channels added during this peer's
3020
                // lifecycle since they haven't waited out the timeout. Their
3021
                // first announcement will be enabled, and the chan status
3022
                // manager will begin monitoring them passively since they exist
3023
                // in the database.
UNCOV
3024
                if _, ok := p.addedChannels.Load(chanID); ok {
×
3025
                        return true
×
3026
                }
×
3027

UNCOV
3028
                activePublicChans = append(
×
UNCOV
3029
                        activePublicChans, dbChan.FundingOutpoint,
×
UNCOV
3030
                )
×
UNCOV
3031

×
UNCOV
3032
                return true
×
3033
        })
3034

UNCOV
3035
        return activePublicChans
×
3036
}
3037

3038
// retryRequestEnable takes a map of channel outpoints and a channel event
3039
// client. It listens to the channel events and removes a channel from the map
3040
// if it's matched to the event. Upon receiving an active channel event, it
3041
// will send the enabling request again.
3042
func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}) {
×
3043
        p.log.Debugf("Retry enabling %v channels", len(activeChans))
×
3044

×
3045
        // retryEnable is a helper closure that sends an enable request and
×
3046
        // removes the channel from the map if it's matched.
×
3047
        retryEnable := func(chanPoint wire.OutPoint) error {
×
3048
                // If this is an active channel event, check whether it's in
×
3049
                // our targeted channels map.
×
3050
                _, found := activeChans[chanPoint]
×
3051

×
3052
                // If this channel is irrelevant, return nil so the loop can
×
3053
                // jump to next iteration.
×
3054
                if !found {
×
3055
                        return nil
×
3056
                }
×
3057

3058
                // Otherwise we've just received an active signal for a channel
3059
                // that's previously failed to be enabled, we send the request
3060
                // again.
3061
                //
3062
                // We only give the channel one more shot, so we delete it from
3063
                // our map first to keep it from being attempted again.
3064
                delete(activeChans, chanPoint)
×
3065

×
3066
                // Send the request.
×
3067
                err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false)
×
3068
                if err != nil {
×
3069
                        return fmt.Errorf("request enabling channel %v "+
×
3070
                                "failed: %w", chanPoint, err)
×
3071
                }
×
3072

3073
                return nil
×
3074
        }
3075

3076
        for {
×
3077
                // If activeChans is empty, we've done processing all the
×
3078
                // channels.
×
3079
                if len(activeChans) == 0 {
×
3080
                        p.log.Debug("Finished retry enabling channels")
×
3081
                        return
×
3082
                }
×
3083

3084
                select {
×
3085
                // A new event has been sent by the ChannelNotifier. We now
3086
                // check whether it's an active or inactive channel event.
3087
                case e := <-p.channelEventClient.Updates():
×
3088
                        // If this is an active channel event, try enable the
×
3089
                        // channel then jump to the next iteration.
×
3090
                        active, ok := e.(channelnotifier.ActiveChannelEvent)
×
3091
                        if ok {
×
3092
                                chanPoint := *active.ChannelPoint
×
3093

×
3094
                                // If we received an error for this particular
×
3095
                                // channel, we log an error and won't quit as
×
3096
                                // we still want to retry other channels.
×
3097
                                if err := retryEnable(chanPoint); err != nil {
×
3098
                                        p.log.Errorf("Retry failed: %v", err)
×
3099
                                }
×
3100

3101
                                continue
×
3102
                        }
3103

3104
                        // Otherwise check for inactive link event, and jump to
3105
                        // next iteration if it's not.
3106
                        inactive, ok := e.(channelnotifier.InactiveLinkEvent)
×
3107
                        if !ok {
×
3108
                                continue
×
3109
                        }
3110

3111
                        // Found an inactive link event, if this is our
3112
                        // targeted channel, remove it from our map.
3113
                        chanPoint := *inactive.ChannelPoint
×
3114
                        _, found := activeChans[chanPoint]
×
3115
                        if !found {
×
3116
                                continue
×
3117
                        }
3118

3119
                        delete(activeChans, chanPoint)
×
3120
                        p.log.Warnf("Re-enable channel %v failed, received "+
×
3121
                                "inactive link event", chanPoint)
×
3122

3123
                case <-p.quit:
×
3124
                        p.log.Debugf("Peer shutdown during retry enabling")
×
3125
                        return
×
3126
                }
3127
        }
3128
}
3129

3130
// chooseDeliveryScript takes two optionally set shutdown scripts and returns
3131
// a suitable script to close out to. This may be nil if neither script is
3132
// set. If both scripts are set, this function will error if they do not match.
3133
func chooseDeliveryScript(upfront,
3134
        requested lnwire.DeliveryAddress) (lnwire.DeliveryAddress, error) {
12✔
3135

12✔
3136
        // If no upfront shutdown script was provided, return the user
12✔
3137
        // requested address (which may be nil).
12✔
3138
        if len(upfront) == 0 {
18✔
3139
                return requested, nil
6✔
3140
        }
6✔
3141

3142
        // If an upfront shutdown script was provided, and the user did not
3143
        // request a custom shutdown script, return the upfront address.
3144
        if len(requested) == 0 {
8✔
3145
                return upfront, nil
2✔
3146
        }
2✔
3147

3148
        // If both an upfront shutdown script and a custom close script were
3149
        // provided, error if the user provided shutdown script does not match
3150
        // the upfront shutdown script (because closing out to a different
3151
        // script would violate upfront shutdown).
3152
        if !bytes.Equal(upfront, requested) {
6✔
3153
                return nil, chancloser.ErrUpfrontShutdownScriptMismatch
2✔
3154
        }
2✔
3155

3156
        // The user requested script matches the upfront shutdown script, so we
3157
        // can return it without error.
3158
        return upfront, nil
2✔
3159
}
3160

3161
// restartCoopClose checks whether we need to restart the cooperative close
3162
// process for a given channel.
3163
func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
3164
        *lnwire.Shutdown, error) {
×
3165

×
3166
        // If this channel has status ChanStatusCoopBroadcasted and does not
×
3167
        // have a closing transaction, then the cooperative close process was
×
3168
        // started but never finished. We'll re-create the chanCloser state
×
3169
        // machine and resend Shutdown. BOLT#2 requires that we retransmit
×
3170
        // Shutdown exactly, but doing so would mean persisting the RPC
×
3171
        // provided close script. Instead use the LocalUpfrontShutdownScript
×
3172
        // or generate a script.
×
3173
        c := lnChan.State()
×
3174
        _, err := c.BroadcastedCooperative()
×
3175
        if err != nil && err != channeldb.ErrNoCloseTx {
×
3176
                // An error other than ErrNoCloseTx was encountered.
×
3177
                return nil, err
×
3178
        } else if err == nil {
×
3179
                // This channel has already completed the coop close
×
3180
                // negotiation.
×
3181
                return nil, nil
×
3182
        }
×
3183

3184
        var deliveryScript []byte
×
3185

×
3186
        shutdownInfo, err := c.ShutdownInfo()
×
3187
        switch {
×
3188
        // We have previously stored the delivery script that we need to use
3189
        // in the shutdown message. Re-use this script.
3190
        case err == nil:
×
3191
                shutdownInfo.WhenSome(func(info channeldb.ShutdownInfo) {
×
3192
                        deliveryScript = info.DeliveryScript.Val
×
3193
                })
×
3194

3195
        // An error other than ErrNoShutdownInfo was returned
3196
        case !errors.Is(err, channeldb.ErrNoShutdownInfo):
×
3197
                return nil, err
×
3198

3199
        case errors.Is(err, channeldb.ErrNoShutdownInfo):
×
3200
                deliveryScript = c.LocalShutdownScript
×
3201
                if len(deliveryScript) == 0 {
×
3202
                        var err error
×
3203
                        deliveryScript, err = p.genDeliveryScript()
×
3204
                        if err != nil {
×
3205
                                p.log.Errorf("unable to gen delivery script: "+
×
3206
                                        "%v", err)
×
3207

×
3208
                                return nil, fmt.Errorf("close addr unavailable")
×
3209
                        }
×
3210
                }
3211
        }
3212

3213
        // Compute an ideal fee.
3214
        feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
×
3215
                p.cfg.CoopCloseTargetConfs,
×
3216
        )
×
3217
        if err != nil {
×
3218
                p.log.Errorf("unable to query fee estimator: %v", err)
×
3219
                return nil, fmt.Errorf("unable to estimate fee")
×
3220
        }
×
3221

3222
        // Determine whether we or the peer are the initiator of the coop
3223
        // close attempt by looking at the channel's status.
3224
        closingParty := lntypes.Remote
×
3225
        if c.HasChanStatus(channeldb.ChanStatusLocalCloseInitiator) {
×
3226
                closingParty = lntypes.Local
×
3227
        }
×
3228

3229
        addr, err := p.addrWithInternalKey(deliveryScript)
×
3230
        if err != nil {
×
3231
                return nil, fmt.Errorf("unable to parse addr: %w", err)
×
3232
        }
×
3233
        chanCloser, err := p.createChanCloser(
×
3234
                lnChan, addr, feePerKw, nil, closingParty,
×
3235
        )
×
3236
        if err != nil {
×
3237
                p.log.Errorf("unable to create chan closer: %v", err)
×
3238
                return nil, fmt.Errorf("unable to create chan closer")
×
3239
        }
×
3240

3241
        // This does not need a mutex even though it is in a different
3242
        // goroutine since this is done before the channelManager goroutine is
3243
        // created.
3244
        chanID := lnwire.NewChanIDFromOutPoint(c.FundingOutpoint)
×
3245
        p.activeChanCloses[chanID] = chanCloser
×
3246

×
3247
        // Create the Shutdown message.
×
3248
        shutdownMsg, err := chanCloser.ShutdownChan()
×
3249
        if err != nil {
×
3250
                p.log.Errorf("unable to create shutdown message: %v", err)
×
3251
                delete(p.activeChanCloses, chanID)
×
3252
                return nil, err
×
3253
        }
×
3254

3255
        return shutdownMsg, nil
×
3256
}
3257

3258
// createChanCloser constructs a ChanCloser from the passed parameters and is
3259
// used to de-duplicate code.
3260
func (p *Brontide) createChanCloser(channel *lnwallet.LightningChannel,
3261
        deliveryScript *chancloser.DeliveryAddrWithKey,
3262
        fee chainfee.SatPerKWeight, req *htlcswitch.ChanClose,
3263
        closer lntypes.ChannelParty) (*chancloser.ChanCloser, error) {
9✔
3264

9✔
3265
        _, startingHeight, err := p.cfg.ChainIO.GetBestBlock()
9✔
3266
        if err != nil {
9✔
3267
                p.log.Errorf("unable to obtain best block: %v", err)
×
3268
                return nil, fmt.Errorf("cannot obtain best block")
×
3269
        }
×
3270

3271
        // The req will only be set if we initiated the co-op closing flow.
3272
        var maxFee chainfee.SatPerKWeight
9✔
3273
        if req != nil {
15✔
3274
                maxFee = req.MaxFee
6✔
3275
        }
6✔
3276

3277
        chanCloser := chancloser.NewChanCloser(
9✔
3278
                chancloser.ChanCloseCfg{
9✔
3279
                        Channel:      channel,
9✔
3280
                        MusigSession: NewMusigChanCloser(channel),
9✔
3281
                        FeeEstimator: &chancloser.SimpleCoopFeeEstimator{},
9✔
3282
                        BroadcastTx:  p.cfg.Wallet.PublishTransaction,
9✔
3283
                        AuxCloser:    p.cfg.AuxChanCloser,
9✔
3284
                        DisableChannel: func(op wire.OutPoint) error {
18✔
3285
                                return p.cfg.ChanStatusMgr.RequestDisable(
9✔
3286
                                        op, false,
9✔
3287
                                )
9✔
3288
                        },
9✔
3289
                        MaxFee: maxFee,
3290
                        Disconnect: func() error {
×
3291
                                return p.cfg.DisconnectPeer(p.IdentityKey())
×
3292
                        },
×
3293
                        ChainParams: &p.cfg.Wallet.Cfg.NetParams,
3294
                        Quit:        p.quit,
3295
                },
3296
                *deliveryScript,
3297
                fee,
3298
                uint32(startingHeight),
3299
                req,
3300
                closer,
3301
        )
3302

3303
        return chanCloser, nil
9✔
3304
}
3305

3306
// handleLocalCloseReq kicks-off the workflow to execute a cooperative or
3307
// forced unilateral closure of the channel initiated by a local subsystem.
3308
func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
7✔
3309
        chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
7✔
3310

7✔
3311
        channel, ok := p.activeChannels.Load(chanID)
7✔
3312

7✔
3313
        // Though this function can't be called for pending channels, we still
7✔
3314
        // check whether channel is nil for safety.
7✔
3315
        if !ok || channel == nil {
7✔
3316
                err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+
×
3317
                        "unknown", chanID)
×
3318
                p.log.Errorf(err.Error())
×
3319
                req.Err <- err
×
3320
                return
×
3321
        }
×
3322

3323
        switch req.CloseType {
7✔
3324
        // A type of CloseRegular indicates that the user has opted to close
3325
        // out this channel on-chain, so we execute the cooperative channel
3326
        // closure workflow.
3327
        case contractcourt.CloseRegular:
7✔
3328
                // First, we'll choose a delivery address that we'll use to send the
7✔
3329
                // funds to in the case of a successful negotiation.
7✔
3330

7✔
3331
                // An upfront shutdown and user provided script are both optional,
7✔
3332
                // but must be equal if both set  (because we cannot serve a request
7✔
3333
                // to close out to a script which violates upfront shutdown). Get the
7✔
3334
                // appropriate address to close out to (which may be nil if neither
7✔
3335
                // are set) and error if they are both set and do not match.
7✔
3336
                deliveryScript, err := chooseDeliveryScript(
7✔
3337
                        channel.LocalUpfrontShutdownScript(), req.DeliveryScript,
7✔
3338
                )
7✔
3339
                if err != nil {
8✔
3340
                        p.log.Errorf("cannot close channel %v: %v", req.ChanPoint, err)
1✔
3341
                        req.Err <- err
1✔
3342
                        return
1✔
3343
                }
1✔
3344

3345
                // If neither an upfront address or a user set address was
3346
                // provided, generate a fresh script.
3347
                if len(deliveryScript) == 0 {
9✔
3348
                        deliveryScript, err = p.genDeliveryScript()
3✔
3349
                        if err != nil {
3✔
3350
                                p.log.Errorf(err.Error())
×
3351
                                req.Err <- err
×
3352
                                return
×
3353
                        }
×
3354
                }
3355
                addr, err := p.addrWithInternalKey(deliveryScript)
6✔
3356
                if err != nil {
6✔
3357
                        err = fmt.Errorf("unable to parse addr for channel "+
×
3358
                                "%v: %w", req.ChanPoint, err)
×
3359
                        p.log.Errorf(err.Error())
×
3360
                        req.Err <- err
×
3361

×
3362
                        return
×
3363
                }
×
3364
                chanCloser, err := p.createChanCloser(
6✔
3365
                        channel, addr, req.TargetFeePerKw, req, lntypes.Local,
6✔
3366
                )
6✔
3367
                if err != nil {
6✔
3368
                        p.log.Errorf(err.Error())
×
3369
                        req.Err <- err
×
3370
                        return
×
3371
                }
×
3372

3373
                p.activeChanCloses[chanID] = chanCloser
6✔
3374

6✔
3375
                // Finally, we'll initiate the channel shutdown within the
6✔
3376
                // chanCloser, and send the shutdown message to the remote
6✔
3377
                // party to kick things off.
6✔
3378
                shutdownMsg, err := chanCloser.ShutdownChan()
6✔
3379
                if err != nil {
6✔
3380
                        p.log.Errorf(err.Error())
×
3381
                        req.Err <- err
×
3382
                        delete(p.activeChanCloses, chanID)
×
3383

×
3384
                        // As we were unable to shutdown the channel, we'll
×
3385
                        // return it back to its normal state.
×
3386
                        channel.ResetState()
×
3387
                        return
×
3388
                }
×
3389

3390
                link := p.fetchLinkFromKeyAndCid(chanID)
6✔
3391
                if link == nil {
6✔
3392
                        // If the link is nil then it means it was already
×
3393
                        // removed from the switch or it never existed in the
×
3394
                        // first place. The latter case is handled at the
×
3395
                        // beginning of this function, so in the case where it
×
3396
                        // has already been removed, we can skip adding the
×
3397
                        // commit hook to queue a Shutdown message.
×
3398
                        p.log.Warnf("link not found during attempted closure: "+
×
3399
                                "%v", chanID)
×
3400
                        return
×
3401
                }
×
3402

3403
                if !link.DisableAdds(htlcswitch.Outgoing) {
6✔
3404
                        p.log.Warnf("Outgoing link adds already "+
×
3405
                                "disabled: %v", link.ChanID())
×
3406
                }
×
3407

3408
                link.OnCommitOnce(htlcswitch.Outgoing, func() {
12✔
3409
                        p.queueMsg(shutdownMsg, nil)
6✔
3410
                })
6✔
3411

3412
        // A type of CloseBreach indicates that the counterparty has breached
3413
        // the channel therefore we need to clean up our local state.
3414
        case contractcourt.CloseBreach:
×
3415
                // TODO(roasbeef): no longer need with newer beach logic?
×
3416
                p.log.Infof("ChannelPoint(%v) has been breached, wiping "+
×
3417
                        "channel", req.ChanPoint)
×
3418
                p.WipeChannel(req.ChanPoint)
×
3419
        }
3420
}
3421

3422
// linkFailureReport is sent to the channelManager whenever a link reports a
3423
// link failure, and is forced to exit. The report houses the necessary
3424
// information to clean up the channel state, send back the error message, and
3425
// force close if necessary.
3426
type linkFailureReport struct {
3427
        chanPoint   wire.OutPoint
3428
        chanID      lnwire.ChannelID
3429
        shortChanID lnwire.ShortChannelID
3430
        linkErr     htlcswitch.LinkFailureError
3431
}
3432

3433
// handleLinkFailure processes a link failure report when a link in the switch
3434
// fails. It facilitates the removal of all channel state within the peer,
3435
// force closing the channel depending on severity, and sending the error
3436
// message back to the remote party.
UNCOV
3437
func (p *Brontide) handleLinkFailure(failure linkFailureReport) {
×
UNCOV
3438
        // Retrieve the channel from the map of active channels. We do this to
×
UNCOV
3439
        // have access to it even after WipeChannel remove it from the map.
×
UNCOV
3440
        chanID := lnwire.NewChanIDFromOutPoint(failure.chanPoint)
×
UNCOV
3441
        lnChan, _ := p.activeChannels.Load(chanID)
×
UNCOV
3442

×
UNCOV
3443
        // We begin by wiping the link, which will remove it from the switch,
×
UNCOV
3444
        // such that it won't be attempted used for any more updates.
×
UNCOV
3445
        //
×
UNCOV
3446
        // TODO(halseth): should introduce a way to atomically stop/pause the
×
UNCOV
3447
        // link and cancel back any adds in its mailboxes such that we can
×
UNCOV
3448
        // safely force close without the link being added again and updates
×
UNCOV
3449
        // being applied.
×
UNCOV
3450
        p.WipeChannel(&failure.chanPoint)
×
UNCOV
3451

×
UNCOV
3452
        // If the error encountered was severe enough, we'll now force close
×
UNCOV
3453
        // the channel to prevent reading it to the switch in the future.
×
UNCOV
3454
        if failure.linkErr.FailureAction == htlcswitch.LinkFailureForceClose {
×
UNCOV
3455
                p.log.Warnf("Force closing link(%v)", failure.shortChanID)
×
UNCOV
3456

×
UNCOV
3457
                closeTx, err := p.cfg.ChainArb.ForceCloseContract(
×
UNCOV
3458
                        failure.chanPoint,
×
UNCOV
3459
                )
×
UNCOV
3460
                if err != nil {
×
UNCOV
3461
                        p.log.Errorf("unable to force close "+
×
UNCOV
3462
                                "link(%v): %v", failure.shortChanID, err)
×
UNCOV
3463
                } else {
×
UNCOV
3464
                        p.log.Infof("channel(%v) force "+
×
UNCOV
3465
                                "closed with txid %v",
×
UNCOV
3466
                                failure.shortChanID, closeTx.TxHash())
×
UNCOV
3467
                }
×
3468
        }
3469

3470
        // If this is a permanent failure, we will mark the channel borked.
UNCOV
3471
        if failure.linkErr.PermanentFailure && lnChan != nil {
×
3472
                p.log.Warnf("Marking link(%v) borked due to permanent "+
×
3473
                        "failure", failure.shortChanID)
×
3474

×
3475
                if err := lnChan.State().MarkBorked(); err != nil {
×
3476
                        p.log.Errorf("Unable to mark channel %v borked: %v",
×
3477
                                failure.shortChanID, err)
×
3478
                }
×
3479
        }
3480

3481
        // Send an error to the peer, why we failed the channel.
UNCOV
3482
        if failure.linkErr.ShouldSendToPeer() {
×
UNCOV
3483
                // If SendData is set, send it to the peer. If not, we'll use
×
UNCOV
3484
                // the standard error messages in the payload. We only include
×
UNCOV
3485
                // sendData in the cases where the error data does not contain
×
UNCOV
3486
                // sensitive information.
×
UNCOV
3487
                data := []byte(failure.linkErr.Error())
×
UNCOV
3488
                if failure.linkErr.SendData != nil {
×
3489
                        data = failure.linkErr.SendData
×
3490
                }
×
3491

UNCOV
3492
                var networkMsg lnwire.Message
×
UNCOV
3493
                if failure.linkErr.Warning {
×
3494
                        networkMsg = &lnwire.Warning{
×
3495
                                ChanID: failure.chanID,
×
3496
                                Data:   data,
×
3497
                        }
×
UNCOV
3498
                } else {
×
UNCOV
3499
                        networkMsg = &lnwire.Error{
×
UNCOV
3500
                                ChanID: failure.chanID,
×
UNCOV
3501
                                Data:   data,
×
UNCOV
3502
                        }
×
UNCOV
3503
                }
×
3504

UNCOV
3505
                err := p.SendMessage(true, networkMsg)
×
UNCOV
3506
                if err != nil {
×
3507
                        p.log.Errorf("unable to send msg to "+
×
3508
                                "remote peer: %v", err)
×
3509
                }
×
3510
        }
3511

3512
        // If the failure action is disconnect, then we'll execute that now. If
3513
        // we had to send an error above, it was a sync call, so we expect the
3514
        // message to be flushed on the wire by now.
UNCOV
3515
        if failure.linkErr.FailureAction == htlcswitch.LinkFailureDisconnect {
×
3516
                p.Disconnect(fmt.Errorf("link requested disconnect"))
×
3517
        }
×
3518
}
3519

3520
// fetchLinkFromKeyAndCid fetches a link from the switch via the remote's
3521
// public key and the channel id.
3522
func (p *Brontide) fetchLinkFromKeyAndCid(
3523
        cid lnwire.ChannelID) htlcswitch.ChannelUpdateHandler {
19✔
3524

19✔
3525
        var chanLink htlcswitch.ChannelUpdateHandler
19✔
3526

19✔
3527
        // We don't need to check the error here, and can instead just loop
19✔
3528
        // over the slice and return nil.
19✔
3529
        links, _ := p.cfg.Switch.GetLinksByInterface(p.cfg.PubKeyBytes)
19✔
3530
        for _, link := range links {
37✔
3531
                if link.ChanID() == cid {
36✔
3532
                        chanLink = link
18✔
3533
                        break
18✔
3534
                }
3535
        }
3536

3537
        return chanLink
19✔
3538
}
3539

3540
// finalizeChanClosure performs the final clean up steps once the cooperative
3541
// closure transaction has been fully broadcast. The finalized closing state
3542
// machine should be passed in. Once the transaction has been sufficiently
3543
// confirmed, the channel will be marked as fully closed within the database,
3544
// and any clients will be notified of updates to the closing state.
3545
func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) {
4✔
3546
        closeReq := chanCloser.CloseRequest()
4✔
3547

4✔
3548
        // First, we'll clear all indexes related to the channel in question.
4✔
3549
        chanPoint := chanCloser.Channel().ChannelPoint()
4✔
3550
        p.WipeChannel(&chanPoint)
4✔
3551

4✔
3552
        // Also clear the activeChanCloses map of this channel.
4✔
3553
        cid := lnwire.NewChanIDFromOutPoint(chanPoint)
4✔
3554
        delete(p.activeChanCloses, cid)
4✔
3555

4✔
3556
        // Next, we'll launch a goroutine which will request to be notified by
4✔
3557
        // the ChainNotifier once the closure transaction obtains a single
4✔
3558
        // confirmation.
4✔
3559
        notifier := p.cfg.ChainNotifier
4✔
3560

4✔
3561
        // If any error happens during waitForChanToClose, forward it to
4✔
3562
        // closeReq. If this channel closure is not locally initiated, closeReq
4✔
3563
        // will be nil, so just ignore the error.
4✔
3564
        errChan := make(chan error, 1)
4✔
3565
        if closeReq != nil {
6✔
3566
                errChan = closeReq.Err
2✔
3567
        }
2✔
3568

3569
        closingTx, err := chanCloser.ClosingTx()
4✔
3570
        if err != nil {
4✔
3571
                if closeReq != nil {
×
3572
                        p.log.Error(err)
×
3573
                        closeReq.Err <- err
×
3574
                }
×
3575
        }
3576

3577
        closingTxid := closingTx.TxHash()
4✔
3578

4✔
3579
        // If this is a locally requested shutdown, update the caller with a
4✔
3580
        // new event detailing the current pending state of this request.
4✔
3581
        if closeReq != nil {
6✔
3582
                closeReq.Updates <- &PendingUpdate{
2✔
3583
                        Txid: closingTxid[:],
2✔
3584
                }
2✔
3585
        }
2✔
3586

3587
        localOut := chanCloser.LocalCloseOutput()
4✔
3588
        remoteOut := chanCloser.RemoteCloseOutput()
4✔
3589
        auxOut := chanCloser.AuxOutputs()
4✔
3590
        go WaitForChanToClose(
4✔
3591
                chanCloser.NegotiationHeight(), notifier, errChan,
4✔
3592
                &chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() {
8✔
3593
                        // Respond to the local subsystem which requested the
4✔
3594
                        // channel closure.
4✔
3595
                        if closeReq != nil {
6✔
3596
                                closeReq.Updates <- &ChannelCloseUpdate{
2✔
3597
                                        ClosingTxid:       closingTxid[:],
2✔
3598
                                        Success:           true,
2✔
3599
                                        LocalCloseOutput:  localOut,
2✔
3600
                                        RemoteCloseOutput: remoteOut,
2✔
3601
                                        AuxOutputs:        auxOut,
2✔
3602
                                }
2✔
3603
                        }
2✔
3604
                },
3605
        )
3606
}
3607

3608
// WaitForChanToClose uses the passed notifier to wait until the channel has
3609
// been detected as closed on chain and then concludes by executing the
3610
// following actions: the channel point will be sent over the settleChan, and
3611
// finally the callback will be executed. If any error is encountered within
3612
// the function, then it will be sent over the errChan.
3613
func WaitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier,
3614
        errChan chan error, chanPoint *wire.OutPoint,
3615
        closingTxID *chainhash.Hash, closeScript []byte, cb func()) {
4✔
3616

4✔
3617
        peerLog.Infof("Waiting for confirmation of close of ChannelPoint(%v) "+
4✔
3618
                "with txid: %v", chanPoint, closingTxID)
4✔
3619

4✔
3620
        // TODO(roasbeef): add param for num needed confs
4✔
3621
        confNtfn, err := notifier.RegisterConfirmationsNtfn(
4✔
3622
                closingTxID, closeScript, 1, bestHeight,
4✔
3623
        )
4✔
3624
        if err != nil {
4✔
3625
                if errChan != nil {
×
3626
                        errChan <- err
×
3627
                }
×
3628
                return
×
3629
        }
3630

3631
        // In the case that the ChainNotifier is shutting down, all subscriber
3632
        // notification channels will be closed, generating a nil receive.
3633
        height, ok := <-confNtfn.Confirmed
4✔
3634
        if !ok {
4✔
UNCOV
3635
                return
×
UNCOV
3636
        }
×
3637

3638
        // The channel has been closed, remove it from any active indexes, and
3639
        // the database state.
3640
        peerLog.Infof("ChannelPoint(%v) is now closed at "+
4✔
3641
                "height %v", chanPoint, height.BlockHeight)
4✔
3642

4✔
3643
        // Finally, execute the closure call back to mark the confirmation of
4✔
3644
        // the transaction closing the contract.
4✔
3645
        cb()
4✔
3646
}
3647

3648
// WipeChannel removes the passed channel point from all indexes associated with
3649
// the peer and the switch.
3650
func (p *Brontide) WipeChannel(chanPoint *wire.OutPoint) {
4✔
3651
        chanID := lnwire.NewChanIDFromOutPoint(*chanPoint)
4✔
3652

4✔
3653
        p.activeChannels.Delete(chanID)
4✔
3654

4✔
3655
        // Instruct the HtlcSwitch to close this link as the channel is no
4✔
3656
        // longer active.
4✔
3657
        p.cfg.Switch.RemoveLink(chanID)
4✔
3658
}
4✔
3659

3660
// handleInitMsg handles the incoming init message which contains global and
3661
// local feature vectors. If feature vectors are incompatible then disconnect.
3662
func (p *Brontide) handleInitMsg(msg *lnwire.Init) error {
3✔
3663
        // First, merge any features from the legacy global features field into
3✔
3664
        // those presented in the local features fields.
3✔
3665
        err := msg.Features.Merge(msg.GlobalFeatures)
3✔
3666
        if err != nil {
3✔
3667
                return fmt.Errorf("unable to merge legacy global features: %w",
×
3668
                        err)
×
3669
        }
×
3670

3671
        // Then, finalize the remote feature vector providing the flattened
3672
        // feature bit namespace.
3673
        p.remoteFeatures = lnwire.NewFeatureVector(
3✔
3674
                msg.Features, lnwire.Features,
3✔
3675
        )
3✔
3676

3✔
3677
        // Now that we have their features loaded, we'll ensure that they
3✔
3678
        // didn't set any required bits that we don't know of.
3✔
3679
        err = feature.ValidateRequired(p.remoteFeatures)
3✔
3680
        if err != nil {
3✔
3681
                return fmt.Errorf("invalid remote features: %w", err)
×
3682
        }
×
3683

3684
        // Ensure the remote party's feature vector contains all transitive
3685
        // dependencies. We know ours are correct since they are validated
3686
        // during the feature manager's instantiation.
3687
        err = feature.ValidateDeps(p.remoteFeatures)
3✔
3688
        if err != nil {
3✔
3689
                return fmt.Errorf("invalid remote features: %w", err)
×
3690
        }
×
3691

3692
        // Now that we know we understand their requirements, we'll check to
3693
        // see if they don't support anything that we deem to be mandatory.
3694
        if !p.remoteFeatures.HasFeature(lnwire.DataLossProtectRequired) {
3✔
3695
                return fmt.Errorf("data loss protection required")
×
3696
        }
×
3697

3698
        return nil
3✔
3699
}
3700

3701
// LocalFeatures returns the set of global features that has been advertised by
3702
// the local node. This allows sub-systems that use this interface to gate their
3703
// behavior off the set of negotiated feature bits.
3704
//
3705
// NOTE: Part of the lnpeer.Peer interface.
UNCOV
3706
func (p *Brontide) LocalFeatures() *lnwire.FeatureVector {
×
UNCOV
3707
        return p.cfg.Features
×
UNCOV
3708
}
×
3709

3710
// RemoteFeatures returns the set of global features that has been advertised by
3711
// the remote node. This allows sub-systems that use this interface to gate
3712
// their behavior off the set of negotiated feature bits.
3713
//
3714
// NOTE: Part of the lnpeer.Peer interface.
3715
func (p *Brontide) RemoteFeatures() *lnwire.FeatureVector {
6✔
3716
        return p.remoteFeatures
6✔
3717
}
6✔
3718

3719
// hasNegotiatedScidAlias returns true if we've negotiated the
3720
// option-scid-alias feature bit with the peer.
3721
func (p *Brontide) hasNegotiatedScidAlias() bool {
3✔
3722
        peerHas := p.remoteFeatures.HasFeature(lnwire.ScidAliasOptional)
3✔
3723
        localHas := p.cfg.Features.HasFeature(lnwire.ScidAliasOptional)
3✔
3724
        return peerHas && localHas
3✔
3725
}
3✔
3726

3727
// sendInitMsg sends the Init message to the remote peer. This message contains
3728
// our currently supported local and global features.
3729
func (p *Brontide) sendInitMsg(legacyChan bool) error {
7✔
3730
        features := p.cfg.Features.Clone()
7✔
3731
        legacyFeatures := p.cfg.LegacyFeatures.Clone()
7✔
3732

7✔
3733
        // If we have a legacy channel open with a peer, we downgrade static
7✔
3734
        // remote required to optional in case the peer does not understand the
7✔
3735
        // required feature bit. If we do not do this, the peer will reject our
7✔
3736
        // connection because it does not understand a required feature bit, and
7✔
3737
        // our channel will be unusable.
7✔
3738
        if legacyChan && features.RequiresFeature(lnwire.StaticRemoteKeyRequired) {
8✔
3739
                p.log.Infof("Legacy channel open with peer, " +
1✔
3740
                        "downgrading static remote required feature bit to " +
1✔
3741
                        "optional")
1✔
3742

1✔
3743
                // Unset and set in both the local and global features to
1✔
3744
                // ensure both sets are consistent and merge able by old and
1✔
3745
                // new nodes.
1✔
3746
                features.Unset(lnwire.StaticRemoteKeyRequired)
1✔
3747
                legacyFeatures.Unset(lnwire.StaticRemoteKeyRequired)
1✔
3748

1✔
3749
                features.Set(lnwire.StaticRemoteKeyOptional)
1✔
3750
                legacyFeatures.Set(lnwire.StaticRemoteKeyOptional)
1✔
3751
        }
1✔
3752

3753
        msg := lnwire.NewInitMessage(
7✔
3754
                legacyFeatures.RawFeatureVector,
7✔
3755
                features.RawFeatureVector,
7✔
3756
        )
7✔
3757

7✔
3758
        return p.writeMessage(msg)
7✔
3759
}
3760

3761
// resendChanSyncMsg will attempt to find a channel sync message for the closed
3762
// channel and resend it to our peer.
UNCOV
3763
func (p *Brontide) resendChanSyncMsg(cid lnwire.ChannelID) error {
×
UNCOV
3764
        // If we already re-sent the mssage for this channel, we won't do it
×
UNCOV
3765
        // again.
×
UNCOV
3766
        if _, ok := p.resentChanSyncMsg[cid]; ok {
×
UNCOV
3767
                return nil
×
UNCOV
3768
        }
×
3769

3770
        // Check if we have any channel sync messages stored for this channel.
UNCOV
3771
        c, err := p.cfg.ChannelDB.FetchClosedChannelForID(cid)
×
UNCOV
3772
        if err != nil {
×
UNCOV
3773
                return fmt.Errorf("unable to fetch channel sync messages for "+
×
UNCOV
3774
                        "peer %v: %v", p, err)
×
UNCOV
3775
        }
×
3776

UNCOV
3777
        if c.LastChanSyncMsg == nil {
×
3778
                return fmt.Errorf("no chan sync message stored for channel %v",
×
3779
                        cid)
×
3780
        }
×
3781

UNCOV
3782
        if !c.RemotePub.IsEqual(p.IdentityKey()) {
×
3783
                return fmt.Errorf("ignoring channel reestablish from "+
×
3784
                        "peer=%x", p.IdentityKey().SerializeCompressed())
×
3785
        }
×
3786

UNCOV
3787
        p.log.Debugf("Re-sending channel sync message for channel %v to "+
×
UNCOV
3788
                "peer", cid)
×
UNCOV
3789

×
UNCOV
3790
        if err := p.SendMessage(true, c.LastChanSyncMsg); err != nil {
×
3791
                return fmt.Errorf("failed resending channel sync "+
×
3792
                        "message to peer %v: %v", p, err)
×
3793
        }
×
3794

UNCOV
3795
        p.log.Debugf("Re-sent channel sync message for channel %v to peer ",
×
UNCOV
3796
                cid)
×
UNCOV
3797

×
UNCOV
3798
        // Note down that we sent the message, so we won't resend it again for
×
UNCOV
3799
        // this connection.
×
UNCOV
3800
        p.resentChanSyncMsg[cid] = struct{}{}
×
UNCOV
3801

×
UNCOV
3802
        return nil
×
3803
}
3804

3805
// SendMessage sends a variadic number of high-priority messages to the remote
3806
// peer. The first argument denotes if the method should block until the
3807
// messages have been sent to the remote peer or an error is returned,
3808
// otherwise it returns immediately after queuing.
3809
//
3810
// NOTE: Part of the lnpeer.Peer interface.
3811
func (p *Brontide) SendMessage(sync bool, msgs ...lnwire.Message) error {
3✔
3812
        return p.sendMessage(sync, true, msgs...)
3✔
3813
}
3✔
3814

3815
// SendMessageLazy sends a variadic number of low-priority messages to the
3816
// remote peer. The first argument denotes if the method should block until
3817
// the messages have been sent to the remote peer or an error is returned,
3818
// otherwise it returns immediately after queueing.
3819
//
3820
// NOTE: Part of the lnpeer.Peer interface.
3821
func (p *Brontide) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
1✔
3822
        return p.sendMessage(sync, false, msgs...)
1✔
3823
}
1✔
3824

3825
// sendMessage queues a variadic number of messages using the passed priority
3826
// to the remote peer. If sync is true, this method will block until the
3827
// messages have been sent to the remote peer or an error is returned, otherwise
3828
// it returns immediately after queueing.
3829
func (p *Brontide) sendMessage(sync, priority bool, msgs ...lnwire.Message) error {
4✔
3830
        // Add all incoming messages to the outgoing queue. A list of error
4✔
3831
        // chans is populated for each message if the caller requested a sync
4✔
3832
        // send.
4✔
3833
        var errChans []chan error
4✔
3834
        if sync {
5✔
3835
                errChans = make([]chan error, 0, len(msgs))
1✔
3836
        }
1✔
3837
        for _, msg := range msgs {
8✔
3838
                // If a sync send was requested, create an error chan to listen
4✔
3839
                // for an ack from the writeHandler.
4✔
3840
                var errChan chan error
4✔
3841
                if sync {
5✔
3842
                        errChan = make(chan error, 1)
1✔
3843
                        errChans = append(errChans, errChan)
1✔
3844
                }
1✔
3845

3846
                if priority {
7✔
3847
                        p.queueMsg(msg, errChan)
3✔
3848
                } else {
4✔
3849
                        p.queueMsgLazy(msg, errChan)
1✔
3850
                }
1✔
3851
        }
3852

3853
        // Wait for all replies from the writeHandler. For async sends, this
3854
        // will be a NOP as the list of error chans is nil.
3855
        for _, errChan := range errChans {
5✔
3856
                select {
1✔
3857
                case err := <-errChan:
1✔
3858
                        return err
1✔
3859
                case <-p.quit:
×
3860
                        return lnpeer.ErrPeerExiting
×
3861
                case <-p.cfg.Quit:
×
3862
                        return lnpeer.ErrPeerExiting
×
3863
                }
3864
        }
3865

3866
        return nil
3✔
3867
}
3868

3869
// PubKey returns the pubkey of the peer in compressed serialized format.
3870
//
3871
// NOTE: Part of the lnpeer.Peer interface.
3872
func (p *Brontide) PubKey() [33]byte {
2✔
3873
        return p.cfg.PubKeyBytes
2✔
3874
}
2✔
3875

3876
// IdentityKey returns the public key of the remote peer.
3877
//
3878
// NOTE: Part of the lnpeer.Peer interface.
3879
func (p *Brontide) IdentityKey() *btcec.PublicKey {
15✔
3880
        return p.cfg.Addr.IdentityKey
15✔
3881
}
15✔
3882

3883
// Address returns the network address of the remote peer.
3884
//
3885
// NOTE: Part of the lnpeer.Peer interface.
UNCOV
3886
func (p *Brontide) Address() net.Addr {
×
UNCOV
3887
        return p.cfg.Addr.Address
×
UNCOV
3888
}
×
3889

3890
// AddNewChannel adds a new channel to the peer. The channel should fail to be
3891
// added if the cancel channel is closed.
3892
//
3893
// NOTE: Part of the lnpeer.Peer interface.
3894
func (p *Brontide) AddNewChannel(newChan *lnpeer.NewChannel,
UNCOV
3895
        cancel <-chan struct{}) error {
×
UNCOV
3896

×
UNCOV
3897
        errChan := make(chan error, 1)
×
UNCOV
3898
        newChanMsg := &newChannelMsg{
×
UNCOV
3899
                channel: newChan,
×
UNCOV
3900
                err:     errChan,
×
UNCOV
3901
        }
×
UNCOV
3902

×
UNCOV
3903
        select {
×
UNCOV
3904
        case p.newActiveChannel <- newChanMsg:
×
3905
        case <-cancel:
×
3906
                return errors.New("canceled adding new channel")
×
3907
        case <-p.quit:
×
3908
                return lnpeer.ErrPeerExiting
×
3909
        }
3910

3911
        // We pause here to wait for the peer to recognize the new channel
3912
        // before we close the channel barrier corresponding to the channel.
UNCOV
3913
        select {
×
UNCOV
3914
        case err := <-errChan:
×
UNCOV
3915
                return err
×
3916
        case <-p.quit:
×
3917
                return lnpeer.ErrPeerExiting
×
3918
        }
3919
}
3920

3921
// AddPendingChannel adds a pending open channel to the peer. The channel
3922
// should fail to be added if the cancel channel is closed.
3923
//
3924
// NOTE: Part of the lnpeer.Peer interface.
3925
func (p *Brontide) AddPendingChannel(cid lnwire.ChannelID,
UNCOV
3926
        cancel <-chan struct{}) error {
×
UNCOV
3927

×
UNCOV
3928
        errChan := make(chan error, 1)
×
UNCOV
3929
        newChanMsg := &newChannelMsg{
×
UNCOV
3930
                channelID: cid,
×
UNCOV
3931
                err:       errChan,
×
UNCOV
3932
        }
×
UNCOV
3933

×
UNCOV
3934
        select {
×
UNCOV
3935
        case p.newPendingChannel <- newChanMsg:
×
3936

3937
        case <-cancel:
×
3938
                return errors.New("canceled adding pending channel")
×
3939

3940
        case <-p.quit:
×
3941
                return lnpeer.ErrPeerExiting
×
3942
        }
3943

3944
        // We pause here to wait for the peer to recognize the new pending
3945
        // channel before we close the channel barrier corresponding to the
3946
        // channel.
UNCOV
3947
        select {
×
UNCOV
3948
        case err := <-errChan:
×
UNCOV
3949
                return err
×
3950

3951
        case <-cancel:
×
3952
                return errors.New("canceled adding pending channel")
×
3953

3954
        case <-p.quit:
×
3955
                return lnpeer.ErrPeerExiting
×
3956
        }
3957
}
3958

3959
// RemovePendingChannel removes a pending open channel from the peer.
3960
//
3961
// NOTE: Part of the lnpeer.Peer interface.
UNCOV
3962
func (p *Brontide) RemovePendingChannel(cid lnwire.ChannelID) error {
×
UNCOV
3963
        errChan := make(chan error, 1)
×
UNCOV
3964
        newChanMsg := &newChannelMsg{
×
UNCOV
3965
                channelID: cid,
×
UNCOV
3966
                err:       errChan,
×
UNCOV
3967
        }
×
UNCOV
3968

×
UNCOV
3969
        select {
×
UNCOV
3970
        case p.removePendingChannel <- newChanMsg:
×
3971
        case <-p.quit:
×
3972
                return lnpeer.ErrPeerExiting
×
3973
        }
3974

3975
        // We pause here to wait for the peer to respond to the cancellation of
3976
        // the pending channel before we close the channel barrier
3977
        // corresponding to the channel.
UNCOV
3978
        select {
×
UNCOV
3979
        case err := <-errChan:
×
UNCOV
3980
                return err
×
3981

3982
        case <-p.quit:
×
3983
                return lnpeer.ErrPeerExiting
×
3984
        }
3985
}
3986

3987
// StartTime returns the time at which the connection was established if the
3988
// peer started successfully, and zero otherwise.
UNCOV
3989
func (p *Brontide) StartTime() time.Time {
×
UNCOV
3990
        return p.startTime
×
UNCOV
3991
}
×
3992

3993
// handleCloseMsg is called when a new cooperative channel closure related
3994
// message is received from the remote peer. We'll use this message to advance
3995
// the chan closer state machine.
3996
func (p *Brontide) handleCloseMsg(msg *closeMsg) {
13✔
3997
        link := p.fetchLinkFromKeyAndCid(msg.cid)
13✔
3998

13✔
3999
        // We'll now fetch the matching closing state machine in order to continue,
13✔
4000
        // or finalize the channel closure process.
13✔
4001
        chanCloser, err := p.fetchActiveChanCloser(msg.cid)
13✔
4002
        if err != nil {
13✔
UNCOV
4003
                // If the channel is not known to us, we'll simply ignore this message.
×
UNCOV
4004
                if err == ErrChannelNotFound {
×
UNCOV
4005
                        return
×
UNCOV
4006
                }
×
4007

4008
                p.log.Errorf("Unable to respond to remote close msg: %v", err)
×
4009

×
4010
                errMsg := &lnwire.Error{
×
4011
                        ChanID: msg.cid,
×
4012
                        Data:   lnwire.ErrorData(err.Error()),
×
4013
                }
×
4014
                p.queueMsg(errMsg, nil)
×
4015
                return
×
4016
        }
4017

4018
        handleErr := func(err error) {
13✔
UNCOV
4019
                err = fmt.Errorf("unable to process close msg: %w", err)
×
UNCOV
4020
                p.log.Error(err)
×
UNCOV
4021

×
UNCOV
4022
                // As the negotiations failed, we'll reset the channel state machine to
×
UNCOV
4023
                // ensure we act to on-chain events as normal.
×
UNCOV
4024
                chanCloser.Channel().ResetState()
×
UNCOV
4025

×
UNCOV
4026
                if chanCloser.CloseRequest() != nil {
×
4027
                        chanCloser.CloseRequest().Err <- err
×
4028
                }
×
UNCOV
4029
                delete(p.activeChanCloses, msg.cid)
×
UNCOV
4030

×
UNCOV
4031
                p.Disconnect(err)
×
4032
        }
4033

4034
        // Next, we'll process the next message using the target state machine.
4035
        // We'll either continue negotiation, or halt.
4036
        switch typed := msg.msg.(type) {
13✔
4037
        case *lnwire.Shutdown:
5✔
4038
                // Disable incoming adds immediately.
5✔
4039
                if link != nil && !link.DisableAdds(htlcswitch.Incoming) {
5✔
4040
                        p.log.Warnf("Incoming link adds already disabled: %v",
×
4041
                                link.ChanID())
×
4042
                }
×
4043

4044
                oShutdown, err := chanCloser.ReceiveShutdown(*typed)
5✔
4045
                if err != nil {
5✔
4046
                        handleErr(err)
×
4047
                        return
×
4048
                }
×
4049

4050
                oShutdown.WhenSome(func(msg lnwire.Shutdown) {
8✔
4051
                        // If the link is nil it means we can immediately queue
3✔
4052
                        // the Shutdown message since we don't have to wait for
3✔
4053
                        // commitment transaction synchronization.
3✔
4054
                        if link == nil {
4✔
4055
                                p.queueMsg(&msg, nil)
1✔
4056
                                return
1✔
4057
                        }
1✔
4058

4059
                        // Immediately disallow any new HTLC's from being added
4060
                        // in the outgoing direction.
4061
                        if !link.DisableAdds(htlcswitch.Outgoing) {
2✔
4062
                                p.log.Warnf("Outgoing link adds already "+
×
4063
                                        "disabled: %v", link.ChanID())
×
4064
                        }
×
4065

4066
                        // When we have a Shutdown to send, we defer it till the
4067
                        // next time we send a CommitSig to remain spec
4068
                        // compliant.
4069
                        link.OnCommitOnce(htlcswitch.Outgoing, func() {
4✔
4070
                                p.queueMsg(&msg, nil)
2✔
4071
                        })
2✔
4072
                })
4073

4074
                beginNegotiation := func() {
10✔
4075
                        oClosingSigned, err := chanCloser.BeginNegotiation()
5✔
4076
                        if err != nil {
5✔
4077
                                handleErr(err)
×
4078
                                return
×
4079
                        }
×
4080

4081
                        oClosingSigned.WhenSome(func(msg lnwire.ClosingSigned) {
10✔
4082
                                p.queueMsg(&msg, nil)
5✔
4083
                        })
5✔
4084
                }
4085

4086
                if link == nil {
6✔
4087
                        beginNegotiation()
1✔
4088
                } else {
5✔
4089
                        // Now we register a flush hook to advance the
4✔
4090
                        // ChanCloser and possibly send out a ClosingSigned
4✔
4091
                        // when the link finishes draining.
4✔
4092
                        link.OnFlushedOnce(func() {
8✔
4093
                                // Remove link in goroutine to prevent deadlock.
4✔
4094
                                go p.cfg.Switch.RemoveLink(msg.cid)
4✔
4095
                                beginNegotiation()
4✔
4096
                        })
4✔
4097
                }
4098

4099
        case *lnwire.ClosingSigned:
8✔
4100
                oClosingSigned, err := chanCloser.ReceiveClosingSigned(*typed)
8✔
4101
                if err != nil {
8✔
UNCOV
4102
                        handleErr(err)
×
UNCOV
4103
                        return
×
UNCOV
4104
                }
×
4105

4106
                oClosingSigned.WhenSome(func(msg lnwire.ClosingSigned) {
16✔
4107
                        p.queueMsg(&msg, nil)
8✔
4108
                })
8✔
4109

4110
        default:
×
4111
                panic("impossible closeMsg type")
×
4112
        }
4113

4114
        // If we haven't finished close negotiations, then we'll continue as we
4115
        // can't yet finalize the closure.
4116
        if _, err := chanCloser.ClosingTx(); err != nil {
20✔
4117
                return
8✔
4118
        }
8✔
4119

4120
        // Otherwise, we've agreed on a closing fee! In this case, we'll wrap up
4121
        // the channel closure by notifying relevant sub-systems and launching a
4122
        // goroutine to wait for close tx conf.
4123
        p.finalizeChanClosure(chanCloser)
4✔
4124
}
4125

4126
// HandleLocalCloseChanReqs accepts a *htlcswitch.ChanClose and passes it onto
4127
// the channelManager goroutine, which will shut down the link and possibly
4128
// close the channel.
UNCOV
4129
func (p *Brontide) HandleLocalCloseChanReqs(req *htlcswitch.ChanClose) {
×
UNCOV
4130
        select {
×
UNCOV
4131
        case p.localCloseChanReqs <- req:
×
UNCOV
4132
                p.log.Info("Local close channel request is going to be " +
×
UNCOV
4133
                        "delivered to the peer")
×
4134
        case <-p.quit:
×
4135
                p.log.Info("Unable to deliver local close channel request " +
×
4136
                        "to peer")
×
4137
        }
4138
}
4139

4140
// NetAddress returns the network of the remote peer as an lnwire.NetAddress.
UNCOV
4141
func (p *Brontide) NetAddress() *lnwire.NetAddress {
×
UNCOV
4142
        return p.cfg.Addr
×
UNCOV
4143
}
×
4144

4145
// Inbound is a getter for the Brontide's Inbound boolean in cfg.
UNCOV
4146
func (p *Brontide) Inbound() bool {
×
UNCOV
4147
        return p.cfg.Inbound
×
UNCOV
4148
}
×
4149

4150
// ConnReq is a getter for the Brontide's connReq in cfg.
UNCOV
4151
func (p *Brontide) ConnReq() *connmgr.ConnReq {
×
UNCOV
4152
        return p.cfg.ConnReq
×
UNCOV
4153
}
×
4154

4155
// ErrorBuffer is a getter for the Brontide's errorBuffer in cfg.
UNCOV
4156
func (p *Brontide) ErrorBuffer() *queue.CircularBuffer {
×
UNCOV
4157
        return p.cfg.ErrorBuffer
×
UNCOV
4158
}
×
4159

4160
// SetAddress sets the remote peer's address given an address.
4161
func (p *Brontide) SetAddress(address net.Addr) {
×
4162
        p.cfg.Addr.Address = address
×
4163
}
×
4164

4165
// ActiveSignal returns the peer's active signal.
UNCOV
4166
func (p *Brontide) ActiveSignal() chan struct{} {
×
UNCOV
4167
        return p.activeSignal
×
UNCOV
4168
}
×
4169

4170
// Conn returns a pointer to the peer's connection struct.
UNCOV
4171
func (p *Brontide) Conn() net.Conn {
×
UNCOV
4172
        return p.cfg.Conn
×
UNCOV
4173
}
×
4174

4175
// BytesReceived returns the number of bytes received from the peer.
UNCOV
4176
func (p *Brontide) BytesReceived() uint64 {
×
UNCOV
4177
        return atomic.LoadUint64(&p.bytesReceived)
×
UNCOV
4178
}
×
4179

4180
// BytesSent returns the number of bytes sent to the peer.
UNCOV
4181
func (p *Brontide) BytesSent() uint64 {
×
UNCOV
4182
        return atomic.LoadUint64(&p.bytesSent)
×
UNCOV
4183
}
×
4184

4185
// LastRemotePingPayload returns the last payload the remote party sent as part
4186
// of their ping.
UNCOV
4187
func (p *Brontide) LastRemotePingPayload() []byte {
×
UNCOV
4188
        pingPayload := p.lastPingPayload.Load()
×
UNCOV
4189
        if pingPayload == nil {
×
UNCOV
4190
                return []byte{}
×
UNCOV
4191
        }
×
4192

4193
        pingBytes, ok := pingPayload.(lnwire.PingPayload)
×
4194
        if !ok {
×
4195
                return nil
×
4196
        }
×
4197

4198
        return pingBytes
×
4199
}
4200

4201
// attachChannelEventSubscription creates a channel event subscription and
4202
// attaches to client to Brontide if the reenableTimeout is no greater than 1
4203
// minute.
4204
func (p *Brontide) attachChannelEventSubscription() error {
3✔
4205
        // If the timeout is greater than 1 minute, it's unlikely that the link
3✔
4206
        // hasn't yet finished its reestablishment. Return a nil without
3✔
4207
        // creating the client to specify that we don't want to retry.
3✔
4208
        if p.cfg.ChanActiveTimeout > 1*time.Minute {
3✔
UNCOV
4209
                return nil
×
UNCOV
4210
        }
×
4211

4212
        // When the reenable timeout is less than 1 minute, it's likely the
4213
        // channel link hasn't finished its reestablishment yet. In that case,
4214
        // we'll give it a second chance by subscribing to the channel update
4215
        // events. Upon receiving the `ActiveLinkEvent`, we'll then request
4216
        // enabling the channel again.
4217
        sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
3✔
4218
        if err != nil {
3✔
4219
                return fmt.Errorf("SubscribeChannelEvents failed: %w", err)
×
4220
        }
×
4221

4222
        p.channelEventClient = sub
3✔
4223

3✔
4224
        return nil
3✔
4225
}
4226

4227
// updateNextRevocation updates the existing channel's next revocation if it's
4228
// nil.
4229
func (p *Brontide) updateNextRevocation(c *channeldb.OpenChannel) error {
3✔
4230
        chanPoint := c.FundingOutpoint
3✔
4231
        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
3✔
4232

3✔
4233
        // Read the current channel.
3✔
4234
        currentChan, loaded := p.activeChannels.Load(chanID)
3✔
4235

3✔
4236
        // currentChan should exist, but we perform a check anyway to avoid nil
3✔
4237
        // pointer dereference.
3✔
4238
        if !loaded {
4✔
4239
                return fmt.Errorf("missing active channel with chanID=%v",
1✔
4240
                        chanID)
1✔
4241
        }
1✔
4242

4243
        // currentChan should not be nil, but we perform a check anyway to
4244
        // avoid nil pointer dereference.
4245
        if currentChan == nil {
3✔
4246
                return fmt.Errorf("found nil active channel with chanID=%v",
1✔
4247
                        chanID)
1✔
4248
        }
1✔
4249

4250
        // If we're being sent a new channel, and our existing channel doesn't
4251
        // have the next revocation, then we need to update the current
4252
        // existing channel.
4253
        if currentChan.RemoteNextRevocation() != nil {
1✔
4254
                return nil
×
4255
        }
×
4256

4257
        p.log.Infof("Processing retransmitted ChannelReady for "+
1✔
4258
                "ChannelPoint(%v)", chanPoint)
1✔
4259

1✔
4260
        nextRevoke := c.RemoteNextRevocation
1✔
4261

1✔
4262
        err := currentChan.InitNextRevocation(nextRevoke)
1✔
4263
        if err != nil {
1✔
4264
                return fmt.Errorf("unable to init next revocation: %w", err)
×
4265
        }
×
4266

4267
        return nil
1✔
4268
}
4269

4270
// addActiveChannel adds a new active channel to the `activeChannels` map. It
4271
// takes a `channeldb.OpenChannel`, creates a `lnwallet.LightningChannel` from
4272
// it and assembles it with a channel link.
UNCOV
4273
func (p *Brontide) addActiveChannel(c *lnpeer.NewChannel) error {
×
UNCOV
4274
        chanPoint := c.FundingOutpoint
×
UNCOV
4275
        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
×
UNCOV
4276

×
UNCOV
4277
        // If we've reached this point, there are two possible scenarios.  If
×
UNCOV
4278
        // the channel was in the active channels map as nil, then it was
×
UNCOV
4279
        // loaded from disk and we need to send reestablish. Else, it was not
×
UNCOV
4280
        // loaded from disk and we don't need to send reestablish as this is a
×
UNCOV
4281
        // fresh channel.
×
UNCOV
4282
        shouldReestablish := p.isLoadedFromDisk(chanID)
×
UNCOV
4283

×
UNCOV
4284
        chanOpts := c.ChanOpts
×
UNCOV
4285
        if shouldReestablish {
×
UNCOV
4286
                // If we have to do the reestablish dance for this channel,
×
UNCOV
4287
                // ensure that we don't try to call InitRemoteMusigNonces twice
×
UNCOV
4288
                // by calling SkipNonceInit.
×
UNCOV
4289
                chanOpts = append(chanOpts, lnwallet.WithSkipNonceInit())
×
UNCOV
4290
        }
×
4291

UNCOV
4292
        p.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
4293
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
4294
        })
×
UNCOV
4295
        p.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
4296
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
4297
        })
×
UNCOV
4298
        p.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
4299
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
4300
        })
×
4301

4302
        // If not already active, we'll add this channel to the set of active
4303
        // channels, so we can look it up later easily according to its channel
4304
        // ID.
UNCOV
4305
        lnChan, err := lnwallet.NewLightningChannel(
×
UNCOV
4306
                p.cfg.Signer, c.OpenChannel, p.cfg.SigPool, chanOpts...,
×
UNCOV
4307
        )
×
UNCOV
4308
        if err != nil {
×
4309
                return fmt.Errorf("unable to create LightningChannel: %w", err)
×
4310
        }
×
4311

4312
        // Store the channel in the activeChannels map.
UNCOV
4313
        p.activeChannels.Store(chanID, lnChan)
×
UNCOV
4314

×
UNCOV
4315
        p.log.Infof("New channel active ChannelPoint(%v) with peer", chanPoint)
×
UNCOV
4316

×
UNCOV
4317
        // Next, we'll assemble a ChannelLink along with the necessary items it
×
UNCOV
4318
        // needs to function.
×
UNCOV
4319
        chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(chanPoint)
×
UNCOV
4320
        if err != nil {
×
4321
                return fmt.Errorf("unable to subscribe to chain events: %w",
×
4322
                        err)
×
4323
        }
×
4324

4325
        // We'll query the channel DB for the new channel's initial forwarding
4326
        // policies to determine the policy we start out with.
UNCOV
4327
        initialPolicy, err := p.cfg.ChannelDB.GetInitialForwardingPolicy(chanID)
×
UNCOV
4328
        if err != nil {
×
4329
                return fmt.Errorf("unable to query for initial forwarding "+
×
4330
                        "policy: %v", err)
×
4331
        }
×
4332

4333
        // Create the link and add it to the switch.
UNCOV
4334
        err = p.addLink(
×
UNCOV
4335
                &chanPoint, lnChan, initialPolicy, chainEvents,
×
UNCOV
4336
                shouldReestablish, fn.None[lnwire.Shutdown](),
×
UNCOV
4337
        )
×
UNCOV
4338
        if err != nil {
×
4339
                return fmt.Errorf("can't register new channel link(%v) with "+
×
4340
                        "peer", chanPoint)
×
4341
        }
×
4342

UNCOV
4343
        return nil
×
4344
}
4345

4346
// handleNewActiveChannel handles a `newChannelMsg` request. Depending on we
4347
// know this channel ID or not, we'll either add it to the `activeChannels` map
4348
// or init the next revocation for it.
UNCOV
4349
func (p *Brontide) handleNewActiveChannel(req *newChannelMsg) {
×
UNCOV
4350
        newChan := req.channel
×
UNCOV
4351
        chanPoint := newChan.FundingOutpoint
×
UNCOV
4352
        chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
×
UNCOV
4353

×
UNCOV
4354
        // Only update RemoteNextRevocation if the channel is in the
×
UNCOV
4355
        // activeChannels map and if we added the link to the switch. Only
×
UNCOV
4356
        // active channels will be added to the switch.
×
UNCOV
4357
        if p.isActiveChannel(chanID) {
×
UNCOV
4358
                p.log.Infof("Already have ChannelPoint(%v), ignoring",
×
UNCOV
4359
                        chanPoint)
×
UNCOV
4360

×
UNCOV
4361
                // Handle it and close the err chan on the request.
×
UNCOV
4362
                close(req.err)
×
UNCOV
4363

×
UNCOV
4364
                // Update the next revocation point.
×
UNCOV
4365
                err := p.updateNextRevocation(newChan.OpenChannel)
×
UNCOV
4366
                if err != nil {
×
4367
                        p.log.Errorf(err.Error())
×
4368
                }
×
4369

UNCOV
4370
                return
×
4371
        }
4372

4373
        // This is a new channel, we now add it to the map.
UNCOV
4374
        if err := p.addActiveChannel(req.channel); err != nil {
×
4375
                // Log and send back the error to the request.
×
4376
                p.log.Errorf(err.Error())
×
4377
                req.err <- err
×
4378

×
4379
                return
×
4380
        }
×
4381

4382
        // Close the err chan if everything went fine.
UNCOV
4383
        close(req.err)
×
4384
}
4385

4386
// handleNewPendingChannel takes a `newChannelMsg` request and add it to
4387
// `activeChannels` map with nil value. This pending channel will be saved as
4388
// it may become active in the future. Once active, the funding manager will
4389
// send it again via `AddNewChannel`, and we'd handle the link creation there.
4390
func (p *Brontide) handleNewPendingChannel(req *newChannelMsg) {
4✔
4391
        defer close(req.err)
4✔
4392

4✔
4393
        chanID := req.channelID
4✔
4394

4✔
4395
        // If we already have this channel, something is wrong with the funding
4✔
4396
        // flow as it will only be marked as active after `ChannelReady` is
4✔
4397
        // handled. In this case, we will do nothing but log an error, just in
4✔
4398
        // case this is a legit channel.
4✔
4399
        if p.isActiveChannel(chanID) {
5✔
4400
                p.log.Errorf("Channel(%v) is already active, ignoring "+
1✔
4401
                        "pending channel request", chanID)
1✔
4402

1✔
4403
                return
1✔
4404
        }
1✔
4405

4406
        // The channel has already been added, we will do nothing and return.
4407
        if p.isPendingChannel(chanID) {
4✔
4408
                p.log.Infof("Channel(%v) is already added, ignoring "+
1✔
4409
                        "pending channel request", chanID)
1✔
4410

1✔
4411
                return
1✔
4412
        }
1✔
4413

4414
        // This is a new channel, we now add it to the map `activeChannels`
4415
        // with nil value and mark it as a newly added channel in
4416
        // `addedChannels`.
4417
        p.activeChannels.Store(chanID, nil)
2✔
4418
        p.addedChannels.Store(chanID, struct{}{})
2✔
4419
}
4420

4421
// handleRemovePendingChannel takes a `newChannelMsg` request and removes it
4422
// from `activeChannels` map. The request will be ignored if the channel is
4423
// considered active by Brontide. Noop if the channel ID cannot be found.
4424
func (p *Brontide) handleRemovePendingChannel(req *newChannelMsg) {
4✔
4425
        defer close(req.err)
4✔
4426

4✔
4427
        chanID := req.channelID
4✔
4428

4✔
4429
        // If we already have this channel, something is wrong with the funding
4✔
4430
        // flow as it will only be marked as active after `ChannelReady` is
4✔
4431
        // handled. In this case, we will log an error and exit.
4✔
4432
        if p.isActiveChannel(chanID) {
5✔
4433
                p.log.Errorf("Channel(%v) is active, ignoring remove request",
1✔
4434
                        chanID)
1✔
4435
                return
1✔
4436
        }
1✔
4437

4438
        // The channel has not been added yet, we will log a warning as there
4439
        // is an unexpected call from funding manager.
4440
        if !p.isPendingChannel(chanID) {
4✔
4441
                p.log.Warnf("Channel(%v) not found, removing it anyway", chanID)
1✔
4442
        }
1✔
4443

4444
        // Remove the record of this pending channel.
4445
        p.activeChannels.Delete(chanID)
3✔
4446
        p.addedChannels.Delete(chanID)
3✔
4447
}
4448

4449
// sendLinkUpdateMsg sends a message that updates the channel to the
4450
// channel's message stream.
UNCOV
4451
func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) {
×
UNCOV
4452
        p.log.Tracef("Sending link update msg=%v", msg.MsgType())
×
UNCOV
4453

×
UNCOV
4454
        chanStream, ok := p.activeMsgStreams[cid]
×
UNCOV
4455
        if !ok {
×
UNCOV
4456
                // If a stream hasn't yet been created, then we'll do so, add
×
UNCOV
4457
                // it to the map, and finally start it.
×
UNCOV
4458
                chanStream = newChanMsgStream(p, cid)
×
UNCOV
4459
                p.activeMsgStreams[cid] = chanStream
×
UNCOV
4460
                chanStream.Start()
×
UNCOV
4461

×
UNCOV
4462
                // Stop the stream when quit.
×
UNCOV
4463
                go func() {
×
UNCOV
4464
                        <-p.quit
×
UNCOV
4465
                        chanStream.Stop()
×
UNCOV
4466
                }()
×
4467
        }
4468

4469
        // With the stream obtained, add the message to the stream so we can
4470
        // continue processing message.
UNCOV
4471
        chanStream.AddMsg(msg)
×
4472
}
4473

4474
// scaleTimeout multiplies the argument duration by a constant factor depending
4475
// on variious heuristics. Currently this is only used to check whether our peer
4476
// appears to be connected over Tor and relaxes the timout deadline. However,
4477
// this is subject to change and should be treated as opaque.
4478
func (p *Brontide) scaleTimeout(timeout time.Duration) time.Duration {
67✔
4479
        if p.isTorConnection {
67✔
UNCOV
4480
                return timeout * time.Duration(torTimeoutMultiplier)
×
UNCOV
4481
        }
×
4482

4483
        return timeout
67✔
4484
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc