• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.05
/contractcourt/channel_arbitrator.go
1
package contractcourt
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "math"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/txscript"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/lightningnetwork/lnd/chainio"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/fn/v2"
20
        "github.com/lightningnetwork/lnd/graph/db/models"
21
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
22
        "github.com/lightningnetwork/lnd/input"
23
        "github.com/lightningnetwork/lnd/invoices"
24
        "github.com/lightningnetwork/lnd/kvdb"
25
        "github.com/lightningnetwork/lnd/labels"
26
        "github.com/lightningnetwork/lnd/lntypes"
27
        "github.com/lightningnetwork/lnd/lnutils"
28
        "github.com/lightningnetwork/lnd/lnwallet"
29
        "github.com/lightningnetwork/lnd/lnwire"
30
        "github.com/lightningnetwork/lnd/sweep"
31
)
32

33
var (
34
        // errAlreadyForceClosed is an error returned when we attempt to force
35
        // close a channel that's already in the process of doing so.
36
        errAlreadyForceClosed = errors.New("channel is already in the " +
37
                "process of being force closed")
38
)
39

40
const (
41
        // arbitratorBlockBufferSize is the size of the buffer we give to each
42
        // channel arbitrator.
43
        arbitratorBlockBufferSize = 20
44

45
        // AnchorOutputValue is the output value for the anchor output of an
46
        // anchor channel.
47
        // See BOLT 03 for more details:
48
        // https://github.com/lightning/bolts/blob/master/03-transactions.md
49
        AnchorOutputValue = btcutil.Amount(330)
50
)
51

52
// WitnessSubscription represents an intent to be notified once new witnesses
53
// are discovered by various active contract resolvers. A contract resolver may
54
// use this to be notified of when it can satisfy an incoming contract after we
55
// discover the witness for an outgoing contract.
56
type WitnessSubscription struct {
57
        // WitnessUpdates is a channel that newly discovered witnesses will be
58
        // sent over.
59
        //
60
        // TODO(roasbeef): couple with WitnessType?
61
        WitnessUpdates <-chan lntypes.Preimage
62

63
        // CancelSubscription is a function closure that should be used by a
64
        // client to cancel the subscription once they are no longer interested
65
        // in receiving new updates.
66
        CancelSubscription func()
67
}
68

69
// WitnessBeacon is a global beacon of witnesses. Contract resolvers will use
70
// this interface to lookup witnesses (preimages typically) of contracts
71
// they're trying to resolve, add new preimages they resolve, and finally
72
// receive new updates each new time a preimage is discovered.
73
//
74
// TODO(roasbeef): need to delete the pre-images once we've used them
75
// and have been sufficiently confirmed?
76
type WitnessBeacon interface {
77
        // SubscribeUpdates returns a channel that will be sent upon *each* time
78
        // a new preimage is discovered.
79
        SubscribeUpdates(chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
80
                payload *hop.Payload,
81
                nextHopOnionBlob []byte) (*WitnessSubscription, error)
82

83
        // LookupPreimage attempts to lookup a preimage in the global cache.
84
        // True is returned for the second argument if the preimage is found.
85
        LookupPreimage(payhash lntypes.Hash) (lntypes.Preimage, bool)
86

87
        // AddPreimages adds a batch of newly discovered preimages to the global
88
        // cache, and also signals any subscribers of the newly discovered
89
        // witness.
90
        AddPreimages(preimages ...lntypes.Preimage) error
91
}
92

93
// ArbChannel is an abstraction that allows the channel arbitrator to interact
94
// with an open channel.
95
type ArbChannel interface {
96
        // ForceCloseChan should force close the contract that this attendant
97
        // is watching over. We'll use this when we decide that we need to go
98
        // to chain. It should in addition tell the switch to remove the
99
        // corresponding link, such that we won't accept any new updates. The
100
        // returned summary contains all items needed to eventually resolve all
101
        // outputs on chain.
102
        ForceCloseChan() (*wire.MsgTx, error)
103

104
        // NewAnchorResolutions returns the anchor resolutions for currently
105
        // valid commitment transactions.
106
        NewAnchorResolutions() (*lnwallet.AnchorResolutions, error)
107
}
108

109
// ChannelArbitratorConfig contains all the functionality that the
110
// ChannelArbitrator needs in order to properly arbitrate any contract dispute
111
// on chain.
112
type ChannelArbitratorConfig struct {
113
        // ChanPoint is the channel point that uniquely identifies this
114
        // channel.
115
        ChanPoint wire.OutPoint
116

117
        // Channel is the full channel data structure. For legacy channels, this
118
        // field may not always be set after a restart.
119
        Channel ArbChannel
120

121
        // ShortChanID describes the exact location of the channel within the
122
        // chain. We'll use this to address any messages that we need to send
123
        // to the switch during contract resolution.
124
        ShortChanID lnwire.ShortChannelID
125

126
        // ChainEvents is an active subscription to the chain watcher for this
127
        // channel to be notified of any on-chain activity related to this
128
        // channel.
129
        ChainEvents *ChainEventSubscription
130

131
        // MarkCommitmentBroadcasted should mark the channel as the commitment
132
        // being broadcast, and we are waiting for the commitment to confirm.
133
        MarkCommitmentBroadcasted func(*wire.MsgTx, lntypes.ChannelParty) error
134

135
        // MarkChannelClosed marks the channel closed in the database, with the
136
        // passed close summary. After this method successfully returns we can
137
        // no longer expect to receive chain events for this channel, and must
138
        // be able to recover from a failure without getting the close event
139
        // again. It takes an optional channel status which will update the
140
        // channel status in the record that we keep of historical channels.
141
        MarkChannelClosed func(*channeldb.ChannelCloseSummary,
142
                ...channeldb.ChannelStatus) error
143

144
        // IsPendingClose is a boolean indicating whether the channel is marked
145
        // as pending close in the database.
146
        IsPendingClose bool
147

148
        // ClosingHeight is the height at which the channel was closed. Note
149
        // that this value is only valid if IsPendingClose is true.
150
        ClosingHeight uint32
151

152
        // CloseType is the type of the close event in case IsPendingClose is
153
        // true. Otherwise this value is unset.
154
        CloseType channeldb.ClosureType
155

156
        // MarkChannelResolved is a function closure that serves to mark a
157
        // channel as "fully resolved". A channel itself can be considered
158
        // fully resolved once all active contracts have individually been
159
        // fully resolved.
160
        //
161
        // TODO(roasbeef): need RPC's to combine for pendingchannels RPC
162
        MarkChannelResolved func() error
163

164
        // PutResolverReport records a resolver report for the channel. If the
165
        // transaction provided is nil, the function should write the report
166
        // in a new transaction.
167
        PutResolverReport func(tx kvdb.RwTx,
168
                report *channeldb.ResolverReport) error
169

170
        // FetchHistoricalChannel retrieves the historical state of a channel.
171
        // This is mostly used to supplement the ContractResolvers with
172
        // additional information required for proper contract resolution.
173
        FetchHistoricalChannel func() (*channeldb.OpenChannel, error)
174

175
        // FindOutgoingHTLCDeadline returns the deadline in absolute block
176
        // height for the specified outgoing HTLC. For an outgoing HTLC, its
177
        // deadline is defined by the timeout height of its corresponding
178
        // incoming HTLC - this is the expiry height the that remote peer can
179
        // spend his/her outgoing HTLC via the timeout path.
180
        FindOutgoingHTLCDeadline func(htlc channeldb.HTLC) fn.Option[int32]
181

182
        ChainArbitratorConfig
183
}
184

185
// ReportOutputType describes the type of output that is being reported
186
// on.
187
type ReportOutputType uint8
188

189
const (
190
        // ReportOutputIncomingHtlc is an incoming hash time locked contract on
191
        // the commitment tx.
192
        ReportOutputIncomingHtlc ReportOutputType = iota
193

194
        // ReportOutputOutgoingHtlc is an outgoing hash time locked contract on
195
        // the commitment tx.
196
        ReportOutputOutgoingHtlc
197

198
        // ReportOutputUnencumbered is an uncontested output on the commitment
199
        // transaction paying to us directly.
200
        ReportOutputUnencumbered
201

202
        // ReportOutputAnchor is an anchor output on the commitment tx.
203
        ReportOutputAnchor
204
)
205

206
// ContractReport provides a summary of a commitment tx output.
207
type ContractReport struct {
208
        // Outpoint is the final output that will be swept back to the wallet.
209
        Outpoint wire.OutPoint
210

211
        // Type indicates the type of the reported output.
212
        Type ReportOutputType
213

214
        // Amount is the final value that will be swept in back to the wallet.
215
        Amount btcutil.Amount
216

217
        // MaturityHeight is the absolute block height that this output will
218
        // mature at.
219
        MaturityHeight uint32
220

221
        // Stage indicates whether the htlc is in the CLTV-timeout stage (1) or
222
        // the CSV-delay stage (2). A stage 1 htlc's maturity height will be set
223
        // to its expiry height, while a stage 2 htlc's maturity height will be
224
        // set to its confirmation height plus the maturity requirement.
225
        Stage uint32
226

227
        // LimboBalance is the total number of frozen coins within this
228
        // contract.
229
        LimboBalance btcutil.Amount
230

231
        // RecoveredBalance is the total value that has been successfully swept
232
        // back to the user's wallet.
233
        RecoveredBalance btcutil.Amount
234
}
235

236
// resolverReport creates a resolve report using some of the information in the
237
// contract report.
238
func (c *ContractReport) resolverReport(spendTx *chainhash.Hash,
239
        resolverType channeldb.ResolverType,
240
        outcome channeldb.ResolverOutcome) *channeldb.ResolverReport {
10✔
241

10✔
242
        return &channeldb.ResolverReport{
10✔
243
                OutPoint:        c.Outpoint,
10✔
244
                Amount:          c.Amount,
10✔
245
                ResolverType:    resolverType,
10✔
246
                ResolverOutcome: outcome,
10✔
247
                SpendTxID:       spendTx,
10✔
248
        }
10✔
249
}
10✔
250

251
// htlcSet represents the set of active HTLCs on a given commitment
252
// transaction.
253
type htlcSet struct {
254
        // incomingHTLCs is a map of all incoming HTLCs on the target
255
        // commitment transaction. We may potentially go onchain to claim the
256
        // funds sent to us within this set.
257
        incomingHTLCs map[uint64]channeldb.HTLC
258

259
        // outgoingHTLCs is a map of all outgoing HTLCs on the target
260
        // commitment transaction. We may potentially go onchain to reclaim the
261
        // funds that are currently in limbo.
262
        outgoingHTLCs map[uint64]channeldb.HTLC
263
}
264

265
// newHtlcSet constructs a new HTLC set from a slice of HTLC's.
266
func newHtlcSet(htlcs []channeldb.HTLC) htlcSet {
46✔
267
        outHTLCs := make(map[uint64]channeldb.HTLC)
46✔
268
        inHTLCs := make(map[uint64]channeldb.HTLC)
46✔
269
        for _, htlc := range htlcs {
77✔
270
                if htlc.Incoming {
37✔
271
                        inHTLCs[htlc.HtlcIndex] = htlc
6✔
272
                        continue
6✔
273
                }
274

275
                outHTLCs[htlc.HtlcIndex] = htlc
25✔
276
        }
277

278
        return htlcSet{
46✔
279
                incomingHTLCs: inHTLCs,
46✔
280
                outgoingHTLCs: outHTLCs,
46✔
281
        }
46✔
282
}
283

284
// HtlcSetKey is a two-tuple that uniquely identifies a set of HTLCs on a
285
// commitment transaction.
286
type HtlcSetKey struct {
287
        // IsRemote denotes if the HTLCs are on the remote commitment
288
        // transaction.
289
        IsRemote bool
290

291
        // IsPending denotes if the commitment transaction that HTLCS are on
292
        // are pending (the higher of two unrevoked commitments).
293
        IsPending bool
294
}
295

296
var (
297
        // LocalHtlcSet is the HtlcSetKey used for local commitments.
298
        LocalHtlcSet = HtlcSetKey{IsRemote: false, IsPending: false}
299

300
        // RemoteHtlcSet is the HtlcSetKey used for remote commitments.
301
        RemoteHtlcSet = HtlcSetKey{IsRemote: true, IsPending: false}
302

303
        // RemotePendingHtlcSet is the HtlcSetKey used for dangling remote
304
        // commitment transactions.
305
        RemotePendingHtlcSet = HtlcSetKey{IsRemote: true, IsPending: true}
306
)
307

308
// String returns a human readable string describing the target HtlcSetKey.
309
func (h HtlcSetKey) String() string {
8✔
310
        switch h {
8✔
311
        case LocalHtlcSet:
4✔
312
                return "LocalHtlcSet"
4✔
313
        case RemoteHtlcSet:
2✔
314
                return "RemoteHtlcSet"
2✔
315
        case RemotePendingHtlcSet:
2✔
316
                return "RemotePendingHtlcSet"
2✔
317
        default:
×
318
                return "unknown HtlcSetKey"
×
319
        }
320
}
321

322
// ChannelArbitrator is the on-chain arbitrator for a particular channel. The
323
// struct will keep in sync with the current set of HTLCs on the commitment
324
// transaction. The job of the attendant is to go on-chain to either settle or
325
// cancel an HTLC as necessary iff: an HTLC times out, or we known the
326
// pre-image to an HTLC, but it wasn't settled by the link off-chain. The
327
// ChannelArbitrator will factor in an expected confirmation delta when
328
// broadcasting to ensure that we avoid any possibility of race conditions, and
329
// sweep the output(s) without contest.
330
type ChannelArbitrator struct {
331
        started int32 // To be used atomically.
332
        stopped int32 // To be used atomically.
333

334
        // Embed the blockbeat consumer struct to get access to the method
335
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
336
        chainio.BeatConsumer
337

338
        // startTimestamp is the time when this ChannelArbitrator was started.
339
        startTimestamp time.Time
340

341
        // log is a persistent log that the attendant will use to checkpoint
342
        // its next action, and the state of any unresolved contracts.
343
        log ArbitratorLog
344

345
        // activeHTLCs is the set of active incoming/outgoing HTLC's on all
346
        // currently valid commitment transactions.
347
        activeHTLCs map[HtlcSetKey]htlcSet
348

349
        // unmergedSet is used to update the activeHTLCs map in two callsites:
350
        // checkLocalChainActions and sweepAnchors. It contains the latest
351
        // updates from the link. It is not deleted from, its entries may be
352
        // replaced on subsequent calls to notifyContractUpdate.
353
        unmergedSet map[HtlcSetKey]htlcSet
354
        unmergedMtx sync.RWMutex
355

356
        // cfg contains all the functionality that the ChannelArbitrator requires
357
        // to do its duty.
358
        cfg ChannelArbitratorConfig
359

360
        // signalUpdates is a channel that any new live signals for the channel
361
        // we're watching over will be sent.
362
        signalUpdates chan *signalUpdateMsg
363

364
        // activeResolvers is a slice of any active resolvers. This is used to
365
        // be able to signal them for shutdown in the case that we shutdown.
366
        activeResolvers []ContractResolver
367

368
        // activeResolversLock prevents simultaneous read and write to the
369
        // resolvers slice.
370
        activeResolversLock sync.RWMutex
371

372
        // resolutionSignal is a channel that will be sent upon by contract
373
        // resolvers once their contract has been fully resolved. With each
374
        // send, we'll check to see if the contract is fully resolved.
375
        resolutionSignal chan struct{}
376

377
        // forceCloseReqs is a channel that requests to forcibly close the
378
        // contract will be sent over.
379
        forceCloseReqs chan *forceCloseReq
380

381
        // state is the current state of the arbitrator. This state is examined
382
        // upon start up to decide which actions to take.
383
        state ArbitratorState
384

385
        wg   sync.WaitGroup
386
        quit chan struct{}
387
}
388

389
// NewChannelArbitrator returns a new instance of a ChannelArbitrator backed by
390
// the passed config struct.
391
func NewChannelArbitrator(cfg ChannelArbitratorConfig,
392
        htlcSets map[HtlcSetKey]htlcSet, log ArbitratorLog) *ChannelArbitrator {
51✔
393

51✔
394
        // Create a new map for unmerged HTLC's as we will overwrite the values
51✔
395
        // and want to avoid modifying activeHTLCs directly. This soft copying
51✔
396
        // is done to ensure that activeHTLCs isn't reset as an empty map later
51✔
397
        // on.
51✔
398
        unmerged := make(map[HtlcSetKey]htlcSet)
51✔
399
        unmerged[LocalHtlcSet] = htlcSets[LocalHtlcSet]
51✔
400
        unmerged[RemoteHtlcSet] = htlcSets[RemoteHtlcSet]
51✔
401

51✔
402
        // If the pending set exists, write that as well.
51✔
403
        if _, ok := htlcSets[RemotePendingHtlcSet]; ok {
51✔
404
                unmerged[RemotePendingHtlcSet] = htlcSets[RemotePendingHtlcSet]
×
405
        }
×
406

407
        c := &ChannelArbitrator{
51✔
408
                log:              log,
51✔
409
                signalUpdates:    make(chan *signalUpdateMsg),
51✔
410
                resolutionSignal: make(chan struct{}),
51✔
411
                forceCloseReqs:   make(chan *forceCloseReq),
51✔
412
                activeHTLCs:      htlcSets,
51✔
413
                unmergedSet:      unmerged,
51✔
414
                cfg:              cfg,
51✔
415
                quit:             make(chan struct{}),
51✔
416
        }
51✔
417

51✔
418
        // Mount the block consumer.
51✔
419
        c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())
51✔
420

51✔
421
        return c
51✔
422
}
423

424
// Compile-time check for the chainio.Consumer interface.
425
var _ chainio.Consumer = (*ChannelArbitrator)(nil)
426

427
// chanArbStartState contains the information from disk that we need to start
428
// up a channel arbitrator.
429
type chanArbStartState struct {
430
        currentState ArbitratorState
431
        commitSet    *CommitSet
432
}
433

434
// getStartState retrieves the information from disk that our channel arbitrator
435
// requires to start.
436
func (c *ChannelArbitrator) getStartState(tx kvdb.RTx) (*chanArbStartState,
437
        error) {
48✔
438

48✔
439
        // First, we'll read our last state from disk, so our internal state
48✔
440
        // machine can act accordingly.
48✔
441
        state, err := c.log.CurrentState(tx)
48✔
442
        if err != nil {
48✔
443
                return nil, err
×
444
        }
×
445

446
        // Next we'll fetch our confirmed commitment set. This will only exist
447
        // if the channel has been closed out on chain for modern nodes. For
448
        // older nodes, this won't be found at all, and will rely on the
449
        // existing written chain actions. Additionally, if this channel hasn't
450
        // logged any actions in the log, then this field won't be present.
451
        commitSet, err := c.log.FetchConfirmedCommitSet(tx)
48✔
452
        if err != nil && err != errNoCommitSet && err != errScopeBucketNoExist {
48✔
453
                return nil, err
×
454
        }
×
455

456
        return &chanArbStartState{
48✔
457
                currentState: state,
48✔
458
                commitSet:    commitSet,
48✔
459
        }, nil
48✔
460
}
461

462
// Start starts all the goroutines that the ChannelArbitrator needs to operate.
463
// If takes a start state, which will be looked up on disk if it is not
464
// provided.
465
func (c *ChannelArbitrator) Start(state *chanArbStartState,
466
        beat chainio.Blockbeat) error {
48✔
467

48✔
468
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
48✔
469
                return nil
×
470
        }
×
471
        c.startTimestamp = c.cfg.Clock.Now()
48✔
472

48✔
473
        // If the state passed in is nil, we look it up now.
48✔
474
        if state == nil {
85✔
475
                var err error
37✔
476
                state, err = c.getStartState(nil)
37✔
477
                if err != nil {
37✔
478
                        return err
×
479
                }
×
480
        }
481

482
        log.Tracef("Starting ChannelArbitrator(%v), htlc_set=%v, state=%v",
48✔
483
                c.cfg.ChanPoint, lnutils.SpewLogClosure(c.activeHTLCs),
48✔
484
                state.currentState)
48✔
485

48✔
486
        // Set our state from our starting state.
48✔
487
        c.state = state.currentState
48✔
488

48✔
489
        // Get the starting height.
48✔
490
        bestHeight := beat.Height()
48✔
491

48✔
492
        c.wg.Add(1)
48✔
493
        go c.channelAttendant(bestHeight, state.commitSet)
48✔
494

48✔
495
        return nil
48✔
496
}
497

498
// progressStateMachineAfterRestart attempts to progress the state machine
499
// after a restart. This makes sure that if the state transition failed, we
500
// will try to progress the state machine again. Moreover it will relaunch
501
// resolvers if the channel is still in the pending close state and has not
502
// been fully resolved yet.
503
func (c *ChannelArbitrator) progressStateMachineAfterRestart(bestHeight int32,
504
        commitSet *CommitSet) error {
48✔
505

48✔
506
        // If the channel has been marked pending close in the database, and we
48✔
507
        // haven't transitioned the state machine to StateContractClosed (or a
48✔
508
        // succeeding state), then a state transition most likely failed. We'll
48✔
509
        // try to recover from this by manually advancing the state by setting
48✔
510
        // the corresponding close trigger.
48✔
511
        trigger := chainTrigger
48✔
512
        triggerHeight := uint32(bestHeight)
48✔
513
        if c.cfg.IsPendingClose {
53✔
514
                switch c.state {
5✔
515
                case StateDefault:
4✔
516
                        fallthrough
4✔
517
                case StateBroadcastCommit:
5✔
518
                        fallthrough
5✔
519
                case StateCommitmentBroadcasted:
5✔
520
                        switch c.cfg.CloseType {
5✔
521

522
                        case channeldb.CooperativeClose:
1✔
523
                                trigger = coopCloseTrigger
1✔
524

525
                        case channeldb.BreachClose:
1✔
526
                                trigger = breachCloseTrigger
1✔
527

528
                        case channeldb.LocalForceClose:
1✔
529
                                trigger = localCloseTrigger
1✔
530

531
                        case channeldb.RemoteForceClose:
2✔
532
                                trigger = remoteCloseTrigger
2✔
533
                        }
534

535
                        log.Warnf("ChannelArbitrator(%v): detected stalled "+
5✔
536
                                "state=%v for closed channel",
5✔
537
                                c.cfg.ChanPoint, c.state)
5✔
538
                }
539

540
                triggerHeight = c.cfg.ClosingHeight
5✔
541
        }
542

543
        log.Infof("ChannelArbitrator(%v): starting state=%v, trigger=%v, "+
48✔
544
                "triggerHeight=%v", c.cfg.ChanPoint, c.state, trigger,
48✔
545
                triggerHeight)
48✔
546

48✔
547
        // We'll now attempt to advance our state forward based on the current
48✔
548
        // on-chain state, and our set of active contracts.
48✔
549
        startingState := c.state
48✔
550
        nextState, _, err := c.advanceState(
48✔
551
                triggerHeight, trigger, commitSet,
48✔
552
        )
48✔
553
        if err != nil {
50✔
554
                switch err {
2✔
555

556
                // If we detect that we tried to fetch resolutions, but failed,
557
                // this channel was marked closed in the database before
558
                // resolutions successfully written. In this case there is not
559
                // much we can do, so we don't return the error.
560
                case errScopeBucketNoExist:
×
561
                        fallthrough
×
562
                case errNoResolutions:
1✔
563
                        log.Warnf("ChannelArbitrator(%v): detected closed"+
1✔
564
                                "channel with no contract resolutions written.",
1✔
565
                                c.cfg.ChanPoint)
1✔
566

567
                default:
1✔
568
                        return err
1✔
569
                }
570
        }
571

572
        // If we start and ended at the awaiting full resolution state, then
573
        // we'll relaunch our set of unresolved contracts.
574
        if startingState == StateWaitingFullResolution &&
47✔
575
                nextState == StateWaitingFullResolution {
48✔
576

1✔
577
                // In order to relaunch the resolvers, we'll need to fetch the
1✔
578
                // set of HTLCs that were present in the commitment transaction
1✔
579
                // at the time it was confirmed. commitSet.ConfCommitKey can't
1✔
580
                // be nil at this point since we're in
1✔
581
                // StateWaitingFullResolution. We can only be in
1✔
582
                // StateWaitingFullResolution after we've transitioned from
1✔
583
                // StateContractClosed which can only be triggered by the local
1✔
584
                // or remote close trigger. This trigger is only fired when we
1✔
585
                // receive a chain event from the chain watcher that the
1✔
586
                // commitment has been confirmed on chain, and before we
1✔
587
                // advance our state step, we call InsertConfirmedCommitSet.
1✔
588
                err := c.relaunchResolvers(commitSet, triggerHeight)
1✔
589
                if err != nil {
1✔
590
                        return err
×
591
                }
×
592
        }
593

594
        return nil
47✔
595
}
596

597
// maybeAugmentTaprootResolvers will update the contract resolution information
598
// for taproot channels. This ensures that all the resolvers have the latest
599
// resolution, which may also include data such as the control block and tap
600
// tweaks.
601
func maybeAugmentTaprootResolvers(chanType channeldb.ChannelType,
602
        resolver ContractResolver,
603
        contractResolutions *ContractResolutions) {
1✔
604

1✔
605
        if !chanType.IsTaproot() {
2✔
606
                return
1✔
607
        }
1✔
608

609
        // The on disk resolutions contains all the ctrl block
610
        // information, so we'll set that now for the relevant
611
        // resolvers.
UNCOV
612
        switch r := resolver.(type) {
×
UNCOV
613
        case *commitSweepResolver:
×
UNCOV
614
                if contractResolutions.CommitResolution != nil {
×
UNCOV
615
                        //nolint:ll
×
UNCOV
616
                        r.commitResolution = *contractResolutions.CommitResolution
×
UNCOV
617
                }
×
UNCOV
618
        case *htlcOutgoingContestResolver:
×
UNCOV
619
                //nolint:ll
×
UNCOV
620
                htlcResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
×
UNCOV
621
                for _, htlcRes := range htlcResolutions {
×
UNCOV
622
                        htlcRes := htlcRes
×
UNCOV
623

×
UNCOV
624
                        if r.htlcResolution.ClaimOutpoint ==
×
UNCOV
625
                                htlcRes.ClaimOutpoint {
×
UNCOV
626

×
UNCOV
627
                                r.htlcResolution = htlcRes
×
UNCOV
628
                        }
×
629
                }
630

UNCOV
631
        case *htlcTimeoutResolver:
×
UNCOV
632
                //nolint:ll
×
UNCOV
633
                htlcResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
×
UNCOV
634
                for _, htlcRes := range htlcResolutions {
×
UNCOV
635
                        htlcRes := htlcRes
×
UNCOV
636

×
UNCOV
637
                        if r.htlcResolution.ClaimOutpoint ==
×
UNCOV
638
                                htlcRes.ClaimOutpoint {
×
UNCOV
639

×
UNCOV
640
                                r.htlcResolution = htlcRes
×
UNCOV
641
                        }
×
642
                }
643

UNCOV
644
        case *htlcIncomingContestResolver:
×
UNCOV
645
                //nolint:ll
×
UNCOV
646
                htlcResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
×
UNCOV
647
                for _, htlcRes := range htlcResolutions {
×
UNCOV
648
                        htlcRes := htlcRes
×
UNCOV
649

×
UNCOV
650
                        if r.htlcResolution.ClaimOutpoint ==
×
UNCOV
651
                                htlcRes.ClaimOutpoint {
×
UNCOV
652

×
UNCOV
653
                                r.htlcResolution = htlcRes
×
UNCOV
654
                        }
×
655
                }
656
        case *htlcSuccessResolver:
×
657
                //nolint:ll
×
658
                htlcResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
×
659
                for _, htlcRes := range htlcResolutions {
×
660
                        htlcRes := htlcRes
×
661

×
662
                        if r.htlcResolution.ClaimOutpoint ==
×
663
                                htlcRes.ClaimOutpoint {
×
664

×
665
                                r.htlcResolution = htlcRes
×
666
                        }
×
667
                }
668
        }
669
}
670

671
// relauchResolvers relaunches the set of resolvers for unresolved contracts in
672
// order to provide them with information that's not immediately available upon
673
// starting the ChannelArbitrator. This information should ideally be stored in
674
// the database, so this only serves as a intermediate work-around to prevent a
675
// migration.
676
func (c *ChannelArbitrator) relaunchResolvers(commitSet *CommitSet,
677
        heightHint uint32) error {
1✔
678

1✔
679
        // We'll now query our log to see if there are any active unresolved
1✔
680
        // contracts. If this is the case, then we'll relaunch all contract
1✔
681
        // resolvers.
1✔
682
        unresolvedContracts, err := c.log.FetchUnresolvedContracts()
1✔
683
        if err != nil {
1✔
684
                return err
×
685
        }
×
686

687
        // Retrieve the commitment tx hash from the log.
688
        contractResolutions, err := c.log.FetchContractResolutions()
1✔
689
        if err != nil {
1✔
690
                log.Errorf("unable to fetch contract resolutions: %v",
×
691
                        err)
×
692
                return err
×
693
        }
×
694
        commitHash := contractResolutions.CommitHash
1✔
695

1✔
696
        // In prior versions of lnd, the information needed to supplement the
1✔
697
        // resolvers (in most cases, the full amount of the HTLC) was found in
1✔
698
        // the chain action map, which is now deprecated.  As a result, if the
1✔
699
        // commitSet is nil (an older node with unresolved HTLCs at time of
1✔
700
        // upgrade), then we'll use the chain action information in place. The
1✔
701
        // chain actions may exclude some information, but we cannot recover it
1✔
702
        // for these older nodes at the moment.
1✔
703
        var confirmedHTLCs []channeldb.HTLC
1✔
704
        if commitSet != nil && commitSet.ConfCommitKey.IsSome() {
2✔
705
                confCommitKey, err := commitSet.ConfCommitKey.UnwrapOrErr(
1✔
706
                        fmt.Errorf("no commitKey available"),
1✔
707
                )
1✔
708
                if err != nil {
1✔
709
                        return err
×
710
                }
×
711
                confirmedHTLCs = commitSet.HtlcSets[confCommitKey]
1✔
712
        } else {
×
713
                chainActions, err := c.log.FetchChainActions()
×
714
                if err != nil {
×
715
                        log.Errorf("unable to fetch chain actions: %v", err)
×
716
                        return err
×
717
                }
×
718
                for _, htlcs := range chainActions {
×
719
                        confirmedHTLCs = append(confirmedHTLCs, htlcs...)
×
720
                }
×
721
        }
722

723
        // Reconstruct the htlc outpoints and data from the chain action log.
724
        // The purpose of the constructed htlc map is to supplement to
725
        // resolvers restored from database with extra data. Ideally this data
726
        // is stored as part of the resolver in the log. This is a workaround
727
        // to prevent a db migration. We use all available htlc sets here in
728
        // order to ensure we have complete coverage.
729
        htlcMap := make(map[wire.OutPoint]*channeldb.HTLC)
1✔
730
        for _, htlc := range confirmedHTLCs {
4✔
731
                htlc := htlc
3✔
732
                outpoint := wire.OutPoint{
3✔
733
                        Hash:  commitHash,
3✔
734
                        Index: uint32(htlc.OutputIndex),
3✔
735
                }
3✔
736
                htlcMap[outpoint] = &htlc
3✔
737
        }
3✔
738

739
        // We'll also fetch the historical state of this channel, as it should
740
        // have been marked as closed by now, and supplement it to each resolver
741
        // such that we can properly resolve our pending contracts.
742
        var chanState *channeldb.OpenChannel
1✔
743
        chanState, err = c.cfg.FetchHistoricalChannel()
1✔
744
        switch {
1✔
745
        // If we don't find this channel, then it may be the case that it
746
        // was closed before we started to retain the final state
747
        // information for open channels.
748
        case err == channeldb.ErrNoHistoricalBucket:
×
749
                fallthrough
×
750
        case err == channeldb.ErrChannelNotFound:
×
751
                log.Warnf("ChannelArbitrator(%v): unable to fetch historical "+
×
752
                        "state", c.cfg.ChanPoint)
×
753

754
        case err != nil:
×
755
                return err
×
756
        }
757

758
        log.Infof("ChannelArbitrator(%v): relaunching %v contract "+
1✔
759
                "resolvers", c.cfg.ChanPoint, len(unresolvedContracts))
1✔
760

1✔
761
        for i := range unresolvedContracts {
2✔
762
                resolver := unresolvedContracts[i]
1✔
763

1✔
764
                if chanState != nil {
2✔
765
                        resolver.SupplementState(chanState)
1✔
766

1✔
767
                        // For taproot channels, we'll need to also make sure
1✔
768
                        // the control block information was set properly.
1✔
769
                        maybeAugmentTaprootResolvers(
1✔
770
                                chanState.ChanType, resolver,
1✔
771
                                contractResolutions,
1✔
772
                        )
1✔
773
                }
1✔
774

775
                unresolvedContracts[i] = resolver
1✔
776

1✔
777
                htlcResolver, ok := resolver.(htlcContractResolver)
1✔
778
                if !ok {
1✔
UNCOV
779
                        continue
×
780
                }
781

782
                htlcPoint := htlcResolver.HtlcPoint()
1✔
783
                htlc, ok := htlcMap[htlcPoint]
1✔
784
                if !ok {
1✔
785
                        return fmt.Errorf(
×
786
                                "htlc resolver %T unavailable", resolver,
×
787
                        )
×
788
                }
×
789

790
                htlcResolver.Supplement(*htlc)
1✔
791

1✔
792
                // If this is an outgoing HTLC, we will also need to supplement
1✔
793
                // the resolver with the expiry block height of its
1✔
794
                // corresponding incoming HTLC.
1✔
795
                if !htlc.Incoming {
2✔
796
                        deadline := c.cfg.FindOutgoingHTLCDeadline(*htlc)
1✔
797
                        htlcResolver.SupplementDeadline(deadline)
1✔
798
                }
1✔
799
        }
800

801
        // The anchor resolver is stateless and can always be re-instantiated.
802
        if contractResolutions.AnchorResolution != nil {
1✔
UNCOV
803
                anchorResolver := newAnchorResolver(
×
UNCOV
804
                        contractResolutions.AnchorResolution.AnchorSignDescriptor,
×
UNCOV
805
                        contractResolutions.AnchorResolution.CommitAnchor,
×
UNCOV
806
                        heightHint, c.cfg.ChanPoint,
×
UNCOV
807
                        ResolverConfig{
×
UNCOV
808
                                ChannelArbitratorConfig: c.cfg,
×
UNCOV
809
                        },
×
UNCOV
810
                )
×
UNCOV
811

×
UNCOV
812
                anchorResolver.SupplementState(chanState)
×
UNCOV
813

×
UNCOV
814
                unresolvedContracts = append(unresolvedContracts, anchorResolver)
×
UNCOV
815

×
UNCOV
816
                // TODO(roasbeef): this isn't re-launched?
×
UNCOV
817
        }
×
818

819
        c.resolveContracts(unresolvedContracts)
1✔
820

1✔
821
        return nil
1✔
822
}
823

824
// Report returns htlc reports for the active resolvers.
UNCOV
825
func (c *ChannelArbitrator) Report() []*ContractReport {
×
UNCOV
826
        c.activeResolversLock.RLock()
×
UNCOV
827
        defer c.activeResolversLock.RUnlock()
×
UNCOV
828

×
UNCOV
829
        var reports []*ContractReport
×
UNCOV
830
        for _, resolver := range c.activeResolvers {
×
UNCOV
831
                r, ok := resolver.(reportingContractResolver)
×
UNCOV
832
                if !ok {
×
833
                        continue
×
834
                }
835

UNCOV
836
                report := r.report()
×
UNCOV
837
                if report == nil {
×
UNCOV
838
                        continue
×
839
                }
840

UNCOV
841
                reports = append(reports, report)
×
842
        }
843

UNCOV
844
        return reports
×
845
}
846

847
// Stop signals the ChannelArbitrator for a graceful shutdown.
848
func (c *ChannelArbitrator) Stop() error {
51✔
849
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
57✔
850
                return nil
6✔
851
        }
6✔
852

853
        log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint)
45✔
854

45✔
855
        if c.cfg.ChainEvents.Cancel != nil {
56✔
856
                go c.cfg.ChainEvents.Cancel()
11✔
857
        }
11✔
858

859
        c.activeResolversLock.RLock()
45✔
860
        for _, activeResolver := range c.activeResolvers {
51✔
861
                activeResolver.Stop()
6✔
862
        }
6✔
863
        c.activeResolversLock.RUnlock()
45✔
864

45✔
865
        close(c.quit)
45✔
866
        c.wg.Wait()
45✔
867

45✔
868
        return nil
45✔
869
}
870

871
// transitionTrigger is an enum that denotes exactly *why* a state transition
872
// was initiated. This is useful as depending on the initial trigger, we may
873
// skip certain states as those actions are expected to have already taken
874
// place as a result of the external trigger.
875
type transitionTrigger uint8
876

877
const (
878
        // chainTrigger is a transition trigger that has been attempted due to
879
        // changing on-chain conditions such as a block which times out HTLC's
880
        // being attached.
881
        chainTrigger transitionTrigger = iota
882

883
        // userTrigger is a transition trigger driven by user action. Examples
884
        // of such a trigger include a user requesting a force closure of the
885
        // channel.
886
        userTrigger
887

888
        // remoteCloseTrigger is a transition trigger driven by the remote
889
        // peer's commitment being confirmed.
890
        remoteCloseTrigger
891

892
        // localCloseTrigger is a transition trigger driven by our commitment
893
        // being confirmed.
894
        localCloseTrigger
895

896
        // coopCloseTrigger is a transition trigger driven by a cooperative
897
        // close transaction being confirmed.
898
        coopCloseTrigger
899

900
        // breachCloseTrigger is a transition trigger driven by a remote breach
901
        // being confirmed. In this case the channel arbitrator will wait for
902
        // the BreachArbitrator to finish and then clean up gracefully.
903
        breachCloseTrigger
904
)
905

906
// String returns a human readable string describing the passed
907
// transitionTrigger.
UNCOV
908
func (t transitionTrigger) String() string {
×
UNCOV
909
        switch t {
×
UNCOV
910
        case chainTrigger:
×
UNCOV
911
                return "chainTrigger"
×
912

UNCOV
913
        case remoteCloseTrigger:
×
UNCOV
914
                return "remoteCloseTrigger"
×
915

UNCOV
916
        case userTrigger:
×
UNCOV
917
                return "userTrigger"
×
918

UNCOV
919
        case localCloseTrigger:
×
UNCOV
920
                return "localCloseTrigger"
×
921

UNCOV
922
        case coopCloseTrigger:
×
UNCOV
923
                return "coopCloseTrigger"
×
924

UNCOV
925
        case breachCloseTrigger:
×
UNCOV
926
                return "breachCloseTrigger"
×
927

928
        default:
×
929
                return "unknown trigger"
×
930
        }
931
}
932

933
// stateStep is a help method that examines our internal state, and attempts
934
// the appropriate state transition if necessary. The next state we transition
935
// to is returned, Additionally, if the next transition results in a commitment
936
// broadcast, the commitment transaction itself is returned.
937
func (c *ChannelArbitrator) stateStep(
938
        triggerHeight uint32, trigger transitionTrigger,
939
        confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
174✔
940

174✔
941
        var (
174✔
942
                nextState ArbitratorState
174✔
943
                closeTx   *wire.MsgTx
174✔
944
        )
174✔
945
        switch c.state {
174✔
946

947
        // If we're in the default state, then we'll check our set of actions
948
        // to see if while we were down, conditions have changed.
949
        case StateDefault:
63✔
950
                log.Debugf("ChannelArbitrator(%v): new block (height=%v) "+
63✔
951
                        "examining active HTLC's", c.cfg.ChanPoint,
63✔
952
                        triggerHeight)
63✔
953

63✔
954
                // As a new block has been connected to the end of the main
63✔
955
                // chain, we'll check to see if we need to make any on-chain
63✔
956
                // claims on behalf of the channel contract that we're
63✔
957
                // arbitrating for. If a commitment has confirmed, then we'll
63✔
958
                // use the set snapshot from the chain, otherwise we'll use our
63✔
959
                // current set.
63✔
960
                var (
63✔
961
                        chainActions ChainActionMap
63✔
962
                        err          error
63✔
963
                )
63✔
964

63✔
965
                // Normally if we force close the channel locally we will have
63✔
966
                // no confCommitSet. However when the remote commitment confirms
63✔
967
                // without us ever broadcasting our local commitment we need to
63✔
968
                // make sure we cancel all upstream HTLCs for outgoing dust
63✔
969
                // HTLCs as well hence we need to fetch the chain actions here
63✔
970
                // as well.
63✔
971
                if confCommitSet == nil {
117✔
972
                        // Update the set of activeHTLCs so
54✔
973
                        // checkLocalChainActions has an up-to-date view of the
54✔
974
                        // commitments.
54✔
975
                        c.updateActiveHTLCs()
54✔
976
                        htlcs := c.activeHTLCs
54✔
977
                        chainActions, err = c.checkLocalChainActions(
54✔
978
                                triggerHeight, trigger, htlcs, false,
54✔
979
                        )
54✔
980
                        if err != nil {
54✔
981
                                return StateDefault, nil, err
×
982
                        }
×
983
                } else {
9✔
984
                        chainActions, err = c.constructChainActions(
9✔
985
                                confCommitSet, triggerHeight, trigger,
9✔
986
                        )
9✔
987
                        if err != nil {
9✔
988
                                return StateDefault, nil, err
×
989
                        }
×
990
                }
991

992
                // If there are no actions to be made, then we'll remain in the
993
                // default state. If this isn't a self initiated event (we're
994
                // checking due to a chain update), then we'll exit now.
995
                if len(chainActions) == 0 && trigger == chainTrigger {
99✔
996
                        log.Debugf("ChannelArbitrator(%v): no actions for "+
36✔
997
                                "chain trigger, terminating", c.cfg.ChanPoint)
36✔
998

36✔
999
                        return StateDefault, closeTx, nil
36✔
1000
                }
36✔
1001

1002
                // Otherwise, we'll log that we checked the HTLC actions as the
1003
                // commitment transaction has already been broadcast.
1004
                log.Tracef("ChannelArbitrator(%v): logging chain_actions=%v",
27✔
1005
                        c.cfg.ChanPoint, lnutils.SpewLogClosure(chainActions))
27✔
1006

27✔
1007
                // Cancel upstream HTLCs for all outgoing dust HTLCs available
27✔
1008
                // either on the local or the remote/remote pending commitment
27✔
1009
                // transaction.
27✔
1010
                dustHTLCs := chainActions[HtlcFailDustAction]
27✔
1011
                if len(dustHTLCs) > 0 {
28✔
1012
                        log.Debugf("ChannelArbitrator(%v): canceling %v dust "+
1✔
1013
                                "HTLCs backwards", c.cfg.ChanPoint,
1✔
1014
                                len(dustHTLCs))
1✔
1015

1✔
1016
                        getIdx := func(htlc channeldb.HTLC) uint64 {
2✔
1017
                                return htlc.HtlcIndex
1✔
1018
                        }
1✔
1019
                        dustHTLCSet := fn.NewSet(fn.Map(dustHTLCs, getIdx)...)
1✔
1020
                        err = c.abandonForwards(dustHTLCSet)
1✔
1021
                        if err != nil {
1✔
1022
                                return StateError, closeTx, err
×
1023
                        }
×
1024
                }
1025

1026
                // Depending on the type of trigger, we'll either "tunnel"
1027
                // through to a farther state, or just proceed linearly to the
1028
                // next state.
1029
                switch trigger {
27✔
1030

1031
                // If this is a chain trigger, then we'll go straight to the
1032
                // next state, as we still need to broadcast the commitment
1033
                // transaction.
1034
                case chainTrigger:
5✔
1035
                        fallthrough
5✔
1036
                case userTrigger:
15✔
1037
                        nextState = StateBroadcastCommit
15✔
1038

1039
                // If the trigger is a cooperative close being confirmed, then
1040
                // we can go straight to StateFullyResolved, as there won't be
1041
                // any contracts to resolve.
1042
                case coopCloseTrigger:
3✔
1043
                        nextState = StateFullyResolved
3✔
1044

1045
                // Otherwise, if this state advance was triggered by a
1046
                // commitment being confirmed on chain, then we'll jump
1047
                // straight to the state where the contract has already been
1048
                // closed, and we will inspect the set of unresolved contracts.
1049
                case localCloseTrigger:
2✔
1050
                        log.Errorf("ChannelArbitrator(%v): unexpected local "+
2✔
1051
                                "commitment confirmed while in StateDefault",
2✔
1052
                                c.cfg.ChanPoint)
2✔
1053
                        fallthrough
2✔
1054
                case remoteCloseTrigger:
8✔
1055
                        nextState = StateContractClosed
8✔
1056

1057
                case breachCloseTrigger:
1✔
1058
                        nextContractState, err := c.checkLegacyBreach()
1✔
1059
                        if nextContractState == StateError {
1✔
1060
                                return nextContractState, nil, err
×
1061
                        }
×
1062

1063
                        nextState = nextContractState
1✔
1064
                }
1065

1066
        // If we're in this state, then we've decided to broadcast the
1067
        // commitment transaction. We enter this state either due to an outside
1068
        // sub-system, or because an on-chain action has been triggered.
1069
        case StateBroadcastCommit:
20✔
1070
                // Under normal operation, we can only enter
20✔
1071
                // StateBroadcastCommit via a user or chain trigger. On restart,
20✔
1072
                // this state may be reexecuted after closing the channel, but
20✔
1073
                // failing to commit to StateContractClosed or
20✔
1074
                // StateFullyResolved. In that case, one of the four close
20✔
1075
                // triggers will be presented, signifying that we should skip
20✔
1076
                // rebroadcasting, and go straight to resolving the on-chain
20✔
1077
                // contract or marking the channel resolved.
20✔
1078
                switch trigger {
20✔
1079
                case localCloseTrigger, remoteCloseTrigger:
×
1080
                        log.Infof("ChannelArbitrator(%v): detected %s "+
×
1081
                                "close after closing channel, fast-forwarding "+
×
1082
                                "to %s to resolve contract",
×
1083
                                c.cfg.ChanPoint, trigger, StateContractClosed)
×
1084
                        return StateContractClosed, closeTx, nil
×
1085

1086
                case breachCloseTrigger:
1✔
1087
                        nextContractState, err := c.checkLegacyBreach()
1✔
1088
                        if nextContractState == StateError {
1✔
1089
                                log.Infof("ChannelArbitrator(%v): unable to "+
×
1090
                                        "advance breach close resolution: %v",
×
1091
                                        c.cfg.ChanPoint, nextContractState)
×
1092
                                return StateError, closeTx, err
×
1093
                        }
×
1094

1095
                        log.Infof("ChannelArbitrator(%v): detected %s close "+
1✔
1096
                                "after closing channel, fast-forwarding to %s"+
1✔
1097
                                " to resolve contract", c.cfg.ChanPoint,
1✔
1098
                                trigger, nextContractState)
1✔
1099

1✔
1100
                        return nextContractState, closeTx, nil
1✔
1101

1102
                case coopCloseTrigger:
×
1103
                        log.Infof("ChannelArbitrator(%v): detected %s "+
×
1104
                                "close after closing channel, fast-forwarding "+
×
1105
                                "to %s to resolve contract",
×
1106
                                c.cfg.ChanPoint, trigger, StateFullyResolved)
×
1107
                        return StateFullyResolved, closeTx, nil
×
1108
                }
1109

1110
                log.Infof("ChannelArbitrator(%v): force closing "+
19✔
1111
                        "chan", c.cfg.ChanPoint)
19✔
1112

19✔
1113
                // Now that we have all the actions decided for the set of
19✔
1114
                // HTLC's, we'll broadcast the commitment transaction, and
19✔
1115
                // signal the link to exit.
19✔
1116

19✔
1117
                // We'll tell the switch that it should remove the link for
19✔
1118
                // this channel, in addition to fetching the force close
19✔
1119
                // summary needed to close this channel on chain.
19✔
1120
                forceCloseTx, err := c.cfg.Channel.ForceCloseChan()
19✔
1121
                if err != nil {
20✔
1122
                        log.Errorf("ChannelArbitrator(%v): unable to "+
1✔
1123
                                "force close: %v", c.cfg.ChanPoint, err)
1✔
1124

1✔
1125
                        // We tried to force close (HTLC may be expiring from
1✔
1126
                        // our PoV, etc), but we think we've lost data. In this
1✔
1127
                        // case, we'll not force close, but terminate the state
1✔
1128
                        // machine here to wait to see what confirms on chain.
1✔
1129
                        if errors.Is(err, lnwallet.ErrForceCloseLocalDataLoss) {
2✔
1130
                                log.Error("ChannelArbitrator(%v): broadcast "+
1✔
1131
                                        "failed due to local data loss, "+
1✔
1132
                                        "waiting for on chain confimation...",
1✔
1133
                                        c.cfg.ChanPoint)
1✔
1134

1✔
1135
                                return StateBroadcastCommit, nil, nil
1✔
1136
                        }
1✔
1137

1138
                        return StateError, closeTx, err
×
1139
                }
1140
                closeTx = forceCloseTx
18✔
1141

18✔
1142
                // Before publishing the transaction, we store it to the
18✔
1143
                // database, such that we can re-publish later in case it
18✔
1144
                // didn't propagate. We initiated the force close, so we
18✔
1145
                // mark broadcast with local initiator set to true.
18✔
1146
                err = c.cfg.MarkCommitmentBroadcasted(closeTx, lntypes.Local)
18✔
1147
                if err != nil {
18✔
1148
                        log.Errorf("ChannelArbitrator(%v): unable to "+
×
1149
                                "mark commitment broadcasted: %v",
×
1150
                                c.cfg.ChanPoint, err)
×
1151
                        return StateError, closeTx, err
×
1152
                }
×
1153

1154
                // With the close transaction in hand, broadcast the
1155
                // transaction to the network, thereby entering the post
1156
                // channel resolution state.
1157
                log.Infof("Broadcasting force close transaction %v, "+
18✔
1158
                        "ChannelPoint(%v): %v", closeTx.TxHash(),
18✔
1159
                        c.cfg.ChanPoint, lnutils.SpewLogClosure(closeTx))
18✔
1160

18✔
1161
                // At this point, we'll now broadcast the commitment
18✔
1162
                // transaction itself.
18✔
1163
                label := labels.MakeLabel(
18✔
1164
                        labels.LabelTypeChannelClose, &c.cfg.ShortChanID,
18✔
1165
                )
18✔
1166
                if err := c.cfg.PublishTx(closeTx, label); err != nil {
23✔
1167
                        log.Errorf("ChannelArbitrator(%v): unable to broadcast "+
5✔
1168
                                "close tx: %v", c.cfg.ChanPoint, err)
5✔
1169

5✔
1170
                        // This makes sure we don't fail at startup if the
5✔
1171
                        // commitment transaction has too low fees to make it
5✔
1172
                        // into mempool. The rebroadcaster makes sure this
5✔
1173
                        // transaction is republished regularly until confirmed
5✔
1174
                        // or replaced.
5✔
1175
                        if !errors.Is(err, lnwallet.ErrDoubleSpend) &&
5✔
1176
                                !errors.Is(err, lnwallet.ErrMempoolFee) {
7✔
1177

2✔
1178
                                return StateError, closeTx, err
2✔
1179
                        }
2✔
1180
                }
1181

1182
                // We go to the StateCommitmentBroadcasted state, where we'll
1183
                // be waiting for the commitment to be confirmed.
1184
                nextState = StateCommitmentBroadcasted
16✔
1185

1186
        // In this state we have broadcasted our own commitment, and will need
1187
        // to wait for a commitment (not necessarily the one we broadcasted!)
1188
        // to be confirmed.
1189
        case StateCommitmentBroadcasted:
30✔
1190
                switch trigger {
30✔
1191

1192
                // We are waiting for a commitment to be confirmed.
1193
                case chainTrigger, userTrigger:
17✔
1194
                        // The commitment transaction has been broadcast, but it
17✔
1195
                        // doesn't necessarily need to be the commitment
17✔
1196
                        // transaction version that is going to be confirmed. To
17✔
1197
                        // be sure that any of those versions can be anchored
17✔
1198
                        // down, we now submit all anchor resolutions to the
17✔
1199
                        // sweeper. The sweeper will keep trying to sweep all of
17✔
1200
                        // them.
17✔
1201
                        //
17✔
1202
                        // Note that the sweeper is idempotent. If we ever
17✔
1203
                        // happen to end up at this point in the code again, no
17✔
1204
                        // harm is done by re-offering the anchors to the
17✔
1205
                        // sweeper.
17✔
1206
                        anchors, err := c.cfg.Channel.NewAnchorResolutions()
17✔
1207
                        if err != nil {
17✔
1208
                                return StateError, closeTx, err
×
1209
                        }
×
1210

1211
                        err = c.sweepAnchors(anchors, triggerHeight)
17✔
1212
                        if err != nil {
17✔
1213
                                return StateError, closeTx, err
×
1214
                        }
×
1215

1216
                        nextState = StateCommitmentBroadcasted
17✔
1217

1218
                // If this state advance was triggered by any of the
1219
                // commitments being confirmed, then we'll jump to the state
1220
                // where the contract has been closed.
1221
                case localCloseTrigger, remoteCloseTrigger:
13✔
1222
                        nextState = StateContractClosed
13✔
1223

1224
                // If a coop close was confirmed, jump straight to the fully
1225
                // resolved state.
1226
                case coopCloseTrigger:
×
1227
                        nextState = StateFullyResolved
×
1228

1229
                case breachCloseTrigger:
×
1230
                        nextContractState, err := c.checkLegacyBreach()
×
1231
                        if nextContractState == StateError {
×
1232
                                return nextContractState, closeTx, err
×
1233
                        }
×
1234

1235
                        nextState = nextContractState
×
1236
                }
1237

1238
                log.Infof("ChannelArbitrator(%v): trigger %v moving from "+
30✔
1239
                        "state %v to %v", c.cfg.ChanPoint, trigger, c.state,
30✔
1240
                        nextState)
30✔
1241

1242
        // If we're in this state, then the contract has been fully closed to
1243
        // outside sub-systems, so we'll process the prior set of on-chain
1244
        // contract actions and launch a set of resolvers.
1245
        case StateContractClosed:
22✔
1246
                // First, we'll fetch our chain actions, and both sets of
22✔
1247
                // resolutions so we can process them.
22✔
1248
                contractResolutions, err := c.log.FetchContractResolutions()
22✔
1249
                if err != nil {
24✔
1250
                        log.Errorf("unable to fetch contract resolutions: %v",
2✔
1251
                                err)
2✔
1252
                        return StateError, closeTx, err
2✔
1253
                }
2✔
1254

1255
                // If the resolution is empty, and we have no HTLCs at all to
1256
                // send to, then we're done here. We don't need to launch any
1257
                // resolvers, and can go straight to our final state.
1258
                if contractResolutions.IsEmpty() && confCommitSet.IsEmpty() {
28✔
1259
                        log.Infof("ChannelArbitrator(%v): contract "+
8✔
1260
                                "resolutions empty, marking channel as fully resolved!",
8✔
1261
                                c.cfg.ChanPoint)
8✔
1262
                        nextState = StateFullyResolved
8✔
1263
                        break
8✔
1264
                }
1265

1266
                // First, we'll reconstruct a fresh set of chain actions as the
1267
                // set of actions we need to act on may differ based on if it
1268
                // was our commitment, or they're commitment that hit the chain.
1269
                htlcActions, err := c.constructChainActions(
12✔
1270
                        confCommitSet, triggerHeight, trigger,
12✔
1271
                )
12✔
1272
                if err != nil {
12✔
1273
                        return StateError, closeTx, err
×
1274
                }
×
1275

1276
                // In case its a breach transaction we fail back all upstream
1277
                // HTLCs for their corresponding outgoing HTLCs on the remote
1278
                // commitment set (remote and remote pending set).
1279
                if contractResolutions.BreachResolution != nil {
14✔
1280
                        // cancelBreachedHTLCs is a set which holds HTLCs whose
2✔
1281
                        // corresponding incoming HTLCs will be failed back
2✔
1282
                        // because the peer broadcasted an old state.
2✔
1283
                        cancelBreachedHTLCs := fn.NewSet[uint64]()
2✔
1284

2✔
1285
                        // We'll use the CommitSet, we'll fail back all
2✔
1286
                        // upstream HTLCs for their corresponding outgoing
2✔
1287
                        // HTLC that exist on either of the remote commitments.
2✔
1288
                        // The map is used to deduplicate any shared HTLC's.
2✔
1289
                        for htlcSetKey, htlcs := range confCommitSet.HtlcSets {
4✔
1290
                                if !htlcSetKey.IsRemote {
2✔
UNCOV
1291
                                        continue
×
1292
                                }
1293

1294
                                for _, htlc := range htlcs {
4✔
1295
                                        // Only outgoing HTLCs have a
2✔
1296
                                        // corresponding incoming HTLC.
2✔
1297
                                        if htlc.Incoming {
3✔
1298
                                                continue
1✔
1299
                                        }
1300

1301
                                        cancelBreachedHTLCs.Add(htlc.HtlcIndex)
1✔
1302
                                }
1303
                        }
1304

1305
                        err := c.abandonForwards(cancelBreachedHTLCs)
2✔
1306
                        if err != nil {
2✔
1307
                                return StateError, closeTx, err
×
1308
                        }
×
1309
                } else {
10✔
1310
                        // If it's not a breach, we resolve all incoming dust
10✔
1311
                        // HTLCs immediately after the commitment is confirmed.
10✔
1312
                        err = c.failIncomingDust(
10✔
1313
                                htlcActions[HtlcIncomingDustFinalAction],
10✔
1314
                        )
10✔
1315
                        if err != nil {
10✔
1316
                                return StateError, closeTx, err
×
1317
                        }
×
1318

1319
                        // We fail the upstream HTLCs for all remote pending
1320
                        // outgoing HTLCs as soon as the commitment is
1321
                        // confirmed. The upstream HTLCs for outgoing dust
1322
                        // HTLCs have already been resolved before we reach
1323
                        // this point.
1324
                        getIdx := func(htlc channeldb.HTLC) uint64 {
18✔
1325
                                return htlc.HtlcIndex
8✔
1326
                        }
8✔
1327
                        remoteDangling := fn.NewSet(fn.Map(
10✔
1328
                                htlcActions[HtlcFailDanglingAction], getIdx,
10✔
1329
                        )...)
10✔
1330
                        err := c.abandonForwards(remoteDangling)
10✔
1331
                        if err != nil {
10✔
1332
                                return StateError, closeTx, err
×
1333
                        }
×
1334
                }
1335

1336
                // Now that we know we'll need to act, we'll process all the
1337
                // resolvers, then create the structures we need to resolve all
1338
                // outstanding contracts.
1339
                resolvers, err := c.prepContractResolutions(
12✔
1340
                        contractResolutions, triggerHeight, htlcActions,
12✔
1341
                )
12✔
1342
                if err != nil {
12✔
1343
                        log.Errorf("ChannelArbitrator(%v): unable to "+
×
1344
                                "resolve contracts: %v", c.cfg.ChanPoint, err)
×
1345
                        return StateError, closeTx, err
×
1346
                }
×
1347

1348
                log.Debugf("ChannelArbitrator(%v): inserting %v contract "+
12✔
1349
                        "resolvers", c.cfg.ChanPoint, len(resolvers))
12✔
1350

12✔
1351
                err = c.log.InsertUnresolvedContracts(nil, resolvers...)
12✔
1352
                if err != nil {
12✔
1353
                        return StateError, closeTx, err
×
1354
                }
×
1355

1356
                // Finally, we'll launch all the required contract resolvers.
1357
                // Once they're all resolved, we're no longer needed.
1358
                c.resolveContracts(resolvers)
12✔
1359

12✔
1360
                nextState = StateWaitingFullResolution
12✔
1361

1362
        // This is our terminal state. We'll keep returning this state until
1363
        // all contracts are fully resolved.
1364
        case StateWaitingFullResolution:
17✔
1365
                log.Infof("ChannelArbitrator(%v): still awaiting contract "+
17✔
1366
                        "resolution", c.cfg.ChanPoint)
17✔
1367

17✔
1368
                unresolved, err := c.log.FetchUnresolvedContracts()
17✔
1369
                if err != nil {
17✔
1370
                        return StateError, closeTx, err
×
1371
                }
×
1372

1373
                // If we have no unresolved contracts, then we can move to the
1374
                // final state.
1375
                if len(unresolved) == 0 {
29✔
1376
                        nextState = StateFullyResolved
12✔
1377
                        break
12✔
1378
                }
1379

1380
                // Otherwise we still have unresolved contracts, then we'll
1381
                // stay alive to oversee their resolution.
1382
                nextState = StateWaitingFullResolution
5✔
1383

5✔
1384
                // Add debug logs.
5✔
1385
                for _, r := range unresolved {
10✔
1386
                        log.Debugf("ChannelArbitrator(%v): still have "+
5✔
1387
                                "unresolved contract: %T", c.cfg.ChanPoint, r)
5✔
1388
                }
5✔
1389

1390
        // If we start as fully resolved, then we'll end as fully resolved.
1391
        case StateFullyResolved:
22✔
1392
                // To ensure that the state of the contract in persistent
22✔
1393
                // storage is properly reflected, we'll mark the contract as
22✔
1394
                // fully resolved now.
22✔
1395
                nextState = StateFullyResolved
22✔
1396

22✔
1397
                log.Infof("ChannelPoint(%v) has been fully resolved "+
22✔
1398
                        "on-chain at height=%v", c.cfg.ChanPoint, triggerHeight)
22✔
1399

22✔
1400
                if err := c.cfg.MarkChannelResolved(); err != nil {
22✔
1401
                        log.Errorf("unable to mark channel resolved: %v", err)
×
1402
                        return StateError, closeTx, err
×
1403
                }
×
1404
        }
1405

1406
        log.Tracef("ChannelArbitrator(%v): next_state=%v", c.cfg.ChanPoint,
132✔
1407
                nextState)
132✔
1408

132✔
1409
        return nextState, closeTx, nil
132✔
1410
}
1411

1412
// sweepAnchors offers all given anchor resolutions to the sweeper. It requests
1413
// sweeping at the minimum fee rate. This fee rate can be upped manually by the
1414
// user via the BumpFee rpc.
1415
func (c *ChannelArbitrator) sweepAnchors(anchors *lnwallet.AnchorResolutions,
1416
        heightHint uint32) error {
19✔
1417

19✔
1418
        // Update the set of activeHTLCs so that the sweeping routine has an
19✔
1419
        // up-to-date view of the set of commitments.
19✔
1420
        c.updateActiveHTLCs()
19✔
1421

19✔
1422
        // Prepare the sweeping requests for all possible versions of
19✔
1423
        // commitments.
19✔
1424
        sweepReqs, err := c.prepareAnchorSweeps(heightHint, anchors)
19✔
1425
        if err != nil {
19✔
1426
                return err
×
1427
        }
×
1428

1429
        // Send out the sweeping requests to the sweeper.
1430
        for _, req := range sweepReqs {
27✔
1431
                _, err = c.cfg.Sweeper.SweepInput(req.input, req.params)
8✔
1432
                if err != nil {
8✔
1433
                        return err
×
1434
                }
×
1435
        }
1436

1437
        return nil
19✔
1438
}
1439

1440
// findCommitmentDeadlineAndValue finds the deadline (relative block height)
1441
// for a commitment transaction by extracting the minimum CLTV from its HTLCs.
1442
// From our PoV, the deadline delta is defined to be the smaller of,
1443
//   - half of the least CLTV from outgoing HTLCs' corresponding incoming
1444
//     HTLCs,  or,
1445
//   - half of the least CLTV from incoming HTLCs if the preimage is available.
1446
//
1447
// We use half of the CTLV value to ensure that we have enough time to sweep
1448
// the second-level HTLCs.
1449
//
1450
// It also finds the total value that are time-sensitive, which is the sum of
1451
// all the outgoing HTLCs plus incoming HTLCs whose preimages are known. It
1452
// then returns the value left after subtracting the budget used for sweeping
1453
// the time-sensitive HTLCs.
1454
//
1455
// NOTE: when the deadline turns out to be 0 blocks, we will replace it with 1
1456
// block because our fee estimator doesn't allow a 0 conf target. This also
1457
// means we've left behind and should increase our fee to make the transaction
1458
// confirmed asap.
1459
func (c *ChannelArbitrator) findCommitmentDeadlineAndValue(heightHint uint32,
1460
        htlcs htlcSet) (fn.Option[int32], btcutil.Amount, error) {
13✔
1461

13✔
1462
        deadlineMinHeight := uint32(math.MaxUint32)
13✔
1463
        totalValue := btcutil.Amount(0)
13✔
1464

13✔
1465
        // First, iterate through the outgoingHTLCs to find the lowest CLTV
13✔
1466
        // value.
13✔
1467
        for _, htlc := range htlcs.outgoingHTLCs {
23✔
1468
                // Skip if the HTLC is dust.
10✔
1469
                if htlc.OutputIndex < 0 {
13✔
1470
                        log.Debugf("ChannelArbitrator(%v): skipped deadline "+
3✔
1471
                                "for dust htlc=%x",
3✔
1472
                                c.cfg.ChanPoint, htlc.RHash[:])
3✔
1473

3✔
1474
                        continue
3✔
1475
                }
1476

1477
                value := htlc.Amt.ToSatoshis()
7✔
1478

7✔
1479
                // Find the expiry height for this outgoing HTLC's incoming
7✔
1480
                // HTLC.
7✔
1481
                deadlineOpt := c.cfg.FindOutgoingHTLCDeadline(htlc)
7✔
1482

7✔
1483
                // The deadline is default to the current deadlineMinHeight,
7✔
1484
                // and it's overwritten when it's not none.
7✔
1485
                deadline := deadlineMinHeight
7✔
1486
                deadlineOpt.WhenSome(func(d int32) {
12✔
1487
                        deadline = uint32(d)
5✔
1488

5✔
1489
                        // We only consider the value is under protection when
5✔
1490
                        // it's time-sensitive.
5✔
1491
                        totalValue += value
5✔
1492
                })
5✔
1493

1494
                if deadline < deadlineMinHeight {
12✔
1495
                        deadlineMinHeight = deadline
5✔
1496

5✔
1497
                        log.Tracef("ChannelArbitrator(%v): outgoing HTLC has "+
5✔
1498
                                "deadline=%v, value=%v", c.cfg.ChanPoint,
5✔
1499
                                deadlineMinHeight, value)
5✔
1500
                }
5✔
1501
        }
1502

1503
        // Then going through the incomingHTLCs, and update the minHeight when
1504
        // conditions met.
1505
        for _, htlc := range htlcs.incomingHTLCs {
22✔
1506
                // Skip if the HTLC is dust.
9✔
1507
                if htlc.OutputIndex < 0 {
10✔
1508
                        log.Debugf("ChannelArbitrator(%v): skipped deadline "+
1✔
1509
                                "for dust htlc=%x",
1✔
1510
                                c.cfg.ChanPoint, htlc.RHash[:])
1✔
1511

1✔
1512
                        continue
1✔
1513
                }
1514

1515
                // Since it's an HTLC sent to us, check if we have preimage for
1516
                // this HTLC.
1517
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
8✔
1518
                if err != nil {
8✔
1519
                        return fn.None[int32](), 0, err
×
1520
                }
×
1521

1522
                if !preimageAvailable {
10✔
1523
                        continue
2✔
1524
                }
1525

1526
                value := htlc.Amt.ToSatoshis()
6✔
1527
                totalValue += value
6✔
1528

6✔
1529
                if htlc.RefundTimeout < deadlineMinHeight {
11✔
1530
                        deadlineMinHeight = htlc.RefundTimeout
5✔
1531

5✔
1532
                        log.Tracef("ChannelArbitrator(%v): incoming HTLC has "+
5✔
1533
                                "deadline=%v, amt=%v", c.cfg.ChanPoint,
5✔
1534
                                deadlineMinHeight, value)
5✔
1535
                }
5✔
1536
        }
1537

1538
        // Calculate the deadline. There are two cases to be handled here,
1539
        //   - when the deadlineMinHeight never gets updated, which could
1540
        //     happen when we have no outgoing HTLCs, and, for incoming HTLCs,
1541
        //       * either we have none, or,
1542
        //       * none of the HTLCs are preimageAvailable.
1543
        //   - when our deadlineMinHeight is no greater than the heightHint,
1544
        //     which means we are behind our schedule.
1545
        var deadline uint32
13✔
1546
        switch {
13✔
1547
        // When we couldn't find a deadline height from our HTLCs, we will fall
1548
        // back to the default value as there's no time pressure here.
1549
        case deadlineMinHeight == math.MaxUint32:
4✔
1550
                return fn.None[int32](), 0, nil
4✔
1551

1552
        // When the deadline is passed, we will fall back to the smallest conf
1553
        // target (1 block).
1554
        case deadlineMinHeight <= heightHint:
1✔
1555
                log.Warnf("ChannelArbitrator(%v): deadline is passed with "+
1✔
1556
                        "deadlineMinHeight=%d, heightHint=%d",
1✔
1557
                        c.cfg.ChanPoint, deadlineMinHeight, heightHint)
1✔
1558
                deadline = 1
1✔
1559

1560
        // Use half of the deadline delta, and leave the other half to be used
1561
        // to sweep the HTLCs.
1562
        default:
8✔
1563
                deadline = (deadlineMinHeight - heightHint) / 2
8✔
1564
        }
1565

1566
        // Calculate the value left after subtracting the budget used for
1567
        // sweeping the time-sensitive HTLCs.
1568
        valueLeft := totalValue - calculateBudget(
9✔
1569
                totalValue, c.cfg.Budget.DeadlineHTLCRatio,
9✔
1570
                c.cfg.Budget.DeadlineHTLC,
9✔
1571
        )
9✔
1572

9✔
1573
        log.Debugf("ChannelArbitrator(%v): calculated valueLeft=%v, "+
9✔
1574
                "deadline=%d, using deadlineMinHeight=%d, heightHint=%d",
9✔
1575
                c.cfg.ChanPoint, valueLeft, deadline, deadlineMinHeight,
9✔
1576
                heightHint)
9✔
1577

9✔
1578
        return fn.Some(int32(deadline)), valueLeft, nil
9✔
1579
}
1580

1581
// resolveContracts updates the activeResolvers list and starts to resolve each
1582
// contract concurrently, and launches them.
1583
func (c *ChannelArbitrator) resolveContracts(resolvers []ContractResolver) {
13✔
1584
        c.activeResolversLock.Lock()
13✔
1585
        c.activeResolvers = resolvers
13✔
1586
        c.activeResolversLock.Unlock()
13✔
1587

13✔
1588
        // Launch all resolvers.
13✔
1589
        c.launchResolvers()
13✔
1590

13✔
1591
        for _, contract := range resolvers {
19✔
1592
                c.wg.Add(1)
6✔
1593
                go c.resolveContract(contract)
6✔
1594
        }
6✔
1595
}
1596

1597
// launchResolvers launches all the active resolvers concurrently.
1598
func (c *ChannelArbitrator) launchResolvers() {
19✔
1599
        c.activeResolversLock.Lock()
19✔
1600
        resolvers := c.activeResolvers
19✔
1601
        c.activeResolversLock.Unlock()
19✔
1602

19✔
1603
        // errChans is a map of channels that will be used to receive errors
19✔
1604
        // returned from launching the resolvers.
19✔
1605
        errChans := make(map[ContractResolver]chan error, len(resolvers))
19✔
1606

19✔
1607
        // Launch each resolver in goroutines.
19✔
1608
        for _, r := range resolvers {
25✔
1609
                // If the contract is already resolved, there's no need to
6✔
1610
                // launch it again.
6✔
1611
                if r.IsResolved() {
6✔
UNCOV
1612
                        log.Debugf("ChannelArbitrator(%v): skipping resolver "+
×
UNCOV
1613
                                "%T as it's already resolved", c.cfg.ChanPoint,
×
UNCOV
1614
                                r)
×
UNCOV
1615

×
UNCOV
1616
                        continue
×
1617
                }
1618

1619
                // Create a signal chan.
1620
                errChan := make(chan error, 1)
6✔
1621
                errChans[r] = errChan
6✔
1622

6✔
1623
                go func() {
12✔
1624
                        err := r.Launch()
6✔
1625
                        errChan <- err
6✔
1626
                }()
6✔
1627
        }
1628

1629
        // Wait for all resolvers to finish launching.
1630
        for r, errChan := range errChans {
25✔
1631
                select {
6✔
1632
                case err := <-errChan:
6✔
1633
                        if err == nil {
12✔
1634
                                continue
6✔
1635
                        }
1636

1637
                        log.Errorf("ChannelArbitrator(%v): unable to launch "+
×
1638
                                "contract resolver(%T): %v", c.cfg.ChanPoint, r,
×
1639
                                err)
×
1640

1641
                case <-c.quit:
×
1642
                        log.Debugf("ChannelArbitrator quit signal received, " +
×
1643
                                "exit launchResolvers")
×
1644

×
1645
                        return
×
1646
                }
1647
        }
1648
}
1649

1650
// advanceState is the main driver of our state machine. This method is an
1651
// iterative function which repeatedly attempts to advance the internal state
1652
// of the channel arbitrator. The state will be advanced until we reach a
1653
// redundant transition, meaning that the state transition is a noop. The final
1654
// param is a callback that allows the caller to execute an arbitrary action
1655
// after each state transition.
1656
func (c *ChannelArbitrator) advanceState(
1657
        triggerHeight uint32, trigger transitionTrigger,
1658
        confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
88✔
1659

88✔
1660
        var (
88✔
1661
                priorState   ArbitratorState
88✔
1662
                forceCloseTx *wire.MsgTx
88✔
1663
        )
88✔
1664

88✔
1665
        // We'll continue to advance our state forward until the state we
88✔
1666
        // transition to is that same state that we started at.
88✔
1667
        for {
262✔
1668
                priorState = c.state
174✔
1669
                log.Debugf("ChannelArbitrator(%v): attempting state step with "+
174✔
1670
                        "trigger=%v from state=%v at height=%v",
174✔
1671
                        c.cfg.ChanPoint, trigger, priorState, triggerHeight)
174✔
1672

174✔
1673
                nextState, closeTx, err := c.stateStep(
174✔
1674
                        triggerHeight, trigger, confCommitSet,
174✔
1675
                )
174✔
1676
                if err != nil {
178✔
1677
                        log.Errorf("ChannelArbitrator(%v): unable to advance "+
4✔
1678
                                "state: %v", c.cfg.ChanPoint, err)
4✔
1679
                        return priorState, nil, err
4✔
1680
                }
4✔
1681

1682
                if forceCloseTx == nil && closeTx != nil {
186✔
1683
                        forceCloseTx = closeTx
16✔
1684
                }
16✔
1685

1686
                // Our termination transition is a noop transition. If we get
1687
                // our prior state back as the next state, then we'll
1688
                // terminate.
1689
                if nextState == priorState {
251✔
1690
                        log.Debugf("ChannelArbitrator(%v): terminating at "+
81✔
1691
                                "state=%v", c.cfg.ChanPoint, nextState)
81✔
1692
                        return nextState, forceCloseTx, nil
81✔
1693
                }
81✔
1694

1695
                // As the prior state was successfully executed, we can now
1696
                // commit the next state. This ensures that we will re-execute
1697
                // the prior state if anything fails.
1698
                if err := c.log.CommitState(nextState); err != nil {
92✔
1699
                        log.Errorf("ChannelArbitrator(%v): unable to commit "+
3✔
1700
                                "next state(%v): %v", c.cfg.ChanPoint,
3✔
1701
                                nextState, err)
3✔
1702
                        return priorState, nil, err
3✔
1703
                }
3✔
1704
                c.state = nextState
86✔
1705
        }
1706
}
1707

1708
// ChainAction is an enum that encompasses all possible on-chain actions
1709
// we'll take for a set of HTLC's.
1710
type ChainAction uint8
1711

1712
const (
1713
        // NoAction is the min chainAction type, indicating that no action
1714
        // needs to be taken for a given HTLC.
1715
        NoAction ChainAction = 0
1716

1717
        // HtlcTimeoutAction indicates that the HTLC will timeout soon. As a
1718
        // result, we should get ready to sweep it on chain after the timeout.
1719
        HtlcTimeoutAction = 1
1720

1721
        // HtlcClaimAction indicates that we should claim the HTLC on chain
1722
        // before its timeout period.
1723
        HtlcClaimAction = 2
1724

1725
        // HtlcFailDustAction indicates that we should fail the upstream HTLC
1726
        // for an outgoing dust HTLC immediately (even before the commitment
1727
        // transaction is confirmed) because it has no output on the commitment
1728
        // transaction. This also includes remote pending outgoing dust HTLCs.
1729
        HtlcFailDustAction = 3
1730

1731
        // HtlcOutgoingWatchAction indicates that we can't yet timeout this
1732
        // HTLC, but we had to go to chain on order to resolve an existing
1733
        // HTLC.  In this case, we'll either: time it out once it expires, or
1734
        // will learn the pre-image if the remote party claims the output. In
1735
        // this case, well add the pre-image to our global store.
1736
        HtlcOutgoingWatchAction = 4
1737

1738
        // HtlcIncomingWatchAction indicates that we don't yet have the
1739
        // pre-image to claim incoming HTLC, but we had to go to chain in order
1740
        // to resolve and existing HTLC. In this case, we'll either: let the
1741
        // other party time it out, or eventually learn of the pre-image, in
1742
        // which case we'll claim on chain.
1743
        HtlcIncomingWatchAction = 5
1744

1745
        // HtlcIncomingDustFinalAction indicates that we should mark an incoming
1746
        // dust htlc as final because it can't be claimed on-chain.
1747
        HtlcIncomingDustFinalAction = 6
1748

1749
        // HtlcFailDanglingAction indicates that we should fail the upstream
1750
        // HTLC for an outgoing HTLC immediately after the commitment
1751
        // transaction has confirmed because it has no corresponding output on
1752
        // the commitment transaction. This category does NOT include any dust
1753
        // HTLCs which are mapped in the "HtlcFailDustAction" category.
1754
        HtlcFailDanglingAction = 7
1755
)
1756

1757
// String returns a human readable string describing a chain action.
1758
func (c ChainAction) String() string {
×
1759
        switch c {
×
1760
        case NoAction:
×
1761
                return "NoAction"
×
1762

1763
        case HtlcTimeoutAction:
×
1764
                return "HtlcTimeoutAction"
×
1765

1766
        case HtlcClaimAction:
×
1767
                return "HtlcClaimAction"
×
1768

1769
        case HtlcFailDustAction:
×
1770
                return "HtlcFailDustAction"
×
1771

1772
        case HtlcOutgoingWatchAction:
×
1773
                return "HtlcOutgoingWatchAction"
×
1774

1775
        case HtlcIncomingWatchAction:
×
1776
                return "HtlcIncomingWatchAction"
×
1777

1778
        case HtlcIncomingDustFinalAction:
×
1779
                return "HtlcIncomingDustFinalAction"
×
1780

1781
        case HtlcFailDanglingAction:
×
1782
                return "HtlcFailDanglingAction"
×
1783

1784
        default:
×
1785
                return "<unknown action>"
×
1786
        }
1787
}
1788

1789
// ChainActionMap is a map of a chain action, to the set of HTLC's that need to
1790
// be acted upon for a given action type. The channel
1791
type ChainActionMap map[ChainAction][]channeldb.HTLC
1792

1793
// Merge merges the passed chain actions with the target chain action map.
1794
func (c ChainActionMap) Merge(actions ChainActionMap) {
68✔
1795
        for chainAction, htlcs := range actions {
81✔
1796
                c[chainAction] = append(c[chainAction], htlcs...)
13✔
1797
        }
13✔
1798
}
1799

1800
// shouldGoOnChain takes into account the absolute timeout of the HTLC, if the
1801
// confirmation delta that we need is close, and returns a bool indicating if
1802
// we should go on chain to claim.  We do this rather than waiting up until the
1803
// last minute as we want to ensure that when we *need* (HTLC is timed out) to
1804
// sweep, the commitment is already confirmed.
1805
func (c *ChannelArbitrator) shouldGoOnChain(htlc channeldb.HTLC,
1806
        broadcastDelta, currentHeight uint32) bool {
26✔
1807

26✔
1808
        // We'll calculate the broadcast cut off for this HTLC. This is the
26✔
1809
        // height that (based on our current fee estimation) we should
26✔
1810
        // broadcast in order to ensure the commitment transaction is confirmed
26✔
1811
        // before the HTLC fully expires.
26✔
1812
        broadcastCutOff := htlc.RefundTimeout - broadcastDelta
26✔
1813

26✔
1814
        log.Tracef("ChannelArbitrator(%v): examining outgoing contract: "+
26✔
1815
                "expiry=%v, cutoff=%v, height=%v", c.cfg.ChanPoint, htlc.RefundTimeout,
26✔
1816
                broadcastCutOff, currentHeight)
26✔
1817

26✔
1818
        // TODO(roasbeef): take into account default HTLC delta, don't need to
26✔
1819
        // broadcast immediately
26✔
1820
        //  * can then batch with SINGLE | ANYONECANPAY
26✔
1821

26✔
1822
        // We should on-chain for this HTLC, iff we're within out broadcast
26✔
1823
        // cutoff window.
26✔
1824
        if currentHeight < broadcastCutOff {
45✔
1825
                return false
19✔
1826
        }
19✔
1827

1828
        // In case of incoming htlc we should go to chain.
1829
        if htlc.Incoming {
7✔
UNCOV
1830
                return true
×
UNCOV
1831
        }
×
1832

1833
        // For htlcs that are result of our initiated payments we give some grace
1834
        // period before force closing the channel. During this time we expect
1835
        // both nodes to connect and give a chance to the other node to send its
1836
        // updates and cancel the htlc.
1837
        // This shouldn't add any security risk as there is no incoming htlc to
1838
        // fulfill at this case and the expectation is that when the channel is
1839
        // active the other node will send update_fail_htlc to remove the htlc
1840
        // without closing the channel. It is up to the user to force close the
1841
        // channel if the peer misbehaves and doesn't send the update_fail_htlc.
1842
        // It is useful when this node is most of the time not online and is
1843
        // likely to miss the time slot where the htlc may be cancelled.
1844
        isForwarded := c.cfg.IsForwardedHTLC(c.cfg.ShortChanID, htlc.HtlcIndex)
7✔
1845
        upTime := c.cfg.Clock.Now().Sub(c.startTimestamp)
7✔
1846
        return isForwarded || upTime > c.cfg.PaymentsExpirationGracePeriod
7✔
1847
}
1848

1849
// checkCommitChainActions is called for each new block connected to the end of
1850
// the main chain. Given the new block height, this new method will examine all
1851
// active HTLC's, and determine if we need to go on-chain to claim any of them.
1852
// A map of action -> []htlc is returned, detailing what action (if any) should
1853
// be performed for each HTLC. For timed out HTLC's, once the commitment has
1854
// been sufficiently confirmed, the HTLC's should be canceled backwards. For
1855
// redeemed HTLC's, we should send the pre-image back to the incoming link.
1856
func (c *ChannelArbitrator) checkCommitChainActions(height uint32,
1857
        trigger transitionTrigger, htlcs htlcSet) (ChainActionMap, error) {
68✔
1858

68✔
1859
        // TODO(roasbeef): would need to lock channel? channel totem?
68✔
1860
        //  * race condition if adding and we broadcast, etc
68✔
1861
        //  * or would make each instance sync?
68✔
1862

68✔
1863
        log.Debugf("ChannelArbitrator(%v): checking commit chain actions at "+
68✔
1864
                "height=%v, in_htlc_count=%v, out_htlc_count=%v",
68✔
1865
                c.cfg.ChanPoint, height,
68✔
1866
                len(htlcs.incomingHTLCs), len(htlcs.outgoingHTLCs))
68✔
1867

68✔
1868
        actionMap := make(ChainActionMap)
68✔
1869

68✔
1870
        // First, we'll make an initial pass over the set of incoming and
68✔
1871
        // outgoing HTLC's to decide if we need to go on chain at all.
68✔
1872
        haveChainActions := false
68✔
1873
        for _, htlc := range htlcs.outgoingHTLCs {
75✔
1874
                // We'll need to go on-chain for an outgoing HTLC if it was
7✔
1875
                // never resolved downstream, and it's "close" to timing out.
7✔
1876
                //
7✔
1877
                // TODO(yy): If there's no corresponding incoming HTLC, it
7✔
1878
                // means we are the first hop, hence the payer. This is a
7✔
1879
                // tricky case - unlike a forwarding hop, we don't have an
7✔
1880
                // incoming HTLC that will time out, which means as long as we
7✔
1881
                // can learn the preimage, we can settle the invoice (before it
7✔
1882
                // expires?).
7✔
1883
                toChain := c.shouldGoOnChain(
7✔
1884
                        htlc, c.cfg.OutgoingBroadcastDelta, height,
7✔
1885
                )
7✔
1886

7✔
1887
                if toChain {
7✔
UNCOV
1888
                        // Convert to int64 in case of overflow.
×
UNCOV
1889
                        remainingBlocks := int64(htlc.RefundTimeout) -
×
UNCOV
1890
                                int64(height)
×
UNCOV
1891

×
UNCOV
1892
                        log.Infof("ChannelArbitrator(%v): go to chain for "+
×
UNCOV
1893
                                "outgoing htlc %x: timeout=%v, amount=%v, "+
×
UNCOV
1894
                                "blocks_until_expiry=%v, broadcast_delta=%v",
×
UNCOV
1895
                                c.cfg.ChanPoint, htlc.RHash[:],
×
UNCOV
1896
                                htlc.RefundTimeout, htlc.Amt, remainingBlocks,
×
UNCOV
1897
                                c.cfg.OutgoingBroadcastDelta,
×
UNCOV
1898
                        )
×
UNCOV
1899
                }
×
1900

1901
                haveChainActions = haveChainActions || toChain
7✔
1902
        }
1903

1904
        for _, htlc := range htlcs.incomingHTLCs {
73✔
1905
                // We'll need to go on-chain to pull an incoming HTLC iff we
5✔
1906
                // know the pre-image and it's close to timing out. We need to
5✔
1907
                // ensure that we claim the funds that are rightfully ours
5✔
1908
                // on-chain.
5✔
1909
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
5✔
1910
                if err != nil {
5✔
1911
                        return nil, err
×
1912
                }
×
1913

1914
                if !preimageAvailable {
9✔
1915
                        continue
4✔
1916
                }
1917

1918
                toChain := c.shouldGoOnChain(
1✔
1919
                        htlc, c.cfg.IncomingBroadcastDelta, height,
1✔
1920
                )
1✔
1921

1✔
1922
                if toChain {
1✔
UNCOV
1923
                        // Convert to int64 in case of overflow.
×
UNCOV
1924
                        remainingBlocks := int64(htlc.RefundTimeout) -
×
UNCOV
1925
                                int64(height)
×
UNCOV
1926

×
UNCOV
1927
                        log.Infof("ChannelArbitrator(%v): go to chain for "+
×
UNCOV
1928
                                "incoming htlc %x: timeout=%v, amount=%v, "+
×
UNCOV
1929
                                "blocks_until_expiry=%v, broadcast_delta=%v",
×
UNCOV
1930
                                c.cfg.ChanPoint, htlc.RHash[:],
×
UNCOV
1931
                                htlc.RefundTimeout, htlc.Amt, remainingBlocks,
×
UNCOV
1932
                                c.cfg.IncomingBroadcastDelta,
×
UNCOV
1933
                        )
×
UNCOV
1934
                }
×
1935

1936
                haveChainActions = haveChainActions || toChain
1✔
1937
        }
1938

1939
        // If we don't have any actions to make, then we'll return an empty
1940
        // action map. We only do this if this was a chain trigger though, as
1941
        // if we're going to broadcast the commitment (or the remote party did)
1942
        // we're *forced* to act on each HTLC.
1943
        if !haveChainActions && trigger == chainTrigger {
108✔
1944
                log.Tracef("ChannelArbitrator(%v): no actions to take at "+
40✔
1945
                        "height=%v", c.cfg.ChanPoint, height)
40✔
1946
                return actionMap, nil
40✔
1947
        }
40✔
1948

1949
        // Now that we know we'll need to go on-chain, we'll examine all of our
1950
        // active outgoing HTLC's to see if we either need to: sweep them after
1951
        // a timeout (then cancel backwards), cancel them backwards
1952
        // immediately, or watch them as they're still active contracts.
1953
        for _, htlc := range htlcs.outgoingHTLCs {
35✔
1954
                switch {
7✔
1955
                // If the HTLC is dust, then we can cancel it backwards
1956
                // immediately as there's no matching contract to arbitrate
1957
                // on-chain. We know the HTLC is dust, if the OutputIndex
1958
                // negative.
1959
                case htlc.OutputIndex < 0:
2✔
1960
                        log.Tracef("ChannelArbitrator(%v): immediately "+
2✔
1961
                                "failing dust htlc=%x", c.cfg.ChanPoint,
2✔
1962
                                htlc.RHash[:])
2✔
1963

2✔
1964
                        actionMap[HtlcFailDustAction] = append(
2✔
1965
                                actionMap[HtlcFailDustAction], htlc,
2✔
1966
                        )
2✔
1967

1968
                // If we don't need to immediately act on this HTLC, then we'll
1969
                // mark it still "live". After we broadcast, we'll monitor it
1970
                // until the HTLC times out to see if we can also redeem it
1971
                // on-chain.
1972
                case !c.shouldGoOnChain(htlc, c.cfg.OutgoingBroadcastDelta,
1973
                        height,
1974
                ):
5✔
1975
                        // TODO(roasbeef): also need to be able to query
5✔
1976
                        // circuit map to see if HTLC hasn't been fully
5✔
1977
                        // resolved
5✔
1978
                        //
5✔
1979
                        //  * can't fail incoming until if outgoing not yet
5✔
1980
                        //  failed
5✔
1981

5✔
1982
                        log.Tracef("ChannelArbitrator(%v): watching chain to "+
5✔
1983
                                "decide action for outgoing htlc=%x",
5✔
1984
                                c.cfg.ChanPoint, htlc.RHash[:])
5✔
1985

5✔
1986
                        actionMap[HtlcOutgoingWatchAction] = append(
5✔
1987
                                actionMap[HtlcOutgoingWatchAction], htlc,
5✔
1988
                        )
5✔
1989

1990
                // Otherwise, we'll update our actionMap to mark that we need
1991
                // to sweep this HTLC on-chain
UNCOV
1992
                default:
×
UNCOV
1993
                        log.Tracef("ChannelArbitrator(%v): going on-chain to "+
×
UNCOV
1994
                                "timeout htlc=%x", c.cfg.ChanPoint, htlc.RHash[:])
×
UNCOV
1995

×
UNCOV
1996
                        actionMap[HtlcTimeoutAction] = append(
×
UNCOV
1997
                                actionMap[HtlcTimeoutAction], htlc,
×
UNCOV
1998
                        )
×
1999
                }
2000
        }
2001

2002
        // Similarly, for each incoming HTLC, now that we need to go on-chain,
2003
        // we'll either: sweep it immediately if we know the pre-image, or
2004
        // observe the output on-chain if we don't In this last, case we'll
2005
        // either learn of it eventually from the outgoing HTLC, or the sender
2006
        // will timeout the HTLC.
2007
        for _, htlc := range htlcs.incomingHTLCs {
33✔
2008
                // If the HTLC is dust, there is no action to be taken.
5✔
2009
                if htlc.OutputIndex < 0 {
7✔
2010
                        log.Debugf("ChannelArbitrator(%v): no resolution "+
2✔
2011
                                "needed for incoming dust htlc=%x",
2✔
2012
                                c.cfg.ChanPoint, htlc.RHash[:])
2✔
2013

2✔
2014
                        actionMap[HtlcIncomingDustFinalAction] = append(
2✔
2015
                                actionMap[HtlcIncomingDustFinalAction], htlc,
2✔
2016
                        )
2✔
2017

2✔
2018
                        continue
2✔
2019
                }
2020

2021
                log.Tracef("ChannelArbitrator(%v): watching chain to decide "+
3✔
2022
                        "action for incoming htlc=%x", c.cfg.ChanPoint,
3✔
2023
                        htlc.RHash[:])
3✔
2024

3✔
2025
                actionMap[HtlcIncomingWatchAction] = append(
3✔
2026
                        actionMap[HtlcIncomingWatchAction], htlc,
3✔
2027
                )
3✔
2028
        }
2029

2030
        return actionMap, nil
28✔
2031
}
2032

2033
// isPreimageAvailable returns whether the hash preimage is available in either
2034
// the preimage cache or the invoice database.
2035
func (c *ChannelArbitrator) isPreimageAvailable(hash lntypes.Hash) (bool,
2036
        error) {
17✔
2037

17✔
2038
        // Start by checking the preimage cache for preimages of
17✔
2039
        // forwarded HTLCs.
17✔
2040
        _, preimageAvailable := c.cfg.PreimageDB.LookupPreimage(
17✔
2041
                hash,
17✔
2042
        )
17✔
2043
        if preimageAvailable {
24✔
2044
                return true, nil
7✔
2045
        }
7✔
2046

2047
        // Then check if we have an invoice that can be settled by this HTLC.
2048
        //
2049
        // TODO(joostjager): Check that there are still more blocks remaining
2050
        // than the invoice cltv delta. We don't want to go to chain only to
2051
        // have the incoming contest resolver decide that we don't want to
2052
        // settle this invoice.
2053
        invoice, err := c.cfg.Registry.LookupInvoice(context.Background(), hash)
10✔
2054
        switch {
10✔
UNCOV
2055
        case err == nil:
×
2056
        case errors.Is(err, invoices.ErrInvoiceNotFound) ||
2057
                errors.Is(err, invoices.ErrNoInvoicesCreated):
10✔
2058

10✔
2059
                return false, nil
10✔
2060
        default:
×
2061
                return false, err
×
2062
        }
2063

UNCOV
2064
        preimageAvailable = invoice.Terms.PaymentPreimage != nil
×
UNCOV
2065

×
UNCOV
2066
        return preimageAvailable, nil
×
2067
}
2068

2069
// checkLocalChainActions is similar to checkCommitChainActions, but it also
2070
// examines the set of HTLCs on the remote party's commitment. This allows us
2071
// to ensure we're able to satisfy the HTLC timeout constraints for incoming vs
2072
// outgoing HTLCs.
2073
func (c *ChannelArbitrator) checkLocalChainActions(
2074
        height uint32, trigger transitionTrigger,
2075
        activeHTLCs map[HtlcSetKey]htlcSet,
2076
        commitsConfirmed bool) (ChainActionMap, error) {
60✔
2077

60✔
2078
        // First, we'll check our local chain actions as normal. This will only
60✔
2079
        // examine HTLCs on our local commitment (timeout or settle).
60✔
2080
        localCommitActions, err := c.checkCommitChainActions(
60✔
2081
                height, trigger, activeHTLCs[LocalHtlcSet],
60✔
2082
        )
60✔
2083
        if err != nil {
60✔
2084
                return nil, err
×
2085
        }
×
2086

2087
        // Next, we'll examine the remote commitment (and maybe a dangling one)
2088
        // to see if the set difference of our HTLCs is non-empty. If so, then
2089
        // we may need to cancel back some HTLCs if we decide go to chain.
2090
        remoteDanglingActions := c.checkRemoteDanglingActions(
60✔
2091
                height, activeHTLCs, commitsConfirmed,
60✔
2092
        )
60✔
2093

60✔
2094
        // Finally, we'll merge the two set of chain actions.
60✔
2095
        localCommitActions.Merge(remoteDanglingActions)
60✔
2096

60✔
2097
        return localCommitActions, nil
60✔
2098
}
2099

2100
// checkRemoteDanglingActions examines the set of remote commitments for any
2101
// HTLCs that are close to timing out. If we find any, then we'll return a set
2102
// of chain actions for HTLCs that are on our commitment, but not theirs to
2103
// cancel immediately.
2104
func (c *ChannelArbitrator) checkRemoteDanglingActions(
2105
        height uint32, activeHTLCs map[HtlcSetKey]htlcSet,
2106
        commitsConfirmed bool) ChainActionMap {
60✔
2107

60✔
2108
        var (
60✔
2109
                pendingRemoteHTLCs []channeldb.HTLC
60✔
2110
                localHTLCs         = make(map[uint64]struct{})
60✔
2111
                remoteHTLCs        = make(map[uint64]channeldb.HTLC)
60✔
2112
                actionMap          = make(ChainActionMap)
60✔
2113
        )
60✔
2114

60✔
2115
        // First, we'll construct two sets of the outgoing HTLCs: those on our
60✔
2116
        // local commitment, and those that are on the remote commitment(s).
60✔
2117
        for htlcSetKey, htlcs := range activeHTLCs {
177✔
2118
                if htlcSetKey.IsRemote {
179✔
2119
                        for _, htlc := range htlcs.outgoingHTLCs {
76✔
2120
                                remoteHTLCs[htlc.HtlcIndex] = htlc
14✔
2121
                        }
14✔
2122
                } else {
55✔
2123
                        for _, htlc := range htlcs.outgoingHTLCs {
60✔
2124
                                localHTLCs[htlc.HtlcIndex] = struct{}{}
5✔
2125
                        }
5✔
2126
                }
2127
        }
2128

2129
        // With both sets constructed, we'll now compute the set difference of
2130
        // our two sets of HTLCs. This'll give us the HTLCs that exist on the
2131
        // remote commitment transaction, but not on ours.
2132
        for htlcIndex, htlc := range remoteHTLCs {
74✔
2133
                if _, ok := localHTLCs[htlcIndex]; ok {
15✔
2134
                        continue
1✔
2135
                }
2136

2137
                pendingRemoteHTLCs = append(pendingRemoteHTLCs, htlc)
13✔
2138
        }
2139

2140
        // Finally, we'll examine all the pending remote HTLCs for those that
2141
        // have expired. If we find any, then we'll recommend that they be
2142
        // failed now so we can free up the incoming HTLC.
2143
        for _, htlc := range pendingRemoteHTLCs {
73✔
2144
                // We'll now check if we need to go to chain in order to cancel
13✔
2145
                // the incoming HTLC.
13✔
2146
                goToChain := c.shouldGoOnChain(htlc, c.cfg.OutgoingBroadcastDelta,
13✔
2147
                        height,
13✔
2148
                )
13✔
2149

13✔
2150
                // If we don't need to go to chain, and no commitments have
13✔
2151
                // been confirmed, then we can move on. Otherwise, if
13✔
2152
                // commitments have been confirmed, then we need to cancel back
13✔
2153
                // *all* of the pending remote HTLCS.
13✔
2154
                if !goToChain && !commitsConfirmed {
17✔
2155
                        continue
4✔
2156
                }
2157

2158
                // Dust htlcs can be canceled back even before the commitment
2159
                // transaction confirms. Dust htlcs are not enforceable onchain.
2160
                // If another version of the commit tx would confirm we either
2161
                // gain or lose those dust amounts but there is no other way
2162
                // than cancelling the incoming back because we will never learn
2163
                // the preimage.
2164
                if htlc.OutputIndex < 0 {
9✔
2165
                        log.Infof("ChannelArbitrator(%v): fail dangling dust "+
×
2166
                                "htlc=%x from local/remote commitments diff",
×
2167
                                c.cfg.ChanPoint, htlc.RHash[:])
×
2168

×
2169
                        actionMap[HtlcFailDustAction] = append(
×
2170
                                actionMap[HtlcFailDustAction], htlc,
×
2171
                        )
×
2172

×
2173
                        continue
×
2174
                }
2175

2176
                log.Infof("ChannelArbitrator(%v): fail dangling htlc=%x from "+
9✔
2177
                        "local/remote commitments diff",
9✔
2178
                        c.cfg.ChanPoint, htlc.RHash[:])
9✔
2179

9✔
2180
                actionMap[HtlcFailDanglingAction] = append(
9✔
2181
                        actionMap[HtlcFailDanglingAction], htlc,
9✔
2182
                )
9✔
2183
        }
2184

2185
        return actionMap
60✔
2186
}
2187

2188
// checkRemoteChainActions examines the two possible remote commitment chains
2189
// and returns the set of chain actions we need to carry out if the remote
2190
// commitment (non pending) confirms. The pendingConf indicates if the pending
2191
// remote commitment confirmed. This is similar to checkCommitChainActions, but
2192
// we'll immediately fail any HTLCs on the pending remote commit, but not the
2193
// remote commit (or the other way around).
2194
func (c *ChannelArbitrator) checkRemoteChainActions(
2195
        height uint32, trigger transitionTrigger,
2196
        activeHTLCs map[HtlcSetKey]htlcSet,
2197
        pendingConf bool) (ChainActionMap, error) {
8✔
2198

8✔
2199
        // First, we'll examine all the normal chain actions on the remote
8✔
2200
        // commitment that confirmed.
8✔
2201
        confHTLCs := activeHTLCs[RemoteHtlcSet]
8✔
2202
        if pendingConf {
10✔
2203
                confHTLCs = activeHTLCs[RemotePendingHtlcSet]
2✔
2204
        }
2✔
2205
        remoteCommitActions, err := c.checkCommitChainActions(
8✔
2206
                height, trigger, confHTLCs,
8✔
2207
        )
8✔
2208
        if err != nil {
8✔
2209
                return nil, err
×
2210
        }
×
2211

2212
        // With these actions computed, we'll now check the diff of the HTLCs on
2213
        // the commitments, and cancel back any that are on the pending but not
2214
        // the non-pending.
2215
        remoteDiffActions := c.checkRemoteDiffActions(
8✔
2216
                activeHTLCs, pendingConf,
8✔
2217
        )
8✔
2218

8✔
2219
        // Finally, we'll merge all the chain actions and the final set of
8✔
2220
        // chain actions.
8✔
2221
        remoteCommitActions.Merge(remoteDiffActions)
8✔
2222
        return remoteCommitActions, nil
8✔
2223
}
2224

2225
// checkRemoteDiffActions checks the set difference of the HTLCs on the remote
2226
// confirmed commit and remote pending commit for HTLCS that we need to cancel
2227
// back. If we find any HTLCs on the remote pending but not the remote, then
2228
// we'll mark them to be failed immediately.
2229
func (c *ChannelArbitrator) checkRemoteDiffActions(
2230
        activeHTLCs map[HtlcSetKey]htlcSet,
2231
        pendingConf bool) ChainActionMap {
8✔
2232

8✔
2233
        // First, we'll partition the HTLCs into those that are present on the
8✔
2234
        // confirmed commitment, and those on the dangling commitment.
8✔
2235
        confHTLCs := activeHTLCs[RemoteHtlcSet]
8✔
2236
        danglingHTLCs := activeHTLCs[RemotePendingHtlcSet]
8✔
2237
        if pendingConf {
10✔
2238
                confHTLCs = activeHTLCs[RemotePendingHtlcSet]
2✔
2239
                danglingHTLCs = activeHTLCs[RemoteHtlcSet]
2✔
2240
        }
2✔
2241

2242
        // Next, we'll create a set of all the HTLCs confirmed commitment.
2243
        remoteHtlcs := make(map[uint64]struct{})
8✔
2244
        for _, htlc := range confHTLCs.outgoingHTLCs {
10✔
2245
                remoteHtlcs[htlc.HtlcIndex] = struct{}{}
2✔
2246
        }
2✔
2247

2248
        // With the remote HTLCs assembled, we'll mark any HTLCs only on the
2249
        // remote pending commitment to be failed asap.
2250
        actionMap := make(ChainActionMap)
8✔
2251
        for _, htlc := range danglingHTLCs.outgoingHTLCs {
12✔
2252
                if _, ok := remoteHtlcs[htlc.HtlcIndex]; ok {
4✔
UNCOV
2253
                        continue
×
2254
                }
2255

2256
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
4✔
2257
                if err != nil {
4✔
2258
                        log.Errorf("ChannelArbitrator(%v): failed to query "+
×
2259
                                "preimage for dangling htlc=%x from remote "+
×
2260
                                "commitments diff", c.cfg.ChanPoint,
×
2261
                                htlc.RHash[:])
×
2262

×
2263
                        continue
×
2264
                }
2265

2266
                if preimageAvailable {
4✔
2267
                        continue
×
2268
                }
2269

2270
                // Dust HTLCs on the remote commitment can be failed back.
2271
                if htlc.OutputIndex < 0 {
4✔
2272
                        log.Infof("ChannelArbitrator(%v): fail dangling dust "+
×
2273
                                "htlc=%x from remote commitments diff",
×
2274
                                c.cfg.ChanPoint, htlc.RHash[:])
×
2275

×
2276
                        actionMap[HtlcFailDustAction] = append(
×
2277
                                actionMap[HtlcFailDustAction], htlc,
×
2278
                        )
×
2279

×
2280
                        continue
×
2281
                }
2282

2283
                actionMap[HtlcFailDanglingAction] = append(
4✔
2284
                        actionMap[HtlcFailDanglingAction], htlc,
4✔
2285
                )
4✔
2286

4✔
2287
                log.Infof("ChannelArbitrator(%v): fail dangling htlc=%x from "+
4✔
2288
                        "remote commitments diff",
4✔
2289
                        c.cfg.ChanPoint, htlc.RHash[:])
4✔
2290
        }
2291

2292
        return actionMap
8✔
2293
}
2294

2295
// constructChainActions returns the set of actions that should be taken for
2296
// confirmed HTLCs at the specified height. Our actions will depend on the set
2297
// of HTLCs that were active across all channels at the time of channel
2298
// closure.
2299
func (c *ChannelArbitrator) constructChainActions(confCommitSet *CommitSet,
2300
        height uint32, trigger transitionTrigger) (ChainActionMap, error) {
21✔
2301

21✔
2302
        // If we've reached this point and have not confirmed commitment set,
21✔
2303
        // then this is an older node that had a pending close channel before
21✔
2304
        // the CommitSet was introduced. In this case, we'll just return the
21✔
2305
        // existing ChainActionMap they had on disk.
21✔
2306
        if confCommitSet == nil || confCommitSet.ConfCommitKey.IsNone() {
28✔
2307
                return c.log.FetchChainActions()
7✔
2308
        }
7✔
2309

2310
        // Otherwise, we have the full commitment set written to disk, and can
2311
        // proceed as normal.
2312
        htlcSets := confCommitSet.toActiveHTLCSets()
14✔
2313
        confCommitKey, err := confCommitSet.ConfCommitKey.UnwrapOrErr(
14✔
2314
                fmt.Errorf("no commitKey available"),
14✔
2315
        )
14✔
2316
        if err != nil {
14✔
2317
                return nil, err
×
2318
        }
×
2319

2320
        switch confCommitKey {
14✔
2321
        // If the local commitment transaction confirmed, then we'll examine
2322
        // that as well as their commitments to the set of chain actions.
2323
        case LocalHtlcSet:
6✔
2324
                return c.checkLocalChainActions(
6✔
2325
                        height, trigger, htlcSets, true,
6✔
2326
                )
6✔
2327

2328
        // If the remote commitment confirmed, then we'll grab all the chain
2329
        // actions for the remote commit, and check the pending commit for any
2330
        // HTLCS we need to handle immediately (dust).
2331
        case RemoteHtlcSet:
6✔
2332
                return c.checkRemoteChainActions(
6✔
2333
                        height, trigger, htlcSets, false,
6✔
2334
                )
6✔
2335

2336
        // Otherwise, the remote pending commitment confirmed, so we'll examine
2337
        // the HTLCs on that unrevoked dangling commitment.
2338
        case RemotePendingHtlcSet:
2✔
2339
                return c.checkRemoteChainActions(
2✔
2340
                        height, trigger, htlcSets, true,
2✔
2341
                )
2✔
2342
        }
2343

2344
        return nil, fmt.Errorf("unable to locate chain actions")
×
2345
}
2346

2347
// prepContractResolutions is called either in the case that we decide we need
2348
// to go to chain, or the remote party goes to chain. Given a set of actions we
2349
// need to take for each HTLC, this method will return a set of contract
2350
// resolvers that will resolve the contracts on-chain if needed, and also a set
2351
// of packets to send to the htlcswitch in order to ensure all incoming HTLC's
2352
// are properly resolved.
2353
func (c *ChannelArbitrator) prepContractResolutions(
2354
        contractResolutions *ContractResolutions, height uint32,
2355
        htlcActions ChainActionMap) ([]ContractResolver, error) {
12✔
2356

12✔
2357
        // We'll also fetch the historical state of this channel, as it should
12✔
2358
        // have been marked as closed by now, and supplement it to each resolver
12✔
2359
        // such that we can properly resolve our pending contracts.
12✔
2360
        var chanState *channeldb.OpenChannel
12✔
2361
        chanState, err := c.cfg.FetchHistoricalChannel()
12✔
2362
        switch {
12✔
2363
        // If we don't find this channel, then it may be the case that it
2364
        // was closed before we started to retain the final state
2365
        // information for open channels.
2366
        case err == channeldb.ErrNoHistoricalBucket:
×
2367
                fallthrough
×
2368
        case err == channeldb.ErrChannelNotFound:
×
2369
                log.Warnf("ChannelArbitrator(%v): unable to fetch historical "+
×
2370
                        "state", c.cfg.ChanPoint)
×
2371

2372
        case err != nil:
×
2373
                return nil, err
×
2374
        }
2375

2376
        incomingResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
12✔
2377
        outgoingResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
12✔
2378

12✔
2379
        // We'll use these two maps to quickly look up an active HTLC with its
12✔
2380
        // matching HTLC resolution.
12✔
2381
        outResolutionMap := make(map[wire.OutPoint]lnwallet.OutgoingHtlcResolution)
12✔
2382
        inResolutionMap := make(map[wire.OutPoint]lnwallet.IncomingHtlcResolution)
12✔
2383
        for i := 0; i < len(incomingResolutions); i++ {
12✔
UNCOV
2384
                inRes := incomingResolutions[i]
×
UNCOV
2385
                inResolutionMap[inRes.HtlcPoint()] = inRes
×
UNCOV
2386
        }
×
2387
        for i := 0; i < len(outgoingResolutions); i++ {
13✔
2388
                outRes := outgoingResolutions[i]
1✔
2389
                outResolutionMap[outRes.HtlcPoint()] = outRes
1✔
2390
        }
1✔
2391

2392
        // We'll create the resolver kit that we'll be cloning for each
2393
        // resolver so they each can do their duty.
2394
        resolverCfg := ResolverConfig{
12✔
2395
                ChannelArbitratorConfig: c.cfg,
12✔
2396
                Checkpoint: func(res ContractResolver,
12✔
2397
                        reports ...*channeldb.ResolverReport) error {
14✔
2398

2✔
2399
                        return c.log.InsertUnresolvedContracts(reports, res)
2✔
2400
                },
2✔
2401
        }
2402

2403
        commitHash := contractResolutions.CommitHash
12✔
2404

12✔
2405
        var htlcResolvers []ContractResolver
12✔
2406

12✔
2407
        // We instantiate an anchor resolver if the commitment tx has an
12✔
2408
        // anchor.
12✔
2409
        if contractResolutions.AnchorResolution != nil {
14✔
2410
                anchorResolver := newAnchorResolver(
2✔
2411
                        contractResolutions.AnchorResolution.AnchorSignDescriptor,
2✔
2412
                        contractResolutions.AnchorResolution.CommitAnchor,
2✔
2413
                        height, c.cfg.ChanPoint, resolverCfg,
2✔
2414
                )
2✔
2415
                anchorResolver.SupplementState(chanState)
2✔
2416

2✔
2417
                htlcResolvers = append(htlcResolvers, anchorResolver)
2✔
2418
        }
2✔
2419

2420
        // If this is a breach close, we'll create a breach resolver, determine
2421
        // the htlc's to fail back, and exit. This is done because the other
2422
        // steps taken for non-breach-closes do not matter for breach-closes.
2423
        if contractResolutions.BreachResolution != nil {
14✔
2424
                breachResolver := newBreachResolver(resolverCfg)
2✔
2425
                htlcResolvers = append(htlcResolvers, breachResolver)
2✔
2426

2✔
2427
                return htlcResolvers, nil
2✔
2428
        }
2✔
2429

2430
        // For each HTLC, we'll either act immediately, meaning we'll instantly
2431
        // fail the HTLC, or we'll act only once the transaction has been
2432
        // confirmed, in which case we'll need an HTLC resolver.
2433
        for htlcAction, htlcs := range htlcActions {
21✔
2434
                switch htlcAction {
11✔
2435
                // If we can claim this HTLC, we'll create an HTLC resolver to
2436
                // claim the HTLC (second-level or directly), then add the pre
2437
                case HtlcClaimAction:
×
2438
                        for _, htlc := range htlcs {
×
2439
                                htlc := htlc
×
2440

×
2441
                                htlcOp := wire.OutPoint{
×
2442
                                        Hash:  commitHash,
×
2443
                                        Index: uint32(htlc.OutputIndex),
×
2444
                                }
×
2445

×
2446
                                resolution, ok := inResolutionMap[htlcOp]
×
2447
                                if !ok {
×
2448
                                        // TODO(roasbeef): panic?
×
2449
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2450
                                                "incoming resolution: %v",
×
2451
                                                c.cfg.ChanPoint, htlcOp)
×
2452
                                        continue
×
2453
                                }
2454

2455
                                resolver := newSuccessResolver(
×
2456
                                        resolution, height, htlc, resolverCfg,
×
2457
                                )
×
2458
                                if chanState != nil {
×
2459
                                        resolver.SupplementState(chanState)
×
2460
                                }
×
2461
                                htlcResolvers = append(htlcResolvers, resolver)
×
2462
                        }
2463

2464
                // If we can timeout the HTLC directly, then we'll create the
2465
                // proper resolver to do so, who will then cancel the packet
2466
                // backwards.
UNCOV
2467
                case HtlcTimeoutAction:
×
UNCOV
2468
                        for _, htlc := range htlcs {
×
UNCOV
2469
                                htlc := htlc
×
UNCOV
2470

×
UNCOV
2471
                                htlcOp := wire.OutPoint{
×
UNCOV
2472
                                        Hash:  commitHash,
×
UNCOV
2473
                                        Index: uint32(htlc.OutputIndex),
×
UNCOV
2474
                                }
×
UNCOV
2475

×
UNCOV
2476
                                resolution, ok := outResolutionMap[htlcOp]
×
UNCOV
2477
                                if !ok {
×
2478
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2479
                                                "outgoing resolution: %v", c.cfg.ChanPoint, htlcOp)
×
2480
                                        continue
×
2481
                                }
2482

UNCOV
2483
                                resolver := newTimeoutResolver(
×
UNCOV
2484
                                        resolution, height, htlc, resolverCfg,
×
UNCOV
2485
                                )
×
UNCOV
2486
                                if chanState != nil {
×
UNCOV
2487
                                        resolver.SupplementState(chanState)
×
UNCOV
2488
                                }
×
2489

2490
                                // For outgoing HTLCs, we will also need to
2491
                                // supplement the resolver with the expiry
2492
                                // block height of its corresponding incoming
2493
                                // HTLC.
UNCOV
2494
                                deadline := c.cfg.FindOutgoingHTLCDeadline(htlc)
×
UNCOV
2495
                                resolver.SupplementDeadline(deadline)
×
UNCOV
2496

×
UNCOV
2497
                                htlcResolvers = append(htlcResolvers, resolver)
×
2498
                        }
2499

2500
                // If this is an incoming HTLC, but we can't act yet, then
2501
                // we'll create an incoming resolver to redeem the HTLC if we
2502
                // learn of the pre-image, or let the remote party time out.
UNCOV
2503
                case HtlcIncomingWatchAction:
×
UNCOV
2504
                        for _, htlc := range htlcs {
×
UNCOV
2505
                                htlc := htlc
×
UNCOV
2506

×
UNCOV
2507
                                htlcOp := wire.OutPoint{
×
UNCOV
2508
                                        Hash:  commitHash,
×
UNCOV
2509
                                        Index: uint32(htlc.OutputIndex),
×
UNCOV
2510
                                }
×
UNCOV
2511

×
UNCOV
2512
                                // TODO(roasbeef): need to handle incoming dust...
×
UNCOV
2513

×
UNCOV
2514
                                // TODO(roasbeef): can't be negative!!!
×
UNCOV
2515
                                resolution, ok := inResolutionMap[htlcOp]
×
UNCOV
2516
                                if !ok {
×
2517
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2518
                                                "incoming resolution: %v",
×
2519
                                                c.cfg.ChanPoint, htlcOp)
×
2520
                                        continue
×
2521
                                }
2522

UNCOV
2523
                                resolver := newIncomingContestResolver(
×
UNCOV
2524
                                        resolution, height, htlc,
×
UNCOV
2525
                                        resolverCfg,
×
UNCOV
2526
                                )
×
UNCOV
2527
                                if chanState != nil {
×
UNCOV
2528
                                        resolver.SupplementState(chanState)
×
UNCOV
2529
                                }
×
UNCOV
2530
                                htlcResolvers = append(htlcResolvers, resolver)
×
2531
                        }
2532

2533
                // Finally, if this is an outgoing HTLC we've sent, then we'll
2534
                // launch a resolver to watch for the pre-image (and settle
2535
                // backwards), or just timeout.
2536
                case HtlcOutgoingWatchAction:
1✔
2537
                        for _, htlc := range htlcs {
2✔
2538
                                htlc := htlc
1✔
2539

1✔
2540
                                htlcOp := wire.OutPoint{
1✔
2541
                                        Hash:  commitHash,
1✔
2542
                                        Index: uint32(htlc.OutputIndex),
1✔
2543
                                }
1✔
2544

1✔
2545
                                resolution, ok := outResolutionMap[htlcOp]
1✔
2546
                                if !ok {
1✔
2547
                                        log.Errorf("ChannelArbitrator(%v) "+
×
2548
                                                "unable to find outgoing "+
×
2549
                                                "resolution: %v",
×
2550
                                                c.cfg.ChanPoint, htlcOp)
×
2551

×
2552
                                        continue
×
2553
                                }
2554

2555
                                resolver := newOutgoingContestResolver(
1✔
2556
                                        resolution, height, htlc, resolverCfg,
1✔
2557
                                )
1✔
2558
                                if chanState != nil {
2✔
2559
                                        resolver.SupplementState(chanState)
1✔
2560
                                }
1✔
2561

2562
                                // For outgoing HTLCs, we will also need to
2563
                                // supplement the resolver with the expiry
2564
                                // block height of its corresponding incoming
2565
                                // HTLC.
2566
                                deadline := c.cfg.FindOutgoingHTLCDeadline(htlc)
1✔
2567
                                resolver.SupplementDeadline(deadline)
1✔
2568

1✔
2569
                                htlcResolvers = append(htlcResolvers, resolver)
1✔
2570
                        }
2571
                }
2572
        }
2573

2574
        // If this is was an unilateral closure, then we'll also create a
2575
        // resolver to sweep our commitment output (but only if it wasn't
2576
        // trimmed).
2577
        if contractResolutions.CommitResolution != nil {
10✔
UNCOV
2578
                resolver := newCommitSweepResolver(
×
UNCOV
2579
                        *contractResolutions.CommitResolution, height,
×
UNCOV
2580
                        c.cfg.ChanPoint, resolverCfg,
×
UNCOV
2581
                )
×
UNCOV
2582
                if chanState != nil {
×
UNCOV
2583
                        resolver.SupplementState(chanState)
×
UNCOV
2584
                }
×
UNCOV
2585
                htlcResolvers = append(htlcResolvers, resolver)
×
2586
        }
2587

2588
        return htlcResolvers, nil
10✔
2589
}
2590

2591
// replaceResolver replaces a in the list of active resolvers. If the resolver
2592
// to be replaced is not found, it returns an error.
2593
func (c *ChannelArbitrator) replaceResolver(oldResolver,
2594
        newResolver ContractResolver) error {
1✔
2595

1✔
2596
        c.activeResolversLock.Lock()
1✔
2597
        defer c.activeResolversLock.Unlock()
1✔
2598

1✔
2599
        oldKey := oldResolver.ResolverKey()
1✔
2600
        for i, r := range c.activeResolvers {
2✔
2601
                if bytes.Equal(r.ResolverKey(), oldKey) {
2✔
2602
                        c.activeResolvers[i] = newResolver
1✔
2603
                        return nil
1✔
2604
                }
1✔
2605
        }
2606

2607
        return errors.New("resolver to be replaced not found")
×
2608
}
2609

2610
// resolveContract is a goroutine tasked with fully resolving an unresolved
2611
// contract. Either the initial contract will be resolved after a single step,
2612
// or the contract will itself create another contract to be resolved. In
2613
// either case, one the contract has been fully resolved, we'll signal back to
2614
// the main goroutine so it can properly keep track of the set of unresolved
2615
// contracts.
2616
//
2617
// NOTE: This MUST be run as a goroutine.
2618
func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver) {
6✔
2619
        defer c.wg.Done()
6✔
2620

6✔
2621
        log.Tracef("ChannelArbitrator(%v): attempting to resolve %T",
6✔
2622
                c.cfg.ChanPoint, currentContract)
6✔
2623

6✔
2624
        // Until the contract is fully resolved, we'll continue to iteratively
6✔
2625
        // resolve the contract one step at a time.
6✔
2626
        for !currentContract.IsResolved() {
13✔
2627
                log.Tracef("ChannelArbitrator(%v): contract %T not yet "+
7✔
2628
                        "resolved", c.cfg.ChanPoint, currentContract)
7✔
2629

7✔
2630
                select {
7✔
2631

2632
                // If we've been signalled to quit, then we'll exit early.
2633
                case <-c.quit:
×
2634
                        return
×
2635

2636
                default:
7✔
2637
                        // Otherwise, we'll attempt to resolve the current
7✔
2638
                        // contract.
7✔
2639
                        nextContract, err := currentContract.Resolve()
7✔
2640
                        if err != nil {
8✔
2641
                                if err == errResolverShuttingDown {
2✔
2642
                                        return
1✔
2643
                                }
1✔
2644

UNCOV
2645
                                log.Errorf("ChannelArbitrator(%v): unable to "+
×
UNCOV
2646
                                        "progress %T: %v",
×
UNCOV
2647
                                        c.cfg.ChanPoint, currentContract, err)
×
UNCOV
2648
                                return
×
2649
                        }
2650

2651
                        switch {
6✔
2652
                        // If this contract produced another, then this means
2653
                        // the current contract was only able to be partially
2654
                        // resolved in this step. So we'll do a contract swap
2655
                        // within our logs: the new contract will take the
2656
                        // place of the old one.
2657
                        case nextContract != nil:
1✔
2658
                                log.Debugf("ChannelArbitrator(%v): swapping "+
1✔
2659
                                        "out contract %T for %T ",
1✔
2660
                                        c.cfg.ChanPoint, currentContract,
1✔
2661
                                        nextContract)
1✔
2662

1✔
2663
                                // Swap contract in log.
1✔
2664
                                err := c.log.SwapContract(
1✔
2665
                                        currentContract, nextContract,
1✔
2666
                                )
1✔
2667
                                if err != nil {
1✔
2668
                                        log.Errorf("unable to add recurse "+
×
2669
                                                "contract: %v", err)
×
2670
                                }
×
2671

2672
                                // Swap contract in resolvers list. This is to
2673
                                // make sure that reports are queried from the
2674
                                // new resolver.
2675
                                err = c.replaceResolver(
1✔
2676
                                        currentContract, nextContract,
1✔
2677
                                )
1✔
2678
                                if err != nil {
1✔
2679
                                        log.Errorf("unable to replace "+
×
2680
                                                "contract: %v", err)
×
2681
                                }
×
2682

2683
                                // As this contract produced another, we'll
2684
                                // re-assign, so we can continue our resolution
2685
                                // loop.
2686
                                currentContract = nextContract
1✔
2687

1✔
2688
                                // Launch the new contract.
1✔
2689
                                err = currentContract.Launch()
1✔
2690
                                if err != nil {
1✔
2691
                                        log.Errorf("Failed to launch %T: %v",
×
2692
                                                currentContract, err)
×
2693
                                }
×
2694

2695
                        // If this contract is actually fully resolved, then
2696
                        // we'll mark it as such within the database.
2697
                        case currentContract.IsResolved():
5✔
2698
                                log.Debugf("ChannelArbitrator(%v): marking "+
5✔
2699
                                        "contract %T fully resolved",
5✔
2700
                                        c.cfg.ChanPoint, currentContract)
5✔
2701

5✔
2702
                                err := c.log.ResolveContract(currentContract)
5✔
2703
                                if err != nil {
5✔
2704
                                        log.Errorf("unable to resolve contract: %v",
×
2705
                                                err)
×
2706
                                }
×
2707

2708
                                // Now that the contract has been resolved,
2709
                                // well signal to the main goroutine.
2710
                                select {
5✔
2711
                                case c.resolutionSignal <- struct{}{}:
4✔
2712
                                case <-c.quit:
1✔
2713
                                        return
1✔
2714
                                }
2715
                        }
2716

2717
                }
2718
        }
2719
}
2720

2721
// signalUpdateMsg is a struct that carries fresh signals to the
2722
// ChannelArbitrator. We need to receive a message like this each time the
2723
// channel becomes active, as it's internal state may change.
2724
type signalUpdateMsg struct {
2725
        // newSignals is the set of new active signals to be sent to the
2726
        // arbitrator.
2727
        newSignals *ContractSignals
2728

2729
        // doneChan is a channel that will be closed on the arbitrator has
2730
        // attached the new signals.
2731
        doneChan chan struct{}
2732
}
2733

2734
// UpdateContractSignals updates the set of signals the ChannelArbitrator needs
2735
// to receive from a channel in real-time in order to keep in sync with the
2736
// latest state of the contract.
2737
func (c *ChannelArbitrator) UpdateContractSignals(newSignals *ContractSignals) {
11✔
2738
        done := make(chan struct{})
11✔
2739

11✔
2740
        select {
11✔
2741
        case c.signalUpdates <- &signalUpdateMsg{
2742
                newSignals: newSignals,
2743
                doneChan:   done,
2744
        }:
11✔
2745
        case <-c.quit:
×
2746
        }
2747

2748
        select {
11✔
2749
        case <-done:
11✔
2750
        case <-c.quit:
×
2751
        }
2752
}
2753

2754
// notifyContractUpdate updates the ChannelArbitrator's unmerged mappings such
2755
// that it can later be merged with activeHTLCs when calling
2756
// checkLocalChainActions or sweepAnchors. These are the only two places that
2757
// activeHTLCs is used.
2758
func (c *ChannelArbitrator) notifyContractUpdate(upd *ContractUpdate) {
12✔
2759
        c.unmergedMtx.Lock()
12✔
2760
        defer c.unmergedMtx.Unlock()
12✔
2761

12✔
2762
        // Update the mapping.
12✔
2763
        c.unmergedSet[upd.HtlcKey] = newHtlcSet(upd.Htlcs)
12✔
2764

12✔
2765
        log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v",
12✔
2766
                c.cfg.ChanPoint, lnutils.SpewLogClosure(upd))
12✔
2767
}
12✔
2768

2769
// updateActiveHTLCs merges the unmerged set of HTLCs from the link with
2770
// activeHTLCs.
2771
func (c *ChannelArbitrator) updateActiveHTLCs() {
73✔
2772
        c.unmergedMtx.RLock()
73✔
2773
        defer c.unmergedMtx.RUnlock()
73✔
2774

73✔
2775
        // Update the mapping.
73✔
2776
        c.activeHTLCs[LocalHtlcSet] = c.unmergedSet[LocalHtlcSet]
73✔
2777
        c.activeHTLCs[RemoteHtlcSet] = c.unmergedSet[RemoteHtlcSet]
73✔
2778

73✔
2779
        // If the pending set exists, update that as well.
73✔
2780
        if _, ok := c.unmergedSet[RemotePendingHtlcSet]; ok {
82✔
2781
                pendingSet := c.unmergedSet[RemotePendingHtlcSet]
9✔
2782
                c.activeHTLCs[RemotePendingHtlcSet] = pendingSet
9✔
2783
        }
9✔
2784
}
2785

2786
// channelAttendant is the primary goroutine that acts at the judicial
2787
// arbitrator between our channel state, the remote channel peer, and the
2788
// blockchain (Our judge). This goroutine will ensure that we faithfully execute
2789
// all clauses of our contract in the case that we need to go on-chain for a
2790
// dispute. Currently, two such conditions warrant our intervention: when an
2791
// outgoing HTLC is about to timeout, and when we know the pre-image for an
2792
// incoming HTLC, but it hasn't yet been settled off-chain. In these cases,
2793
// we'll: broadcast our commitment, cancel/settle any HTLC's backwards after
2794
// sufficient confirmation, and finally send our set of outputs to the UTXO
2795
// Nursery for incubation, and ultimate sweeping.
2796
//
2797
// NOTE: This MUST be run as a goroutine.
2798
func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
2799
        commitSet *CommitSet) {
48✔
2800

48✔
2801
        // TODO(roasbeef): tell top chain arb we're done
48✔
2802
        defer func() {
93✔
2803
                c.wg.Done()
45✔
2804
        }()
45✔
2805

2806
        err := c.progressStateMachineAfterRestart(bestHeight, commitSet)
48✔
2807
        if err != nil {
49✔
2808
                // In case of an error, we return early but we do not shutdown
1✔
2809
                // LND, because there might be other channels that still can be
1✔
2810
                // resolved and we don't want to interfere with that.
1✔
2811
                // We continue to run the channel attendant in case the channel
1✔
2812
                // closes via other means for example the remote pary force
1✔
2813
                // closes the channel. So we log the error and continue.
1✔
2814
                log.Errorf("Unable to progress state machine after "+
1✔
2815
                        "restart: %v", err)
1✔
2816
        }
1✔
2817

2818
        for {
146✔
2819
                select {
98✔
2820

2821
                // A new block has arrived, we'll examine all the active HTLC's
2822
                // to see if any of them have expired, and also update our
2823
                // track of the best current height.
2824
                case beat := <-c.BlockbeatChan:
6✔
2825
                        bestHeight = beat.Height()
6✔
2826

6✔
2827
                        log.Debugf("ChannelArbitrator(%v): new block height=%v",
6✔
2828
                                c.cfg.ChanPoint, bestHeight)
6✔
2829

6✔
2830
                        err := c.handleBlockbeat(beat)
6✔
2831
                        if err != nil {
6✔
2832
                                log.Errorf("Handle block=%v got err: %v",
×
2833
                                        bestHeight, err)
×
2834
                        }
×
2835

2836
                        // If as a result of this trigger, the contract is
2837
                        // fully resolved, then well exit.
2838
                        if c.state == StateFullyResolved {
6✔
2839
                                return
×
2840
                        }
×
2841

2842
                // A new signal update was just sent. This indicates that the
2843
                // channel under watch is now live, and may modify its internal
2844
                // state, so we'll get the most up to date signals to we can
2845
                // properly do our job.
2846
                case signalUpdate := <-c.signalUpdates:
11✔
2847
                        log.Tracef("ChannelArbitrator(%v): got new signal "+
11✔
2848
                                "update!", c.cfg.ChanPoint)
11✔
2849

11✔
2850
                        // We'll update the ShortChannelID.
11✔
2851
                        c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID
11✔
2852

11✔
2853
                        // Now that the signal has been updated, we'll now
11✔
2854
                        // close the done channel to signal to the caller we've
11✔
2855
                        // registered the new ShortChannelID.
11✔
2856
                        close(signalUpdate.doneChan)
11✔
2857

2858
                // We've cooperatively closed the channel, so we're no longer
2859
                // needed. We'll mark the channel as resolved and exit.
2860
                case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
2✔
2861
                        err := c.handleCoopCloseEvent(closeInfo)
2✔
2862
                        if err != nil {
2✔
2863
                                log.Errorf("Failed to handle coop close: %v",
×
2864
                                        err)
×
2865

×
2866
                                return
×
2867
                        }
×
2868

2869
                // We have broadcasted our commitment, and it is now confirmed
2870
                // on-chain.
2871
                case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
12✔
2872
                        if c.state != StateCommitmentBroadcasted {
13✔
2873
                                log.Errorf("ChannelArbitrator(%v): unexpected "+
1✔
2874
                                        "local on-chain channel close",
1✔
2875
                                        c.cfg.ChanPoint)
1✔
2876
                        }
1✔
2877

2878
                        err := c.handleLocalForceCloseEvent(closeInfo)
12✔
2879
                        if err != nil {
12✔
2880
                                log.Errorf("Failed to handle local force "+
×
2881
                                        "close: %v", err)
×
2882

×
2883
                                return
×
2884
                        }
×
2885

2886
                // The remote party has broadcast the commitment on-chain.
2887
                // We'll examine our state to determine if we need to act at
2888
                // all.
2889
                case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
8✔
2890
                        err := c.handleRemoteForceCloseEvent(uniClosure)
8✔
2891
                        if err != nil {
10✔
2892
                                log.Errorf("Failed to handle remote force "+
2✔
2893
                                        "close: %v", err)
2✔
2894

2✔
2895
                                return
2✔
2896
                        }
2✔
2897

2898
                // The remote has breached the channel. As this is handled by
2899
                // the ChainWatcher and BreachArbitrator, we don't have to do
2900
                // anything in particular, so just advance our state and
2901
                // gracefully exit.
2902
                case breachInfo := <-c.cfg.ChainEvents.ContractBreach:
1✔
2903
                        err := c.handleContractBreach(breachInfo)
1✔
2904
                        if err != nil {
1✔
2905
                                log.Errorf("Failed to handle contract breach: "+
×
2906
                                        "%v", err)
×
2907

×
2908
                                return
×
2909
                        }
×
2910

2911
                // A new contract has just been resolved, we'll now check our
2912
                // log to see if all contracts have been resolved. If so, then
2913
                // we can exit as the contract is fully resolved.
2914
                case <-c.resolutionSignal:
4✔
2915
                        log.Infof("ChannelArbitrator(%v): a contract has been "+
4✔
2916
                                "fully resolved!", c.cfg.ChanPoint)
4✔
2917

4✔
2918
                        nextState, _, err := c.advanceState(
4✔
2919
                                uint32(bestHeight), chainTrigger, nil,
4✔
2920
                        )
4✔
2921
                        if err != nil {
4✔
2922
                                log.Errorf("Unable to advance state: %v", err)
×
2923
                        }
×
2924

2925
                        // If we don't have anything further to do after
2926
                        // advancing our state, then we'll exit.
2927
                        if nextState == StateFullyResolved {
7✔
2928
                                log.Infof("ChannelArbitrator(%v): all "+
3✔
2929
                                        "contracts fully resolved, exiting",
3✔
2930
                                        c.cfg.ChanPoint)
3✔
2931

3✔
2932
                                return
3✔
2933
                        }
3✔
2934

2935
                // We've just received a request to forcibly close out the
2936
                // channel. We'll
2937
                case closeReq := <-c.forceCloseReqs:
11✔
2938
                        log.Infof("ChannelArbitrator(%v): received force "+
11✔
2939
                                "close request", c.cfg.ChanPoint)
11✔
2940

11✔
2941
                        if c.state != StateDefault {
12✔
2942
                                select {
1✔
2943
                                case closeReq.closeTx <- nil:
1✔
2944
                                case <-c.quit:
×
2945
                                }
2946

2947
                                select {
1✔
2948
                                case closeReq.errResp <- errAlreadyForceClosed:
1✔
2949
                                case <-c.quit:
×
2950
                                }
2951

2952
                                continue
1✔
2953
                        }
2954

2955
                        nextState, closeTx, err := c.advanceState(
10✔
2956
                                uint32(bestHeight), userTrigger, nil,
10✔
2957
                        )
10✔
2958
                        if err != nil {
11✔
2959
                                log.Errorf("Unable to advance state: %v", err)
1✔
2960
                        }
1✔
2961

2962
                        select {
10✔
2963
                        case closeReq.closeTx <- closeTx:
10✔
2964
                        case <-c.quit:
×
2965
                                return
×
2966
                        }
2967

2968
                        select {
10✔
2969
                        case closeReq.errResp <- err:
10✔
2970
                        case <-c.quit:
×
2971
                                return
×
2972
                        }
2973

2974
                        // If we don't have anything further to do after
2975
                        // advancing our state, then we'll exit.
2976
                        if nextState == StateFullyResolved {
10✔
2977
                                log.Infof("ChannelArbitrator(%v): all "+
×
2978
                                        "contracts resolved, exiting",
×
2979
                                        c.cfg.ChanPoint)
×
2980
                                return
×
2981
                        }
×
2982

2983
                case <-c.quit:
40✔
2984
                        return
40✔
2985
                }
2986
        }
2987
}
2988

2989
// handleBlockbeat processes a newly received blockbeat by advancing the
2990
// arbitrator's internal state using the received block height.
2991
func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
6✔
2992
        // Notify we've processed the block.
6✔
2993
        defer c.NotifyBlockProcessed(beat, nil)
6✔
2994

6✔
2995
        // If the state is StateContractClosed, StateWaitingFullResolution, or
6✔
2996
        // StateFullyResolved, there's no need to read the close event channel
6✔
2997
        // since the arbitrator can only get to this state after processing a
6✔
2998
        // previous close event and launched all its resolvers.
6✔
2999
        if c.state.IsContractClosed() {
6✔
UNCOV
3000
                log.Infof("ChannelArbitrator(%v): skipping reading close "+
×
UNCOV
3001
                        "events in state=%v", c.cfg.ChanPoint, c.state)
×
UNCOV
3002

×
UNCOV
3003
                // Launch all active resolvers when a new blockbeat is
×
UNCOV
3004
                // received, even when the contract is closed, we still need
×
UNCOV
3005
                // this as the resolvers may transform into new ones. For
×
UNCOV
3006
                // already launched resolvers this will be NOOP as they track
×
UNCOV
3007
                // their own `launched` states.
×
UNCOV
3008
                c.launchResolvers()
×
UNCOV
3009

×
UNCOV
3010
                return nil
×
UNCOV
3011
        }
×
3012

3013
        // Perform a non-blocking read on the close events in case the channel
3014
        // is closed in this blockbeat.
3015
        c.receiveAndProcessCloseEvent()
6✔
3016

6✔
3017
        // Try to advance the state if we are in StateDefault.
6✔
3018
        if c.state == StateDefault {
11✔
3019
                // Now that a new block has arrived, we'll attempt to advance
5✔
3020
                // our state forward.
5✔
3021
                _, _, err := c.advanceState(
5✔
3022
                        uint32(beat.Height()), chainTrigger, nil,
5✔
3023
                )
5✔
3024
                if err != nil {
5✔
3025
                        return fmt.Errorf("unable to advance state: %w", err)
×
3026
                }
×
3027
        }
3028

3029
        // Launch all active resolvers when a new blockbeat is received.
3030
        c.launchResolvers()
6✔
3031

6✔
3032
        return nil
6✔
3033
}
3034

3035
// receiveAndProcessCloseEvent does a non-blocking read on all the channel
3036
// close event channels. If an event is received, it will be further processed.
3037
func (c *ChannelArbitrator) receiveAndProcessCloseEvent() {
6✔
3038
        select {
6✔
3039
        // Received a coop close event, we now mark the channel as resolved and
3040
        // exit.
3041
        case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
×
3042
                err := c.handleCoopCloseEvent(closeInfo)
×
3043
                if err != nil {
×
3044
                        log.Errorf("Failed to handle coop close: %v", err)
×
3045
                        return
×
3046
                }
×
3047

3048
        // We have broadcast our commitment, and it is now confirmed onchain.
UNCOV
3049
        case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
×
UNCOV
3050
                if c.state != StateCommitmentBroadcasted {
×
3051
                        log.Errorf("ChannelArbitrator(%v): unexpected "+
×
3052
                                "local on-chain channel close", c.cfg.ChanPoint)
×
3053
                }
×
3054

UNCOV
3055
                err := c.handleLocalForceCloseEvent(closeInfo)
×
UNCOV
3056
                if err != nil {
×
3057
                        log.Errorf("Failed to handle local force close: %v",
×
3058
                                err)
×
3059

×
3060
                        return
×
3061
                }
×
3062

3063
        // The remote party has broadcast the commitment. We'll examine our
3064
        // state to determine if we need to act at all.
3065
        case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
×
3066
                err := c.handleRemoteForceCloseEvent(uniClosure)
×
3067
                if err != nil {
×
3068
                        log.Errorf("Failed to handle remote force close: %v",
×
3069
                                err)
×
3070

×
3071
                        return
×
3072
                }
×
3073

3074
        // The remote has breached the channel! We now launch the breach
3075
        // contract resolvers.
3076
        case breachInfo := <-c.cfg.ChainEvents.ContractBreach:
×
3077
                err := c.handleContractBreach(breachInfo)
×
3078
                if err != nil {
×
3079
                        log.Errorf("Failed to handle contract breach: %v", err)
×
3080
                        return
×
3081
                }
×
3082

3083
        default:
6✔
3084
                log.Infof("ChannelArbitrator(%v) no close event",
6✔
3085
                        c.cfg.ChanPoint)
6✔
3086
        }
3087
}
3088

3089
// Name returns a human-readable string for this subsystem.
3090
//
3091
// NOTE: Part of chainio.Consumer interface.
3092
func (c *ChannelArbitrator) Name() string {
51✔
3093
        return fmt.Sprintf("ChannelArbitrator(%v)", c.cfg.ChanPoint)
51✔
3094
}
51✔
3095

3096
// checkLegacyBreach returns StateFullyResolved if the channel was closed with
3097
// a breach transaction before the channel arbitrator launched its own breach
3098
// resolver. StateContractClosed is returned if this is a modern breach close
3099
// with a breach resolver. StateError is returned if the log lookup failed.
3100
func (c *ChannelArbitrator) checkLegacyBreach() (ArbitratorState, error) {
2✔
3101
        // A previous version of the channel arbitrator would make the breach
2✔
3102
        // close skip to StateFullyResolved. If there are no contract
2✔
3103
        // resolutions in the bolt arbitrator log, then this is an older breach
2✔
3104
        // close. Otherwise, if there are resolutions, the state should advance
2✔
3105
        // to StateContractClosed.
2✔
3106
        _, err := c.log.FetchContractResolutions()
2✔
3107
        if err == errNoResolutions {
2✔
3108
                // This is an older breach close still in the database.
×
3109
                return StateFullyResolved, nil
×
3110
        } else if err != nil {
2✔
3111
                return StateError, err
×
3112
        }
×
3113

3114
        // This is a modern breach close with resolvers.
3115
        return StateContractClosed, nil
2✔
3116
}
3117

3118
// sweepRequest wraps the arguments used when calling `SweepInput`.
3119
type sweepRequest struct {
3120
        // input is the input to be swept.
3121
        input input.Input
3122

3123
        // params holds the sweeping parameters.
3124
        params sweep.Params
3125
}
3126

3127
// createSweepRequest creates an anchor sweeping request for a particular
3128
// version (local/remote/remote pending) of the commitment.
3129
func (c *ChannelArbitrator) createSweepRequest(
3130
        anchor *lnwallet.AnchorResolution, htlcs htlcSet, anchorPath string,
3131
        heightHint uint32) (sweepRequest, error) {
8✔
3132

8✔
3133
        // Use the chan id as the exclusive group. This prevents any of the
8✔
3134
        // anchors from being batched together.
8✔
3135
        exclusiveGroup := c.cfg.ShortChanID.ToUint64()
8✔
3136

8✔
3137
        // Find the deadline for this specific anchor.
8✔
3138
        deadline, value, err := c.findCommitmentDeadlineAndValue(
8✔
3139
                heightHint, htlcs,
8✔
3140
        )
8✔
3141
        if err != nil {
8✔
3142
                return sweepRequest{}, err
×
3143
        }
×
3144

3145
        // If we cannot find a deadline, it means there's no HTLCs at stake,
3146
        // which means we can relax our anchor sweeping conditions as we don't
3147
        // have any time sensitive outputs to sweep. However we need to
3148
        // register the anchor output with the sweeper so we are later able to
3149
        // bump the close fee.
3150
        if deadline.IsNone() {
11✔
3151
                log.Infof("ChannelArbitrator(%v): no HTLCs at stake, "+
3✔
3152
                        "sweeping anchor with default deadline",
3✔
3153
                        c.cfg.ChanPoint)
3✔
3154
        }
3✔
3155

3156
        witnessType := input.CommitmentAnchor
8✔
3157

8✔
3158
        // For taproot channels, we need to use the proper witness type.
8✔
3159
        if txscript.IsPayToTaproot(
8✔
3160
                anchor.AnchorSignDescriptor.Output.PkScript,
8✔
3161
        ) {
8✔
UNCOV
3162

×
UNCOV
3163
                witnessType = input.TaprootAnchorSweepSpend
×
UNCOV
3164
        }
×
3165

3166
        // Prepare anchor output for sweeping.
3167
        anchorInput := input.MakeBaseInput(
8✔
3168
                &anchor.CommitAnchor,
8✔
3169
                witnessType,
8✔
3170
                &anchor.AnchorSignDescriptor,
8✔
3171
                heightHint,
8✔
3172
                &input.TxInfo{
8✔
3173
                        Fee:    anchor.CommitFee,
8✔
3174
                        Weight: anchor.CommitWeight,
8✔
3175
                },
8✔
3176
        )
8✔
3177

8✔
3178
        // If we have a deadline, we'll use it to calculate the deadline
8✔
3179
        // height, otherwise default to none.
8✔
3180
        deadlineDesc := "None"
8✔
3181
        deadlineHeight := fn.MapOption(func(d int32) int32 {
13✔
3182
                deadlineDesc = fmt.Sprintf("%d", d)
5✔
3183

5✔
3184
                return d + int32(heightHint)
5✔
3185
        })(deadline)
5✔
3186

3187
        // Calculate the budget based on the value under protection, which is
3188
        // the sum of all HTLCs on this commitment subtracted by their budgets.
3189
        // The anchor output in itself has a small output value of 330 sats so
3190
        // we also include it in the budget to pay for the cpfp transaction.
3191
        budget := calculateBudget(
8✔
3192
                value, c.cfg.Budget.AnchorCPFPRatio, c.cfg.Budget.AnchorCPFP,
8✔
3193
        ) + AnchorOutputValue
8✔
3194

8✔
3195
        log.Infof("ChannelArbitrator(%v): offering anchor from %s commitment "+
8✔
3196
                "%v to sweeper with deadline=%v, budget=%v", c.cfg.ChanPoint,
8✔
3197
                anchorPath, anchor.CommitAnchor, deadlineDesc, budget)
8✔
3198

8✔
3199
        // Sweep anchor output with a confirmation target fee preference.
8✔
3200
        // Because this is a cpfp-operation, the anchor will only be attempted
8✔
3201
        // to sweep when the current fee estimate for the confirmation target
8✔
3202
        // exceeds the commit fee rate.
8✔
3203
        return sweepRequest{
8✔
3204
                input: &anchorInput,
8✔
3205
                params: sweep.Params{
8✔
3206
                        ExclusiveGroup: &exclusiveGroup,
8✔
3207
                        Budget:         budget,
8✔
3208
                        DeadlineHeight: deadlineHeight,
8✔
3209
                },
8✔
3210
        }, nil
8✔
3211
}
3212

3213
// prepareAnchorSweeps creates a list of requests to be used by the sweeper for
3214
// all possible commitment versions.
3215
func (c *ChannelArbitrator) prepareAnchorSweeps(heightHint uint32,
3216
        anchors *lnwallet.AnchorResolutions) ([]sweepRequest, error) {
19✔
3217

19✔
3218
        // requests holds all the possible anchor sweep requests. We can have
19✔
3219
        // up to 3 different versions of commitments (local/remote/remote
19✔
3220
        // dangling) to be CPFPed by the anchors.
19✔
3221
        requests := make([]sweepRequest, 0, 3)
19✔
3222

19✔
3223
        // remotePendingReq holds the request for sweeping the anchor output on
19✔
3224
        // the remote pending commitment. It's only set when there's an actual
19✔
3225
        // pending remote commitment and it's used to decide whether we need to
19✔
3226
        // update the fee budget when sweeping the anchor output on the local
19✔
3227
        // commitment.
19✔
3228
        remotePendingReq := fn.None[sweepRequest]()
19✔
3229

19✔
3230
        // First we check on the remote pending commitment and optionally
19✔
3231
        // create an anchor sweeping request.
19✔
3232
        htlcs, ok := c.activeHTLCs[RemotePendingHtlcSet]
19✔
3233
        if ok && anchors.RemotePending != nil {
21✔
3234
                req, err := c.createSweepRequest(
2✔
3235
                        anchors.RemotePending, htlcs, "remote pending",
2✔
3236
                        heightHint,
2✔
3237
                )
2✔
3238
                if err != nil {
2✔
3239
                        return nil, err
×
3240
                }
×
3241

3242
                // Save the request.
3243
                requests = append(requests, req)
2✔
3244

2✔
3245
                // Set the optional variable.
2✔
3246
                remotePendingReq = fn.Some(req)
2✔
3247
        }
3248

3249
        // Check the local commitment and optionally create an anchor sweeping
3250
        // request. The params used in this request will be influenced by the
3251
        // anchor sweeping request made from the pending remote commitment.
3252
        htlcs, ok = c.activeHTLCs[LocalHtlcSet]
19✔
3253
        if ok && anchors.Local != nil {
22✔
3254
                req, err := c.createSweepRequest(
3✔
3255
                        anchors.Local, htlcs, "local", heightHint,
3✔
3256
                )
3✔
3257
                if err != nil {
3✔
3258
                        return nil, err
×
3259
                }
×
3260

3261
                // If there's an anchor sweeping request from the pending
3262
                // remote commitment, we will compare its budget against the
3263
                // budget used here and choose the params that has a larger
3264
                // budget. The deadline when choosing the remote pending budget
3265
                // instead of the local one will always be earlier or equal to
3266
                // the local deadline because outgoing HTLCs are resolved on
3267
                // the local commitment first before they are removed from the
3268
                // remote one.
3269
                remotePendingReq.WhenSome(func(s sweepRequest) {
5✔
3270
                        if s.params.Budget <= req.params.Budget {
3✔
3271
                                return
1✔
3272
                        }
1✔
3273

3274
                        log.Infof("ChannelArbitrator(%v): replaced local "+
1✔
3275
                                "anchor(%v) sweep params with pending remote "+
1✔
3276
                                "anchor sweep params, \nold:[%v], \nnew:[%v]",
1✔
3277
                                c.cfg.ChanPoint, anchors.Local.CommitAnchor,
1✔
3278
                                req.params, s.params)
1✔
3279

1✔
3280
                        req.params = s.params
1✔
3281
                })
3282

3283
                // Save the request.
3284
                requests = append(requests, req)
3✔
3285
        }
3286

3287
        // Check the remote commitment and create an anchor sweeping request if
3288
        // needed.
3289
        htlcs, ok = c.activeHTLCs[RemoteHtlcSet]
19✔
3290
        if ok && anchors.Remote != nil {
22✔
3291
                req, err := c.createSweepRequest(
3✔
3292
                        anchors.Remote, htlcs, "remote", heightHint,
3✔
3293
                )
3✔
3294
                if err != nil {
3✔
3295
                        return nil, err
×
3296
                }
×
3297

3298
                requests = append(requests, req)
3✔
3299
        }
3300

3301
        return requests, nil
19✔
3302
}
3303

3304
// failIncomingDust resolves the incoming dust HTLCs because they do not have
3305
// an output on the commitment transaction and cannot be resolved onchain. We
3306
// mark them as failed here.
3307
func (c *ChannelArbitrator) failIncomingDust(
3308
        incomingDustHTLCs []channeldb.HTLC) error {
10✔
3309

10✔
3310
        for _, htlc := range incomingDustHTLCs {
11✔
3311
                if !htlc.Incoming || htlc.OutputIndex >= 0 {
1✔
3312
                        return fmt.Errorf("htlc with index %v is not incoming "+
×
3313
                                "dust", htlc.OutputIndex)
×
3314
                }
×
3315

3316
                key := models.CircuitKey{
1✔
3317
                        ChanID: c.cfg.ShortChanID,
1✔
3318
                        HtlcID: htlc.HtlcIndex,
1✔
3319
                }
1✔
3320

1✔
3321
                // Mark this dust htlc as final failed.
1✔
3322
                chainArbCfg := c.cfg.ChainArbitratorConfig
1✔
3323
                err := chainArbCfg.PutFinalHtlcOutcome(
1✔
3324
                        key.ChanID, key.HtlcID, false,
1✔
3325
                )
1✔
3326
                if err != nil {
1✔
3327
                        return err
×
3328
                }
×
3329

3330
                // Send notification.
3331
                chainArbCfg.HtlcNotifier.NotifyFinalHtlcEvent(
1✔
3332
                        key,
1✔
3333
                        channeldb.FinalHtlcInfo{
1✔
3334
                                Settled:  false,
1✔
3335
                                Offchain: false,
1✔
3336
                        },
1✔
3337
                )
1✔
3338
        }
3339

3340
        return nil
10✔
3341
}
3342

3343
// abandonForwards cancels back the incoming HTLCs for their corresponding
3344
// outgoing HTLCs. We use a set here to avoid sending duplicate failure messages
3345
// for the same HTLC. This also needs to be done for locally initiated outgoing
3346
// HTLCs they are special cased in the switch.
3347
func (c *ChannelArbitrator) abandonForwards(htlcs fn.Set[uint64]) error {
13✔
3348
        log.Debugf("ChannelArbitrator(%v): cancelling back %v incoming "+
13✔
3349
                "HTLC(s)", c.cfg.ChanPoint,
13✔
3350
                len(htlcs))
13✔
3351

13✔
3352
        msgsToSend := make([]ResolutionMsg, 0, len(htlcs))
13✔
3353
        failureMsg := &lnwire.FailPermanentChannelFailure{}
13✔
3354

13✔
3355
        for idx := range htlcs {
23✔
3356
                failMsg := ResolutionMsg{
10✔
3357
                        SourceChan: c.cfg.ShortChanID,
10✔
3358
                        HtlcIndex:  idx,
10✔
3359
                        Failure:    failureMsg,
10✔
3360
                }
10✔
3361

10✔
3362
                msgsToSend = append(msgsToSend, failMsg)
10✔
3363
        }
10✔
3364

3365
        // Send the msges to the switch, if there are any.
3366
        if len(msgsToSend) == 0 {
16✔
3367
                return nil
3✔
3368
        }
3✔
3369

3370
        log.Debugf("ChannelArbitrator(%v): sending resolution message=%v",
10✔
3371
                c.cfg.ChanPoint, lnutils.SpewLogClosure(msgsToSend))
10✔
3372

10✔
3373
        err := c.cfg.DeliverResolutionMsg(msgsToSend...)
10✔
3374
        if err != nil {
10✔
3375
                log.Errorf("Unable to send resolution msges to switch: %v", err)
×
3376
                return err
×
3377
        }
×
3378

3379
        return nil
10✔
3380
}
3381

3382
// handleCoopCloseEvent takes a coop close event from ChainEvents, marks the
3383
// channel as closed and advances the state.
3384
func (c *ChannelArbitrator) handleCoopCloseEvent(
3385
        closeInfo *CooperativeCloseInfo) error {
2✔
3386

2✔
3387
        log.Infof("ChannelArbitrator(%v) marking channel cooperatively closed "+
2✔
3388
                "at height %v", c.cfg.ChanPoint, closeInfo.CloseHeight)
2✔
3389

2✔
3390
        err := c.cfg.MarkChannelClosed(
2✔
3391
                closeInfo.ChannelCloseSummary,
2✔
3392
                channeldb.ChanStatusCoopBroadcasted,
2✔
3393
        )
2✔
3394
        if err != nil {
2✔
3395
                return fmt.Errorf("unable to mark channel closed: %w", err)
×
3396
        }
×
3397

3398
        // We'll now advance our state machine until it reaches a terminal
3399
        // state, and the channel is marked resolved.
3400
        _, _, err = c.advanceState(closeInfo.CloseHeight, coopCloseTrigger, nil)
2✔
3401
        if err != nil {
3✔
3402
                log.Errorf("Unable to advance state: %v", err)
1✔
3403
        }
1✔
3404

3405
        return nil
2✔
3406
}
3407

3408
// handleLocalForceCloseEvent takes a local force close event from ChainEvents,
3409
// saves the contract resolutions to disk, mark the channel as closed and
3410
// advance the state.
3411
func (c *ChannelArbitrator) handleLocalForceCloseEvent(
3412
        closeInfo *LocalUnilateralCloseInfo) error {
12✔
3413

12✔
3414
        closeTx := closeInfo.CloseTx
12✔
3415

12✔
3416
        resolutions, err := closeInfo.ContractResolutions.
12✔
3417
                UnwrapOrErr(
12✔
3418
                        fmt.Errorf("resolutions not found"),
12✔
3419
                )
12✔
3420
        if err != nil {
12✔
3421
                return fmt.Errorf("unable to get resolutions: %w", err)
×
3422
        }
×
3423

3424
        // We make sure that the htlc resolutions are present
3425
        // otherwise we would panic dereferencing the pointer.
3426
        //
3427
        // TODO(ziggie): Refactor ContractResolutions to use
3428
        // options.
3429
        if resolutions.HtlcResolutions == nil {
12✔
3430
                return fmt.Errorf("htlc resolutions is nil")
×
3431
        }
×
3432

3433
        log.Infof("ChannelArbitrator(%v): local force close tx=%v confirmed",
12✔
3434
                c.cfg.ChanPoint, closeTx.TxHash())
12✔
3435

12✔
3436
        contractRes := &ContractResolutions{
12✔
3437
                CommitHash:       closeTx.TxHash(),
12✔
3438
                CommitResolution: resolutions.CommitResolution,
12✔
3439
                HtlcResolutions:  *resolutions.HtlcResolutions,
12✔
3440
                AnchorResolution: resolutions.AnchorResolution,
12✔
3441
        }
12✔
3442

12✔
3443
        // When processing a unilateral close event, we'll transition to the
12✔
3444
        // ContractClosed state. We'll log out the set of resolutions such that
12✔
3445
        // they are available to fetch in that state, we'll also write the
12✔
3446
        // commit set so we can reconstruct our chain actions on restart.
12✔
3447
        err = c.log.LogContractResolutions(contractRes)
12✔
3448
        if err != nil {
12✔
3449
                return fmt.Errorf("unable to write resolutions: %w", err)
×
3450
        }
×
3451

3452
        err = c.log.InsertConfirmedCommitSet(&closeInfo.CommitSet)
12✔
3453
        if err != nil {
12✔
3454
                return fmt.Errorf("unable to write commit set: %w", err)
×
3455
        }
×
3456

3457
        // After the set of resolutions are successfully logged, we can safely
3458
        // close the channel. After this succeeds we won't be getting chain
3459
        // events anymore, so we must make sure we can recover on restart after
3460
        // it is marked closed. If the next state transition fails, we'll start
3461
        // up in the prior state again, and we won't be longer getting chain
3462
        // events. In this case we must manually re-trigger the state
3463
        // transition into StateContractClosed based on the close status of the
3464
        // channel.
3465
        err = c.cfg.MarkChannelClosed(
12✔
3466
                closeInfo.ChannelCloseSummary,
12✔
3467
                channeldb.ChanStatusLocalCloseInitiator,
12✔
3468
        )
12✔
3469
        if err != nil {
12✔
3470
                return fmt.Errorf("unable to mark channel closed: %w", err)
×
3471
        }
×
3472

3473
        // We'll now advance our state machine until it reaches a terminal
3474
        // state.
3475
        _, _, err = c.advanceState(
12✔
3476
                uint32(closeInfo.SpendingHeight),
12✔
3477
                localCloseTrigger, &closeInfo.CommitSet,
12✔
3478
        )
12✔
3479
        if err != nil {
13✔
3480
                log.Errorf("Unable to advance state: %v", err)
1✔
3481
        }
1✔
3482

3483
        return nil
12✔
3484
}
3485

3486
// handleRemoteForceCloseEvent takes a remote force close event from
3487
// ChainEvents, saves the contract resolutions to disk, mark the channel as
3488
// closed and advance the state.
3489
func (c *ChannelArbitrator) handleRemoteForceCloseEvent(
3490
        closeInfo *RemoteUnilateralCloseInfo) error {
8✔
3491

8✔
3492
        log.Infof("ChannelArbitrator(%v): remote party has force closed "+
8✔
3493
                "channel at height %v", c.cfg.ChanPoint,
8✔
3494
                closeInfo.SpendingHeight)
8✔
3495

8✔
3496
        // If we don't have a self output, and there are no active HTLC's, then
8✔
3497
        // we can immediately mark the contract as fully resolved and exit.
8✔
3498
        contractRes := &ContractResolutions{
8✔
3499
                CommitHash:       *closeInfo.SpenderTxHash,
8✔
3500
                CommitResolution: closeInfo.CommitResolution,
8✔
3501
                HtlcResolutions:  *closeInfo.HtlcResolutions,
8✔
3502
                AnchorResolution: closeInfo.AnchorResolution,
8✔
3503
        }
8✔
3504

8✔
3505
        // When processing a unilateral close event, we'll transition to the
8✔
3506
        // ContractClosed state. We'll log out the set of resolutions such that
8✔
3507
        // they are available to fetch in that state, we'll also write the
8✔
3508
        // commit set so we can reconstruct our chain actions on restart.
8✔
3509
        err := c.log.LogContractResolutions(contractRes)
8✔
3510
        if err != nil {
9✔
3511
                return fmt.Errorf("unable to write resolutions: %w", err)
1✔
3512
        }
1✔
3513

3514
        err = c.log.InsertConfirmedCommitSet(&closeInfo.CommitSet)
7✔
3515
        if err != nil {
7✔
3516
                return fmt.Errorf("unable to write commit set: %w", err)
×
3517
        }
×
3518

3519
        // After the set of resolutions are successfully logged, we can safely
3520
        // close the channel. After this succeeds we won't be getting chain
3521
        // events anymore, so we must make sure we can recover on restart after
3522
        // it is marked closed. If the next state transition fails, we'll start
3523
        // up in the prior state again, and we won't be longer getting chain
3524
        // events. In this case we must manually re-trigger the state
3525
        // transition into StateContractClosed based on the close status of the
3526
        // channel.
3527
        closeSummary := &closeInfo.ChannelCloseSummary
7✔
3528
        err = c.cfg.MarkChannelClosed(
7✔
3529
                closeSummary,
7✔
3530
                channeldb.ChanStatusRemoteCloseInitiator,
7✔
3531
        )
7✔
3532
        if err != nil {
8✔
3533
                return fmt.Errorf("unable to mark channel closed: %w", err)
1✔
3534
        }
1✔
3535

3536
        // We'll now advance our state machine until it reaches a terminal
3537
        // state.
3538
        _, _, err = c.advanceState(
6✔
3539
                uint32(closeInfo.SpendingHeight),
6✔
3540
                remoteCloseTrigger, &closeInfo.CommitSet,
6✔
3541
        )
6✔
3542
        if err != nil {
8✔
3543
                log.Errorf("Unable to advance state: %v", err)
2✔
3544
        }
2✔
3545

3546
        return nil
6✔
3547
}
3548

3549
// handleContractBreach takes a breach close event from ChainEvents, saves the
3550
// contract resolutions to disk, mark the channel as closed and advance the
3551
// state.
3552
func (c *ChannelArbitrator) handleContractBreach(
3553
        breachInfo *BreachCloseInfo) error {
1✔
3554

1✔
3555
        closeSummary := &breachInfo.CloseSummary
1✔
3556

1✔
3557
        log.Infof("ChannelArbitrator(%v): remote party has breached channel "+
1✔
3558
                "at height %v!", c.cfg.ChanPoint, closeSummary.CloseHeight)
1✔
3559

1✔
3560
        // In the breach case, we'll only have anchor and breach resolutions.
1✔
3561
        contractRes := &ContractResolutions{
1✔
3562
                CommitHash:       breachInfo.CommitHash,
1✔
3563
                BreachResolution: breachInfo.BreachResolution,
1✔
3564
                AnchorResolution: breachInfo.AnchorResolution,
1✔
3565
        }
1✔
3566

1✔
3567
        // We'll transition to the ContractClosed state and log the set of
1✔
3568
        // resolutions such that they can be turned into resolvers later on.
1✔
3569
        // We'll also insert the CommitSet of the latest set of commitments.
1✔
3570
        err := c.log.LogContractResolutions(contractRes)
1✔
3571
        if err != nil {
1✔
3572
                return fmt.Errorf("unable to write resolutions: %w", err)
×
3573
        }
×
3574

3575
        err = c.log.InsertConfirmedCommitSet(&breachInfo.CommitSet)
1✔
3576
        if err != nil {
1✔
3577
                return fmt.Errorf("unable to write commit set: %w", err)
×
3578
        }
×
3579

3580
        // The channel is finally marked pending closed here as the
3581
        // BreachArbitrator and channel arbitrator have persisted the relevant
3582
        // states.
3583
        err = c.cfg.MarkChannelClosed(
1✔
3584
                closeSummary, channeldb.ChanStatusRemoteCloseInitiator,
1✔
3585
        )
1✔
3586
        if err != nil {
1✔
3587
                return fmt.Errorf("unable to mark channel closed: %w", err)
×
3588
        }
×
3589

3590
        log.Infof("Breached channel=%v marked pending-closed",
1✔
3591
                breachInfo.BreachResolution.FundingOutPoint)
1✔
3592

1✔
3593
        // We'll advance our state machine until it reaches a terminal state.
1✔
3594
        _, _, err = c.advanceState(
1✔
3595
                closeSummary.CloseHeight, breachCloseTrigger,
1✔
3596
                &breachInfo.CommitSet,
1✔
3597
        )
1✔
3598
        if err != nil {
1✔
3599
                log.Errorf("Unable to advance state: %v", err)
×
3600
        }
×
3601

3602
        return nil
1✔
3603
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc