• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18852986778

27 Oct 2025 07:10PM UTC coverage: 54.859% (-11.8%) from 66.648%
18852986778

Pull #10265

github

web-flow
Merge 45787b3d5 into 9a7b526c0
Pull Request #10265: multi: update close logic to handle re-orgs of depth n-1, where n is num confs - add min conf floor

529 of 828 new or added lines in 17 files covered. (63.89%)

24026 existing lines in 286 files now uncovered.

110927 of 202205 relevant lines covered (54.86%)

21658.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.68
/contractcourt/chain_arbitrator.go
1
package contractcourt
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/chainio"
15
        "github.com/lightningnetwork/lnd/chainntnfs"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
        "github.com/lightningnetwork/lnd/clock"
18
        "github.com/lightningnetwork/lnd/fn/v2"
19
        "github.com/lightningnetwork/lnd/graph/db/models"
20
        "github.com/lightningnetwork/lnd/input"
21
        "github.com/lightningnetwork/lnd/kvdb"
22
        "github.com/lightningnetwork/lnd/labels"
23
        "github.com/lightningnetwork/lnd/lnwallet"
24
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
25
        "github.com/lightningnetwork/lnd/lnwire"
26
)
27

28
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
29
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
30

31
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
32
// outgoing contract has been fully resolved. For multi-hop contracts, if we
33
// resolve the outgoing contract, we'll also need to ensure that the incoming
34
// contract is resolved as well. We package the items required to resolve the
35
// incoming contracts within this message.
36
type ResolutionMsg struct {
37
        // SourceChan identifies the channel that this message is being sent
38
        // from. This is the channel's short channel ID.
39
        SourceChan lnwire.ShortChannelID
40

41
        // HtlcIndex is the index of the contract within the original
42
        // commitment trace.
43
        HtlcIndex uint64
44

45
        // Failure will be non-nil if the incoming contract should be canceled
46
        // all together. This can happen if the outgoing contract was dust, if
47
        // if the outgoing HTLC timed out.
48
        Failure lnwire.FailureMessage
49

50
        // PreImage will be non-nil if the incoming contract can successfully
51
        // be redeemed. This can happen if we learn of the preimage from the
52
        // outgoing HTLC on-chain.
53
        PreImage *[32]byte
54
}
55

56
// ChainArbitratorConfig is a configuration struct that contains all the
57
// function closures and interface that required to arbitrate on-chain
58
// contracts for a particular chain.
59
type ChainArbitratorConfig struct {
60
        // ChainHash is the chain that this arbitrator is to operate within.
61
        ChainHash chainhash.Hash
62

63
        // IncomingBroadcastDelta is the delta that we'll use to decide when to
64
        // broadcast our commitment transaction if we have incoming htlcs. This
65
        // value should be set based on our current fee estimation of the
66
        // commitment transaction. We use this to determine when we should
67
        // broadcast instead of just the HTLC timeout, as we want to ensure
68
        // that the commitment transaction is already confirmed, by the time the
69
        // HTLC expires. Otherwise we may end up not settling the htlc on-chain
70
        // because the other party managed to time it out.
71
        IncomingBroadcastDelta uint32
72

73
        // OutgoingBroadcastDelta is the delta that we'll use to decide when to
74
        // broadcast our commitment transaction if there are active outgoing
75
        // htlcs. This value can be lower than the incoming broadcast delta.
76
        OutgoingBroadcastDelta uint32
77

78
        // NewSweepAddr is a function that returns a new address under control
79
        // by the wallet. We'll use this to sweep any no-delay outputs as a
80
        // result of unilateral channel closes.
81
        //
82
        // NOTE: This SHOULD return a p2wkh script.
83
        NewSweepAddr func() ([]byte, error)
84

85
        // PublishTx reliably broadcasts a transaction to the network. Once
86
        // this function exits without an error, then they transaction MUST
87
        // continually be rebroadcast if needed.
88
        PublishTx func(*wire.MsgTx, string) error
89

90
        // DeliverResolutionMsg is a function that will append an outgoing
91
        // message to the "out box" for a ChannelLink. This is used to cancel
92
        // backwards any HTLC's that are either dust, we're timing out, or
93
        // settling on-chain to the incoming link.
94
        DeliverResolutionMsg func(...ResolutionMsg) error
95

96
        // MarkLinkInactive is a function closure that the ChainArbitrator will
97
        // use to mark that active HTLC's shouldn't be attempted to be routed
98
        // over a particular channel. This function will be called in that a
99
        // ChannelArbitrator decides that it needs to go to chain in order to
100
        // resolve contracts.
101
        //
102
        // TODO(roasbeef): rename, routing based
103
        MarkLinkInactive func(wire.OutPoint) error
104

105
        // ContractBreach is a function closure that the ChainArbitrator will
106
        // use to notify the BreachArbitrator about a contract breach. It should
107
        // only return a non-nil error when the BreachArbitrator has preserved
108
        // the necessary breach info for this channel point. Once the breach
109
        // resolution is persisted in the ChannelArbitrator, it will be safe
110
        // to mark the channel closed.
111
        ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
112

113
        // IsOurAddress is a function that returns true if the passed address
114
        // is known to the underlying wallet. Otherwise, false should be
115
        // returned.
116
        IsOurAddress func(btcutil.Address) bool
117

118
        // IncubateOutputs sends either an incoming HTLC, an outgoing HTLC, or
119
        // both to the utxo nursery. Once this function returns, the nursery
120
        // should have safely persisted the outputs to disk, and should start
121
        // the process of incubation. This is used when a resolver wishes to
122
        // pass off the output to the nursery as we're only waiting on an
123
        // absolute/relative item block.
124
        IncubateOutputs func(wire.OutPoint,
125
                fn.Option[lnwallet.OutgoingHtlcResolution],
126
                fn.Option[lnwallet.IncomingHtlcResolution],
127
                uint32, fn.Option[int32]) error
128

129
        // PreimageDB is a global store of all known pre-images. We'll use this
130
        // to decide if we should broadcast a commitment transaction to claim
131
        // an HTLC on-chain.
132
        PreimageDB WitnessBeacon
133

134
        // Notifier is an instance of a chain notifier we'll use to watch for
135
        // certain on-chain events.
136
        Notifier chainntnfs.ChainNotifier
137

138
        // Mempool is the a mempool watcher that allows us to watch for events
139
        // happened in mempool.
140
        Mempool chainntnfs.MempoolWatcher
141

142
        // Signer is a signer backed by the active lnd node. This should be
143
        // capable of producing a signature as specified by a valid
144
        // SignDescriptor.
145
        Signer input.Signer
146

147
        // FeeEstimator will be used to return fee estimates.
148
        FeeEstimator chainfee.Estimator
149

150
        // ChainIO allows us to query the state of the current main chain.
151
        ChainIO lnwallet.BlockChainIO
152

153
        // DisableChannel disables a channel, resulting in it not being able to
154
        // forward payments.
155
        DisableChannel func(wire.OutPoint) error
156

157
        // Sweeper allows resolvers to sweep their final outputs.
158
        Sweeper UtxoSweeper
159

160
        // Registry is the invoice database that is used by resolvers to lookup
161
        // preimages and settle invoices.
162
        Registry Registry
163

164
        // NotifyClosedChannel is a function closure that the ChainArbitrator
165
        // will use to notify the ChannelNotifier about a newly closed channel.
166
        NotifyClosedChannel func(wire.OutPoint)
167

168
        // NotifyFullyResolvedChannel is a function closure that the
169
        // ChainArbitrator will use to notify the ChannelNotifier about a newly
170
        // resolved channel. The main difference to NotifyClosedChannel is that
171
        // in case of a local force close the NotifyClosedChannel is called when
172
        // the published commitment transaction confirms while
173
        // NotifyFullyResolvedChannel is only called when the channel is fully
174
        // resolved (which includes sweeping any time locked funds).
175
        NotifyFullyResolvedChannel func(point wire.OutPoint)
176

177
        // OnionProcessor is used to decode onion payloads for on-chain
178
        // resolution.
179
        OnionProcessor OnionProcessor
180

181
        // PaymentsExpirationGracePeriod indicates a time window we let the
182
        // other node to cancel an outgoing htlc that our node has initiated and
183
        // has timed out.
184
        PaymentsExpirationGracePeriod time.Duration
185

186
        // IsForwardedHTLC checks for a given htlc, identified by channel id and
187
        // htlcIndex, if it is a forwarded one.
188
        IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
189

190
        // Clock is the clock implementation that ChannelArbitrator uses.
191
        // It is useful for testing.
192
        Clock clock.Clock
193

194
        // SubscribeBreachComplete is used by the breachResolver to register a
195
        // subscription that notifies when the breach resolution process is
196
        // complete.
197
        SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
198
                bool, error)
199

200
        // PutFinalHtlcOutcome stores the final outcome of an htlc in the
201
        // database.
202
        PutFinalHtlcOutcome func(chanId lnwire.ShortChannelID,
203
                htlcId uint64, settled bool) error
204

205
        // HtlcNotifier is an interface that htlc events are sent to.
206
        HtlcNotifier HtlcNotifier
207

208
        // Budget is the configured budget for the arbitrator.
209
        Budget BudgetConfig
210

211
        // QueryIncomingCircuit is used to find the outgoing HTLC's
212
        // corresponding incoming HTLC circuit. It queries the circuit map for
213
        // a given outgoing circuit key and returns the incoming circuit key.
214
        //
215
        // TODO(yy): this is a hacky way to get around the cycling import issue
216
        // as we cannot import `htlcswitch` here. A proper way is to define an
217
        // interface here that asks for method `LookupOpenCircuit`,
218
        // meanwhile, turn `PaymentCircuit` into an interface or bring it to a
219
        // lower package.
220
        QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey
221

222
        // AuxLeafStore is an optional store that can be used to store auxiliary
223
        // leaves for certain custom channel types.
224
        AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
225

226
        // AuxSigner is an optional signer that can be used to sign auxiliary
227
        // leaves for certain custom channel types.
228
        AuxSigner fn.Option[lnwallet.AuxSigner]
229

230
        // AuxResolver is an optional interface that can be used to modify the
231
        // way contracts are resolved.
232
        AuxResolver fn.Option[lnwallet.AuxContractResolver]
233

234
        // ChannelCloseConfs is an optional override for the number of
235
        // confirmations required for channel closes. When set, this overrides
236
        // the normal capacity-based scaling. This is only available in
237
        // dev/integration builds for testing purposes.
238
        ChannelCloseConfs fn.Option[uint32]
239
}
240

241
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
242
// active, and channel that are in the "pending close" state. Within the
243
// contractcourt package, the ChainArbitrator manages a set of active
244
// ContractArbitrators. Each ContractArbitrators is responsible for watching
245
// the chain for any activity that affects the state of the channel, and also
246
// for monitoring each contract in order to determine if any on-chain activity is
247
// required. Outside sub-systems interact with the ChainArbitrator in order to
248
// forcibly exit a contract, update the set of live signals for each contract,
249
// and to receive reports on the state of contract resolution.
250
type ChainArbitrator struct {
251
        started int32 // To be used atomically.
252
        stopped int32 // To be used atomically.
253

254
        // Embed the blockbeat consumer struct to get access to the method
255
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
256
        chainio.BeatConsumer
257

258
        sync.Mutex
259

260
        // activeChannels is a map of all the active contracts that are still
261
        // open, and not fully resolved.
262
        activeChannels map[wire.OutPoint]*ChannelArbitrator
263

264
        // activeWatchers is a map of all the active chainWatchers for channels
265
        // that are still considered open.
266
        activeWatchers map[wire.OutPoint]*chainWatcher
267

268
        // cfg is the config struct for the arbitrator that contains all
269
        // methods and interface it needs to operate.
270
        cfg ChainArbitratorConfig
271

272
        // chanSource will be used by the ChainArbitrator to fetch all the
273
        // active channels that it must still watch over.
274
        chanSource *channeldb.DB
275

276
        // beat is the current best known blockbeat.
277
        beat chainio.Blockbeat
278

279
        // resolvedChan is used to signal that the given channel outpoint has
280
        // been resolved onchain. Once received, chain arbitrator will perform
281
        // cleanups.
282
        resolvedChan chan wire.OutPoint
283

284
        quit chan struct{}
285

286
        wg sync.WaitGroup
287
}
288

289
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
290
// passed config struct, and backing persistent database.
291
func NewChainArbitrator(cfg ChainArbitratorConfig,
292
        db *channeldb.DB) *ChainArbitrator {
2✔
293

2✔
294
        c := &ChainArbitrator{
2✔
295
                cfg:            cfg,
2✔
296
                activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
2✔
297
                activeWatchers: make(map[wire.OutPoint]*chainWatcher),
2✔
298
                chanSource:     db,
2✔
299
                quit:           make(chan struct{}),
2✔
300
                resolvedChan:   make(chan wire.OutPoint),
2✔
301
        }
2✔
302

2✔
303
        // Mount the block consumer.
2✔
304
        c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())
2✔
305

2✔
306
        return c
2✔
307
}
2✔
308

309
// Compile-time check for the chainio.Consumer interface.
310
var _ chainio.Consumer = (*ChainArbitrator)(nil)
311

312
// arbChannel is a wrapper around an open channel that channel arbitrators
313
// interact with.
314
type arbChannel struct {
315
        // channel is the in-memory channel state.
316
        channel *channeldb.OpenChannel
317

318
        // c references the chain arbitrator and is used by arbChannel
319
        // internally.
320
        c *ChainArbitrator
321
}
322

323
// NewAnchorResolutions returns the anchor resolutions for currently valid
324
// commitment transactions.
325
//
326
// NOTE: Part of the ArbChannel interface.
327
func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
UNCOV
328
        error) {
×
UNCOV
329

×
UNCOV
330
        // Get a fresh copy of the database state to base the anchor resolutions
×
UNCOV
331
        // on. Unfortunately the channel instance that we have here isn't the
×
UNCOV
332
        // same instance that is used by the link.
×
UNCOV
333
        chanPoint := a.channel.FundingOutpoint
×
UNCOV
334

×
UNCOV
335
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
×
UNCOV
336
        if err != nil {
×
337
                return nil, err
×
338
        }
×
339

UNCOV
340
        var chanOpts []lnwallet.ChannelOpt
×
UNCOV
341
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
342
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
343
        })
×
UNCOV
344
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
345
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
346
        })
×
UNCOV
347
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
348
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
349
        })
×
350

UNCOV
351
        chanMachine, err := lnwallet.NewLightningChannel(
×
UNCOV
352
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
UNCOV
353
        )
×
UNCOV
354
        if err != nil {
×
355
                return nil, err
×
356
        }
×
357

UNCOV
358
        return chanMachine.NewAnchorResolutions()
×
359
}
360

361
// ForceCloseChan should force close the contract that this attendant is
362
// watching over. We'll use this when we decide that we need to go to chain. It
363
// should in addition tell the switch to remove the corresponding link, such
364
// that we won't accept any new updates.
365
//
366
// NOTE: Part of the ArbChannel interface.
UNCOV
367
func (a *arbChannel) ForceCloseChan() (*wire.MsgTx, error) {
×
UNCOV
368
        // First, we mark the channel as borked, this ensure
×
UNCOV
369
        // that no new state transitions can happen, and also
×
UNCOV
370
        // that the link won't be loaded into the switch.
×
UNCOV
371
        if err := a.channel.MarkBorked(); err != nil {
×
372
                return nil, err
×
373
        }
×
374

375
        // With the channel marked as borked, we'll now remove
376
        // the link from the switch if its there. If the link
377
        // is active, then this method will block until it
378
        // exits.
UNCOV
379
        chanPoint := a.channel.FundingOutpoint
×
UNCOV
380

×
UNCOV
381
        if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
×
382
                log.Errorf("unable to mark link inactive: %v", err)
×
383
        }
×
384

385
        // Now that we know the link can't mutate the channel
386
        // state, we'll read the channel from disk the target
387
        // channel according to its channel point.
UNCOV
388
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
×
UNCOV
389
        if err != nil {
×
390
                return nil, err
×
391
        }
×
392

UNCOV
393
        var chanOpts []lnwallet.ChannelOpt
×
UNCOV
394
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
395
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
396
        })
×
UNCOV
397
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
398
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
399
        })
×
UNCOV
400
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
401
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
402
        })
×
403

404
        // Finally, we'll force close the channel completing
405
        // the force close workflow.
UNCOV
406
        chanMachine, err := lnwallet.NewLightningChannel(
×
UNCOV
407
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
UNCOV
408
        )
×
UNCOV
409
        if err != nil {
×
410
                return nil, err
×
411
        }
×
412

UNCOV
413
        closeSummary, err := chanMachine.ForceClose(
×
UNCOV
414
                lnwallet.WithSkipContractResolutions(),
×
UNCOV
415
        )
×
UNCOV
416
        if err != nil {
×
417
                return nil, err
×
418
        }
×
419

UNCOV
420
        return closeSummary.CloseTx, nil
×
421
}
422

423
// newActiveChannelArbitrator creates a new instance of an active channel
424
// arbitrator given the state of the target channel.
425
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
426
        c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
11✔
427

11✔
428
        // TODO(roasbeef): fetch best height (or pass in) so can ensure block
11✔
429
        // epoch delivers all the notifications to
11✔
430

11✔
431
        chanPoint := channel.FundingOutpoint
11✔
432

11✔
433
        log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", chanPoint)
11✔
434

11✔
435
        // Next we'll create the matching configuration struct that contains
11✔
436
        // all interfaces and methods the arbitrator needs to do its job.
11✔
437
        arbCfg := ChannelArbitratorConfig{
11✔
438
                ChanPoint:   chanPoint,
11✔
439
                Channel:     c.getArbChannel(channel),
11✔
440
                ShortChanID: channel.ShortChanID(),
11✔
441

11✔
442
                MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
11✔
443
                MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
11✔
444
                        statuses ...channeldb.ChannelStatus) error {
11✔
UNCOV
445

×
UNCOV
446
                        err := channel.CloseChannel(summary, statuses...)
×
UNCOV
447
                        if err != nil {
×
448
                                return err
×
449
                        }
×
UNCOV
450
                        c.cfg.NotifyClosedChannel(summary.ChanPoint)
×
UNCOV
451
                        return nil
×
452
                },
453
                IsPendingClose:        false,
454
                ChainArbitratorConfig: c.cfg,
455
                ChainEvents:           chanEvents,
456
                PutResolverReport: func(tx kvdb.RwTx,
UNCOV
457
                        report *channeldb.ResolverReport) error {
×
UNCOV
458

×
UNCOV
459
                        return c.chanSource.PutResolverReport(
×
UNCOV
460
                                tx, c.cfg.ChainHash, &chanPoint, report,
×
UNCOV
461
                        )
×
UNCOV
462
                },
×
UNCOV
463
                FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
×
UNCOV
464
                        chanStateDB := c.chanSource.ChannelStateDB()
×
UNCOV
465
                        return chanStateDB.FetchHistoricalChannel(&chanPoint)
×
UNCOV
466
                },
×
467
                FindOutgoingHTLCDeadline: func(
UNCOV
468
                        htlc channeldb.HTLC) fn.Option[int32] {
×
UNCOV
469

×
UNCOV
470
                        return c.FindOutgoingHTLCDeadline(
×
UNCOV
471
                                channel.ShortChanID(), htlc,
×
UNCOV
472
                        )
×
UNCOV
473
                },
×
UNCOV
474
                NotifyChannelResolved: func() {
×
UNCOV
475
                        c.notifyChannelResolved(chanPoint)
×
UNCOV
476
                },
×
477
        }
478

479
        // The final component needed is an arbitrator log that the arbitrator
480
        // will use to keep track of its internal state using a backed
481
        // persistent log.
482
        //
483
        // TODO(roasbeef); abstraction leak...
484
        //  * rework: adaptor method to set log scope w/ factory func
485
        chanLog, err := newBoltArbitratorLog(
11✔
486
                c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
11✔
487
        )
11✔
488
        if err != nil {
11✔
489
                return nil, err
×
490
        }
×
491

492
        // Finally, we'll need to construct a series of htlc Sets based on all
493
        // currently known valid commitments.
494
        htlcSets := make(map[HtlcSetKey]htlcSet)
11✔
495
        htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
11✔
496
        htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
11✔
497

11✔
498
        pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
11✔
499
        if err != nil && err != channeldb.ErrNoPendingCommit {
11✔
500
                return nil, err
×
501
        }
×
502
        if pendingRemoteCommitment != nil {
11✔
503
                htlcSets[RemotePendingHtlcSet] = newHtlcSet(
×
504
                        pendingRemoteCommitment.Commitment.Htlcs,
×
505
                )
×
506
        }
×
507

508
        return NewChannelArbitrator(
11✔
509
                arbCfg, htlcSets, chanLog,
11✔
510
        ), nil
11✔
511
}
512

513
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
514
func (c *ChainArbitrator) getArbChannel(
515
        channel *channeldb.OpenChannel) *arbChannel {
11✔
516

11✔
517
        return &arbChannel{
11✔
518
                channel: channel,
11✔
519
                c:       c,
11✔
520
        }
11✔
521
}
11✔
522

523
// ResolveContract marks a contract as fully resolved within the database.
524
// This is only to be done once all contracts which were live on the channel
525
// before hitting the chain have been resolved.
526
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
2✔
527
        log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
2✔
528

2✔
529
        // First, we'll we'll mark the channel as fully closed from the PoV of
2✔
530
        // the channel source.
2✔
531
        err := c.chanSource.ChannelStateDB().MarkChanFullyClosed(&chanPoint)
2✔
532
        if err != nil {
2✔
533
                log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
×
534
                        "fully closed: %v", chanPoint, err)
×
535
                return err
×
536
        }
×
537

538
        // Now that the channel has been marked as fully closed, we'll stop
539
        // both the channel arbitrator and chain watcher for this channel if
540
        // they're still active.
541
        var arbLog ArbitratorLog
2✔
542
        c.Lock()
2✔
543
        chainArb := c.activeChannels[chanPoint]
2✔
544
        delete(c.activeChannels, chanPoint)
2✔
545

2✔
546
        chainWatcher := c.activeWatchers[chanPoint]
2✔
547
        delete(c.activeWatchers, chanPoint)
2✔
548
        c.Unlock()
2✔
549

2✔
550
        if chainArb != nil {
3✔
551
                arbLog = chainArb.log
1✔
552

1✔
553
                if err := chainArb.Stop(); err != nil {
1✔
554
                        log.Warnf("unable to stop ChannelArbitrator(%v): %v",
×
555
                                chanPoint, err)
×
556
                }
×
557
        }
558
        if chainWatcher != nil {
3✔
559
                if err := chainWatcher.Stop(); err != nil {
1✔
560
                        log.Warnf("unable to stop ChainWatcher(%v): %v",
×
561
                                chanPoint, err)
×
562
                }
×
563
        }
564

565
        // Once this has been marked as resolved, we'll wipe the log that the
566
        // channel arbitrator was using to store its persistent state. We do
567
        // this after marking the channel resolved, as otherwise, the
568
        // arbitrator would be re-created, and think it was starting from the
569
        // default state.
570
        if arbLog != nil {
3✔
571
                if err := arbLog.WipeHistory(); err != nil {
1✔
572
                        return err
×
573
                }
×
574
        }
575

576
        return nil
2✔
577
}
578

579
// Start launches all goroutines that the ChainArbitrator needs to operate.
580
func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
2✔
581
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
2✔
582
                return nil
×
583
        }
×
584

585
        // Set the current beat.
586
        c.beat = beat
2✔
587

2✔
588
        // Start the goroutine which listens for signals to mark the channel as
2✔
589
        // resolved.
2✔
590
        //
2✔
591
        // NOTE: We must start this goroutine here we won't block the following
2✔
592
        // channel loading.
2✔
593
        c.wg.Add(1)
2✔
594
        go func() {
4✔
595
                defer c.wg.Done()
2✔
596
                c.resolveContracts()
2✔
597
        }()
2✔
598

599
        // First, we'll fetch all the channels that are still open, in order to
600
        // collect them within our set of active contracts.
601
        if err := c.loadOpenChannels(); err != nil {
2✔
602
                return err
×
603
        }
×
604

605
        // In addition to the channels that we know to be open, we'll also
606
        // launch arbitrators to finishing resolving any channels that are in
607
        // the pending close state.
608
        if err := c.loadPendingCloseChannels(); err != nil {
2✔
609
                return err
×
610
        }
×
611

612
        // Now, we'll start all chain watchers in parallel to shorten start up
613
        // duration. In neutrino mode, this allows spend registrations to take
614
        // advantage of batch spend reporting, instead of doing a single rescan
615
        // per chain watcher.
616
        //
617
        // NOTE: After this point, we Stop the chain arb to ensure that any
618
        // lingering goroutines are cleaned up before exiting.
619
        watcherErrs := make(chan error, len(c.activeWatchers))
2✔
620
        var wg sync.WaitGroup
2✔
621
        for _, watcher := range c.activeWatchers {
13✔
622
                wg.Add(1)
11✔
623
                go func(w *chainWatcher) {
22✔
624
                        defer wg.Done()
11✔
625
                        select {
11✔
626
                        case watcherErrs <- w.Start():
11✔
627
                        case <-c.quit:
×
628
                                watcherErrs <- ErrChainArbExiting
×
629
                        }
630
                }(watcher)
631
        }
632

633
        // Once all chain watchers have been started, seal the err chan to
634
        // signal the end of the err stream.
635
        go func() {
4✔
636
                wg.Wait()
2✔
637
                close(watcherErrs)
2✔
638
        }()
2✔
639

640
        // stopAndLog is a helper function which shuts down the chain arb and
641
        // logs errors if they occur.
642
        stopAndLog := func() {
2✔
643
                if err := c.Stop(); err != nil {
×
644
                        log.Errorf("ChainArbitrator could not shutdown: %v", err)
×
645
                }
×
646
        }
647

648
        // Handle all errors returned from spawning our chain watchers. If any
649
        // of them failed, we will stop the chain arb to shutdown any active
650
        // goroutines.
651
        for err := range watcherErrs {
13✔
652
                if err != nil {
11✔
653
                        stopAndLog()
×
654
                        return err
×
655
                }
×
656
        }
657

658
        // Before we start all of our arbitrators, we do a preliminary state
659
        // lookup so that we can combine all of these lookups in a single db
660
        // transaction.
661
        var startStates map[wire.OutPoint]*chanArbStartState
2✔
662

2✔
663
        err := kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
4✔
664
                for _, arbitrator := range c.activeChannels {
13✔
665
                        startState, err := arbitrator.getStartState(tx)
11✔
666
                        if err != nil {
11✔
667
                                return err
×
668
                        }
×
669

670
                        startStates[arbitrator.cfg.ChanPoint] = startState
11✔
671
                }
672

673
                return nil
2✔
674
        }, func() {
2✔
675
                startStates = make(
2✔
676
                        map[wire.OutPoint]*chanArbStartState,
2✔
677
                        len(c.activeChannels),
2✔
678
                )
2✔
679
        })
2✔
680
        if err != nil {
2✔
681
                stopAndLog()
×
682
                return err
×
683
        }
×
684

685
        // Launch all the goroutines for each arbitrator so they can carry out
686
        // their duties.
687
        for _, arbitrator := range c.activeChannels {
13✔
688
                startState, ok := startStates[arbitrator.cfg.ChanPoint]
11✔
689
                if !ok {
11✔
690
                        stopAndLog()
×
691
                        return fmt.Errorf("arbitrator: %v has no start state",
×
692
                                arbitrator.cfg.ChanPoint)
×
693
                }
×
694

695
                if err := arbitrator.Start(startState, c.beat); err != nil {
11✔
696
                        stopAndLog()
×
697
                        return err
×
698
                }
×
699
        }
700

701
        // Start our goroutine which will dispatch blocks to each arbitrator.
702
        c.wg.Add(1)
2✔
703
        go func() {
4✔
704
                defer c.wg.Done()
2✔
705
                c.dispatchBlocks()
2✔
706
        }()
2✔
707

708
        log.Infof("ChainArbitrator starting at height %d with %d chain "+
2✔
709
                "watchers, %d channel arbitrators, and budget config=[%v]",
2✔
710
                c.beat.Height(), len(c.activeWatchers), len(c.activeChannels),
2✔
711
                &c.cfg.Budget)
2✔
712

2✔
713
        // TODO(roasbeef): eventually move all breach watching here
2✔
714

2✔
715
        return nil
2✔
716
}
717

718
// resolveContracts listens to the `resolvedChan` to mark a given channel as
719
// fully resolved.
720
func (c *ChainArbitrator) resolveContracts() {
2✔
721
        for {
4✔
722
                select {
2✔
723
                // The channel arbitrator signals that a given channel has been
724
                // resolved, we now update chain arbitrator's internal state for
725
                // this channel.
UNCOV
726
                case cp := <-c.resolvedChan:
×
UNCOV
727
                        if c.cfg.NotifyFullyResolvedChannel != nil {
×
UNCOV
728
                                c.cfg.NotifyFullyResolvedChannel(cp)
×
UNCOV
729
                        }
×
730

UNCOV
731
                        err := c.ResolveContract(cp)
×
UNCOV
732
                        if err != nil {
×
733
                                log.Errorf("Failed to resolve contract for "+
×
734
                                        "channel %v", cp)
×
735
                        }
×
736

737
                // Exit if the chain arbitrator is shutting down.
738
                case <-c.quit:
2✔
739
                        return
2✔
740
                }
741
        }
742
}
743

744
// dispatchBlocks consumes a block epoch notification stream and dispatches
745
// blocks to each of the chain arb's active channel arbitrators. This function
746
// must be run in a goroutine.
747
func (c *ChainArbitrator) dispatchBlocks() {
2✔
748
        // Consume block epochs until we receive the instruction to shutdown.
2✔
749
        for {
4✔
750
                select {
2✔
751
                // Consume block epochs, exiting if our subscription is
752
                // terminated.
UNCOV
753
                case beat := <-c.BlockbeatChan:
×
UNCOV
754
                        // Set the current blockbeat.
×
UNCOV
755
                        c.beat = beat
×
UNCOV
756

×
UNCOV
757
                        // Send this blockbeat to all the active channels and
×
UNCOV
758
                        // wait for them to finish processing it.
×
UNCOV
759
                        c.handleBlockbeat(beat)
×
760

761
                // Exit if the chain arbitrator is shutting down.
762
                case <-c.quit:
2✔
763
                        return
2✔
764
                }
765
        }
766
}
767

768
// handleBlockbeat sends the blockbeat to all active channel arbitrator in
769
// parallel and wait for them to finish processing it.
UNCOV
770
func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
×
UNCOV
771
        // Read the active channels in a lock.
×
UNCOV
772
        c.Lock()
×
UNCOV
773

×
UNCOV
774
        // Create a slice to record active channel arbitrator.
×
UNCOV
775
        channels := make([]chainio.Consumer, 0, len(c.activeChannels))
×
UNCOV
776
        watchers := make([]chainio.Consumer, 0, len(c.activeWatchers))
×
UNCOV
777

×
UNCOV
778
        // Copy the active channels to the slice.
×
UNCOV
779
        for _, channel := range c.activeChannels {
×
UNCOV
780
                channels = append(channels, channel)
×
UNCOV
781
        }
×
782

UNCOV
783
        for _, watcher := range c.activeWatchers {
×
UNCOV
784
                watchers = append(watchers, watcher)
×
UNCOV
785
        }
×
786

UNCOV
787
        c.Unlock()
×
UNCOV
788

×
UNCOV
789
        // Iterate all the copied watchers and send the blockbeat to them.
×
UNCOV
790
        err := chainio.DispatchConcurrent(beat, watchers)
×
UNCOV
791
        if err != nil {
×
792
                log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
×
793
        }
×
794

795
        // Iterate all the copied channels and send the blockbeat to them.
796
        //
797
        // NOTE: This method will timeout if the processing of blocks of the
798
        // subsystems is too long (60s).
UNCOV
799
        err = chainio.DispatchConcurrent(beat, channels)
×
UNCOV
800
        if err != nil {
×
801
                log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
×
802
                        err)
×
803
        }
×
804

805
        // Notify the chain arbitrator has processed the block.
UNCOV
806
        c.NotifyBlockProcessed(beat, err)
×
807
}
808

809
// notifyChannelResolved is used by the channel arbitrator to signal that a
810
// given channel has been resolved.
UNCOV
811
func (c *ChainArbitrator) notifyChannelResolved(cp wire.OutPoint) {
×
UNCOV
812
        select {
×
UNCOV
813
        case c.resolvedChan <- cp:
×
814
        case <-c.quit:
×
815
                return
×
816
        }
817
}
818

819
// republishClosingTxs will load any stored cooperative or unilateral closing
820
// transactions and republish them. This helps ensure propagation of the
821
// transactions in the event that prior publications failed.
822
func (c *ChainArbitrator) republishClosingTxs(
823
        channel *channeldb.OpenChannel) error {
11✔
824

11✔
825
        // If the channel has had its unilateral close broadcasted already,
11✔
826
        // republish it in case it didn't propagate.
11✔
827
        if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
16✔
828
                err := c.rebroadcast(
5✔
829
                        channel, channeldb.ChanStatusCommitBroadcasted,
5✔
830
                )
5✔
831
                if err != nil {
5✔
832
                        return err
×
833
                }
×
834
        }
835

836
        // If the channel has had its cooperative close broadcasted
837
        // already, republish it in case it didn't propagate.
838
        if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
16✔
839
                err := c.rebroadcast(
5✔
840
                        channel, channeldb.ChanStatusCoopBroadcasted,
5✔
841
                )
5✔
842
                if err != nil {
5✔
843
                        return err
×
844
                }
×
845
        }
846

847
        return nil
11✔
848
}
849

850
// rebroadcast is a helper method which will republish the unilateral or
851
// cooperative close transaction or a channel in a particular state.
852
//
853
// NOTE: There is no risk to calling this method if the channel isn't in either
854
// CommitmentBroadcasted or CoopBroadcasted, but the logs will be misleading.
855
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
856
        state channeldb.ChannelStatus) error {
10✔
857

10✔
858
        chanPoint := channel.FundingOutpoint
10✔
859

10✔
860
        var (
10✔
861
                closeTx *wire.MsgTx
10✔
862
                kind    string
10✔
863
                err     error
10✔
864
        )
10✔
865
        switch state {
10✔
866
        case channeldb.ChanStatusCommitBroadcasted:
5✔
867
                kind = "force"
5✔
868
                closeTx, err = channel.BroadcastedCommitment()
5✔
869

870
        case channeldb.ChanStatusCoopBroadcasted:
5✔
871
                kind = "coop"
5✔
872
                closeTx, err = channel.BroadcastedCooperative()
5✔
873

874
        default:
×
875
                return fmt.Errorf("unknown closing state: %v", state)
×
876
        }
877

878
        switch {
10✔
879
        // This can happen for channels that had their closing tx published
880
        // before we started storing it to disk.
881
        case err == channeldb.ErrNoCloseTx:
×
882
                log.Warnf("Channel %v is in state %v, but no %s closing tx "+
×
883
                        "to re-publish...", chanPoint, state, kind)
×
884
                return nil
×
885

886
        case err != nil:
×
887
                return err
×
888
        }
889

890
        log.Infof("Re-publishing %s close tx(%v) for channel %v",
10✔
891
                kind, closeTx.TxHash(), chanPoint)
10✔
892

10✔
893
        label := labels.MakeLabel(
10✔
894
                labels.LabelTypeChannelClose, &channel.ShortChannelID,
10✔
895
        )
10✔
896
        err = c.cfg.PublishTx(closeTx, label)
10✔
897
        if err != nil && err != lnwallet.ErrDoubleSpend {
10✔
898
                log.Warnf("Unable to broadcast %s close tx(%v): %v",
×
899
                        kind, closeTx.TxHash(), err)
×
900
        }
×
901

902
        return nil
10✔
903
}
904

905
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
906
// channel arbitrators will be signalled to exit, and this method will block
907
// until they've all exited.
908
func (c *ChainArbitrator) Stop() error {
2✔
909
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
2✔
910
                return nil
×
911
        }
×
912

913
        log.Info("ChainArbitrator shutting down...")
2✔
914
        defer log.Debug("ChainArbitrator shutdown complete")
2✔
915

2✔
916
        close(c.quit)
2✔
917

2✔
918
        var (
2✔
919
                activeWatchers = make(map[wire.OutPoint]*chainWatcher)
2✔
920
                activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
2✔
921
        )
2✔
922

2✔
923
        // Copy the current set of active watchers and arbitrators to shutdown.
2✔
924
        // We don't want to hold the lock when shutting down each watcher or
2✔
925
        // arbitrator individually, as they may need to acquire this mutex.
2✔
926
        c.Lock()
2✔
927
        for chanPoint, watcher := range c.activeWatchers {
12✔
928
                activeWatchers[chanPoint] = watcher
10✔
929
        }
10✔
930
        for chanPoint, arbitrator := range c.activeChannels {
12✔
931
                activeChannels[chanPoint] = arbitrator
10✔
932
        }
10✔
933
        c.Unlock()
2✔
934

2✔
935
        for chanPoint, watcher := range activeWatchers {
12✔
936
                log.Tracef("Attempting to stop ChainWatcher(%v)",
10✔
937
                        chanPoint)
10✔
938

10✔
939
                if err := watcher.Stop(); err != nil {
10✔
940
                        log.Errorf("unable to stop watcher for "+
×
941
                                "ChannelPoint(%v): %v", chanPoint, err)
×
942
                }
×
943
        }
944
        for chanPoint, arbitrator := range activeChannels {
12✔
945
                log.Tracef("Attempting to stop ChannelArbitrator(%v)",
10✔
946
                        chanPoint)
10✔
947

10✔
948
                if err := arbitrator.Stop(); err != nil {
10✔
949
                        log.Errorf("unable to stop arbitrator for "+
×
950
                                "ChannelPoint(%v): %v", chanPoint, err)
×
951
                }
×
952
        }
953

954
        c.wg.Wait()
2✔
955

2✔
956
        return nil
2✔
957
}
958

959
// ContractUpdate is a message packages the latest set of active HTLCs on a
960
// commitment, and also identifies which commitment received a new set of
961
// HTLCs.
962
type ContractUpdate struct {
963
        // HtlcKey identifies which commitment the HTLCs below are present on.
964
        HtlcKey HtlcSetKey
965

966
        // Htlcs are the of active HTLCs on the commitment identified by the
967
        // above HtlcKey.
968
        Htlcs []channeldb.HTLC
969
}
970

971
// ContractSignals is used by outside subsystems to notify a channel arbitrator
972
// of its ShortChannelID.
973
type ContractSignals struct {
974
        // ShortChanID is the up to date short channel ID for a contract. This
975
        // can change either if when the contract was added it didn't yet have
976
        // a stable identifier, or in the case of a reorg.
977
        ShortChanID lnwire.ShortChannelID
978
}
979

980
// UpdateContractSignals sends a set of active, up to date contract signals to
981
// the ChannelArbitrator which is has been assigned to the channel infield by
982
// the passed channel point.
983
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
UNCOV
984
        signals *ContractSignals) error {
×
UNCOV
985

×
UNCOV
986
        log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
×
UNCOV
987
                chanPoint)
×
UNCOV
988

×
UNCOV
989
        c.Lock()
×
UNCOV
990
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
991
        c.Unlock()
×
UNCOV
992
        if !ok {
×
993
                return fmt.Errorf("unable to find arbitrator")
×
994
        }
×
995

UNCOV
996
        arbitrator.UpdateContractSignals(signals)
×
UNCOV
997

×
UNCOV
998
        return nil
×
999
}
1000

1001
// NotifyContractUpdate lets a channel arbitrator know that a new
1002
// ContractUpdate is available. This calls the ChannelArbitrator's internal
1003
// method NotifyContractUpdate which waits for a response on a done chan before
1004
// returning. This method will return an error if the ChannelArbitrator is not
1005
// in the activeChannels map. However, this only happens if the arbitrator is
1006
// resolved and the related link would already be shut down.
1007
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
UNCOV
1008
        update *ContractUpdate) error {
×
UNCOV
1009

×
UNCOV
1010
        c.Lock()
×
UNCOV
1011
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
1012
        c.Unlock()
×
UNCOV
1013
        if !ok {
×
1014
                return fmt.Errorf("can't find arbitrator for %v", chanPoint)
×
1015
        }
×
1016

UNCOV
1017
        arbitrator.notifyContractUpdate(update)
×
UNCOV
1018
        return nil
×
1019
}
1020

1021
// GetChannelArbitrator safely returns the channel arbitrator for a given
1022
// channel outpoint.
1023
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
UNCOV
1024
        *ChannelArbitrator, error) {
×
UNCOV
1025

×
UNCOV
1026
        c.Lock()
×
UNCOV
1027
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
1028
        c.Unlock()
×
UNCOV
1029
        if !ok {
×
1030
                return nil, fmt.Errorf("unable to find arbitrator")
×
1031
        }
×
1032

UNCOV
1033
        return arbitrator, nil
×
1034
}
1035

1036
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
1037
// that watches a particular channel to broadcast the commitment transaction,
1038
// and enter the resolution phase of the channel.
1039
type forceCloseReq struct {
1040
        // errResp is a channel that will be sent upon either in the case of
1041
        // force close success (nil error), or in the case on an error.
1042
        //
1043
        // NOTE; This channel MUST be buffered.
1044
        errResp chan error
1045

1046
        // closeTx is a channel that carries the transaction which ultimately
1047
        // closed out the channel.
1048
        closeTx chan *wire.MsgTx
1049
}
1050

1051
// ForceCloseContract attempts to force close the channel infield by the passed
1052
// channel point. A force close will immediately terminate the contract,
1053
// causing it to enter the resolution phase. If the force close was successful,
1054
// then the force close transaction itself will be returned.
1055
//
1056
// TODO(roasbeef): just return the summary itself?
UNCOV
1057
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
×
UNCOV
1058
        c.Lock()
×
UNCOV
1059
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
1060
        c.Unlock()
×
UNCOV
1061
        if !ok {
×
1062
                return nil, fmt.Errorf("unable to find arbitrator")
×
1063
        }
×
1064

UNCOV
1065
        log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
×
UNCOV
1066

×
UNCOV
1067
        // Before closing, we'll attempt to send a disable update for the
×
UNCOV
1068
        // channel. We do so before closing the channel as otherwise the current
×
UNCOV
1069
        // edge policy won't be retrievable from the graph.
×
UNCOV
1070
        if err := c.cfg.DisableChannel(chanPoint); err != nil {
×
1071
                log.Warnf("Unable to disable channel %v on "+
×
1072
                        "close: %v", chanPoint, err)
×
1073
        }
×
1074

UNCOV
1075
        errChan := make(chan error, 1)
×
UNCOV
1076
        respChan := make(chan *wire.MsgTx, 1)
×
UNCOV
1077

×
UNCOV
1078
        // With the channel found, and the request crafted, we'll send over a
×
UNCOV
1079
        // force close request to the arbitrator that watches this channel.
×
UNCOV
1080
        select {
×
1081
        case arbitrator.forceCloseReqs <- &forceCloseReq{
1082
                errResp: errChan,
1083
                closeTx: respChan,
UNCOV
1084
        }:
×
1085
        case <-c.quit:
×
1086
                return nil, ErrChainArbExiting
×
1087
        }
1088

1089
        // We'll await two responses: the error response, and the transaction
1090
        // that closed out the channel.
UNCOV
1091
        select {
×
UNCOV
1092
        case err := <-errChan:
×
UNCOV
1093
                if err != nil {
×
UNCOV
1094
                        return nil, err
×
UNCOV
1095
                }
×
1096
        case <-c.quit:
×
1097
                return nil, ErrChainArbExiting
×
1098
        }
1099

UNCOV
1100
        var closeTx *wire.MsgTx
×
UNCOV
1101
        select {
×
UNCOV
1102
        case closeTx = <-respChan:
×
1103
        case <-c.quit:
×
1104
                return nil, ErrChainArbExiting
×
1105
        }
1106

UNCOV
1107
        return closeTx, nil
×
1108
}
1109

1110
// WatchNewChannel sends the ChainArbitrator a message to create a
1111
// ChannelArbitrator tasked with watching over a new channel. Once a new
1112
// channel has finished its final funding flow, it should be registered with
1113
// the ChainArbitrator so we can properly react to any on-chain events.
UNCOV
1114
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
×
UNCOV
1115
        c.Lock()
×
UNCOV
1116
        defer c.Unlock()
×
UNCOV
1117

×
UNCOV
1118
        chanPoint := newChan.FundingOutpoint
×
UNCOV
1119

×
UNCOV
1120
        log.Infof("Creating new chainWatcher and ChannelArbitrator for "+
×
UNCOV
1121
                "ChannelPoint(%v)", chanPoint)
×
UNCOV
1122

×
UNCOV
1123
        // If we're already watching this channel, then we'll ignore this
×
UNCOV
1124
        // request.
×
UNCOV
1125
        if _, ok := c.activeChannels[chanPoint]; ok {
×
1126
                return nil
×
1127
        }
×
1128

1129
        // First, also create an active chainWatcher for this channel to ensure
1130
        // that we detect any relevant on chain events.
UNCOV
1131
        chainWatcher, err := newChainWatcher(
×
UNCOV
1132
                chainWatcherConfig{
×
UNCOV
1133
                        chanState: newChan,
×
UNCOV
1134
                        notifier:  c.cfg.Notifier,
×
UNCOV
1135
                        signer:    c.cfg.Signer,
×
UNCOV
1136
                        isOurAddr: c.cfg.IsOurAddress,
×
UNCOV
1137
                        contractBreach: func(
×
UNCOV
1138
                                retInfo *lnwallet.BreachRetribution) error {
×
UNCOV
1139

×
UNCOV
1140
                                return c.cfg.ContractBreach(
×
UNCOV
1141
                                        chanPoint, retInfo,
×
UNCOV
1142
                                )
×
UNCOV
1143
                        },
×
1144
                        extractStateNumHint: lnwallet.GetStateNumHint,
1145
                        auxLeafStore:        c.cfg.AuxLeafStore,
1146
                        auxResolver:         c.cfg.AuxResolver,
1147
                        chanCloseConfs:      c.cfg.ChannelCloseConfs,
1148
                },
1149
        )
UNCOV
1150
        if err != nil {
×
1151
                return err
×
1152
        }
×
1153

UNCOV
1154
        c.activeWatchers[chanPoint] = chainWatcher
×
UNCOV
1155

×
UNCOV
1156
        // We'll also create a new channel arbitrator instance using this new
×
UNCOV
1157
        // channel, and our internal state.
×
UNCOV
1158
        channelArb, err := newActiveChannelArbitrator(
×
UNCOV
1159
                newChan, c, chainWatcher.SubscribeChannelEvents(),
×
UNCOV
1160
        )
×
UNCOV
1161
        if err != nil {
×
1162
                return err
×
1163
        }
×
1164

1165
        // With the arbitrator created, we'll add it to our set of active
1166
        // arbitrators, then launch it.
UNCOV
1167
        c.activeChannels[chanPoint] = channelArb
×
UNCOV
1168

×
UNCOV
1169
        if err := channelArb.Start(nil, c.beat); err != nil {
×
1170
                return err
×
1171
        }
×
1172

UNCOV
1173
        return chainWatcher.Start()
×
1174
}
1175

1176
// SubscribeChannelEvents returns a new active subscription for the set of
1177
// possible on-chain events for a particular channel. The struct can be used by
1178
// callers to be notified whenever an event that changes the state of the
1179
// channel on-chain occurs.
1180
func (c *ChainArbitrator) SubscribeChannelEvents(
UNCOV
1181
        chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
×
UNCOV
1182

×
UNCOV
1183
        // First, we'll attempt to look up the active watcher for this channel.
×
UNCOV
1184
        // If we can't find it, then we'll return an error back to the caller.
×
UNCOV
1185
        c.Lock()
×
UNCOV
1186
        watcher, ok := c.activeWatchers[chanPoint]
×
UNCOV
1187
        c.Unlock()
×
UNCOV
1188

×
UNCOV
1189
        if !ok {
×
1190
                return nil, fmt.Errorf("unable to find watcher for: %v",
×
1191
                        chanPoint)
×
1192
        }
×
1193

1194
        // With the watcher located, we'll request for it to create a new chain
1195
        // event subscription client.
UNCOV
1196
        return watcher.SubscribeChannelEvents(), nil
×
1197
}
1198

1199
// FindOutgoingHTLCDeadline returns the deadline in absolute block height for
1200
// the specified outgoing HTLC. For an outgoing HTLC, its deadline is defined
1201
// by the timeout height of its corresponding incoming HTLC - this is the
1202
// expiry height the that remote peer can spend his/her outgoing HTLC via the
1203
// timeout path.
1204
func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
UNCOV
1205
        outgoingHTLC channeldb.HTLC) fn.Option[int32] {
×
UNCOV
1206

×
UNCOV
1207
        // Find the outgoing HTLC's corresponding incoming HTLC in the circuit
×
UNCOV
1208
        // map.
×
UNCOV
1209
        rHash := outgoingHTLC.RHash
×
UNCOV
1210
        circuit := models.CircuitKey{
×
UNCOV
1211
                ChanID: scid,
×
UNCOV
1212
                HtlcID: outgoingHTLC.HtlcIndex,
×
UNCOV
1213
        }
×
UNCOV
1214
        incomingCircuit := c.cfg.QueryIncomingCircuit(circuit)
×
UNCOV
1215

×
UNCOV
1216
        // If there's no incoming circuit found, we will use the default
×
UNCOV
1217
        // deadline.
×
UNCOV
1218
        if incomingCircuit == nil {
×
UNCOV
1219
                log.Warnf("ChannelArbitrator(%v): incoming circuit key not "+
×
UNCOV
1220
                        "found for rHash=%x, using default deadline instead",
×
UNCOV
1221
                        scid, rHash)
×
UNCOV
1222

×
UNCOV
1223
                return fn.None[int32]()
×
UNCOV
1224
        }
×
1225

1226
        // If this is a locally initiated HTLC, it means we are the first hop.
1227
        // In this case, we can relax the deadline.
UNCOV
1228
        if incomingCircuit.ChanID.IsDefault() {
×
UNCOV
1229
                log.Infof("ChannelArbitrator(%v): using default deadline for "+
×
UNCOV
1230
                        "locally initiated HTLC for rHash=%x", scid, rHash)
×
UNCOV
1231

×
UNCOV
1232
                return fn.None[int32]()
×
UNCOV
1233
        }
×
1234

UNCOV
1235
        log.Debugf("Found incoming circuit %v for rHash=%x using outgoing "+
×
UNCOV
1236
                "circuit %v", incomingCircuit, rHash, circuit)
×
UNCOV
1237

×
UNCOV
1238
        c.Lock()
×
UNCOV
1239
        defer c.Unlock()
×
UNCOV
1240

×
UNCOV
1241
        // Iterate over all active channels to find the incoming HTLC specified
×
UNCOV
1242
        // by its circuit key.
×
UNCOV
1243
        for cp, channelArb := range c.activeChannels {
×
UNCOV
1244
                // Skip if the SCID doesn't match.
×
UNCOV
1245
                if channelArb.cfg.ShortChanID != incomingCircuit.ChanID {
×
UNCOV
1246
                        continue
×
1247
                }
1248

1249
                // Make sure the channel arbitrator has the latest view of its
1250
                // active HTLCs.
UNCOV
1251
                channelArb.updateActiveHTLCs()
×
UNCOV
1252

×
UNCOV
1253
                // Iterate all the known HTLCs to find the targeted incoming
×
UNCOV
1254
                // HTLC.
×
UNCOV
1255
                for _, htlcs := range channelArb.activeHTLCs {
×
UNCOV
1256
                        for _, htlc := range htlcs.incomingHTLCs {
×
UNCOV
1257
                                // Skip if the index doesn't match.
×
UNCOV
1258
                                if htlc.HtlcIndex != incomingCircuit.HtlcID {
×
UNCOV
1259
                                        continue
×
1260
                                }
1261

UNCOV
1262
                                log.Debugf("ChannelArbitrator(%v): found "+
×
UNCOV
1263
                                        "incoming HTLC in channel=%v using "+
×
UNCOV
1264
                                        "rHash=%x, refundTimeout=%v", scid,
×
UNCOV
1265
                                        cp, rHash, htlc.RefundTimeout)
×
UNCOV
1266

×
UNCOV
1267
                                return fn.Some(int32(htlc.RefundTimeout))
×
1268
                        }
1269
                }
1270
        }
1271

1272
        // If there's no incoming HTLC found, yet we have the incoming circuit,
1273
        // something is wrong - in this case, we return the none deadline.
UNCOV
1274
        log.Errorf("ChannelArbitrator(%v): incoming HTLC not found for "+
×
UNCOV
1275
                "rHash=%x, using default deadline instead", scid, rHash)
×
UNCOV
1276

×
UNCOV
1277
        return fn.None[int32]()
×
1278
}
1279

1280
// TODO(roasbeef): arbitration reports
1281
//  * types: contested, waiting for success conf, etc
1282

1283
// NOTE: part of the `chainio.Consumer` interface.
1284
func (c *ChainArbitrator) Name() string {
2✔
1285
        return "ChainArbitrator"
2✔
1286
}
2✔
1287

1288
// loadOpenChannels loads all channels that are currently open in the database
1289
// and registers them with the chainWatcher for future notification.
1290
func (c *ChainArbitrator) loadOpenChannels() error {
2✔
1291
        openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
2✔
1292
        if err != nil {
2✔
1293
                return err
×
1294
        }
×
1295

1296
        if len(openChannels) == 0 {
2✔
UNCOV
1297
                return nil
×
UNCOV
1298
        }
×
1299

1300
        log.Infof("Creating ChannelArbitrators for %v active channels",
2✔
1301
                len(openChannels))
2✔
1302

2✔
1303
        // For each open channel, we'll configure then launch a corresponding
2✔
1304
        // ChannelArbitrator.
2✔
1305
        for _, channel := range openChannels {
13✔
1306
                chanPoint := channel.FundingOutpoint
11✔
1307
                channel := channel
11✔
1308

11✔
1309
                // First, we'll create an active chainWatcher for this channel
11✔
1310
                // to ensure that we detect any relevant on chain events.
11✔
1311
                breachClosure := func(ret *lnwallet.BreachRetribution) error {
11✔
UNCOV
1312
                        return c.cfg.ContractBreach(chanPoint, ret)
×
UNCOV
1313
                }
×
1314

1315
                chainWatcher, err := newChainWatcher(
11✔
1316
                        chainWatcherConfig{
11✔
1317
                                chanState:           channel,
11✔
1318
                                notifier:            c.cfg.Notifier,
11✔
1319
                                signer:              c.cfg.Signer,
11✔
1320
                                isOurAddr:           c.cfg.IsOurAddress,
11✔
1321
                                contractBreach:      breachClosure,
11✔
1322
                                extractStateNumHint: lnwallet.GetStateNumHint,
11✔
1323
                                auxLeafStore:        c.cfg.AuxLeafStore,
11✔
1324
                                auxResolver:         c.cfg.AuxResolver,
11✔
1325
                                chanCloseConfs:      c.cfg.ChannelCloseConfs,
11✔
1326
                        },
11✔
1327
                )
11✔
1328
                if err != nil {
11✔
1329
                        return err
×
1330
                }
×
1331

1332
                c.activeWatchers[chanPoint] = chainWatcher
11✔
1333
                channelArb, err := newActiveChannelArbitrator(
11✔
1334
                        channel, c, chainWatcher.SubscribeChannelEvents(),
11✔
1335
                )
11✔
1336
                if err != nil {
11✔
1337
                        return err
×
1338
                }
×
1339

1340
                c.activeChannels[chanPoint] = channelArb
11✔
1341

11✔
1342
                // Republish any closing transactions for this channel.
11✔
1343
                err = c.republishClosingTxs(channel)
11✔
1344
                if err != nil {
11✔
1345
                        log.Errorf("Failed to republish closing txs for "+
×
1346
                                "channel %v", chanPoint)
×
1347
                }
×
1348
        }
1349

1350
        return nil
2✔
1351
}
1352

1353
// loadPendingCloseChannels loads all channels that are currently pending
1354
// closure in the database and registers them with the ChannelArbitrator to
1355
// continue the resolution process.
1356
func (c *ChainArbitrator) loadPendingCloseChannels() error {
2✔
1357
        chanStateDB := c.chanSource.ChannelStateDB()
2✔
1358

2✔
1359
        closingChannels, err := chanStateDB.FetchClosedChannels(true)
2✔
1360
        if err != nil {
2✔
1361
                return err
×
1362
        }
×
1363

1364
        if len(closingChannels) == 0 {
4✔
1365
                return nil
2✔
1366
        }
2✔
1367

UNCOV
1368
        log.Infof("Creating ChannelArbitrators for %v closing channels",
×
UNCOV
1369
                len(closingChannels))
×
UNCOV
1370

×
UNCOV
1371
        // Next, for each channel is the closing state, we'll launch a
×
UNCOV
1372
        // corresponding more restricted resolver, as we don't have to watch
×
UNCOV
1373
        // the chain any longer, only resolve the contracts on the confirmed
×
UNCOV
1374
        // commitment.
×
UNCOV
1375
        //nolint:ll
×
UNCOV
1376
        for _, closeChanInfo := range closingChannels {
×
UNCOV
1377
                // We can leave off the CloseContract and ForceCloseChan
×
UNCOV
1378
                // methods as the channel is already closed at this point.
×
UNCOV
1379
                chanPoint := closeChanInfo.ChanPoint
×
UNCOV
1380
                arbCfg := ChannelArbitratorConfig{
×
UNCOV
1381
                        ChanPoint:             chanPoint,
×
UNCOV
1382
                        ShortChanID:           closeChanInfo.ShortChanID,
×
UNCOV
1383
                        ChainArbitratorConfig: c.cfg,
×
UNCOV
1384
                        ChainEvents:           &ChainEventSubscription{},
×
UNCOV
1385
                        IsPendingClose:        true,
×
UNCOV
1386
                        ClosingHeight:         closeChanInfo.CloseHeight,
×
UNCOV
1387
                        CloseType:             closeChanInfo.CloseType,
×
UNCOV
1388
                        PutResolverReport: func(tx kvdb.RwTx,
×
UNCOV
1389
                                report *channeldb.ResolverReport) error {
×
UNCOV
1390

×
UNCOV
1391
                                return c.chanSource.PutResolverReport(
×
UNCOV
1392
                                        tx, c.cfg.ChainHash, &chanPoint, report,
×
UNCOV
1393
                                )
×
UNCOV
1394
                        },
×
UNCOV
1395
                        FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
×
UNCOV
1396
                                return chanStateDB.FetchHistoricalChannel(&chanPoint)
×
UNCOV
1397
                        },
×
1398
                        FindOutgoingHTLCDeadline: func(
UNCOV
1399
                                htlc channeldb.HTLC) fn.Option[int32] {
×
UNCOV
1400

×
UNCOV
1401
                                return c.FindOutgoingHTLCDeadline(
×
UNCOV
1402
                                        closeChanInfo.ShortChanID, htlc,
×
UNCOV
1403
                                )
×
UNCOV
1404
                        },
×
UNCOV
1405
                        NotifyChannelResolved: func() {
×
UNCOV
1406
                                c.notifyChannelResolved(chanPoint)
×
UNCOV
1407
                        },
×
1408
                }
UNCOV
1409
                chanLog, err := newBoltArbitratorLog(
×
UNCOV
1410
                        c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
×
UNCOV
1411
                )
×
UNCOV
1412
                if err != nil {
×
1413
                        return err
×
1414
                }
×
1415

1416
                // We create an empty map of HTLC's here since it's possible
1417
                // that the channel is in StateDefault and updateActiveHTLCs is
1418
                // called. We want to avoid writing to an empty map. Since the
1419
                // channel is already in the process of being resolved, no new
1420
                // HTLCs will be added.
UNCOV
1421
                c.activeChannels[chanPoint] = NewChannelArbitrator(
×
UNCOV
1422
                        arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
×
UNCOV
1423
                )
×
1424
        }
1425

UNCOV
1426
        return nil
×
1427
}
1428

1429
// RedispatchBlockbeat resends the current blockbeat to the channels specified
1430
// by the chanPoints. It is used when a channel is added to the chain
1431
// arbitrator after it has been started, e.g., during the channel restore
1432
// process.
UNCOV
1433
func (c *ChainArbitrator) RedispatchBlockbeat(chanPoints []wire.OutPoint) {
×
UNCOV
1434
        // Get the current blockbeat.
×
UNCOV
1435
        beat := c.beat
×
UNCOV
1436

×
UNCOV
1437
        // Prepare two sets of consumers.
×
UNCOV
1438
        channels := make([]chainio.Consumer, 0, len(chanPoints))
×
UNCOV
1439
        watchers := make([]chainio.Consumer, 0, len(chanPoints))
×
UNCOV
1440

×
UNCOV
1441
        // Read the active channels in a lock.
×
UNCOV
1442
        c.Lock()
×
UNCOV
1443
        for _, op := range chanPoints {
×
UNCOV
1444
                if channel, ok := c.activeChannels[op]; ok {
×
UNCOV
1445
                        channels = append(channels, channel)
×
UNCOV
1446
                }
×
1447

UNCOV
1448
                if watcher, ok := c.activeWatchers[op]; ok {
×
UNCOV
1449
                        watchers = append(watchers, watcher)
×
UNCOV
1450
                }
×
1451
        }
UNCOV
1452
        c.Unlock()
×
UNCOV
1453

×
UNCOV
1454
        // Iterate all the copied watchers and send the blockbeat to them.
×
UNCOV
1455
        err := chainio.DispatchConcurrent(beat, watchers)
×
UNCOV
1456
        if err != nil {
×
1457
                log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
×
1458
        }
×
1459

1460
        // Iterate all the copied channels and send the blockbeat to them.
UNCOV
1461
        err = chainio.DispatchConcurrent(beat, channels)
×
UNCOV
1462
        if err != nil {
×
1463
                // Shutdown lnd if there's an error processing the block.
×
1464
                log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
×
1465
                        err)
×
1466
        }
×
1467
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc