• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13558005087

27 Feb 2025 03:04AM UTC coverage: 58.834% (-0.001%) from 58.835%
13558005087

Pull #8453

github

Roasbeef
lnwallet/chancloser: increase test coverage of state machine
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

1079 of 1370 new or added lines in 23 files covered. (78.76%)

578 existing lines in 40 files now uncovered.

137063 of 232965 relevant lines covered (58.83%)

19205.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.26
/contractcourt/channel_arbitrator.go
1
package contractcourt
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "math"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/txscript"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/lightningnetwork/lnd/chainio"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/fn/v2"
20
        "github.com/lightningnetwork/lnd/graph/db/models"
21
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
22
        "github.com/lightningnetwork/lnd/input"
23
        "github.com/lightningnetwork/lnd/invoices"
24
        "github.com/lightningnetwork/lnd/kvdb"
25
        "github.com/lightningnetwork/lnd/labels"
26
        "github.com/lightningnetwork/lnd/lntypes"
27
        "github.com/lightningnetwork/lnd/lnutils"
28
        "github.com/lightningnetwork/lnd/lnwallet"
29
        "github.com/lightningnetwork/lnd/lnwire"
30
        "github.com/lightningnetwork/lnd/sweep"
31
)
32

33
var (
34
        // errAlreadyForceClosed is an error returned when we attempt to force
35
        // close a channel that's already in the process of doing so.
36
        errAlreadyForceClosed = errors.New("channel is already in the " +
37
                "process of being force closed")
38
)
39

40
const (
41
        // arbitratorBlockBufferSize is the size of the buffer we give to each
42
        // channel arbitrator.
43
        arbitratorBlockBufferSize = 20
44

45
        // AnchorOutputValue is the output value for the anchor output of an
46
        // anchor channel.
47
        // See BOLT 03 for more details:
48
        // https://github.com/lightning/bolts/blob/master/03-transactions.md
49
        AnchorOutputValue = btcutil.Amount(330)
50
)
51

52
// WitnessSubscription represents an intent to be notified once new witnesses
53
// are discovered by various active contract resolvers. A contract resolver may
54
// use this to be notified of when it can satisfy an incoming contract after we
55
// discover the witness for an outgoing contract.
56
type WitnessSubscription struct {
57
        // WitnessUpdates is a channel that newly discovered witnesses will be
58
        // sent over.
59
        //
60
        // TODO(roasbeef): couple with WitnessType?
61
        WitnessUpdates <-chan lntypes.Preimage
62

63
        // CancelSubscription is a function closure that should be used by a
64
        // client to cancel the subscription once they are no longer interested
65
        // in receiving new updates.
66
        CancelSubscription func()
67
}
68

69
// WitnessBeacon is a global beacon of witnesses. Contract resolvers will use
70
// this interface to lookup witnesses (preimages typically) of contracts
71
// they're trying to resolve, add new preimages they resolve, and finally
72
// receive new updates each new time a preimage is discovered.
73
//
74
// TODO(roasbeef): need to delete the pre-images once we've used them
75
// and have been sufficiently confirmed?
76
type WitnessBeacon interface {
77
        // SubscribeUpdates returns a channel that will be sent upon *each* time
78
        // a new preimage is discovered.
79
        SubscribeUpdates(chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
80
                payload *hop.Payload,
81
                nextHopOnionBlob []byte) (*WitnessSubscription, error)
82

83
        // LookupPreimage attempts to lookup a preimage in the global cache.
84
        // True is returned for the second argument if the preimage is found.
85
        LookupPreimage(payhash lntypes.Hash) (lntypes.Preimage, bool)
86

87
        // AddPreimages adds a batch of newly discovered preimages to the global
88
        // cache, and also signals any subscribers of the newly discovered
89
        // witness.
90
        AddPreimages(preimages ...lntypes.Preimage) error
91
}
92

93
// ArbChannel is an abstraction that allows the channel arbitrator to interact
94
// with an open channel.
95
type ArbChannel interface {
96
        // ForceCloseChan should force close the contract that this attendant
97
        // is watching over. We'll use this when we decide that we need to go
98
        // to chain. It should in addition tell the switch to remove the
99
        // corresponding link, such that we won't accept any new updates. The
100
        // returned summary contains all items needed to eventually resolve all
101
        // outputs on chain.
102
        ForceCloseChan() (*wire.MsgTx, error)
103

104
        // NewAnchorResolutions returns the anchor resolutions for currently
105
        // valid commitment transactions.
106
        NewAnchorResolutions() (*lnwallet.AnchorResolutions, error)
107
}
108

109
// ChannelArbitratorConfig contains all the functionality that the
110
// ChannelArbitrator needs in order to properly arbitrate any contract dispute
111
// on chain.
112
type ChannelArbitratorConfig struct {
113
        // ChanPoint is the channel point that uniquely identifies this
114
        // channel.
115
        ChanPoint wire.OutPoint
116

117
        // Channel is the full channel data structure. For legacy channels, this
118
        // field may not always be set after a restart.
119
        Channel ArbChannel
120

121
        // ShortChanID describes the exact location of the channel within the
122
        // chain. We'll use this to address any messages that we need to send
123
        // to the switch during contract resolution.
124
        ShortChanID lnwire.ShortChannelID
125

126
        // ChainEvents is an active subscription to the chain watcher for this
127
        // channel to be notified of any on-chain activity related to this
128
        // channel.
129
        ChainEvents *ChainEventSubscription
130

131
        // MarkCommitmentBroadcasted should mark the channel as the commitment
132
        // being broadcast, and we are waiting for the commitment to confirm.
133
        MarkCommitmentBroadcasted func(*wire.MsgTx, lntypes.ChannelParty) error
134

135
        // MarkChannelClosed marks the channel closed in the database, with the
136
        // passed close summary. After this method successfully returns we can
137
        // no longer expect to receive chain events for this channel, and must
138
        // be able to recover from a failure without getting the close event
139
        // again. It takes an optional channel status which will update the
140
        // channel status in the record that we keep of historical channels.
141
        MarkChannelClosed func(*channeldb.ChannelCloseSummary,
142
                ...channeldb.ChannelStatus) error
143

144
        // IsPendingClose is a boolean indicating whether the channel is marked
145
        // as pending close in the database.
146
        IsPendingClose bool
147

148
        // ClosingHeight is the height at which the channel was closed. Note
149
        // that this value is only valid if IsPendingClose is true.
150
        ClosingHeight uint32
151

152
        // CloseType is the type of the close event in case IsPendingClose is
153
        // true. Otherwise this value is unset.
154
        CloseType channeldb.ClosureType
155

156
        // MarkChannelResolved is a function closure that serves to mark a
157
        // channel as "fully resolved". A channel itself can be considered
158
        // fully resolved once all active contracts have individually been
159
        // fully resolved.
160
        //
161
        // TODO(roasbeef): need RPC's to combine for pendingchannels RPC
162
        MarkChannelResolved func() error
163

164
        // PutResolverReport records a resolver report for the channel. If the
165
        // transaction provided is nil, the function should write the report
166
        // in a new transaction.
167
        PutResolverReport func(tx kvdb.RwTx,
168
                report *channeldb.ResolverReport) error
169

170
        // FetchHistoricalChannel retrieves the historical state of a channel.
171
        // This is mostly used to supplement the ContractResolvers with
172
        // additional information required for proper contract resolution.
173
        FetchHistoricalChannel func() (*channeldb.OpenChannel, error)
174

175
        // FindOutgoingHTLCDeadline returns the deadline in absolute block
176
        // height for the specified outgoing HTLC. For an outgoing HTLC, its
177
        // deadline is defined by the timeout height of its corresponding
178
        // incoming HTLC - this is the expiry height the that remote peer can
179
        // spend his/her outgoing HTLC via the timeout path.
180
        FindOutgoingHTLCDeadline func(htlc channeldb.HTLC) fn.Option[int32]
181

182
        ChainArbitratorConfig
183
}
184

185
// ReportOutputType describes the type of output that is being reported
186
// on.
187
type ReportOutputType uint8
188

189
const (
190
        // ReportOutputIncomingHtlc is an incoming hash time locked contract on
191
        // the commitment tx.
192
        ReportOutputIncomingHtlc ReportOutputType = iota
193

194
        // ReportOutputOutgoingHtlc is an outgoing hash time locked contract on
195
        // the commitment tx.
196
        ReportOutputOutgoingHtlc
197

198
        // ReportOutputUnencumbered is an uncontested output on the commitment
199
        // transaction paying to us directly.
200
        ReportOutputUnencumbered
201

202
        // ReportOutputAnchor is an anchor output on the commitment tx.
203
        ReportOutputAnchor
204
)
205

206
// ContractReport provides a summary of a commitment tx output.
207
type ContractReport struct {
208
        // Outpoint is the final output that will be swept back to the wallet.
209
        Outpoint wire.OutPoint
210

211
        // Type indicates the type of the reported output.
212
        Type ReportOutputType
213

214
        // Amount is the final value that will be swept in back to the wallet.
215
        Amount btcutil.Amount
216

217
        // MaturityHeight is the absolute block height that this output will
218
        // mature at.
219
        MaturityHeight uint32
220

221
        // Stage indicates whether the htlc is in the CLTV-timeout stage (1) or
222
        // the CSV-delay stage (2). A stage 1 htlc's maturity height will be set
223
        // to its expiry height, while a stage 2 htlc's maturity height will be
224
        // set to its confirmation height plus the maturity requirement.
225
        Stage uint32
226

227
        // LimboBalance is the total number of frozen coins within this
228
        // contract.
229
        LimboBalance btcutil.Amount
230

231
        // RecoveredBalance is the total value that has been successfully swept
232
        // back to the user's wallet.
233
        RecoveredBalance btcutil.Amount
234
}
235

236
// resolverReport creates a resolve report using some of the information in the
237
// contract report.
238
func (c *ContractReport) resolverReport(spendTx *chainhash.Hash,
239
        resolverType channeldb.ResolverType,
240
        outcome channeldb.ResolverOutcome) *channeldb.ResolverReport {
11✔
241

11✔
242
        return &channeldb.ResolverReport{
11✔
243
                OutPoint:        c.Outpoint,
11✔
244
                Amount:          c.Amount,
11✔
245
                ResolverType:    resolverType,
11✔
246
                ResolverOutcome: outcome,
11✔
247
                SpendTxID:       spendTx,
11✔
248
        }
11✔
249
}
11✔
250

251
// htlcSet represents the set of active HTLCs on a given commitment
252
// transaction.
253
type htlcSet struct {
254
        // incomingHTLCs is a map of all incoming HTLCs on the target
255
        // commitment transaction. We may potentially go onchain to claim the
256
        // funds sent to us within this set.
257
        incomingHTLCs map[uint64]channeldb.HTLC
258

259
        // outgoingHTLCs is a map of all outgoing HTLCs on the target
260
        // commitment transaction. We may potentially go onchain to reclaim the
261
        // funds that are currently in limbo.
262
        outgoingHTLCs map[uint64]channeldb.HTLC
263
}
264

265
// newHtlcSet constructs a new HTLC set from a slice of HTLC's.
266
func newHtlcSet(htlcs []channeldb.HTLC) htlcSet {
47✔
267
        outHTLCs := make(map[uint64]channeldb.HTLC)
47✔
268
        inHTLCs := make(map[uint64]channeldb.HTLC)
47✔
269
        for _, htlc := range htlcs {
79✔
270
                if htlc.Incoming {
39✔
271
                        inHTLCs[htlc.HtlcIndex] = htlc
7✔
272
                        continue
7✔
273
                }
274

275
                outHTLCs[htlc.HtlcIndex] = htlc
26✔
276
        }
277

278
        return htlcSet{
47✔
279
                incomingHTLCs: inHTLCs,
47✔
280
                outgoingHTLCs: outHTLCs,
47✔
281
        }
47✔
282
}
283

284
// HtlcSetKey is a two-tuple that uniquely identifies a set of HTLCs on a
285
// commitment transaction.
286
type HtlcSetKey struct {
287
        // IsRemote denotes if the HTLCs are on the remote commitment
288
        // transaction.
289
        IsRemote bool
290

291
        // IsPending denotes if the commitment transaction that HTLCS are on
292
        // are pending (the higher of two unrevoked commitments).
293
        IsPending bool
294
}
295

296
var (
297
        // LocalHtlcSet is the HtlcSetKey used for local commitments.
298
        LocalHtlcSet = HtlcSetKey{IsRemote: false, IsPending: false}
299

300
        // RemoteHtlcSet is the HtlcSetKey used for remote commitments.
301
        RemoteHtlcSet = HtlcSetKey{IsRemote: true, IsPending: false}
302

303
        // RemotePendingHtlcSet is the HtlcSetKey used for dangling remote
304
        // commitment transactions.
305
        RemotePendingHtlcSet = HtlcSetKey{IsRemote: true, IsPending: true}
306
)
307

308
// String returns a human readable string describing the target HtlcSetKey.
309
func (h HtlcSetKey) String() string {
8✔
310
        switch h {
8✔
311
        case LocalHtlcSet:
4✔
312
                return "LocalHtlcSet"
4✔
313
        case RemoteHtlcSet:
2✔
314
                return "RemoteHtlcSet"
2✔
315
        case RemotePendingHtlcSet:
2✔
316
                return "RemotePendingHtlcSet"
2✔
317
        default:
×
318
                return "unknown HtlcSetKey"
×
319
        }
320
}
321

322
// ChannelArbitrator is the on-chain arbitrator for a particular channel. The
323
// struct will keep in sync with the current set of HTLCs on the commitment
324
// transaction. The job of the attendant is to go on-chain to either settle or
325
// cancel an HTLC as necessary iff: an HTLC times out, or we known the
326
// pre-image to an HTLC, but it wasn't settled by the link off-chain. The
327
// ChannelArbitrator will factor in an expected confirmation delta when
328
// broadcasting to ensure that we avoid any possibility of race conditions, and
329
// sweep the output(s) without contest.
330
type ChannelArbitrator struct {
331
        started int32 // To be used atomically.
332
        stopped int32 // To be used atomically.
333

334
        // Embed the blockbeat consumer struct to get access to the method
335
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
336
        chainio.BeatConsumer
337

338
        // startTimestamp is the time when this ChannelArbitrator was started.
339
        startTimestamp time.Time
340

341
        // log is a persistent log that the attendant will use to checkpoint
342
        // its next action, and the state of any unresolved contracts.
343
        log ArbitratorLog
344

345
        // activeHTLCs is the set of active incoming/outgoing HTLC's on all
346
        // currently valid commitment transactions.
347
        activeHTLCs map[HtlcSetKey]htlcSet
348

349
        // unmergedSet is used to update the activeHTLCs map in two callsites:
350
        // checkLocalChainActions and sweepAnchors. It contains the latest
351
        // updates from the link. It is not deleted from, its entries may be
352
        // replaced on subsequent calls to notifyContractUpdate.
353
        unmergedSet map[HtlcSetKey]htlcSet
354
        unmergedMtx sync.RWMutex
355

356
        // cfg contains all the functionality that the ChannelArbitrator requires
357
        // to do its duty.
358
        cfg ChannelArbitratorConfig
359

360
        // signalUpdates is a channel that any new live signals for the channel
361
        // we're watching over will be sent.
362
        signalUpdates chan *signalUpdateMsg
363

364
        // activeResolvers is a slice of any active resolvers. This is used to
365
        // be able to signal them for shutdown in the case that we shutdown.
366
        activeResolvers []ContractResolver
367

368
        // activeResolversLock prevents simultaneous read and write to the
369
        // resolvers slice.
370
        activeResolversLock sync.RWMutex
371

372
        // resolutionSignal is a channel that will be sent upon by contract
373
        // resolvers once their contract has been fully resolved. With each
374
        // send, we'll check to see if the contract is fully resolved.
375
        resolutionSignal chan struct{}
376

377
        // forceCloseReqs is a channel that requests to forcibly close the
378
        // contract will be sent over.
379
        forceCloseReqs chan *forceCloseReq
380

381
        // state is the current state of the arbitrator. This state is examined
382
        // upon start up to decide which actions to take.
383
        state ArbitratorState
384

385
        wg   sync.WaitGroup
386
        quit chan struct{}
387
}
388

389
// NewChannelArbitrator returns a new instance of a ChannelArbitrator backed by
390
// the passed config struct.
391
func NewChannelArbitrator(cfg ChannelArbitratorConfig,
392
        htlcSets map[HtlcSetKey]htlcSet, log ArbitratorLog) *ChannelArbitrator {
52✔
393

52✔
394
        // Create a new map for unmerged HTLC's as we will overwrite the values
52✔
395
        // and want to avoid modifying activeHTLCs directly. This soft copying
52✔
396
        // is done to ensure that activeHTLCs isn't reset as an empty map later
52✔
397
        // on.
52✔
398
        unmerged := make(map[HtlcSetKey]htlcSet)
52✔
399
        unmerged[LocalHtlcSet] = htlcSets[LocalHtlcSet]
52✔
400
        unmerged[RemoteHtlcSet] = htlcSets[RemoteHtlcSet]
52✔
401

52✔
402
        // If the pending set exists, write that as well.
52✔
403
        if _, ok := htlcSets[RemotePendingHtlcSet]; ok {
52✔
404
                unmerged[RemotePendingHtlcSet] = htlcSets[RemotePendingHtlcSet]
×
405
        }
×
406

407
        c := &ChannelArbitrator{
52✔
408
                log:              log,
52✔
409
                signalUpdates:    make(chan *signalUpdateMsg),
52✔
410
                resolutionSignal: make(chan struct{}),
52✔
411
                forceCloseReqs:   make(chan *forceCloseReq),
52✔
412
                activeHTLCs:      htlcSets,
52✔
413
                unmergedSet:      unmerged,
52✔
414
                cfg:              cfg,
52✔
415
                quit:             make(chan struct{}),
52✔
416
        }
52✔
417

52✔
418
        // Mount the block consumer.
52✔
419
        c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())
52✔
420

52✔
421
        return c
52✔
422
}
423

424
// Compile-time check for the chainio.Consumer interface.
425
var _ chainio.Consumer = (*ChannelArbitrator)(nil)
426

427
// chanArbStartState contains the information from disk that we need to start
428
// up a channel arbitrator.
429
type chanArbStartState struct {
430
        currentState ArbitratorState
431
        commitSet    *CommitSet
432
}
433

434
// getStartState retrieves the information from disk that our channel arbitrator
435
// requires to start.
436
func (c *ChannelArbitrator) getStartState(tx kvdb.RTx) (*chanArbStartState,
437
        error) {
49✔
438

49✔
439
        // First, we'll read our last state from disk, so our internal state
49✔
440
        // machine can act accordingly.
49✔
441
        state, err := c.log.CurrentState(tx)
49✔
442
        if err != nil {
49✔
443
                return nil, err
×
444
        }
×
445

446
        // Next we'll fetch our confirmed commitment set. This will only exist
447
        // if the channel has been closed out on chain for modern nodes. For
448
        // older nodes, this won't be found at all, and will rely on the
449
        // existing written chain actions. Additionally, if this channel hasn't
450
        // logged any actions in the log, then this field won't be present.
451
        commitSet, err := c.log.FetchConfirmedCommitSet(tx)
49✔
452
        if err != nil && err != errNoCommitSet && err != errScopeBucketNoExist {
49✔
453
                return nil, err
×
454
        }
×
455

456
        return &chanArbStartState{
49✔
457
                currentState: state,
49✔
458
                commitSet:    commitSet,
49✔
459
        }, nil
49✔
460
}
461

462
// Start starts all the goroutines that the ChannelArbitrator needs to operate.
463
// If takes a start state, which will be looked up on disk if it is not
464
// provided.
465
func (c *ChannelArbitrator) Start(state *chanArbStartState,
466
        beat chainio.Blockbeat) error {
49✔
467

49✔
468
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
49✔
469
                return nil
×
470
        }
×
471
        c.startTimestamp = c.cfg.Clock.Now()
49✔
472

49✔
473
        // If the state passed in is nil, we look it up now.
49✔
474
        if state == nil {
87✔
475
                var err error
38✔
476
                state, err = c.getStartState(nil)
38✔
477
                if err != nil {
38✔
478
                        return err
×
479
                }
×
480
        }
481

482
        log.Tracef("Starting ChannelArbitrator(%v), htlc_set=%v, state=%v",
49✔
483
                c.cfg.ChanPoint, lnutils.SpewLogClosure(c.activeHTLCs),
49✔
484
                state.currentState)
49✔
485

49✔
486
        // Set our state from our starting state.
49✔
487
        c.state = state.currentState
49✔
488

49✔
489
        // Get the starting height.
49✔
490
        bestHeight := beat.Height()
49✔
491

49✔
492
        c.wg.Add(1)
49✔
493
        go c.channelAttendant(bestHeight, state.commitSet)
49✔
494

49✔
495
        return nil
49✔
496
}
497

498
// progressStateMachineAfterRestart attempts to progress the state machine
499
// after a restart. This makes sure that if the state transition failed, we
500
// will try to progress the state machine again. Moreover it will relaunch
501
// resolvers if the channel is still in the pending close state and has not
502
// been fully resolved yet.
503
func (c *ChannelArbitrator) progressStateMachineAfterRestart(bestHeight int32,
504
        commitSet *CommitSet) error {
49✔
505

49✔
506
        // If the channel has been marked pending close in the database, and we
49✔
507
        // haven't transitioned the state machine to StateContractClosed (or a
49✔
508
        // succeeding state), then a state transition most likely failed. We'll
49✔
509
        // try to recover from this by manually advancing the state by setting
49✔
510
        // the corresponding close trigger.
49✔
511
        trigger := chainTrigger
49✔
512
        triggerHeight := uint32(bestHeight)
49✔
513
        if c.cfg.IsPendingClose {
55✔
514
                switch c.state {
6✔
515
                case StateDefault:
4✔
516
                        fallthrough
4✔
517
                case StateBroadcastCommit:
5✔
518
                        fallthrough
5✔
519
                case StateCommitmentBroadcasted:
5✔
520
                        switch c.cfg.CloseType {
5✔
521

522
                        case channeldb.CooperativeClose:
1✔
523
                                trigger = coopCloseTrigger
1✔
524

525
                        case channeldb.BreachClose:
1✔
526
                                trigger = breachCloseTrigger
1✔
527

528
                        case channeldb.LocalForceClose:
1✔
529
                                trigger = localCloseTrigger
1✔
530

531
                        case channeldb.RemoteForceClose:
2✔
532
                                trigger = remoteCloseTrigger
2✔
533
                        }
534

535
                        log.Warnf("ChannelArbitrator(%v): detected stalled "+
5✔
536
                                "state=%v for closed channel",
5✔
537
                                c.cfg.ChanPoint, c.state)
5✔
538
                }
539

540
                triggerHeight = c.cfg.ClosingHeight
6✔
541
        }
542

543
        log.Infof("ChannelArbitrator(%v): starting state=%v, trigger=%v, "+
49✔
544
                "triggerHeight=%v", c.cfg.ChanPoint, c.state, trigger,
49✔
545
                triggerHeight)
49✔
546

49✔
547
        // We'll now attempt to advance our state forward based on the current
49✔
548
        // on-chain state, and our set of active contracts.
49✔
549
        startingState := c.state
49✔
550
        nextState, _, err := c.advanceState(
49✔
551
                triggerHeight, trigger, commitSet,
49✔
552
        )
49✔
553
        if err != nil {
51✔
554
                switch err {
2✔
555

556
                // If we detect that we tried to fetch resolutions, but failed,
557
                // this channel was marked closed in the database before
558
                // resolutions successfully written. In this case there is not
559
                // much we can do, so we don't return the error.
560
                case errScopeBucketNoExist:
×
561
                        fallthrough
×
562
                case errNoResolutions:
1✔
563
                        log.Warnf("ChannelArbitrator(%v): detected closed"+
1✔
564
                                "channel with no contract resolutions written.",
1✔
565
                                c.cfg.ChanPoint)
1✔
566

567
                default:
1✔
568
                        return err
1✔
569
                }
570
        }
571

572
        // If we start and ended at the awaiting full resolution state, then
573
        // we'll relaunch our set of unresolved contracts.
574
        if startingState == StateWaitingFullResolution &&
48✔
575
                nextState == StateWaitingFullResolution {
50✔
576

2✔
577
                // In order to relaunch the resolvers, we'll need to fetch the
2✔
578
                // set of HTLCs that were present in the commitment transaction
2✔
579
                // at the time it was confirmed. commitSet.ConfCommitKey can't
2✔
580
                // be nil at this point since we're in
2✔
581
                // StateWaitingFullResolution. We can only be in
2✔
582
                // StateWaitingFullResolution after we've transitioned from
2✔
583
                // StateContractClosed which can only be triggered by the local
2✔
584
                // or remote close trigger. This trigger is only fired when we
2✔
585
                // receive a chain event from the chain watcher that the
2✔
586
                // commitment has been confirmed on chain, and before we
2✔
587
                // advance our state step, we call InsertConfirmedCommitSet.
2✔
588
                err := c.relaunchResolvers(commitSet, triggerHeight)
2✔
589
                if err != nil {
2✔
590
                        return err
×
591
                }
×
592
        }
593

594
        return nil
48✔
595
}
596

597
// maybeAugmentTaprootResolvers will update the contract resolution information
598
// for taproot channels. This ensures that all the resolvers have the latest
599
// resolution, which may also include data such as the control block and tap
600
// tweaks.
601
func maybeAugmentTaprootResolvers(chanType channeldb.ChannelType,
602
        resolver ContractResolver,
603
        contractResolutions *ContractResolutions) {
2✔
604

2✔
605
        if !chanType.IsTaproot() {
4✔
606
                return
2✔
607
        }
2✔
608

609
        // The on disk resolutions contains all the ctrl block
610
        // information, so we'll set that now for the relevant
611
        // resolvers.
612
        switch r := resolver.(type) {
1✔
613
        case *commitSweepResolver:
1✔
614
                if contractResolutions.CommitResolution != nil {
2✔
615
                        //nolint:ll
1✔
616
                        r.commitResolution = *contractResolutions.CommitResolution
1✔
617
                }
1✔
618
        case *htlcOutgoingContestResolver:
1✔
619
                //nolint:ll
1✔
620
                htlcResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
1✔
621
                for _, htlcRes := range htlcResolutions {
2✔
622
                        htlcRes := htlcRes
1✔
623

1✔
624
                        if r.htlcResolution.ClaimOutpoint ==
1✔
625
                                htlcRes.ClaimOutpoint {
2✔
626

1✔
627
                                r.htlcResolution = htlcRes
1✔
628
                        }
1✔
629
                }
630

631
        case *htlcTimeoutResolver:
1✔
632
                //nolint:ll
1✔
633
                htlcResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
1✔
634
                for _, htlcRes := range htlcResolutions {
2✔
635
                        htlcRes := htlcRes
1✔
636

1✔
637
                        if r.htlcResolution.ClaimOutpoint ==
1✔
638
                                htlcRes.ClaimOutpoint {
2✔
639

1✔
640
                                r.htlcResolution = htlcRes
1✔
641
                        }
1✔
642
                }
643

644
        case *htlcIncomingContestResolver:
1✔
645
                //nolint:ll
1✔
646
                htlcResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
1✔
647
                for _, htlcRes := range htlcResolutions {
2✔
648
                        htlcRes := htlcRes
1✔
649

1✔
650
                        if r.htlcResolution.ClaimOutpoint ==
1✔
651
                                htlcRes.ClaimOutpoint {
2✔
652

1✔
653
                                r.htlcResolution = htlcRes
1✔
654
                        }
1✔
655
                }
656
        case *htlcSuccessResolver:
×
657
                //nolint:ll
×
658
                htlcResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
×
659
                for _, htlcRes := range htlcResolutions {
×
660
                        htlcRes := htlcRes
×
661

×
662
                        if r.htlcResolution.ClaimOutpoint ==
×
663
                                htlcRes.ClaimOutpoint {
×
664

×
665
                                r.htlcResolution = htlcRes
×
666
                        }
×
667
                }
668
        }
669
}
670

671
// relauchResolvers relaunches the set of resolvers for unresolved contracts in
672
// order to provide them with information that's not immediately available upon
673
// starting the ChannelArbitrator. This information should ideally be stored in
674
// the database, so this only serves as a intermediate work-around to prevent a
675
// migration.
676
func (c *ChannelArbitrator) relaunchResolvers(commitSet *CommitSet,
677
        heightHint uint32) error {
2✔
678

2✔
679
        // We'll now query our log to see if there are any active unresolved
2✔
680
        // contracts. If this is the case, then we'll relaunch all contract
2✔
681
        // resolvers.
2✔
682
        unresolvedContracts, err := c.log.FetchUnresolvedContracts()
2✔
683
        if err != nil {
2✔
684
                return err
×
685
        }
×
686

687
        // Retrieve the commitment tx hash from the log.
688
        contractResolutions, err := c.log.FetchContractResolutions()
2✔
689
        if err != nil {
2✔
690
                log.Errorf("unable to fetch contract resolutions: %v",
×
691
                        err)
×
692
                return err
×
693
        }
×
694
        commitHash := contractResolutions.CommitHash
2✔
695

2✔
696
        // In prior versions of lnd, the information needed to supplement the
2✔
697
        // resolvers (in most cases, the full amount of the HTLC) was found in
2✔
698
        // the chain action map, which is now deprecated.  As a result, if the
2✔
699
        // commitSet is nil (an older node with unresolved HTLCs at time of
2✔
700
        // upgrade), then we'll use the chain action information in place. The
2✔
701
        // chain actions may exclude some information, but we cannot recover it
2✔
702
        // for these older nodes at the moment.
2✔
703
        var confirmedHTLCs []channeldb.HTLC
2✔
704
        if commitSet != nil && commitSet.ConfCommitKey.IsSome() {
4✔
705
                confCommitKey, err := commitSet.ConfCommitKey.UnwrapOrErr(
2✔
706
                        fmt.Errorf("no commitKey available"),
2✔
707
                )
2✔
708
                if err != nil {
2✔
709
                        return err
×
710
                }
×
711
                confirmedHTLCs = commitSet.HtlcSets[confCommitKey]
2✔
712
        } else {
×
713
                chainActions, err := c.log.FetchChainActions()
×
714
                if err != nil {
×
715
                        log.Errorf("unable to fetch chain actions: %v", err)
×
716
                        return err
×
717
                }
×
718
                for _, htlcs := range chainActions {
×
719
                        confirmedHTLCs = append(confirmedHTLCs, htlcs...)
×
720
                }
×
721
        }
722

723
        // Reconstruct the htlc outpoints and data from the chain action log.
724
        // The purpose of the constructed htlc map is to supplement to
725
        // resolvers restored from database with extra data. Ideally this data
726
        // is stored as part of the resolver in the log. This is a workaround
727
        // to prevent a db migration. We use all available htlc sets here in
728
        // order to ensure we have complete coverage.
729
        htlcMap := make(map[wire.OutPoint]*channeldb.HTLC)
2✔
730
        for _, htlc := range confirmedHTLCs {
6✔
731
                htlc := htlc
4✔
732
                outpoint := wire.OutPoint{
4✔
733
                        Hash:  commitHash,
4✔
734
                        Index: uint32(htlc.OutputIndex),
4✔
735
                }
4✔
736
                htlcMap[outpoint] = &htlc
4✔
737
        }
4✔
738

739
        // We'll also fetch the historical state of this channel, as it should
740
        // have been marked as closed by now, and supplement it to each resolver
741
        // such that we can properly resolve our pending contracts.
742
        var chanState *channeldb.OpenChannel
2✔
743
        chanState, err = c.cfg.FetchHistoricalChannel()
2✔
744
        switch {
2✔
745
        // If we don't find this channel, then it may be the case that it
746
        // was closed before we started to retain the final state
747
        // information for open channels.
748
        case err == channeldb.ErrNoHistoricalBucket:
×
749
                fallthrough
×
750
        case err == channeldb.ErrChannelNotFound:
×
751
                log.Warnf("ChannelArbitrator(%v): unable to fetch historical "+
×
752
                        "state", c.cfg.ChanPoint)
×
753

754
        case err != nil:
×
755
                return err
×
756
        }
757

758
        log.Infof("ChannelArbitrator(%v): relaunching %v contract "+
2✔
759
                "resolvers", c.cfg.ChanPoint, len(unresolvedContracts))
2✔
760

2✔
761
        for i := range unresolvedContracts {
4✔
762
                resolver := unresolvedContracts[i]
2✔
763

2✔
764
                if chanState != nil {
4✔
765
                        resolver.SupplementState(chanState)
2✔
766

2✔
767
                        // For taproot channels, we'll need to also make sure
2✔
768
                        // the control block information was set properly.
2✔
769
                        maybeAugmentTaprootResolvers(
2✔
770
                                chanState.ChanType, resolver,
2✔
771
                                contractResolutions,
2✔
772
                        )
2✔
773
                }
2✔
774

775
                unresolvedContracts[i] = resolver
2✔
776

2✔
777
                htlcResolver, ok := resolver.(htlcContractResolver)
2✔
778
                if !ok {
3✔
779
                        continue
1✔
780
                }
781

782
                htlcPoint := htlcResolver.HtlcPoint()
2✔
783
                htlc, ok := htlcMap[htlcPoint]
2✔
784
                if !ok {
2✔
785
                        return fmt.Errorf(
×
786
                                "htlc resolver %T unavailable", resolver,
×
787
                        )
×
788
                }
×
789

790
                htlcResolver.Supplement(*htlc)
2✔
791

2✔
792
                // If this is an outgoing HTLC, we will also need to supplement
2✔
793
                // the resolver with the expiry block height of its
2✔
794
                // corresponding incoming HTLC.
2✔
795
                if !htlc.Incoming {
4✔
796
                        deadline := c.cfg.FindOutgoingHTLCDeadline(*htlc)
2✔
797
                        htlcResolver.SupplementDeadline(deadline)
2✔
798
                }
2✔
799
        }
800

801
        // The anchor resolver is stateless and can always be re-instantiated.
802
        if contractResolutions.AnchorResolution != nil {
3✔
803
                anchorResolver := newAnchorResolver(
1✔
804
                        contractResolutions.AnchorResolution.AnchorSignDescriptor,
1✔
805
                        contractResolutions.AnchorResolution.CommitAnchor,
1✔
806
                        heightHint, c.cfg.ChanPoint,
1✔
807
                        ResolverConfig{
1✔
808
                                ChannelArbitratorConfig: c.cfg,
1✔
809
                        },
1✔
810
                )
1✔
811

1✔
812
                anchorResolver.SupplementState(chanState)
1✔
813

1✔
814
                unresolvedContracts = append(unresolvedContracts, anchorResolver)
1✔
815

1✔
816
                // TODO(roasbeef): this isn't re-launched?
1✔
817
        }
1✔
818

819
        c.resolveContracts(unresolvedContracts)
2✔
820

2✔
821
        return nil
2✔
822
}
823

824
// Report returns htlc reports for the active resolvers.
825
func (c *ChannelArbitrator) Report() []*ContractReport {
1✔
826
        c.activeResolversLock.RLock()
1✔
827
        defer c.activeResolversLock.RUnlock()
1✔
828

1✔
829
        var reports []*ContractReport
1✔
830
        for _, resolver := range c.activeResolvers {
2✔
831
                r, ok := resolver.(reportingContractResolver)
1✔
832
                if !ok {
1✔
833
                        continue
×
834
                }
835

836
                report := r.report()
1✔
837
                if report == nil {
2✔
838
                        continue
1✔
839
                }
840

841
                reports = append(reports, report)
1✔
842
        }
843

844
        return reports
1✔
845
}
846

847
// Stop signals the ChannelArbitrator for a graceful shutdown.
848
func (c *ChannelArbitrator) Stop() error {
52✔
849
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
58✔
850
                return nil
6✔
851
        }
6✔
852

853
        log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint)
46✔
854

46✔
855
        if c.cfg.ChainEvents.Cancel != nil {
58✔
856
                go c.cfg.ChainEvents.Cancel()
12✔
857
        }
12✔
858

859
        c.activeResolversLock.RLock()
46✔
860
        for _, activeResolver := range c.activeResolvers {
53✔
861
                activeResolver.Stop()
7✔
862
        }
7✔
863
        c.activeResolversLock.RUnlock()
46✔
864

46✔
865
        close(c.quit)
46✔
866
        c.wg.Wait()
46✔
867

46✔
868
        return nil
46✔
869
}
870

871
// transitionTrigger is an enum that denotes exactly *why* a state transition
872
// was initiated. This is useful as depending on the initial trigger, we may
873
// skip certain states as those actions are expected to have already taken
874
// place as a result of the external trigger.
875
type transitionTrigger uint8
876

877
const (
878
        // chainTrigger is a transition trigger that has been attempted due to
879
        // changing on-chain conditions such as a block which times out HTLC's
880
        // being attached.
881
        chainTrigger transitionTrigger = iota
882

883
        // userTrigger is a transition trigger driven by user action. Examples
884
        // of such a trigger include a user requesting a force closure of the
885
        // channel.
886
        userTrigger
887

888
        // remoteCloseTrigger is a transition trigger driven by the remote
889
        // peer's commitment being confirmed.
890
        remoteCloseTrigger
891

892
        // localCloseTrigger is a transition trigger driven by our commitment
893
        // being confirmed.
894
        localCloseTrigger
895

896
        // coopCloseTrigger is a transition trigger driven by a cooperative
897
        // close transaction being confirmed.
898
        coopCloseTrigger
899

900
        // breachCloseTrigger is a transition trigger driven by a remote breach
901
        // being confirmed. In this case the channel arbitrator will wait for
902
        // the BreachArbitrator to finish and then clean up gracefully.
903
        breachCloseTrigger
904
)
905

906
// String returns a human readable string describing the passed
907
// transitionTrigger.
908
func (t transitionTrigger) String() string {
1✔
909
        switch t {
1✔
910
        case chainTrigger:
1✔
911
                return "chainTrigger"
1✔
912

913
        case remoteCloseTrigger:
1✔
914
                return "remoteCloseTrigger"
1✔
915

916
        case userTrigger:
1✔
917
                return "userTrigger"
1✔
918

919
        case localCloseTrigger:
1✔
920
                return "localCloseTrigger"
1✔
921

922
        case coopCloseTrigger:
1✔
923
                return "coopCloseTrigger"
1✔
924

925
        case breachCloseTrigger:
1✔
926
                return "breachCloseTrigger"
1✔
927

928
        default:
×
929
                return "unknown trigger"
×
930
        }
931
}
932

933
// stateStep is a help method that examines our internal state, and attempts
934
// the appropriate state transition if necessary. The next state we transition
935
// to is returned, Additionally, if the next transition results in a commitment
936
// broadcast, the commitment transaction itself is returned.
937
func (c *ChannelArbitrator) stateStep(
938
        triggerHeight uint32, trigger transitionTrigger,
939
        confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
175✔
940

175✔
941
        var (
175✔
942
                nextState ArbitratorState
175✔
943
                closeTx   *wire.MsgTx
175✔
944
        )
175✔
945
        switch c.state {
175✔
946

947
        // If we're in the default state, then we'll check our set of actions
948
        // to see if while we were down, conditions have changed.
949
        case StateDefault:
64✔
950
                log.Debugf("ChannelArbitrator(%v): new block (height=%v) "+
64✔
951
                        "examining active HTLC's", c.cfg.ChanPoint,
64✔
952
                        triggerHeight)
64✔
953

64✔
954
                // As a new block has been connected to the end of the main
64✔
955
                // chain, we'll check to see if we need to make any on-chain
64✔
956
                // claims on behalf of the channel contract that we're
64✔
957
                // arbitrating for. If a commitment has confirmed, then we'll
64✔
958
                // use the set snapshot from the chain, otherwise we'll use our
64✔
959
                // current set.
64✔
960
                var (
64✔
961
                        chainActions ChainActionMap
64✔
962
                        err          error
64✔
963
                )
64✔
964

64✔
965
                // Normally if we force close the channel locally we will have
64✔
966
                // no confCommitSet. However when the remote commitment confirms
64✔
967
                // without us ever broadcasting our local commitment we need to
64✔
968
                // make sure we cancel all upstream HTLCs for outgoing dust
64✔
969
                // HTLCs as well hence we need to fetch the chain actions here
64✔
970
                // as well.
64✔
971
                if confCommitSet == nil {
119✔
972
                        // Update the set of activeHTLCs so
55✔
973
                        // checkLocalChainActions has an up-to-date view of the
55✔
974
                        // commitments.
55✔
975
                        c.updateActiveHTLCs()
55✔
976
                        htlcs := c.activeHTLCs
55✔
977
                        chainActions, err = c.checkLocalChainActions(
55✔
978
                                triggerHeight, trigger, htlcs, false,
55✔
979
                        )
55✔
980
                        if err != nil {
55✔
981
                                return StateDefault, nil, err
×
982
                        }
×
983
                } else {
10✔
984
                        chainActions, err = c.constructChainActions(
10✔
985
                                confCommitSet, triggerHeight, trigger,
10✔
986
                        )
10✔
987
                        if err != nil {
10✔
988
                                return StateDefault, nil, err
×
989
                        }
×
990
                }
991

992
                // If there are no actions to be made, then we'll remain in the
993
                // default state. If this isn't a self initiated event (we're
994
                // checking due to a chain update), then we'll exit now.
995
                if len(chainActions) == 0 && trigger == chainTrigger {
101✔
996
                        log.Debugf("ChannelArbitrator(%v): no actions for "+
37✔
997
                                "chain trigger, terminating", c.cfg.ChanPoint)
37✔
998

37✔
999
                        return StateDefault, closeTx, nil
37✔
1000
                }
37✔
1001

1002
                // Otherwise, we'll log that we checked the HTLC actions as the
1003
                // commitment transaction has already been broadcast.
1004
                log.Tracef("ChannelArbitrator(%v): logging chain_actions=%v",
28✔
1005
                        c.cfg.ChanPoint, lnutils.SpewLogClosure(chainActions))
28✔
1006

28✔
1007
                // Cancel upstream HTLCs for all outgoing dust HTLCs available
28✔
1008
                // either on the local or the remote/remote pending commitment
28✔
1009
                // transaction.
28✔
1010
                dustHTLCs := chainActions[HtlcFailDustAction]
28✔
1011
                if len(dustHTLCs) > 0 {
30✔
1012
                        log.Debugf("ChannelArbitrator(%v): canceling %v dust "+
2✔
1013
                                "HTLCs backwards", c.cfg.ChanPoint,
2✔
1014
                                len(dustHTLCs))
2✔
1015

2✔
1016
                        getIdx := func(htlc channeldb.HTLC) uint64 {
4✔
1017
                                return htlc.HtlcIndex
2✔
1018
                        }
2✔
1019
                        dustHTLCSet := fn.NewSet(fn.Map(dustHTLCs, getIdx)...)
2✔
1020
                        err = c.abandonForwards(dustHTLCSet)
2✔
1021
                        if err != nil {
2✔
1022
                                return StateError, closeTx, err
×
1023
                        }
×
1024
                }
1025

1026
                // Depending on the type of trigger, we'll either "tunnel"
1027
                // through to a farther state, or just proceed linearly to the
1028
                // next state.
1029
                switch trigger {
28✔
1030

1031
                // If this is a chain trigger, then we'll go straight to the
1032
                // next state, as we still need to broadcast the commitment
1033
                // transaction.
1034
                case chainTrigger:
6✔
1035
                        fallthrough
6✔
1036
                case userTrigger:
16✔
1037
                        nextState = StateBroadcastCommit
16✔
1038

1039
                // If the trigger is a cooperative close being confirmed, then
1040
                // we can go straight to StateFullyResolved, as there won't be
1041
                // any contracts to resolve.
1042
                case coopCloseTrigger:
4✔
1043
                        nextState = StateFullyResolved
4✔
1044

1045
                // Otherwise, if this state advance was triggered by a
1046
                // commitment being confirmed on chain, then we'll jump
1047
                // straight to the state where the contract has already been
1048
                // closed, and we will inspect the set of unresolved contracts.
1049
                case localCloseTrigger:
3✔
1050
                        log.Errorf("ChannelArbitrator(%v): unexpected local "+
3✔
1051
                                "commitment confirmed while in StateDefault",
3✔
1052
                                c.cfg.ChanPoint)
3✔
1053
                        fallthrough
3✔
1054
                case remoteCloseTrigger:
9✔
1055
                        nextState = StateContractClosed
9✔
1056

1057
                case breachCloseTrigger:
2✔
1058
                        nextContractState, err := c.checkLegacyBreach()
2✔
1059
                        if nextContractState == StateError {
2✔
1060
                                return nextContractState, nil, err
×
1061
                        }
×
1062

1063
                        nextState = nextContractState
2✔
1064
                }
1065

1066
        // If we're in this state, then we've decided to broadcast the
1067
        // commitment transaction. We enter this state either due to an outside
1068
        // sub-system, or because an on-chain action has been triggered.
1069
        case StateBroadcastCommit:
21✔
1070
                // Under normal operation, we can only enter
21✔
1071
                // StateBroadcastCommit via a user or chain trigger. On restart,
21✔
1072
                // this state may be reexecuted after closing the channel, but
21✔
1073
                // failing to commit to StateContractClosed or
21✔
1074
                // StateFullyResolved. In that case, one of the four close
21✔
1075
                // triggers will be presented, signifying that we should skip
21✔
1076
                // rebroadcasting, and go straight to resolving the on-chain
21✔
1077
                // contract or marking the channel resolved.
21✔
1078
                switch trigger {
21✔
1079
                case localCloseTrigger, remoteCloseTrigger:
×
1080
                        log.Infof("ChannelArbitrator(%v): detected %s "+
×
1081
                                "close after closing channel, fast-forwarding "+
×
1082
                                "to %s to resolve contract",
×
1083
                                c.cfg.ChanPoint, trigger, StateContractClosed)
×
1084
                        return StateContractClosed, closeTx, nil
×
1085

1086
                case breachCloseTrigger:
2✔
1087
                        nextContractState, err := c.checkLegacyBreach()
2✔
1088
                        if nextContractState == StateError {
2✔
1089
                                log.Infof("ChannelArbitrator(%v): unable to "+
×
1090
                                        "advance breach close resolution: %v",
×
1091
                                        c.cfg.ChanPoint, nextContractState)
×
1092
                                return StateError, closeTx, err
×
1093
                        }
×
1094

1095
                        log.Infof("ChannelArbitrator(%v): detected %s close "+
2✔
1096
                                "after closing channel, fast-forwarding to %s"+
2✔
1097
                                " to resolve contract", c.cfg.ChanPoint,
2✔
1098
                                trigger, nextContractState)
2✔
1099

2✔
1100
                        return nextContractState, closeTx, nil
2✔
1101

1102
                case coopCloseTrigger:
×
1103
                        log.Infof("ChannelArbitrator(%v): detected %s "+
×
1104
                                "close after closing channel, fast-forwarding "+
×
1105
                                "to %s to resolve contract",
×
1106
                                c.cfg.ChanPoint, trigger, StateFullyResolved)
×
1107
                        return StateFullyResolved, closeTx, nil
×
1108
                }
1109

1110
                log.Infof("ChannelArbitrator(%v): force closing "+
20✔
1111
                        "chan", c.cfg.ChanPoint)
20✔
1112

20✔
1113
                // Now that we have all the actions decided for the set of
20✔
1114
                // HTLC's, we'll broadcast the commitment transaction, and
20✔
1115
                // signal the link to exit.
20✔
1116

20✔
1117
                // We'll tell the switch that it should remove the link for
20✔
1118
                // this channel, in addition to fetching the force close
20✔
1119
                // summary needed to close this channel on chain.
20✔
1120
                forceCloseTx, err := c.cfg.Channel.ForceCloseChan()
20✔
1121
                if err != nil {
21✔
1122
                        log.Errorf("ChannelArbitrator(%v): unable to "+
1✔
1123
                                "force close: %v", c.cfg.ChanPoint, err)
1✔
1124

1✔
1125
                        // We tried to force close (HTLC may be expiring from
1✔
1126
                        // our PoV, etc), but we think we've lost data. In this
1✔
1127
                        // case, we'll not force close, but terminate the state
1✔
1128
                        // machine here to wait to see what confirms on chain.
1✔
1129
                        if errors.Is(err, lnwallet.ErrForceCloseLocalDataLoss) {
2✔
1130
                                log.Error("ChannelArbitrator(%v): broadcast "+
1✔
1131
                                        "failed due to local data loss, "+
1✔
1132
                                        "waiting for on chain confimation...",
1✔
1133
                                        c.cfg.ChanPoint)
1✔
1134

1✔
1135
                                return StateBroadcastCommit, nil, nil
1✔
1136
                        }
1✔
1137

1138
                        return StateError, closeTx, err
×
1139
                }
1140
                closeTx = forceCloseTx
19✔
1141

19✔
1142
                // Before publishing the transaction, we store it to the
19✔
1143
                // database, such that we can re-publish later in case it
19✔
1144
                // didn't propagate. We initiated the force close, so we
19✔
1145
                // mark broadcast with local initiator set to true.
19✔
1146
                err = c.cfg.MarkCommitmentBroadcasted(closeTx, lntypes.Local)
19✔
1147
                if err != nil {
19✔
1148
                        log.Errorf("ChannelArbitrator(%v): unable to "+
×
1149
                                "mark commitment broadcasted: %v",
×
1150
                                c.cfg.ChanPoint, err)
×
1151
                        return StateError, closeTx, err
×
1152
                }
×
1153

1154
                // With the close transaction in hand, broadcast the
1155
                // transaction to the network, thereby entering the post
1156
                // channel resolution state.
1157
                log.Infof("Broadcasting force close transaction %v, "+
19✔
1158
                        "ChannelPoint(%v): %v", closeTx.TxHash(),
19✔
1159
                        c.cfg.ChanPoint, lnutils.SpewLogClosure(closeTx))
19✔
1160

19✔
1161
                // At this point, we'll now broadcast the commitment
19✔
1162
                // transaction itself.
19✔
1163
                label := labels.MakeLabel(
19✔
1164
                        labels.LabelTypeChannelClose, &c.cfg.ShortChanID,
19✔
1165
                )
19✔
1166
                if err := c.cfg.PublishTx(closeTx, label); err != nil {
25✔
1167
                        log.Errorf("ChannelArbitrator(%v): unable to broadcast "+
6✔
1168
                                "close tx: %v", c.cfg.ChanPoint, err)
6✔
1169

6✔
1170
                        // This makes sure we don't fail at startup if the
6✔
1171
                        // commitment transaction has too low fees to make it
6✔
1172
                        // into mempool. The rebroadcaster makes sure this
6✔
1173
                        // transaction is republished regularly until confirmed
6✔
1174
                        // or replaced.
6✔
1175
                        if !errors.Is(err, lnwallet.ErrDoubleSpend) &&
6✔
1176
                                !errors.Is(err, lnwallet.ErrMempoolFee) {
9✔
1177

3✔
1178
                                return StateError, closeTx, err
3✔
1179
                        }
3✔
1180
                }
1181

1182
                // We go to the StateCommitmentBroadcasted state, where we'll
1183
                // be waiting for the commitment to be confirmed.
1184
                nextState = StateCommitmentBroadcasted
17✔
1185

1186
        // In this state we have broadcasted our own commitment, and will need
1187
        // to wait for a commitment (not necessarily the one we broadcasted!)
1188
        // to be confirmed.
1189
        case StateCommitmentBroadcasted:
31✔
1190
                switch trigger {
31✔
1191

1192
                // We are waiting for a commitment to be confirmed.
1193
                case chainTrigger, userTrigger:
18✔
1194
                        // The commitment transaction has been broadcast, but it
18✔
1195
                        // doesn't necessarily need to be the commitment
18✔
1196
                        // transaction version that is going to be confirmed. To
18✔
1197
                        // be sure that any of those versions can be anchored
18✔
1198
                        // down, we now submit all anchor resolutions to the
18✔
1199
                        // sweeper. The sweeper will keep trying to sweep all of
18✔
1200
                        // them.
18✔
1201
                        //
18✔
1202
                        // Note that the sweeper is idempotent. If we ever
18✔
1203
                        // happen to end up at this point in the code again, no
18✔
1204
                        // harm is done by re-offering the anchors to the
18✔
1205
                        // sweeper.
18✔
1206
                        anchors, err := c.cfg.Channel.NewAnchorResolutions()
18✔
1207
                        if err != nil {
18✔
1208
                                return StateError, closeTx, err
×
1209
                        }
×
1210

1211
                        err = c.sweepAnchors(anchors, triggerHeight)
18✔
1212
                        if err != nil {
18✔
1213
                                return StateError, closeTx, err
×
1214
                        }
×
1215

1216
                        nextState = StateCommitmentBroadcasted
18✔
1217

1218
                // If this state advance was triggered by any of the
1219
                // commitments being confirmed, then we'll jump to the state
1220
                // where the contract has been closed.
1221
                case localCloseTrigger, remoteCloseTrigger:
14✔
1222
                        nextState = StateContractClosed
14✔
1223

1224
                // If a coop close was confirmed, jump straight to the fully
1225
                // resolved state.
1226
                case coopCloseTrigger:
×
1227
                        nextState = StateFullyResolved
×
1228

1229
                case breachCloseTrigger:
×
1230
                        nextContractState, err := c.checkLegacyBreach()
×
1231
                        if nextContractState == StateError {
×
1232
                                return nextContractState, closeTx, err
×
1233
                        }
×
1234

1235
                        nextState = nextContractState
×
1236
                }
1237

1238
                log.Infof("ChannelArbitrator(%v): trigger %v moving from "+
31✔
1239
                        "state %v to %v", c.cfg.ChanPoint, trigger, c.state,
31✔
1240
                        nextState)
31✔
1241

1242
        // If we're in this state, then the contract has been fully closed to
1243
        // outside sub-systems, so we'll process the prior set of on-chain
1244
        // contract actions and launch a set of resolvers.
1245
        case StateContractClosed:
23✔
1246
                // First, we'll fetch our chain actions, and both sets of
23✔
1247
                // resolutions so we can process them.
23✔
1248
                contractResolutions, err := c.log.FetchContractResolutions()
23✔
1249
                if err != nil {
25✔
1250
                        log.Errorf("unable to fetch contract resolutions: %v",
2✔
1251
                                err)
2✔
1252
                        return StateError, closeTx, err
2✔
1253
                }
2✔
1254

1255
                // If the resolution is empty, and we have no HTLCs at all to
1256
                // send to, then we're done here. We don't need to launch any
1257
                // resolvers, and can go straight to our final state.
1258
                if contractResolutions.IsEmpty() && confCommitSet.IsEmpty() {
30✔
1259
                        log.Infof("ChannelArbitrator(%v): contract "+
9✔
1260
                                "resolutions empty, marking channel as fully resolved!",
9✔
1261
                                c.cfg.ChanPoint)
9✔
1262
                        nextState = StateFullyResolved
9✔
1263
                        break
9✔
1264
                }
1265

1266
                // First, we'll reconstruct a fresh set of chain actions as the
1267
                // set of actions we need to act on may differ based on if it
1268
                // was our commitment, or they're commitment that hit the chain.
1269
                htlcActions, err := c.constructChainActions(
13✔
1270
                        confCommitSet, triggerHeight, trigger,
13✔
1271
                )
13✔
1272
                if err != nil {
13✔
1273
                        return StateError, closeTx, err
×
1274
                }
×
1275

1276
                // In case its a breach transaction we fail back all upstream
1277
                // HTLCs for their corresponding outgoing HTLCs on the remote
1278
                // commitment set (remote and remote pending set).
1279
                if contractResolutions.BreachResolution != nil {
16✔
1280
                        // cancelBreachedHTLCs is a set which holds HTLCs whose
3✔
1281
                        // corresponding incoming HTLCs will be failed back
3✔
1282
                        // because the peer broadcasted an old state.
3✔
1283
                        cancelBreachedHTLCs := fn.NewSet[uint64]()
3✔
1284

3✔
1285
                        // We'll use the CommitSet, we'll fail back all
3✔
1286
                        // upstream HTLCs for their corresponding outgoing
3✔
1287
                        // HTLC that exist on either of the remote commitments.
3✔
1288
                        // The map is used to deduplicate any shared HTLC's.
3✔
1289
                        for htlcSetKey, htlcs := range confCommitSet.HtlcSets {
6✔
1290
                                if !htlcSetKey.IsRemote {
4✔
1291
                                        continue
1✔
1292
                                }
1293

1294
                                for _, htlc := range htlcs {
6✔
1295
                                        // Only outgoing HTLCs have a
3✔
1296
                                        // corresponding incoming HTLC.
3✔
1297
                                        if htlc.Incoming {
5✔
1298
                                                continue
2✔
1299
                                        }
1300

1301
                                        cancelBreachedHTLCs.Add(htlc.HtlcIndex)
2✔
1302
                                }
1303
                        }
1304

1305
                        err := c.abandonForwards(cancelBreachedHTLCs)
3✔
1306
                        if err != nil {
3✔
1307
                                return StateError, closeTx, err
×
1308
                        }
×
1309
                } else {
11✔
1310
                        // If it's not a breach, we resolve all incoming dust
11✔
1311
                        // HTLCs immediately after the commitment is confirmed.
11✔
1312
                        err = c.failIncomingDust(
11✔
1313
                                htlcActions[HtlcIncomingDustFinalAction],
11✔
1314
                        )
11✔
1315
                        if err != nil {
11✔
1316
                                return StateError, closeTx, err
×
1317
                        }
×
1318

1319
                        // We fail the upstream HTLCs for all remote pending
1320
                        // outgoing HTLCs as soon as the commitment is
1321
                        // confirmed. The upstream HTLCs for outgoing dust
1322
                        // HTLCs have already been resolved before we reach
1323
                        // this point.
1324
                        getIdx := func(htlc channeldb.HTLC) uint64 {
20✔
1325
                                return htlc.HtlcIndex
9✔
1326
                        }
9✔
1327
                        remoteDangling := fn.NewSet(fn.Map(
11✔
1328
                                htlcActions[HtlcFailDanglingAction], getIdx,
11✔
1329
                        )...)
11✔
1330
                        err := c.abandonForwards(remoteDangling)
11✔
1331
                        if err != nil {
11✔
1332
                                return StateError, closeTx, err
×
1333
                        }
×
1334
                }
1335

1336
                // Now that we know we'll need to act, we'll process all the
1337
                // resolvers, then create the structures we need to resolve all
1338
                // outstanding contracts.
1339
                resolvers, err := c.prepContractResolutions(
13✔
1340
                        contractResolutions, triggerHeight, htlcActions,
13✔
1341
                )
13✔
1342
                if err != nil {
13✔
1343
                        log.Errorf("ChannelArbitrator(%v): unable to "+
×
1344
                                "resolve contracts: %v", c.cfg.ChanPoint, err)
×
1345
                        return StateError, closeTx, err
×
1346
                }
×
1347

1348
                log.Debugf("ChannelArbitrator(%v): inserting %v contract "+
13✔
1349
                        "resolvers", c.cfg.ChanPoint, len(resolvers))
13✔
1350

13✔
1351
                err = c.log.InsertUnresolvedContracts(nil, resolvers...)
13✔
1352
                if err != nil {
13✔
1353
                        return StateError, closeTx, err
×
1354
                }
×
1355

1356
                // Finally, we'll launch all the required contract resolvers.
1357
                // Once they're all resolved, we're no longer needed.
1358
                c.resolveContracts(resolvers)
13✔
1359

13✔
1360
                nextState = StateWaitingFullResolution
13✔
1361

1362
        // This is our terminal state. We'll keep returning this state until
1363
        // all contracts are fully resolved.
1364
        case StateWaitingFullResolution:
18✔
1365
                log.Infof("ChannelArbitrator(%v): still awaiting contract "+
18✔
1366
                        "resolution", c.cfg.ChanPoint)
18✔
1367

18✔
1368
                unresolved, err := c.log.FetchUnresolvedContracts()
18✔
1369
                if err != nil {
18✔
1370
                        return StateError, closeTx, err
×
1371
                }
×
1372

1373
                // If we have no unresolved contracts, then we can move to the
1374
                // final state.
1375
                if len(unresolved) == 0 {
31✔
1376
                        nextState = StateFullyResolved
13✔
1377
                        break
13✔
1378
                }
1379

1380
                // Otherwise we still have unresolved contracts, then we'll
1381
                // stay alive to oversee their resolution.
1382
                nextState = StateWaitingFullResolution
6✔
1383

6✔
1384
                // Add debug logs.
6✔
1385
                for _, r := range unresolved {
12✔
1386
                        log.Debugf("ChannelArbitrator(%v): still have "+
6✔
1387
                                "unresolved contract: %T", c.cfg.ChanPoint, r)
6✔
1388
                }
6✔
1389

1390
        // If we start as fully resolved, then we'll end as fully resolved.
1391
        case StateFullyResolved:
23✔
1392
                // To ensure that the state of the contract in persistent
23✔
1393
                // storage is properly reflected, we'll mark the contract as
23✔
1394
                // fully resolved now.
23✔
1395
                nextState = StateFullyResolved
23✔
1396

23✔
1397
                log.Infof("ChannelPoint(%v) has been fully resolved "+
23✔
1398
                        "on-chain at height=%v", c.cfg.ChanPoint, triggerHeight)
23✔
1399

23✔
1400
                if err := c.cfg.MarkChannelResolved(); err != nil {
23✔
1401
                        log.Errorf("unable to mark channel resolved: %v", err)
×
1402
                        return StateError, closeTx, err
×
1403
                }
×
1404
        }
1405

1406
        log.Tracef("ChannelArbitrator(%v): next_state=%v", c.cfg.ChanPoint,
133✔
1407
                nextState)
133✔
1408

133✔
1409
        return nextState, closeTx, nil
133✔
1410
}
1411

1412
// sweepAnchors offers all given anchor resolutions to the sweeper. It requests
1413
// sweeping at the minimum fee rate. This fee rate can be upped manually by the
1414
// user via the BumpFee rpc.
1415
func (c *ChannelArbitrator) sweepAnchors(anchors *lnwallet.AnchorResolutions,
1416
        heightHint uint32) error {
20✔
1417

20✔
1418
        // Update the set of activeHTLCs so that the sweeping routine has an
20✔
1419
        // up-to-date view of the set of commitments.
20✔
1420
        c.updateActiveHTLCs()
20✔
1421

20✔
1422
        // Prepare the sweeping requests for all possible versions of
20✔
1423
        // commitments.
20✔
1424
        sweepReqs, err := c.prepareAnchorSweeps(heightHint, anchors)
20✔
1425
        if err != nil {
20✔
1426
                return err
×
1427
        }
×
1428

1429
        // Send out the sweeping requests to the sweeper.
1430
        for _, req := range sweepReqs {
29✔
1431
                _, err = c.cfg.Sweeper.SweepInput(req.input, req.params)
9✔
1432
                if err != nil {
9✔
1433
                        return err
×
1434
                }
×
1435
        }
1436

1437
        return nil
20✔
1438
}
1439

1440
// findCommitmentDeadlineAndValue finds the deadline (relative block height)
1441
// for a commitment transaction by extracting the minimum CLTV from its HTLCs.
1442
// From our PoV, the deadline delta is defined to be the smaller of,
1443
//   - half of the least CLTV from outgoing HTLCs' corresponding incoming
1444
//     HTLCs,  or,
1445
//   - half of the least CLTV from incoming HTLCs if the preimage is available.
1446
//
1447
// We use half of the CTLV value to ensure that we have enough time to sweep
1448
// the second-level HTLCs.
1449
//
1450
// It also finds the total value that are time-sensitive, which is the sum of
1451
// all the outgoing HTLCs plus incoming HTLCs whose preimages are known. It
1452
// then returns the value left after subtracting the budget used for sweeping
1453
// the time-sensitive HTLCs.
1454
//
1455
// NOTE: when the deadline turns out to be 0 blocks, we will replace it with 1
1456
// block because our fee estimator doesn't allow a 0 conf target. This also
1457
// means we've left behind and should increase our fee to make the transaction
1458
// confirmed asap.
1459
func (c *ChannelArbitrator) findCommitmentDeadlineAndValue(heightHint uint32,
1460
        htlcs htlcSet) (fn.Option[int32], btcutil.Amount, error) {
14✔
1461

14✔
1462
        deadlineMinHeight := uint32(math.MaxUint32)
14✔
1463
        totalValue := btcutil.Amount(0)
14✔
1464

14✔
1465
        // First, iterate through the outgoingHTLCs to find the lowest CLTV
14✔
1466
        // value.
14✔
1467
        for _, htlc := range htlcs.outgoingHTLCs {
25✔
1468
                // Skip if the HTLC is dust.
11✔
1469
                if htlc.OutputIndex < 0 {
15✔
1470
                        log.Debugf("ChannelArbitrator(%v): skipped deadline "+
4✔
1471
                                "for dust htlc=%x",
4✔
1472
                                c.cfg.ChanPoint, htlc.RHash[:])
4✔
1473

4✔
1474
                        continue
4✔
1475
                }
1476

1477
                value := htlc.Amt.ToSatoshis()
8✔
1478

8✔
1479
                // Find the expiry height for this outgoing HTLC's incoming
8✔
1480
                // HTLC.
8✔
1481
                deadlineOpt := c.cfg.FindOutgoingHTLCDeadline(htlc)
8✔
1482

8✔
1483
                // The deadline is default to the current deadlineMinHeight,
8✔
1484
                // and it's overwritten when it's not none.
8✔
1485
                deadline := deadlineMinHeight
8✔
1486
                deadlineOpt.WhenSome(func(d int32) {
14✔
1487
                        deadline = uint32(d)
6✔
1488

6✔
1489
                        // We only consider the value is under protection when
6✔
1490
                        // it's time-sensitive.
6✔
1491
                        totalValue += value
6✔
1492
                })
6✔
1493

1494
                if deadline < deadlineMinHeight {
14✔
1495
                        deadlineMinHeight = deadline
6✔
1496

6✔
1497
                        log.Tracef("ChannelArbitrator(%v): outgoing HTLC has "+
6✔
1498
                                "deadline=%v, value=%v", c.cfg.ChanPoint,
6✔
1499
                                deadlineMinHeight, value)
6✔
1500
                }
6✔
1501
        }
1502

1503
        // Then going through the incomingHTLCs, and update the minHeight when
1504
        // conditions met.
1505
        for _, htlc := range htlcs.incomingHTLCs {
24✔
1506
                // Skip if the HTLC is dust.
10✔
1507
                if htlc.OutputIndex < 0 {
11✔
1508
                        log.Debugf("ChannelArbitrator(%v): skipped deadline "+
1✔
1509
                                "for dust htlc=%x",
1✔
1510
                                c.cfg.ChanPoint, htlc.RHash[:])
1✔
1511

1✔
1512
                        continue
1✔
1513
                }
1514

1515
                // Since it's an HTLC sent to us, check if we have preimage for
1516
                // this HTLC.
1517
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
9✔
1518
                if err != nil {
9✔
1519
                        return fn.None[int32](), 0, err
×
1520
                }
×
1521

1522
                if !preimageAvailable {
12✔
1523
                        continue
3✔
1524
                }
1525

1526
                value := htlc.Amt.ToSatoshis()
7✔
1527
                totalValue += value
7✔
1528

7✔
1529
                if htlc.RefundTimeout < deadlineMinHeight {
13✔
1530
                        deadlineMinHeight = htlc.RefundTimeout
6✔
1531

6✔
1532
                        log.Tracef("ChannelArbitrator(%v): incoming HTLC has "+
6✔
1533
                                "deadline=%v, amt=%v", c.cfg.ChanPoint,
6✔
1534
                                deadlineMinHeight, value)
6✔
1535
                }
6✔
1536
        }
1537

1538
        // Calculate the deadline. There are two cases to be handled here,
1539
        //   - when the deadlineMinHeight never gets updated, which could
1540
        //     happen when we have no outgoing HTLCs, and, for incoming HTLCs,
1541
        //       * either we have none, or,
1542
        //       * none of the HTLCs are preimageAvailable.
1543
        //   - when our deadlineMinHeight is no greater than the heightHint,
1544
        //     which means we are behind our schedule.
1545
        var deadline uint32
14✔
1546
        switch {
14✔
1547
        // When we couldn't find a deadline height from our HTLCs, we will fall
1548
        // back to the default value as there's no time pressure here.
1549
        case deadlineMinHeight == math.MaxUint32:
5✔
1550
                return fn.None[int32](), 0, nil
5✔
1551

1552
        // When the deadline is passed, we will fall back to the smallest conf
1553
        // target (1 block).
1554
        case deadlineMinHeight <= heightHint:
2✔
1555
                log.Warnf("ChannelArbitrator(%v): deadline is passed with "+
2✔
1556
                        "deadlineMinHeight=%d, heightHint=%d",
2✔
1557
                        c.cfg.ChanPoint, deadlineMinHeight, heightHint)
2✔
1558
                deadline = 1
2✔
1559

1560
        // Use half of the deadline delta, and leave the other half to be used
1561
        // to sweep the HTLCs.
1562
        default:
9✔
1563
                deadline = (deadlineMinHeight - heightHint) / 2
9✔
1564
        }
1565

1566
        // Calculate the value left after subtracting the budget used for
1567
        // sweeping the time-sensitive HTLCs.
1568
        valueLeft := totalValue - calculateBudget(
10✔
1569
                totalValue, c.cfg.Budget.DeadlineHTLCRatio,
10✔
1570
                c.cfg.Budget.DeadlineHTLC,
10✔
1571
        )
10✔
1572

10✔
1573
        log.Debugf("ChannelArbitrator(%v): calculated valueLeft=%v, "+
10✔
1574
                "deadline=%d, using deadlineMinHeight=%d, heightHint=%d",
10✔
1575
                c.cfg.ChanPoint, valueLeft, deadline, deadlineMinHeight,
10✔
1576
                heightHint)
10✔
1577

10✔
1578
        return fn.Some(int32(deadline)), valueLeft, nil
10✔
1579
}
1580

1581
// resolveContracts updates the activeResolvers list and starts to resolve each
1582
// contract concurrently, and launches them.
1583
func (c *ChannelArbitrator) resolveContracts(resolvers []ContractResolver) {
14✔
1584
        c.activeResolversLock.Lock()
14✔
1585
        c.activeResolvers = resolvers
14✔
1586
        c.activeResolversLock.Unlock()
14✔
1587

14✔
1588
        // Launch all resolvers.
14✔
1589
        c.launchResolvers()
14✔
1590

14✔
1591
        for _, contract := range resolvers {
21✔
1592
                c.wg.Add(1)
7✔
1593
                go c.resolveContract(contract)
7✔
1594
        }
7✔
1595
}
1596

1597
// launchResolvers launches all the active resolvers concurrently.
1598
func (c *ChannelArbitrator) launchResolvers() {
20✔
1599
        c.activeResolversLock.Lock()
20✔
1600
        resolvers := c.activeResolvers
20✔
1601
        c.activeResolversLock.Unlock()
20✔
1602

20✔
1603
        // errChans is a map of channels that will be used to receive errors
20✔
1604
        // returned from launching the resolvers.
20✔
1605
        errChans := make(map[ContractResolver]chan error, len(resolvers))
20✔
1606

20✔
1607
        // Launch each resolver in goroutines.
20✔
1608
        for _, r := range resolvers {
27✔
1609
                // If the contract is already resolved, there's no need to
7✔
1610
                // launch it again.
7✔
1611
                if r.IsResolved() {
8✔
1612
                        log.Debugf("ChannelArbitrator(%v): skipping resolver "+
1✔
1613
                                "%T as it's already resolved", c.cfg.ChanPoint,
1✔
1614
                                r)
1✔
1615

1✔
1616
                        continue
1✔
1617
                }
1618

1619
                // Create a signal chan.
1620
                errChan := make(chan error, 1)
7✔
1621
                errChans[r] = errChan
7✔
1622

7✔
1623
                go func() {
14✔
1624
                        err := r.Launch()
7✔
1625
                        errChan <- err
7✔
1626
                }()
7✔
1627
        }
1628

1629
        // Wait for all resolvers to finish launching.
1630
        for r, errChan := range errChans {
27✔
1631
                select {
7✔
1632
                case err := <-errChan:
7✔
1633
                        if err == nil {
14✔
1634
                                continue
7✔
1635
                        }
1636

1637
                        log.Errorf("ChannelArbitrator(%v): unable to launch "+
×
1638
                                "contract resolver(%T): %v", c.cfg.ChanPoint, r,
×
1639
                                err)
×
1640

1641
                case <-c.quit:
×
1642
                        log.Debugf("ChannelArbitrator quit signal received, " +
×
1643
                                "exit launchResolvers")
×
1644

×
1645
                        return
×
1646
                }
1647
        }
1648
}
1649

1650
// advanceState is the main driver of our state machine. This method is an
1651
// iterative function which repeatedly attempts to advance the internal state
1652
// of the channel arbitrator. The state will be advanced until we reach a
1653
// redundant transition, meaning that the state transition is a noop. The final
1654
// param is a callback that allows the caller to execute an arbitrary action
1655
// after each state transition.
1656
func (c *ChannelArbitrator) advanceState(
1657
        triggerHeight uint32, trigger transitionTrigger,
1658
        confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
89✔
1659

89✔
1660
        var (
89✔
1661
                priorState   ArbitratorState
89✔
1662
                forceCloseTx *wire.MsgTx
89✔
1663
        )
89✔
1664

89✔
1665
        // We'll continue to advance our state forward until the state we
89✔
1666
        // transition to is that same state that we started at.
89✔
1667
        for {
264✔
1668
                priorState = c.state
175✔
1669
                log.Debugf("ChannelArbitrator(%v): attempting state step with "+
175✔
1670
                        "trigger=%v from state=%v at height=%v",
175✔
1671
                        c.cfg.ChanPoint, trigger, priorState, triggerHeight)
175✔
1672

175✔
1673
                nextState, closeTx, err := c.stateStep(
175✔
1674
                        triggerHeight, trigger, confCommitSet,
175✔
1675
                )
175✔
1676
                if err != nil {
180✔
1677
                        log.Errorf("ChannelArbitrator(%v): unable to advance "+
5✔
1678
                                "state: %v", c.cfg.ChanPoint, err)
5✔
1679
                        return priorState, nil, err
5✔
1680
                }
5✔
1681

1682
                if forceCloseTx == nil && closeTx != nil {
188✔
1683
                        forceCloseTx = closeTx
17✔
1684
                }
17✔
1685

1686
                // Our termination transition is a noop transition. If we get
1687
                // our prior state back as the next state, then we'll
1688
                // terminate.
1689
                if nextState == priorState {
253✔
1690
                        log.Debugf("ChannelArbitrator(%v): terminating at "+
82✔
1691
                                "state=%v", c.cfg.ChanPoint, nextState)
82✔
1692
                        return nextState, forceCloseTx, nil
82✔
1693
                }
82✔
1694

1695
                // As the prior state was successfully executed, we can now
1696
                // commit the next state. This ensures that we will re-execute
1697
                // the prior state if anything fails.
1698
                if err := c.log.CommitState(nextState); err != nil {
93✔
1699
                        log.Errorf("ChannelArbitrator(%v): unable to commit "+
3✔
1700
                                "next state(%v): %v", c.cfg.ChanPoint,
3✔
1701
                                nextState, err)
3✔
1702
                        return priorState, nil, err
3✔
1703
                }
3✔
1704
                c.state = nextState
87✔
1705
        }
1706
}
1707

1708
// ChainAction is an enum that encompasses all possible on-chain actions
1709
// we'll take for a set of HTLC's.
1710
type ChainAction uint8
1711

1712
const (
1713
        // NoAction is the min chainAction type, indicating that no action
1714
        // needs to be taken for a given HTLC.
1715
        NoAction ChainAction = 0
1716

1717
        // HtlcTimeoutAction indicates that the HTLC will timeout soon. As a
1718
        // result, we should get ready to sweep it on chain after the timeout.
1719
        HtlcTimeoutAction = 1
1720

1721
        // HtlcClaimAction indicates that we should claim the HTLC on chain
1722
        // before its timeout period.
1723
        HtlcClaimAction = 2
1724

1725
        // HtlcFailDustAction indicates that we should fail the upstream HTLC
1726
        // for an outgoing dust HTLC immediately (even before the commitment
1727
        // transaction is confirmed) because it has no output on the commitment
1728
        // transaction. This also includes remote pending outgoing dust HTLCs.
1729
        HtlcFailDustAction = 3
1730

1731
        // HtlcOutgoingWatchAction indicates that we can't yet timeout this
1732
        // HTLC, but we had to go to chain on order to resolve an existing
1733
        // HTLC.  In this case, we'll either: time it out once it expires, or
1734
        // will learn the pre-image if the remote party claims the output. In
1735
        // this case, well add the pre-image to our global store.
1736
        HtlcOutgoingWatchAction = 4
1737

1738
        // HtlcIncomingWatchAction indicates that we don't yet have the
1739
        // pre-image to claim incoming HTLC, but we had to go to chain in order
1740
        // to resolve and existing HTLC. In this case, we'll either: let the
1741
        // other party time it out, or eventually learn of the pre-image, in
1742
        // which case we'll claim on chain.
1743
        HtlcIncomingWatchAction = 5
1744

1745
        // HtlcIncomingDustFinalAction indicates that we should mark an incoming
1746
        // dust htlc as final because it can't be claimed on-chain.
1747
        HtlcIncomingDustFinalAction = 6
1748

1749
        // HtlcFailDanglingAction indicates that we should fail the upstream
1750
        // HTLC for an outgoing HTLC immediately after the commitment
1751
        // transaction has confirmed because it has no corresponding output on
1752
        // the commitment transaction. This category does NOT include any dust
1753
        // HTLCs which are mapped in the "HtlcFailDustAction" category.
1754
        HtlcFailDanglingAction = 7
1755
)
1756

1757
// String returns a human readable string describing a chain action.
1758
func (c ChainAction) String() string {
×
1759
        switch c {
×
1760
        case NoAction:
×
1761
                return "NoAction"
×
1762

1763
        case HtlcTimeoutAction:
×
1764
                return "HtlcTimeoutAction"
×
1765

1766
        case HtlcClaimAction:
×
1767
                return "HtlcClaimAction"
×
1768

1769
        case HtlcFailDustAction:
×
1770
                return "HtlcFailDustAction"
×
1771

1772
        case HtlcOutgoingWatchAction:
×
1773
                return "HtlcOutgoingWatchAction"
×
1774

1775
        case HtlcIncomingWatchAction:
×
1776
                return "HtlcIncomingWatchAction"
×
1777

1778
        case HtlcIncomingDustFinalAction:
×
1779
                return "HtlcIncomingDustFinalAction"
×
1780

1781
        case HtlcFailDanglingAction:
×
1782
                return "HtlcFailDanglingAction"
×
1783

1784
        default:
×
1785
                return "<unknown action>"
×
1786
        }
1787
}
1788

1789
// ChainActionMap is a map of a chain action, to the set of HTLC's that need to
1790
// be acted upon for a given action type. The channel
1791
type ChainActionMap map[ChainAction][]channeldb.HTLC
1792

1793
// Merge merges the passed chain actions with the target chain action map.
1794
func (c ChainActionMap) Merge(actions ChainActionMap) {
69✔
1795
        for chainAction, htlcs := range actions {
83✔
1796
                c[chainAction] = append(c[chainAction], htlcs...)
14✔
1797
        }
14✔
1798
}
1799

1800
// shouldGoOnChain takes into account the absolute timeout of the HTLC, if the
1801
// confirmation delta that we need is close, and returns a bool indicating if
1802
// we should go on chain to claim.  We do this rather than waiting up until the
1803
// last minute as we want to ensure that when we *need* (HTLC is timed out) to
1804
// sweep, the commitment is already confirmed.
1805
func (c *ChannelArbitrator) shouldGoOnChain(htlc channeldb.HTLC,
1806
        broadcastDelta, currentHeight uint32) bool {
27✔
1807

27✔
1808
        // We'll calculate the broadcast cut off for this HTLC. This is the
27✔
1809
        // height that (based on our current fee estimation) we should
27✔
1810
        // broadcast in order to ensure the commitment transaction is confirmed
27✔
1811
        // before the HTLC fully expires.
27✔
1812
        broadcastCutOff := htlc.RefundTimeout - broadcastDelta
27✔
1813

27✔
1814
        log.Tracef("ChannelArbitrator(%v): examining outgoing contract: "+
27✔
1815
                "expiry=%v, cutoff=%v, height=%v", c.cfg.ChanPoint, htlc.RefundTimeout,
27✔
1816
                broadcastCutOff, currentHeight)
27✔
1817

27✔
1818
        // TODO(roasbeef): take into account default HTLC delta, don't need to
27✔
1819
        // broadcast immediately
27✔
1820
        //  * can then batch with SINGLE | ANYONECANPAY
27✔
1821

27✔
1822
        // We should on-chain for this HTLC, iff we're within out broadcast
27✔
1823
        // cutoff window.
27✔
1824
        if currentHeight < broadcastCutOff {
47✔
1825
                return false
20✔
1826
        }
20✔
1827

1828
        // In case of incoming htlc we should go to chain.
1829
        if htlc.Incoming {
9✔
1830
                return true
1✔
1831
        }
1✔
1832

1833
        // For htlcs that are result of our initiated payments we give some grace
1834
        // period before force closing the channel. During this time we expect
1835
        // both nodes to connect and give a chance to the other node to send its
1836
        // updates and cancel the htlc.
1837
        // This shouldn't add any security risk as there is no incoming htlc to
1838
        // fulfill at this case and the expectation is that when the channel is
1839
        // active the other node will send update_fail_htlc to remove the htlc
1840
        // without closing the channel. It is up to the user to force close the
1841
        // channel if the peer misbehaves and doesn't send the update_fail_htlc.
1842
        // It is useful when this node is most of the time not online and is
1843
        // likely to miss the time slot where the htlc may be cancelled.
1844
        isForwarded := c.cfg.IsForwardedHTLC(c.cfg.ShortChanID, htlc.HtlcIndex)
8✔
1845
        upTime := c.cfg.Clock.Now().Sub(c.startTimestamp)
8✔
1846
        return isForwarded || upTime > c.cfg.PaymentsExpirationGracePeriod
8✔
1847
}
1848

1849
// checkCommitChainActions is called for each new block connected to the end of
1850
// the main chain. Given the new block height, this new method will examine all
1851
// active HTLC's, and determine if we need to go on-chain to claim any of them.
1852
// A map of action -> []htlc is returned, detailing what action (if any) should
1853
// be performed for each HTLC. For timed out HTLC's, once the commitment has
1854
// been sufficiently confirmed, the HTLC's should be canceled backwards. For
1855
// redeemed HTLC's, we should send the pre-image back to the incoming link.
1856
func (c *ChannelArbitrator) checkCommitChainActions(height uint32,
1857
        trigger transitionTrigger, htlcs htlcSet) (ChainActionMap, error) {
69✔
1858

69✔
1859
        // TODO(roasbeef): would need to lock channel? channel totem?
69✔
1860
        //  * race condition if adding and we broadcast, etc
69✔
1861
        //  * or would make each instance sync?
69✔
1862

69✔
1863
        log.Debugf("ChannelArbitrator(%v): checking commit chain actions at "+
69✔
1864
                "height=%v, in_htlc_count=%v, out_htlc_count=%v",
69✔
1865
                c.cfg.ChanPoint, height,
69✔
1866
                len(htlcs.incomingHTLCs), len(htlcs.outgoingHTLCs))
69✔
1867

69✔
1868
        actionMap := make(ChainActionMap)
69✔
1869

69✔
1870
        // First, we'll make an initial pass over the set of incoming and
69✔
1871
        // outgoing HTLC's to decide if we need to go on chain at all.
69✔
1872
        haveChainActions := false
69✔
1873
        for _, htlc := range htlcs.outgoingHTLCs {
77✔
1874
                // We'll need to go on-chain for an outgoing HTLC if it was
8✔
1875
                // never resolved downstream, and it's "close" to timing out.
8✔
1876
                //
8✔
1877
                // TODO(yy): If there's no corresponding incoming HTLC, it
8✔
1878
                // means we are the first hop, hence the payer. This is a
8✔
1879
                // tricky case - unlike a forwarding hop, we don't have an
8✔
1880
                // incoming HTLC that will time out, which means as long as we
8✔
1881
                // can learn the preimage, we can settle the invoice (before it
8✔
1882
                // expires?).
8✔
1883
                toChain := c.shouldGoOnChain(
8✔
1884
                        htlc, c.cfg.OutgoingBroadcastDelta, height,
8✔
1885
                )
8✔
1886

8✔
1887
                if toChain {
9✔
1888
                        // Convert to int64 in case of overflow.
1✔
1889
                        remainingBlocks := int64(htlc.RefundTimeout) -
1✔
1890
                                int64(height)
1✔
1891

1✔
1892
                        log.Infof("ChannelArbitrator(%v): go to chain for "+
1✔
1893
                                "outgoing htlc %x: timeout=%v, amount=%v, "+
1✔
1894
                                "blocks_until_expiry=%v, broadcast_delta=%v",
1✔
1895
                                c.cfg.ChanPoint, htlc.RHash[:],
1✔
1896
                                htlc.RefundTimeout, htlc.Amt, remainingBlocks,
1✔
1897
                                c.cfg.OutgoingBroadcastDelta,
1✔
1898
                        )
1✔
1899
                }
1✔
1900

1901
                haveChainActions = haveChainActions || toChain
8✔
1902
        }
1903

1904
        for _, htlc := range htlcs.incomingHTLCs {
75✔
1905
                // We'll need to go on-chain to pull an incoming HTLC iff we
6✔
1906
                // know the pre-image and it's close to timing out. We need to
6✔
1907
                // ensure that we claim the funds that are rightfully ours
6✔
1908
                // on-chain.
6✔
1909
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
6✔
1910
                if err != nil {
6✔
1911
                        return nil, err
×
1912
                }
×
1913

1914
                if !preimageAvailable {
11✔
1915
                        continue
5✔
1916
                }
1917

1918
                toChain := c.shouldGoOnChain(
2✔
1919
                        htlc, c.cfg.IncomingBroadcastDelta, height,
2✔
1920
                )
2✔
1921

2✔
1922
                if toChain {
3✔
1923
                        // Convert to int64 in case of overflow.
1✔
1924
                        remainingBlocks := int64(htlc.RefundTimeout) -
1✔
1925
                                int64(height)
1✔
1926

1✔
1927
                        log.Infof("ChannelArbitrator(%v): go to chain for "+
1✔
1928
                                "incoming htlc %x: timeout=%v, amount=%v, "+
1✔
1929
                                "blocks_until_expiry=%v, broadcast_delta=%v",
1✔
1930
                                c.cfg.ChanPoint, htlc.RHash[:],
1✔
1931
                                htlc.RefundTimeout, htlc.Amt, remainingBlocks,
1✔
1932
                                c.cfg.IncomingBroadcastDelta,
1✔
1933
                        )
1✔
1934
                }
1✔
1935

1936
                haveChainActions = haveChainActions || toChain
2✔
1937
        }
1938

1939
        // If we don't have any actions to make, then we'll return an empty
1940
        // action map. We only do this if this was a chain trigger though, as
1941
        // if we're going to broadcast the commitment (or the remote party did)
1942
        // we're *forced* to act on each HTLC.
1943
        if !haveChainActions && trigger == chainTrigger {
110✔
1944
                log.Tracef("ChannelArbitrator(%v): no actions to take at "+
41✔
1945
                        "height=%v", c.cfg.ChanPoint, height)
41✔
1946
                return actionMap, nil
41✔
1947
        }
41✔
1948

1949
        // Now that we know we'll need to go on-chain, we'll examine all of our
1950
        // active outgoing HTLC's to see if we either need to: sweep them after
1951
        // a timeout (then cancel backwards), cancel them backwards
1952
        // immediately, or watch them as they're still active contracts.
1953
        for _, htlc := range htlcs.outgoingHTLCs {
37✔
1954
                switch {
8✔
1955
                // If the HTLC is dust, then we can cancel it backwards
1956
                // immediately as there's no matching contract to arbitrate
1957
                // on-chain. We know the HTLC is dust, if the OutputIndex
1958
                // negative.
1959
                case htlc.OutputIndex < 0:
3✔
1960
                        log.Tracef("ChannelArbitrator(%v): immediately "+
3✔
1961
                                "failing dust htlc=%x", c.cfg.ChanPoint,
3✔
1962
                                htlc.RHash[:])
3✔
1963

3✔
1964
                        actionMap[HtlcFailDustAction] = append(
3✔
1965
                                actionMap[HtlcFailDustAction], htlc,
3✔
1966
                        )
3✔
1967

1968
                // If we don't need to immediately act on this HTLC, then we'll
1969
                // mark it still "live". After we broadcast, we'll monitor it
1970
                // until the HTLC times out to see if we can also redeem it
1971
                // on-chain.
1972
                case !c.shouldGoOnChain(htlc, c.cfg.OutgoingBroadcastDelta,
1973
                        height,
1974
                ):
6✔
1975
                        // TODO(roasbeef): also need to be able to query
6✔
1976
                        // circuit map to see if HTLC hasn't been fully
6✔
1977
                        // resolved
6✔
1978
                        //
6✔
1979
                        //  * can't fail incoming until if outgoing not yet
6✔
1980
                        //  failed
6✔
1981

6✔
1982
                        log.Tracef("ChannelArbitrator(%v): watching chain to "+
6✔
1983
                                "decide action for outgoing htlc=%x",
6✔
1984
                                c.cfg.ChanPoint, htlc.RHash[:])
6✔
1985

6✔
1986
                        actionMap[HtlcOutgoingWatchAction] = append(
6✔
1987
                                actionMap[HtlcOutgoingWatchAction], htlc,
6✔
1988
                        )
6✔
1989

1990
                // Otherwise, we'll update our actionMap to mark that we need
1991
                // to sweep this HTLC on-chain
1992
                default:
1✔
1993
                        log.Tracef("ChannelArbitrator(%v): going on-chain to "+
1✔
1994
                                "timeout htlc=%x", c.cfg.ChanPoint, htlc.RHash[:])
1✔
1995

1✔
1996
                        actionMap[HtlcTimeoutAction] = append(
1✔
1997
                                actionMap[HtlcTimeoutAction], htlc,
1✔
1998
                        )
1✔
1999
                }
2000
        }
2001

2002
        // Similarly, for each incoming HTLC, now that we need to go on-chain,
2003
        // we'll either: sweep it immediately if we know the pre-image, or
2004
        // observe the output on-chain if we don't In this last, case we'll
2005
        // either learn of it eventually from the outgoing HTLC, or the sender
2006
        // will timeout the HTLC.
2007
        for _, htlc := range htlcs.incomingHTLCs {
35✔
2008
                // If the HTLC is dust, there is no action to be taken.
6✔
2009
                if htlc.OutputIndex < 0 {
9✔
2010
                        log.Debugf("ChannelArbitrator(%v): no resolution "+
3✔
2011
                                "needed for incoming dust htlc=%x",
3✔
2012
                                c.cfg.ChanPoint, htlc.RHash[:])
3✔
2013

3✔
2014
                        actionMap[HtlcIncomingDustFinalAction] = append(
3✔
2015
                                actionMap[HtlcIncomingDustFinalAction], htlc,
3✔
2016
                        )
3✔
2017

3✔
2018
                        continue
3✔
2019
                }
2020

2021
                log.Tracef("ChannelArbitrator(%v): watching chain to decide "+
4✔
2022
                        "action for incoming htlc=%x", c.cfg.ChanPoint,
4✔
2023
                        htlc.RHash[:])
4✔
2024

4✔
2025
                actionMap[HtlcIncomingWatchAction] = append(
4✔
2026
                        actionMap[HtlcIncomingWatchAction], htlc,
4✔
2027
                )
4✔
2028
        }
2029

2030
        return actionMap, nil
29✔
2031
}
2032

2033
// isPreimageAvailable returns whether the hash preimage is available in either
2034
// the preimage cache or the invoice database.
2035
func (c *ChannelArbitrator) isPreimageAvailable(hash lntypes.Hash) (bool,
2036
        error) {
18✔
2037

18✔
2038
        // Start by checking the preimage cache for preimages of
18✔
2039
        // forwarded HTLCs.
18✔
2040
        _, preimageAvailable := c.cfg.PreimageDB.LookupPreimage(
18✔
2041
                hash,
18✔
2042
        )
18✔
2043
        if preimageAvailable {
26✔
2044
                return true, nil
8✔
2045
        }
8✔
2046

2047
        // Then check if we have an invoice that can be settled by this HTLC.
2048
        //
2049
        // TODO(joostjager): Check that there are still more blocks remaining
2050
        // than the invoice cltv delta. We don't want to go to chain only to
2051
        // have the incoming contest resolver decide that we don't want to
2052
        // settle this invoice.
2053
        invoice, err := c.cfg.Registry.LookupInvoice(context.Background(), hash)
11✔
2054
        switch {
11✔
2055
        case err == nil:
1✔
2056
        case errors.Is(err, invoices.ErrInvoiceNotFound) ||
2057
                errors.Is(err, invoices.ErrNoInvoicesCreated):
11✔
2058

11✔
2059
                return false, nil
11✔
2060
        default:
×
2061
                return false, err
×
2062
        }
2063

2064
        preimageAvailable = invoice.Terms.PaymentPreimage != nil
1✔
2065

1✔
2066
        return preimageAvailable, nil
1✔
2067
}
2068

2069
// checkLocalChainActions is similar to checkCommitChainActions, but it also
2070
// examines the set of HTLCs on the remote party's commitment. This allows us
2071
// to ensure we're able to satisfy the HTLC timeout constraints for incoming vs
2072
// outgoing HTLCs.
2073
func (c *ChannelArbitrator) checkLocalChainActions(
2074
        height uint32, trigger transitionTrigger,
2075
        activeHTLCs map[HtlcSetKey]htlcSet,
2076
        commitsConfirmed bool) (ChainActionMap, error) {
61✔
2077

61✔
2078
        // First, we'll check our local chain actions as normal. This will only
61✔
2079
        // examine HTLCs on our local commitment (timeout or settle).
61✔
2080
        localCommitActions, err := c.checkCommitChainActions(
61✔
2081
                height, trigger, activeHTLCs[LocalHtlcSet],
61✔
2082
        )
61✔
2083
        if err != nil {
61✔
2084
                return nil, err
×
2085
        }
×
2086

2087
        // Next, we'll examine the remote commitment (and maybe a dangling one)
2088
        // to see if the set difference of our HTLCs is non-empty. If so, then
2089
        // we may need to cancel back some HTLCs if we decide go to chain.
2090
        remoteDanglingActions := c.checkRemoteDanglingActions(
61✔
2091
                height, activeHTLCs, commitsConfirmed,
61✔
2092
        )
61✔
2093

61✔
2094
        // Finally, we'll merge the two set of chain actions.
61✔
2095
        localCommitActions.Merge(remoteDanglingActions)
61✔
2096

61✔
2097
        return localCommitActions, nil
61✔
2098
}
2099

2100
// checkRemoteDanglingActions examines the set of remote commitments for any
2101
// HTLCs that are close to timing out. If we find any, then we'll return a set
2102
// of chain actions for HTLCs that are on our commitment, but not theirs to
2103
// cancel immediately.
2104
func (c *ChannelArbitrator) checkRemoteDanglingActions(
2105
        height uint32, activeHTLCs map[HtlcSetKey]htlcSet,
2106
        commitsConfirmed bool) ChainActionMap {
61✔
2107

61✔
2108
        var (
61✔
2109
                pendingRemoteHTLCs []channeldb.HTLC
61✔
2110
                localHTLCs         = make(map[uint64]struct{})
61✔
2111
                remoteHTLCs        = make(map[uint64]channeldb.HTLC)
61✔
2112
                actionMap          = make(ChainActionMap)
61✔
2113
        )
61✔
2114

61✔
2115
        // First, we'll construct two sets of the outgoing HTLCs: those on our
61✔
2116
        // local commitment, and those that are on the remote commitment(s).
61✔
2117
        for htlcSetKey, htlcs := range activeHTLCs {
179✔
2118
                if htlcSetKey.IsRemote {
181✔
2119
                        for _, htlc := range htlcs.outgoingHTLCs {
78✔
2120
                                remoteHTLCs[htlc.HtlcIndex] = htlc
15✔
2121
                        }
15✔
2122
                } else {
56✔
2123
                        for _, htlc := range htlcs.outgoingHTLCs {
62✔
2124
                                localHTLCs[htlc.HtlcIndex] = struct{}{}
6✔
2125
                        }
6✔
2126
                }
2127
        }
2128

2129
        // With both sets constructed, we'll now compute the set difference of
2130
        // our two sets of HTLCs. This'll give us the HTLCs that exist on the
2131
        // remote commitment transaction, but not on ours.
2132
        for htlcIndex, htlc := range remoteHTLCs {
76✔
2133
                if _, ok := localHTLCs[htlcIndex]; ok {
17✔
2134
                        continue
2✔
2135
                }
2136

2137
                pendingRemoteHTLCs = append(pendingRemoteHTLCs, htlc)
14✔
2138
        }
2139

2140
        // Finally, we'll examine all the pending remote HTLCs for those that
2141
        // have expired. If we find any, then we'll recommend that they be
2142
        // failed now so we can free up the incoming HTLC.
2143
        for _, htlc := range pendingRemoteHTLCs {
75✔
2144
                // We'll now check if we need to go to chain in order to cancel
14✔
2145
                // the incoming HTLC.
14✔
2146
                goToChain := c.shouldGoOnChain(htlc, c.cfg.OutgoingBroadcastDelta,
14✔
2147
                        height,
14✔
2148
                )
14✔
2149

14✔
2150
                // If we don't need to go to chain, and no commitments have
14✔
2151
                // been confirmed, then we can move on. Otherwise, if
14✔
2152
                // commitments have been confirmed, then we need to cancel back
14✔
2153
                // *all* of the pending remote HTLCS.
14✔
2154
                if !goToChain && !commitsConfirmed {
19✔
2155
                        continue
5✔
2156
                }
2157

2158
                // Dust htlcs can be canceled back even before the commitment
2159
                // transaction confirms. Dust htlcs are not enforceable onchain.
2160
                // If another version of the commit tx would confirm we either
2161
                // gain or lose those dust amounts but there is no other way
2162
                // than cancelling the incoming back because we will never learn
2163
                // the preimage.
2164
                if htlc.OutputIndex < 0 {
10✔
2165
                        log.Infof("ChannelArbitrator(%v): fail dangling dust "+
×
2166
                                "htlc=%x from local/remote commitments diff",
×
2167
                                c.cfg.ChanPoint, htlc.RHash[:])
×
2168

×
2169
                        actionMap[HtlcFailDustAction] = append(
×
2170
                                actionMap[HtlcFailDustAction], htlc,
×
2171
                        )
×
2172

×
2173
                        continue
×
2174
                }
2175

2176
                log.Infof("ChannelArbitrator(%v): fail dangling htlc=%x from "+
10✔
2177
                        "local/remote commitments diff",
10✔
2178
                        c.cfg.ChanPoint, htlc.RHash[:])
10✔
2179

10✔
2180
                actionMap[HtlcFailDanglingAction] = append(
10✔
2181
                        actionMap[HtlcFailDanglingAction], htlc,
10✔
2182
                )
10✔
2183
        }
2184

2185
        return actionMap
61✔
2186
}
2187

2188
// checkRemoteChainActions examines the two possible remote commitment chains
2189
// and returns the set of chain actions we need to carry out if the remote
2190
// commitment (non pending) confirms. The pendingConf indicates if the pending
2191
// remote commitment confirmed. This is similar to checkCommitChainActions, but
2192
// we'll immediately fail any HTLCs on the pending remote commit, but not the
2193
// remote commit (or the other way around).
2194
func (c *ChannelArbitrator) checkRemoteChainActions(
2195
        height uint32, trigger transitionTrigger,
2196
        activeHTLCs map[HtlcSetKey]htlcSet,
2197
        pendingConf bool) (ChainActionMap, error) {
9✔
2198

9✔
2199
        // First, we'll examine all the normal chain actions on the remote
9✔
2200
        // commitment that confirmed.
9✔
2201
        confHTLCs := activeHTLCs[RemoteHtlcSet]
9✔
2202
        if pendingConf {
11✔
2203
                confHTLCs = activeHTLCs[RemotePendingHtlcSet]
2✔
2204
        }
2✔
2205
        remoteCommitActions, err := c.checkCommitChainActions(
9✔
2206
                height, trigger, confHTLCs,
9✔
2207
        )
9✔
2208
        if err != nil {
9✔
2209
                return nil, err
×
2210
        }
×
2211

2212
        // With these actions computed, we'll now check the diff of the HTLCs on
2213
        // the commitments, and cancel back any that are on the pending but not
2214
        // the non-pending.
2215
        remoteDiffActions := c.checkRemoteDiffActions(
9✔
2216
                activeHTLCs, pendingConf,
9✔
2217
        )
9✔
2218

9✔
2219
        // Finally, we'll merge all the chain actions and the final set of
9✔
2220
        // chain actions.
9✔
2221
        remoteCommitActions.Merge(remoteDiffActions)
9✔
2222
        return remoteCommitActions, nil
9✔
2223
}
2224

2225
// checkRemoteDiffActions checks the set difference of the HTLCs on the remote
2226
// confirmed commit and remote pending commit for HTLCS that we need to cancel
2227
// back. If we find any HTLCs on the remote pending but not the remote, then
2228
// we'll mark them to be failed immediately.
2229
func (c *ChannelArbitrator) checkRemoteDiffActions(
2230
        activeHTLCs map[HtlcSetKey]htlcSet,
2231
        pendingConf bool) ChainActionMap {
9✔
2232

9✔
2233
        // First, we'll partition the HTLCs into those that are present on the
9✔
2234
        // confirmed commitment, and those on the dangling commitment.
9✔
2235
        confHTLCs := activeHTLCs[RemoteHtlcSet]
9✔
2236
        danglingHTLCs := activeHTLCs[RemotePendingHtlcSet]
9✔
2237
        if pendingConf {
11✔
2238
                confHTLCs = activeHTLCs[RemotePendingHtlcSet]
2✔
2239
                danglingHTLCs = activeHTLCs[RemoteHtlcSet]
2✔
2240
        }
2✔
2241

2242
        // Next, we'll create a set of all the HTLCs confirmed commitment.
2243
        remoteHtlcs := make(map[uint64]struct{})
9✔
2244
        for _, htlc := range confHTLCs.outgoingHTLCs {
12✔
2245
                remoteHtlcs[htlc.HtlcIndex] = struct{}{}
3✔
2246
        }
3✔
2247

2248
        // With the remote HTLCs assembled, we'll mark any HTLCs only on the
2249
        // remote pending commitment to be failed asap.
2250
        actionMap := make(ChainActionMap)
9✔
2251
        for _, htlc := range danglingHTLCs.outgoingHTLCs {
13✔
2252
                if _, ok := remoteHtlcs[htlc.HtlcIndex]; ok {
4✔
UNCOV
2253
                        continue
×
2254
                }
2255

2256
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
4✔
2257
                if err != nil {
4✔
2258
                        log.Errorf("ChannelArbitrator(%v): failed to query "+
×
2259
                                "preimage for dangling htlc=%x from remote "+
×
2260
                                "commitments diff", c.cfg.ChanPoint,
×
2261
                                htlc.RHash[:])
×
2262

×
2263
                        continue
×
2264
                }
2265

2266
                if preimageAvailable {
4✔
2267
                        continue
×
2268
                }
2269

2270
                // Dust HTLCs on the remote commitment can be failed back.
2271
                if htlc.OutputIndex < 0 {
4✔
2272
                        log.Infof("ChannelArbitrator(%v): fail dangling dust "+
×
2273
                                "htlc=%x from remote commitments diff",
×
2274
                                c.cfg.ChanPoint, htlc.RHash[:])
×
2275

×
2276
                        actionMap[HtlcFailDustAction] = append(
×
2277
                                actionMap[HtlcFailDustAction], htlc,
×
2278
                        )
×
2279

×
2280
                        continue
×
2281
                }
2282

2283
                actionMap[HtlcFailDanglingAction] = append(
4✔
2284
                        actionMap[HtlcFailDanglingAction], htlc,
4✔
2285
                )
4✔
2286

4✔
2287
                log.Infof("ChannelArbitrator(%v): fail dangling htlc=%x from "+
4✔
2288
                        "remote commitments diff",
4✔
2289
                        c.cfg.ChanPoint, htlc.RHash[:])
4✔
2290
        }
2291

2292
        return actionMap
9✔
2293
}
2294

2295
// constructChainActions returns the set of actions that should be taken for
2296
// confirmed HTLCs at the specified height. Our actions will depend on the set
2297
// of HTLCs that were active across all channels at the time of channel
2298
// closure.
2299
func (c *ChannelArbitrator) constructChainActions(confCommitSet *CommitSet,
2300
        height uint32, trigger transitionTrigger) (ChainActionMap, error) {
22✔
2301

22✔
2302
        // If we've reached this point and have not confirmed commitment set,
22✔
2303
        // then this is an older node that had a pending close channel before
22✔
2304
        // the CommitSet was introduced. In this case, we'll just return the
22✔
2305
        // existing ChainActionMap they had on disk.
22✔
2306
        if confCommitSet == nil || confCommitSet.ConfCommitKey.IsNone() {
29✔
2307
                return c.log.FetchChainActions()
7✔
2308
        }
7✔
2309

2310
        // Otherwise, we have the full commitment set written to disk, and can
2311
        // proceed as normal.
2312
        htlcSets := confCommitSet.toActiveHTLCSets()
15✔
2313
        confCommitKey, err := confCommitSet.ConfCommitKey.UnwrapOrErr(
15✔
2314
                fmt.Errorf("no commitKey available"),
15✔
2315
        )
15✔
2316
        if err != nil {
15✔
2317
                return nil, err
×
2318
        }
×
2319

2320
        switch confCommitKey {
15✔
2321
        // If the local commitment transaction confirmed, then we'll examine
2322
        // that as well as their commitments to the set of chain actions.
2323
        case LocalHtlcSet:
7✔
2324
                return c.checkLocalChainActions(
7✔
2325
                        height, trigger, htlcSets, true,
7✔
2326
                )
7✔
2327

2328
        // If the remote commitment confirmed, then we'll grab all the chain
2329
        // actions for the remote commit, and check the pending commit for any
2330
        // HTLCS we need to handle immediately (dust).
2331
        case RemoteHtlcSet:
7✔
2332
                return c.checkRemoteChainActions(
7✔
2333
                        height, trigger, htlcSets, false,
7✔
2334
                )
7✔
2335

2336
        // Otherwise, the remote pending commitment confirmed, so we'll examine
2337
        // the HTLCs on that unrevoked dangling commitment.
2338
        case RemotePendingHtlcSet:
2✔
2339
                return c.checkRemoteChainActions(
2✔
2340
                        height, trigger, htlcSets, true,
2✔
2341
                )
2✔
2342
        }
2343

2344
        return nil, fmt.Errorf("unable to locate chain actions")
×
2345
}
2346

2347
// prepContractResolutions is called either in the case that we decide we need
2348
// to go to chain, or the remote party goes to chain. Given a set of actions we
2349
// need to take for each HTLC, this method will return a set of contract
2350
// resolvers that will resolve the contracts on-chain if needed, and also a set
2351
// of packets to send to the htlcswitch in order to ensure all incoming HTLC's
2352
// are properly resolved.
2353
func (c *ChannelArbitrator) prepContractResolutions(
2354
        contractResolutions *ContractResolutions, height uint32,
2355
        htlcActions ChainActionMap) ([]ContractResolver, error) {
13✔
2356

13✔
2357
        // We'll also fetch the historical state of this channel, as it should
13✔
2358
        // have been marked as closed by now, and supplement it to each resolver
13✔
2359
        // such that we can properly resolve our pending contracts.
13✔
2360
        var chanState *channeldb.OpenChannel
13✔
2361
        chanState, err := c.cfg.FetchHistoricalChannel()
13✔
2362
        switch {
13✔
2363
        // If we don't find this channel, then it may be the case that it
2364
        // was closed before we started to retain the final state
2365
        // information for open channels.
2366
        case err == channeldb.ErrNoHistoricalBucket:
×
2367
                fallthrough
×
2368
        case err == channeldb.ErrChannelNotFound:
×
2369
                log.Warnf("ChannelArbitrator(%v): unable to fetch historical "+
×
2370
                        "state", c.cfg.ChanPoint)
×
2371

2372
        case err != nil:
×
2373
                return nil, err
×
2374
        }
2375

2376
        incomingResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
13✔
2377
        outgoingResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
13✔
2378

13✔
2379
        // We'll use these two maps to quickly look up an active HTLC with its
13✔
2380
        // matching HTLC resolution.
13✔
2381
        outResolutionMap := make(map[wire.OutPoint]lnwallet.OutgoingHtlcResolution)
13✔
2382
        inResolutionMap := make(map[wire.OutPoint]lnwallet.IncomingHtlcResolution)
13✔
2383
        for i := 0; i < len(incomingResolutions); i++ {
14✔
2384
                inRes := incomingResolutions[i]
1✔
2385
                inResolutionMap[inRes.HtlcPoint()] = inRes
1✔
2386
        }
1✔
2387
        for i := 0; i < len(outgoingResolutions); i++ {
15✔
2388
                outRes := outgoingResolutions[i]
2✔
2389
                outResolutionMap[outRes.HtlcPoint()] = outRes
2✔
2390
        }
2✔
2391

2392
        // We'll create the resolver kit that we'll be cloning for each
2393
        // resolver so they each can do their duty.
2394
        resolverCfg := ResolverConfig{
13✔
2395
                ChannelArbitratorConfig: c.cfg,
13✔
2396
                Checkpoint: func(res ContractResolver,
13✔
2397
                        reports ...*channeldb.ResolverReport) error {
16✔
2398

3✔
2399
                        return c.log.InsertUnresolvedContracts(reports, res)
3✔
2400
                },
3✔
2401
        }
2402

2403
        commitHash := contractResolutions.CommitHash
13✔
2404

13✔
2405
        var htlcResolvers []ContractResolver
13✔
2406

13✔
2407
        // We instantiate an anchor resolver if the commitment tx has an
13✔
2408
        // anchor.
13✔
2409
        if contractResolutions.AnchorResolution != nil {
16✔
2410
                anchorResolver := newAnchorResolver(
3✔
2411
                        contractResolutions.AnchorResolution.AnchorSignDescriptor,
3✔
2412
                        contractResolutions.AnchorResolution.CommitAnchor,
3✔
2413
                        height, c.cfg.ChanPoint, resolverCfg,
3✔
2414
                )
3✔
2415
                anchorResolver.SupplementState(chanState)
3✔
2416

3✔
2417
                htlcResolvers = append(htlcResolvers, anchorResolver)
3✔
2418
        }
3✔
2419

2420
        // If this is a breach close, we'll create a breach resolver, determine
2421
        // the htlc's to fail back, and exit. This is done because the other
2422
        // steps taken for non-breach-closes do not matter for breach-closes.
2423
        if contractResolutions.BreachResolution != nil {
16✔
2424
                breachResolver := newBreachResolver(resolverCfg)
3✔
2425
                htlcResolvers = append(htlcResolvers, breachResolver)
3✔
2426

3✔
2427
                return htlcResolvers, nil
3✔
2428
        }
3✔
2429

2430
        // For each HTLC, we'll either act immediately, meaning we'll instantly
2431
        // fail the HTLC, or we'll act only once the transaction has been
2432
        // confirmed, in which case we'll need an HTLC resolver.
2433
        for htlcAction, htlcs := range htlcActions {
23✔
2434
                switch htlcAction {
12✔
2435
                // If we can claim this HTLC, we'll create an HTLC resolver to
2436
                // claim the HTLC (second-level or directly), then add the pre
2437
                case HtlcClaimAction:
×
2438
                        for _, htlc := range htlcs {
×
2439
                                htlc := htlc
×
2440

×
2441
                                htlcOp := wire.OutPoint{
×
2442
                                        Hash:  commitHash,
×
2443
                                        Index: uint32(htlc.OutputIndex),
×
2444
                                }
×
2445

×
2446
                                resolution, ok := inResolutionMap[htlcOp]
×
2447
                                if !ok {
×
2448
                                        // TODO(roasbeef): panic?
×
2449
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2450
                                                "incoming resolution: %v",
×
2451
                                                c.cfg.ChanPoint, htlcOp)
×
2452
                                        continue
×
2453
                                }
2454

2455
                                resolver := newSuccessResolver(
×
2456
                                        resolution, height, htlc, resolverCfg,
×
2457
                                )
×
2458
                                if chanState != nil {
×
2459
                                        resolver.SupplementState(chanState)
×
2460
                                }
×
2461
                                htlcResolvers = append(htlcResolvers, resolver)
×
2462
                        }
2463

2464
                // If we can timeout the HTLC directly, then we'll create the
2465
                // proper resolver to do so, who will then cancel the packet
2466
                // backwards.
2467
                case HtlcTimeoutAction:
1✔
2468
                        for _, htlc := range htlcs {
2✔
2469
                                htlc := htlc
1✔
2470

1✔
2471
                                htlcOp := wire.OutPoint{
1✔
2472
                                        Hash:  commitHash,
1✔
2473
                                        Index: uint32(htlc.OutputIndex),
1✔
2474
                                }
1✔
2475

1✔
2476
                                resolution, ok := outResolutionMap[htlcOp]
1✔
2477
                                if !ok {
1✔
2478
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2479
                                                "outgoing resolution: %v", c.cfg.ChanPoint, htlcOp)
×
2480
                                        continue
×
2481
                                }
2482

2483
                                resolver := newTimeoutResolver(
1✔
2484
                                        resolution, height, htlc, resolverCfg,
1✔
2485
                                )
1✔
2486
                                if chanState != nil {
2✔
2487
                                        resolver.SupplementState(chanState)
1✔
2488
                                }
1✔
2489

2490
                                // For outgoing HTLCs, we will also need to
2491
                                // supplement the resolver with the expiry
2492
                                // block height of its corresponding incoming
2493
                                // HTLC.
2494
                                deadline := c.cfg.FindOutgoingHTLCDeadline(htlc)
1✔
2495
                                resolver.SupplementDeadline(deadline)
1✔
2496

1✔
2497
                                htlcResolvers = append(htlcResolvers, resolver)
1✔
2498
                        }
2499

2500
                // If this is an incoming HTLC, but we can't act yet, then
2501
                // we'll create an incoming resolver to redeem the HTLC if we
2502
                // learn of the pre-image, or let the remote party time out.
2503
                case HtlcIncomingWatchAction:
1✔
2504
                        for _, htlc := range htlcs {
2✔
2505
                                htlc := htlc
1✔
2506

1✔
2507
                                htlcOp := wire.OutPoint{
1✔
2508
                                        Hash:  commitHash,
1✔
2509
                                        Index: uint32(htlc.OutputIndex),
1✔
2510
                                }
1✔
2511

1✔
2512
                                // TODO(roasbeef): need to handle incoming dust...
1✔
2513

1✔
2514
                                // TODO(roasbeef): can't be negative!!!
1✔
2515
                                resolution, ok := inResolutionMap[htlcOp]
1✔
2516
                                if !ok {
1✔
2517
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2518
                                                "incoming resolution: %v",
×
2519
                                                c.cfg.ChanPoint, htlcOp)
×
2520
                                        continue
×
2521
                                }
2522

2523
                                resolver := newIncomingContestResolver(
1✔
2524
                                        resolution, height, htlc,
1✔
2525
                                        resolverCfg,
1✔
2526
                                )
1✔
2527
                                if chanState != nil {
2✔
2528
                                        resolver.SupplementState(chanState)
1✔
2529
                                }
1✔
2530
                                htlcResolvers = append(htlcResolvers, resolver)
1✔
2531
                        }
2532

2533
                // Finally, if this is an outgoing HTLC we've sent, then we'll
2534
                // launch a resolver to watch for the pre-image (and settle
2535
                // backwards), or just timeout.
2536
                case HtlcOutgoingWatchAction:
2✔
2537
                        for _, htlc := range htlcs {
4✔
2538
                                htlc := htlc
2✔
2539

2✔
2540
                                htlcOp := wire.OutPoint{
2✔
2541
                                        Hash:  commitHash,
2✔
2542
                                        Index: uint32(htlc.OutputIndex),
2✔
2543
                                }
2✔
2544

2✔
2545
                                resolution, ok := outResolutionMap[htlcOp]
2✔
2546
                                if !ok {
2✔
2547
                                        log.Errorf("ChannelArbitrator(%v) "+
×
2548
                                                "unable to find outgoing "+
×
2549
                                                "resolution: %v",
×
2550
                                                c.cfg.ChanPoint, htlcOp)
×
2551

×
2552
                                        continue
×
2553
                                }
2554

2555
                                resolver := newOutgoingContestResolver(
2✔
2556
                                        resolution, height, htlc, resolverCfg,
2✔
2557
                                )
2✔
2558
                                if chanState != nil {
4✔
2559
                                        resolver.SupplementState(chanState)
2✔
2560
                                }
2✔
2561

2562
                                // For outgoing HTLCs, we will also need to
2563
                                // supplement the resolver with the expiry
2564
                                // block height of its corresponding incoming
2565
                                // HTLC.
2566
                                deadline := c.cfg.FindOutgoingHTLCDeadline(htlc)
2✔
2567
                                resolver.SupplementDeadline(deadline)
2✔
2568

2✔
2569
                                htlcResolvers = append(htlcResolvers, resolver)
2✔
2570
                        }
2571
                }
2572
        }
2573

2574
        // If this is was an unilateral closure, then we'll also create a
2575
        // resolver to sweep our commitment output (but only if it wasn't
2576
        // trimmed).
2577
        if contractResolutions.CommitResolution != nil {
12✔
2578
                resolver := newCommitSweepResolver(
1✔
2579
                        *contractResolutions.CommitResolution, height,
1✔
2580
                        c.cfg.ChanPoint, resolverCfg,
1✔
2581
                )
1✔
2582
                if chanState != nil {
2✔
2583
                        resolver.SupplementState(chanState)
1✔
2584
                }
1✔
2585
                htlcResolvers = append(htlcResolvers, resolver)
1✔
2586
        }
2587

2588
        return htlcResolvers, nil
11✔
2589
}
2590

2591
// replaceResolver replaces a in the list of active resolvers. If the resolver
2592
// to be replaced is not found, it returns an error.
2593
func (c *ChannelArbitrator) replaceResolver(oldResolver,
2594
        newResolver ContractResolver) error {
2✔
2595

2✔
2596
        c.activeResolversLock.Lock()
2✔
2597
        defer c.activeResolversLock.Unlock()
2✔
2598

2✔
2599
        oldKey := oldResolver.ResolverKey()
2✔
2600
        for i, r := range c.activeResolvers {
4✔
2601
                if bytes.Equal(r.ResolverKey(), oldKey) {
4✔
2602
                        c.activeResolvers[i] = newResolver
2✔
2603
                        return nil
2✔
2604
                }
2✔
2605
        }
2606

2607
        return errors.New("resolver to be replaced not found")
×
2608
}
2609

2610
// resolveContract is a goroutine tasked with fully resolving an unresolved
2611
// contract. Either the initial contract will be resolved after a single step,
2612
// or the contract will itself create another contract to be resolved. In
2613
// either case, one the contract has been fully resolved, we'll signal back to
2614
// the main goroutine so it can properly keep track of the set of unresolved
2615
// contracts.
2616
//
2617
// NOTE: This MUST be run as a goroutine.
2618
func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver) {
7✔
2619
        defer c.wg.Done()
7✔
2620

7✔
2621
        log.Tracef("ChannelArbitrator(%v): attempting to resolve %T",
7✔
2622
                c.cfg.ChanPoint, currentContract)
7✔
2623

7✔
2624
        // Until the contract is fully resolved, we'll continue to iteratively
7✔
2625
        // resolve the contract one step at a time.
7✔
2626
        for !currentContract.IsResolved() {
15✔
2627
                log.Tracef("ChannelArbitrator(%v): contract %T not yet "+
8✔
2628
                        "resolved", c.cfg.ChanPoint, currentContract)
8✔
2629

8✔
2630
                select {
8✔
2631

2632
                // If we've been signalled to quit, then we'll exit early.
2633
                case <-c.quit:
×
2634
                        return
×
2635

2636
                default:
8✔
2637
                        // Otherwise, we'll attempt to resolve the current
8✔
2638
                        // contract.
8✔
2639
                        nextContract, err := currentContract.Resolve()
8✔
2640
                        if err != nil {
10✔
2641
                                if err == errResolverShuttingDown {
4✔
2642
                                        return
2✔
2643
                                }
2✔
2644

2645
                                log.Errorf("ChannelArbitrator(%v): unable to "+
1✔
2646
                                        "progress %T: %v",
1✔
2647
                                        c.cfg.ChanPoint, currentContract, err)
1✔
2648
                                return
1✔
2649
                        }
2650

2651
                        switch {
7✔
2652
                        // If this contract produced another, then this means
2653
                        // the current contract was only able to be partially
2654
                        // resolved in this step. So we'll do a contract swap
2655
                        // within our logs: the new contract will take the
2656
                        // place of the old one.
2657
                        case nextContract != nil:
2✔
2658
                                log.Debugf("ChannelArbitrator(%v): swapping "+
2✔
2659
                                        "out contract %T for %T ",
2✔
2660
                                        c.cfg.ChanPoint, currentContract,
2✔
2661
                                        nextContract)
2✔
2662

2✔
2663
                                // Swap contract in log.
2✔
2664
                                err := c.log.SwapContract(
2✔
2665
                                        currentContract, nextContract,
2✔
2666
                                )
2✔
2667
                                if err != nil {
2✔
2668
                                        log.Errorf("unable to add recurse "+
×
2669
                                                "contract: %v", err)
×
2670
                                }
×
2671

2672
                                // Swap contract in resolvers list. This is to
2673
                                // make sure that reports are queried from the
2674
                                // new resolver.
2675
                                err = c.replaceResolver(
2✔
2676
                                        currentContract, nextContract,
2✔
2677
                                )
2✔
2678
                                if err != nil {
2✔
2679
                                        log.Errorf("unable to replace "+
×
2680
                                                "contract: %v", err)
×
2681
                                }
×
2682

2683
                                // As this contract produced another, we'll
2684
                                // re-assign, so we can continue our resolution
2685
                                // loop.
2686
                                currentContract = nextContract
2✔
2687

2✔
2688
                                // Launch the new contract.
2✔
2689
                                err = currentContract.Launch()
2✔
2690
                                if err != nil {
2✔
2691
                                        log.Errorf("Failed to launch %T: %v",
×
2692
                                                currentContract, err)
×
2693
                                }
×
2694

2695
                        // If this contract is actually fully resolved, then
2696
                        // we'll mark it as such within the database.
2697
                        case currentContract.IsResolved():
6✔
2698
                                log.Debugf("ChannelArbitrator(%v): marking "+
6✔
2699
                                        "contract %T fully resolved",
6✔
2700
                                        c.cfg.ChanPoint, currentContract)
6✔
2701

6✔
2702
                                err := c.log.ResolveContract(currentContract)
6✔
2703
                                if err != nil {
6✔
2704
                                        log.Errorf("unable to resolve contract: %v",
×
2705
                                                err)
×
2706
                                }
×
2707

2708
                                // Now that the contract has been resolved,
2709
                                // well signal to the main goroutine.
2710
                                select {
6✔
2711
                                case c.resolutionSignal <- struct{}{}:
5✔
2712
                                case <-c.quit:
2✔
2713
                                        return
2✔
2714
                                }
2715
                        }
2716

2717
                }
2718
        }
2719
}
2720

2721
// signalUpdateMsg is a struct that carries fresh signals to the
2722
// ChannelArbitrator. We need to receive a message like this each time the
2723
// channel becomes active, as it's internal state may change.
2724
type signalUpdateMsg struct {
2725
        // newSignals is the set of new active signals to be sent to the
2726
        // arbitrator.
2727
        newSignals *ContractSignals
2728

2729
        // doneChan is a channel that will be closed on the arbitrator has
2730
        // attached the new signals.
2731
        doneChan chan struct{}
2732
}
2733

2734
// UpdateContractSignals updates the set of signals the ChannelArbitrator needs
2735
// to receive from a channel in real-time in order to keep in sync with the
2736
// latest state of the contract.
2737
func (c *ChannelArbitrator) UpdateContractSignals(newSignals *ContractSignals) {
12✔
2738
        done := make(chan struct{})
12✔
2739

12✔
2740
        select {
12✔
2741
        case c.signalUpdates <- &signalUpdateMsg{
2742
                newSignals: newSignals,
2743
                doneChan:   done,
2744
        }:
12✔
2745
        case <-c.quit:
×
2746
        }
2747

2748
        select {
12✔
2749
        case <-done:
12✔
2750
        case <-c.quit:
×
2751
        }
2752
}
2753

2754
// notifyContractUpdate updates the ChannelArbitrator's unmerged mappings such
2755
// that it can later be merged with activeHTLCs when calling
2756
// checkLocalChainActions or sweepAnchors. These are the only two places that
2757
// activeHTLCs is used.
2758
func (c *ChannelArbitrator) notifyContractUpdate(upd *ContractUpdate) {
13✔
2759
        c.unmergedMtx.Lock()
13✔
2760
        defer c.unmergedMtx.Unlock()
13✔
2761

13✔
2762
        // Update the mapping.
13✔
2763
        c.unmergedSet[upd.HtlcKey] = newHtlcSet(upd.Htlcs)
13✔
2764

13✔
2765
        log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v",
13✔
2766
                c.cfg.ChanPoint, lnutils.SpewLogClosure(upd))
13✔
2767
}
13✔
2768

2769
// updateActiveHTLCs merges the unmerged set of HTLCs from the link with
2770
// activeHTLCs.
2771
func (c *ChannelArbitrator) updateActiveHTLCs() {
74✔
2772
        c.unmergedMtx.RLock()
74✔
2773
        defer c.unmergedMtx.RUnlock()
74✔
2774

74✔
2775
        // Update the mapping.
74✔
2776
        c.activeHTLCs[LocalHtlcSet] = c.unmergedSet[LocalHtlcSet]
74✔
2777
        c.activeHTLCs[RemoteHtlcSet] = c.unmergedSet[RemoteHtlcSet]
74✔
2778

74✔
2779
        // If the pending set exists, update that as well.
74✔
2780
        if _, ok := c.unmergedSet[RemotePendingHtlcSet]; ok {
84✔
2781
                pendingSet := c.unmergedSet[RemotePendingHtlcSet]
10✔
2782
                c.activeHTLCs[RemotePendingHtlcSet] = pendingSet
10✔
2783
        }
10✔
2784
}
2785

2786
// channelAttendant is the primary goroutine that acts at the judicial
2787
// arbitrator between our channel state, the remote channel peer, and the
2788
// blockchain (Our judge). This goroutine will ensure that we faithfully execute
2789
// all clauses of our contract in the case that we need to go on-chain for a
2790
// dispute. Currently, two such conditions warrant our intervention: when an
2791
// outgoing HTLC is about to timeout, and when we know the pre-image for an
2792
// incoming HTLC, but it hasn't yet been settled off-chain. In these cases,
2793
// we'll: broadcast our commitment, cancel/settle any HTLC's backwards after
2794
// sufficient confirmation, and finally send our set of outputs to the UTXO
2795
// Nursery for incubation, and ultimate sweeping.
2796
//
2797
// NOTE: This MUST be run as a goroutine.
2798
func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
2799
        commitSet *CommitSet) {
49✔
2800

49✔
2801
        // TODO(roasbeef): tell top chain arb we're done
49✔
2802
        defer func() {
95✔
2803
                c.wg.Done()
46✔
2804
        }()
46✔
2805

2806
        err := c.progressStateMachineAfterRestart(bestHeight, commitSet)
49✔
2807
        if err != nil {
50✔
2808
                // In case of an error, we return early but we do not shutdown
1✔
2809
                // LND, because there might be other channels that still can be
1✔
2810
                // resolved and we don't want to interfere with that.
1✔
2811
                // We continue to run the channel attendant in case the channel
1✔
2812
                // closes via other means for example the remote pary force
1✔
2813
                // closes the channel. So we log the error and continue.
1✔
2814
                log.Errorf("Unable to progress state machine after "+
1✔
2815
                        "restart: %v", err)
1✔
2816
        }
1✔
2817

2818
        for {
148✔
2819
                select {
99✔
2820

2821
                // A new block has arrived, we'll examine all the active HTLC's
2822
                // to see if any of them have expired, and also update our
2823
                // track of the best current height.
2824
                case beat := <-c.BlockbeatChan:
7✔
2825
                        bestHeight = beat.Height()
7✔
2826

7✔
2827
                        log.Debugf("ChannelArbitrator(%v): new block height=%v",
7✔
2828
                                c.cfg.ChanPoint, bestHeight)
7✔
2829

7✔
2830
                        err := c.handleBlockbeat(beat)
7✔
2831
                        if err != nil {
7✔
2832
                                log.Errorf("Handle block=%v got err: %v",
×
2833
                                        bestHeight, err)
×
2834
                        }
×
2835

2836
                        // If as a result of this trigger, the contract is
2837
                        // fully resolved, then well exit.
2838
                        if c.state == StateFullyResolved {
7✔
2839
                                return
×
2840
                        }
×
2841

2842
                // A new signal update was just sent. This indicates that the
2843
                // channel under watch is now live, and may modify its internal
2844
                // state, so we'll get the most up to date signals to we can
2845
                // properly do our job.
2846
                case signalUpdate := <-c.signalUpdates:
12✔
2847
                        log.Tracef("ChannelArbitrator(%v): got new signal "+
12✔
2848
                                "update!", c.cfg.ChanPoint)
12✔
2849

12✔
2850
                        // We'll update the ShortChannelID.
12✔
2851
                        c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID
12✔
2852

12✔
2853
                        // Now that the signal has been updated, we'll now
12✔
2854
                        // close the done channel to signal to the caller we've
12✔
2855
                        // registered the new ShortChannelID.
12✔
2856
                        close(signalUpdate.doneChan)
12✔
2857

2858
                // We've cooperatively closed the channel, so we're no longer
2859
                // needed. We'll mark the channel as resolved and exit.
2860
                case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
3✔
2861
                        err := c.handleCoopCloseEvent(closeInfo)
3✔
2862
                        if err != nil {
3✔
2863
                                log.Errorf("Failed to handle coop close: %v",
×
2864
                                        err)
×
2865

×
2866
                                return
×
2867
                        }
×
2868

2869
                // We have broadcasted our commitment, and it is now confirmed
2870
                // on-chain.
2871
                case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
13✔
2872
                        if c.state != StateCommitmentBroadcasted {
15✔
2873
                                log.Errorf("ChannelArbitrator(%v): unexpected "+
2✔
2874
                                        "local on-chain channel close",
2✔
2875
                                        c.cfg.ChanPoint)
2✔
2876
                        }
2✔
2877

2878
                        err := c.handleLocalForceCloseEvent(closeInfo)
13✔
2879
                        if err != nil {
13✔
2880
                                log.Errorf("Failed to handle local force "+
×
2881
                                        "close: %v", err)
×
2882

×
2883
                                return
×
2884
                        }
×
2885

2886
                // The remote party has broadcast the commitment on-chain.
2887
                // We'll examine our state to determine if we need to act at
2888
                // all.
2889
                case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
9✔
2890
                        err := c.handleRemoteForceCloseEvent(uniClosure)
9✔
2891
                        if err != nil {
11✔
2892
                                log.Errorf("Failed to handle remote force "+
2✔
2893
                                        "close: %v", err)
2✔
2894

2✔
2895
                                return
2✔
2896
                        }
2✔
2897

2898
                // The remote has breached the channel. As this is handled by
2899
                // the ChainWatcher and BreachArbitrator, we don't have to do
2900
                // anything in particular, so just advance our state and
2901
                // gracefully exit.
2902
                case breachInfo := <-c.cfg.ChainEvents.ContractBreach:
2✔
2903
                        err := c.handleContractBreach(breachInfo)
2✔
2904
                        if err != nil {
2✔
2905
                                log.Errorf("Failed to handle contract breach: "+
×
2906
                                        "%v", err)
×
2907

×
2908
                                return
×
2909
                        }
×
2910

2911
                // A new contract has just been resolved, we'll now check our
2912
                // log to see if all contracts have been resolved. If so, then
2913
                // we can exit as the contract is fully resolved.
2914
                case <-c.resolutionSignal:
5✔
2915
                        log.Infof("ChannelArbitrator(%v): a contract has been "+
5✔
2916
                                "fully resolved!", c.cfg.ChanPoint)
5✔
2917

5✔
2918
                        nextState, _, err := c.advanceState(
5✔
2919
                                uint32(bestHeight), chainTrigger, nil,
5✔
2920
                        )
5✔
2921
                        if err != nil {
5✔
2922
                                log.Errorf("Unable to advance state: %v", err)
×
2923
                        }
×
2924

2925
                        // If we don't have anything further to do after
2926
                        // advancing our state, then we'll exit.
2927
                        if nextState == StateFullyResolved {
8✔
2928
                                log.Infof("ChannelArbitrator(%v): all "+
3✔
2929
                                        "contracts fully resolved, exiting",
3✔
2930
                                        c.cfg.ChanPoint)
3✔
2931

3✔
2932
                                return
3✔
2933
                        }
3✔
2934

2935
                // We've just received a request to forcibly close out the
2936
                // channel. We'll
2937
                case closeReq := <-c.forceCloseReqs:
12✔
2938
                        log.Infof("ChannelArbitrator(%v): received force "+
12✔
2939
                                "close request", c.cfg.ChanPoint)
12✔
2940

12✔
2941
                        if c.state != StateDefault {
13✔
2942
                                select {
1✔
2943
                                case closeReq.closeTx <- nil:
1✔
2944
                                case <-c.quit:
×
2945
                                }
2946

2947
                                select {
1✔
2948
                                case closeReq.errResp <- errAlreadyForceClosed:
1✔
2949
                                case <-c.quit:
×
2950
                                }
2951

2952
                                continue
1✔
2953
                        }
2954

2955
                        nextState, closeTx, err := c.advanceState(
11✔
2956
                                uint32(bestHeight), userTrigger, nil,
11✔
2957
                        )
11✔
2958
                        if err != nil {
13✔
2959
                                log.Errorf("Unable to advance state: %v", err)
2✔
2960
                        }
2✔
2961

2962
                        select {
11✔
2963
                        case closeReq.closeTx <- closeTx:
11✔
2964
                        case <-c.quit:
×
2965
                                return
×
2966
                        }
2967

2968
                        select {
11✔
2969
                        case closeReq.errResp <- err:
11✔
2970
                        case <-c.quit:
×
2971
                                return
×
2972
                        }
2973

2974
                        // If we don't have anything further to do after
2975
                        // advancing our state, then we'll exit.
2976
                        if nextState == StateFullyResolved {
11✔
2977
                                log.Infof("ChannelArbitrator(%v): all "+
×
2978
                                        "contracts resolved, exiting",
×
2979
                                        c.cfg.ChanPoint)
×
2980
                                return
×
2981
                        }
×
2982

2983
                case <-c.quit:
41✔
2984
                        return
41✔
2985
                }
2986
        }
2987
}
2988

2989
// handleBlockbeat processes a newly received blockbeat by advancing the
2990
// arbitrator's internal state using the received block height.
2991
func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
7✔
2992
        // Notify we've processed the block.
7✔
2993
        defer c.NotifyBlockProcessed(beat, nil)
7✔
2994

7✔
2995
        // If the state is StateContractClosed, StateWaitingFullResolution, or
7✔
2996
        // StateFullyResolved, there's no need to read the close event channel
7✔
2997
        // since the arbitrator can only get to this state after processing a
7✔
2998
        // previous close event and launched all its resolvers.
7✔
2999
        if c.state.IsContractClosed() {
8✔
3000
                log.Infof("ChannelArbitrator(%v): skipping reading close "+
1✔
3001
                        "events in state=%v", c.cfg.ChanPoint, c.state)
1✔
3002

1✔
3003
                // Launch all active resolvers when a new blockbeat is
1✔
3004
                // received, even when the contract is closed, we still need
1✔
3005
                // this as the resolvers may transform into new ones. For
1✔
3006
                // already launched resolvers this will be NOOP as they track
1✔
3007
                // their own `launched` states.
1✔
3008
                c.launchResolvers()
1✔
3009

1✔
3010
                return nil
1✔
3011
        }
1✔
3012

3013
        // Perform a non-blocking read on the close events in case the channel
3014
        // is closed in this blockbeat.
3015
        c.receiveAndProcessCloseEvent()
7✔
3016

7✔
3017
        // Try to advance the state if we are in StateDefault.
7✔
3018
        if c.state == StateDefault {
13✔
3019
                // Now that a new block has arrived, we'll attempt to advance
6✔
3020
                // our state forward.
6✔
3021
                _, _, err := c.advanceState(
6✔
3022
                        uint32(beat.Height()), chainTrigger, nil,
6✔
3023
                )
6✔
3024
                if err != nil {
6✔
3025
                        return fmt.Errorf("unable to advance state: %w", err)
×
3026
                }
×
3027
        }
3028

3029
        // Launch all active resolvers when a new blockbeat is received.
3030
        c.launchResolvers()
7✔
3031

7✔
3032
        return nil
7✔
3033
}
3034

3035
// receiveAndProcessCloseEvent does a non-blocking read on all the channel
3036
// close event channels. If an event is received, it will be further processed.
3037
func (c *ChannelArbitrator) receiveAndProcessCloseEvent() {
7✔
3038
        select {
7✔
3039
        // Received a coop close event, we now mark the channel as resolved and
3040
        // exit.
3041
        case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
×
3042
                err := c.handleCoopCloseEvent(closeInfo)
×
3043
                if err != nil {
×
3044
                        log.Errorf("Failed to handle coop close: %v", err)
×
3045
                        return
×
3046
                }
×
3047

3048
        // We have broadcast our commitment, and it is now confirmed onchain.
UNCOV
3049
        case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
×
UNCOV
3050
                if c.state != StateCommitmentBroadcasted {
×
3051
                        log.Errorf("ChannelArbitrator(%v): unexpected "+
×
3052
                                "local on-chain channel close", c.cfg.ChanPoint)
×
3053
                }
×
3054

UNCOV
3055
                err := c.handleLocalForceCloseEvent(closeInfo)
×
UNCOV
3056
                if err != nil {
×
3057
                        log.Errorf("Failed to handle local force close: %v",
×
3058
                                err)
×
3059

×
3060
                        return
×
3061
                }
×
3062

3063
        // The remote party has broadcast the commitment. We'll examine our
3064
        // state to determine if we need to act at all.
UNCOV
3065
        case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
×
UNCOV
3066
                err := c.handleRemoteForceCloseEvent(uniClosure)
×
UNCOV
3067
                if err != nil {
×
3068
                        log.Errorf("Failed to handle remote force close: %v",
×
3069
                                err)
×
3070

×
3071
                        return
×
3072
                }
×
3073

3074
        // The remote has breached the channel! We now launch the breach
3075
        // contract resolvers.
3076
        case breachInfo := <-c.cfg.ChainEvents.ContractBreach:
×
3077
                err := c.handleContractBreach(breachInfo)
×
3078
                if err != nil {
×
3079
                        log.Errorf("Failed to handle contract breach: %v", err)
×
3080
                        return
×
3081
                }
×
3082

3083
        default:
7✔
3084
                log.Infof("ChannelArbitrator(%v) no close event",
7✔
3085
                        c.cfg.ChanPoint)
7✔
3086
        }
3087
}
3088

3089
// Name returns a human-readable string for this subsystem.
3090
//
3091
// NOTE: Part of chainio.Consumer interface.
3092
func (c *ChannelArbitrator) Name() string {
52✔
3093
        return fmt.Sprintf("ChannelArbitrator(%v)", c.cfg.ChanPoint)
52✔
3094
}
52✔
3095

3096
// checkLegacyBreach returns StateFullyResolved if the channel was closed with
3097
// a breach transaction before the channel arbitrator launched its own breach
3098
// resolver. StateContractClosed is returned if this is a modern breach close
3099
// with a breach resolver. StateError is returned if the log lookup failed.
3100
func (c *ChannelArbitrator) checkLegacyBreach() (ArbitratorState, error) {
3✔
3101
        // A previous version of the channel arbitrator would make the breach
3✔
3102
        // close skip to StateFullyResolved. If there are no contract
3✔
3103
        // resolutions in the bolt arbitrator log, then this is an older breach
3✔
3104
        // close. Otherwise, if there are resolutions, the state should advance
3✔
3105
        // to StateContractClosed.
3✔
3106
        _, err := c.log.FetchContractResolutions()
3✔
3107
        if err == errNoResolutions {
3✔
3108
                // This is an older breach close still in the database.
×
3109
                return StateFullyResolved, nil
×
3110
        } else if err != nil {
3✔
3111
                return StateError, err
×
3112
        }
×
3113

3114
        // This is a modern breach close with resolvers.
3115
        return StateContractClosed, nil
3✔
3116
}
3117

3118
// sweepRequest wraps the arguments used when calling `SweepInput`.
3119
type sweepRequest struct {
3120
        // input is the input to be swept.
3121
        input input.Input
3122

3123
        // params holds the sweeping parameters.
3124
        params sweep.Params
3125
}
3126

3127
// createSweepRequest creates an anchor sweeping request for a particular
3128
// version (local/remote/remote pending) of the commitment.
3129
func (c *ChannelArbitrator) createSweepRequest(
3130
        anchor *lnwallet.AnchorResolution, htlcs htlcSet, anchorPath string,
3131
        heightHint uint32) (sweepRequest, error) {
9✔
3132

9✔
3133
        // Use the chan id as the exclusive group. This prevents any of the
9✔
3134
        // anchors from being batched together.
9✔
3135
        exclusiveGroup := c.cfg.ShortChanID.ToUint64()
9✔
3136

9✔
3137
        // Find the deadline for this specific anchor.
9✔
3138
        deadline, value, err := c.findCommitmentDeadlineAndValue(
9✔
3139
                heightHint, htlcs,
9✔
3140
        )
9✔
3141
        if err != nil {
9✔
3142
                return sweepRequest{}, err
×
3143
        }
×
3144

3145
        // If we cannot find a deadline, it means there's no HTLCs at stake,
3146
        // which means we can relax our anchor sweeping conditions as we don't
3147
        // have any time sensitive outputs to sweep. However we need to
3148
        // register the anchor output with the sweeper so we are later able to
3149
        // bump the close fee.
3150
        if deadline.IsNone() {
13✔
3151
                log.Infof("ChannelArbitrator(%v): no HTLCs at stake, "+
4✔
3152
                        "sweeping anchor with default deadline",
4✔
3153
                        c.cfg.ChanPoint)
4✔
3154
        }
4✔
3155

3156
        witnessType := input.CommitmentAnchor
9✔
3157

9✔
3158
        // For taproot channels, we need to use the proper witness type.
9✔
3159
        if txscript.IsPayToTaproot(
9✔
3160
                anchor.AnchorSignDescriptor.Output.PkScript,
9✔
3161
        ) {
10✔
3162

1✔
3163
                witnessType = input.TaprootAnchorSweepSpend
1✔
3164
        }
1✔
3165

3166
        // Prepare anchor output for sweeping.
3167
        anchorInput := input.MakeBaseInput(
9✔
3168
                &anchor.CommitAnchor,
9✔
3169
                witnessType,
9✔
3170
                &anchor.AnchorSignDescriptor,
9✔
3171
                heightHint,
9✔
3172
                &input.TxInfo{
9✔
3173
                        Fee:    anchor.CommitFee,
9✔
3174
                        Weight: anchor.CommitWeight,
9✔
3175
                },
9✔
3176
        )
9✔
3177

9✔
3178
        // If we have a deadline, we'll use it to calculate the deadline
9✔
3179
        // height, otherwise default to none.
9✔
3180
        deadlineDesc := "None"
9✔
3181
        deadlineHeight := fn.MapOption(func(d int32) int32 {
15✔
3182
                deadlineDesc = fmt.Sprintf("%d", d)
6✔
3183

6✔
3184
                return d + int32(heightHint)
6✔
3185
        })(deadline)
6✔
3186

3187
        // Calculate the budget based on the value under protection, which is
3188
        // the sum of all HTLCs on this commitment subtracted by their budgets.
3189
        // The anchor output in itself has a small output value of 330 sats so
3190
        // we also include it in the budget to pay for the cpfp transaction.
3191
        budget := calculateBudget(
9✔
3192
                value, c.cfg.Budget.AnchorCPFPRatio, c.cfg.Budget.AnchorCPFP,
9✔
3193
        ) + AnchorOutputValue
9✔
3194

9✔
3195
        log.Infof("ChannelArbitrator(%v): offering anchor from %s commitment "+
9✔
3196
                "%v to sweeper with deadline=%v, budget=%v", c.cfg.ChanPoint,
9✔
3197
                anchorPath, anchor.CommitAnchor, deadlineDesc, budget)
9✔
3198

9✔
3199
        // Sweep anchor output with a confirmation target fee preference.
9✔
3200
        // Because this is a cpfp-operation, the anchor will only be attempted
9✔
3201
        // to sweep when the current fee estimate for the confirmation target
9✔
3202
        // exceeds the commit fee rate.
9✔
3203
        return sweepRequest{
9✔
3204
                input: &anchorInput,
9✔
3205
                params: sweep.Params{
9✔
3206
                        ExclusiveGroup: &exclusiveGroup,
9✔
3207
                        Budget:         budget,
9✔
3208
                        DeadlineHeight: deadlineHeight,
9✔
3209
                },
9✔
3210
        }, nil
9✔
3211
}
3212

3213
// prepareAnchorSweeps creates a list of requests to be used by the sweeper for
3214
// all possible commitment versions.
3215
func (c *ChannelArbitrator) prepareAnchorSweeps(heightHint uint32,
3216
        anchors *lnwallet.AnchorResolutions) ([]sweepRequest, error) {
20✔
3217

20✔
3218
        // requests holds all the possible anchor sweep requests. We can have
20✔
3219
        // up to 3 different versions of commitments (local/remote/remote
20✔
3220
        // dangling) to be CPFPed by the anchors.
20✔
3221
        requests := make([]sweepRequest, 0, 3)
20✔
3222

20✔
3223
        // remotePendingReq holds the request for sweeping the anchor output on
20✔
3224
        // the remote pending commitment. It's only set when there's an actual
20✔
3225
        // pending remote commitment and it's used to decide whether we need to
20✔
3226
        // update the fee budget when sweeping the anchor output on the local
20✔
3227
        // commitment.
20✔
3228
        remotePendingReq := fn.None[sweepRequest]()
20✔
3229

20✔
3230
        // First we check on the remote pending commitment and optionally
20✔
3231
        // create an anchor sweeping request.
20✔
3232
        htlcs, ok := c.activeHTLCs[RemotePendingHtlcSet]
20✔
3233
        if ok && anchors.RemotePending != nil {
22✔
3234
                req, err := c.createSweepRequest(
2✔
3235
                        anchors.RemotePending, htlcs, "remote pending",
2✔
3236
                        heightHint,
2✔
3237
                )
2✔
3238
                if err != nil {
2✔
3239
                        return nil, err
×
3240
                }
×
3241

3242
                // Save the request.
3243
                requests = append(requests, req)
2✔
3244

2✔
3245
                // Set the optional variable.
2✔
3246
                remotePendingReq = fn.Some(req)
2✔
3247
        }
3248

3249
        // Check the local commitment and optionally create an anchor sweeping
3250
        // request. The params used in this request will be influenced by the
3251
        // anchor sweeping request made from the pending remote commitment.
3252
        htlcs, ok = c.activeHTLCs[LocalHtlcSet]
20✔
3253
        if ok && anchors.Local != nil {
24✔
3254
                req, err := c.createSweepRequest(
4✔
3255
                        anchors.Local, htlcs, "local", heightHint,
4✔
3256
                )
4✔
3257
                if err != nil {
4✔
3258
                        return nil, err
×
3259
                }
×
3260

3261
                // If there's an anchor sweeping request from the pending
3262
                // remote commitment, we will compare its budget against the
3263
                // budget used here and choose the params that has a larger
3264
                // budget. The deadline when choosing the remote pending budget
3265
                // instead of the local one will always be earlier or equal to
3266
                // the local deadline because outgoing HTLCs are resolved on
3267
                // the local commitment first before they are removed from the
3268
                // remote one.
3269
                remotePendingReq.WhenSome(func(s sweepRequest) {
6✔
3270
                        if s.params.Budget <= req.params.Budget {
3✔
3271
                                return
1✔
3272
                        }
1✔
3273

3274
                        log.Infof("ChannelArbitrator(%v): replaced local "+
1✔
3275
                                "anchor(%v) sweep params with pending remote "+
1✔
3276
                                "anchor sweep params, \nold:[%v], \nnew:[%v]",
1✔
3277
                                c.cfg.ChanPoint, anchors.Local.CommitAnchor,
1✔
3278
                                req.params, s.params)
1✔
3279

1✔
3280
                        req.params = s.params
1✔
3281
                })
3282

3283
                // Save the request.
3284
                requests = append(requests, req)
4✔
3285
        }
3286

3287
        // Check the remote commitment and create an anchor sweeping request if
3288
        // needed.
3289
        htlcs, ok = c.activeHTLCs[RemoteHtlcSet]
20✔
3290
        if ok && anchors.Remote != nil {
24✔
3291
                req, err := c.createSweepRequest(
4✔
3292
                        anchors.Remote, htlcs, "remote", heightHint,
4✔
3293
                )
4✔
3294
                if err != nil {
4✔
3295
                        return nil, err
×
3296
                }
×
3297

3298
                requests = append(requests, req)
4✔
3299
        }
3300

3301
        return requests, nil
20✔
3302
}
3303

3304
// failIncomingDust resolves the incoming dust HTLCs because they do not have
3305
// an output on the commitment transaction and cannot be resolved onchain. We
3306
// mark them as failed here.
3307
func (c *ChannelArbitrator) failIncomingDust(
3308
        incomingDustHTLCs []channeldb.HTLC) error {
11✔
3309

11✔
3310
        for _, htlc := range incomingDustHTLCs {
13✔
3311
                if !htlc.Incoming || htlc.OutputIndex >= 0 {
2✔
3312
                        return fmt.Errorf("htlc with index %v is not incoming "+
×
3313
                                "dust", htlc.OutputIndex)
×
3314
                }
×
3315

3316
                key := models.CircuitKey{
2✔
3317
                        ChanID: c.cfg.ShortChanID,
2✔
3318
                        HtlcID: htlc.HtlcIndex,
2✔
3319
                }
2✔
3320

2✔
3321
                // Mark this dust htlc as final failed.
2✔
3322
                chainArbCfg := c.cfg.ChainArbitratorConfig
2✔
3323
                err := chainArbCfg.PutFinalHtlcOutcome(
2✔
3324
                        key.ChanID, key.HtlcID, false,
2✔
3325
                )
2✔
3326
                if err != nil {
2✔
3327
                        return err
×
3328
                }
×
3329

3330
                // Send notification.
3331
                chainArbCfg.HtlcNotifier.NotifyFinalHtlcEvent(
2✔
3332
                        key,
2✔
3333
                        channeldb.FinalHtlcInfo{
2✔
3334
                                Settled:  false,
2✔
3335
                                Offchain: false,
2✔
3336
                        },
2✔
3337
                )
2✔
3338
        }
3339

3340
        return nil
11✔
3341
}
3342

3343
// abandonForwards cancels back the incoming HTLCs for their corresponding
3344
// outgoing HTLCs. We use a set here to avoid sending duplicate failure messages
3345
// for the same HTLC. This also needs to be done for locally initiated outgoing
3346
// HTLCs they are special cased in the switch.
3347
func (c *ChannelArbitrator) abandonForwards(htlcs fn.Set[uint64]) error {
14✔
3348
        log.Debugf("ChannelArbitrator(%v): cancelling back %v incoming "+
14✔
3349
                "HTLC(s)", c.cfg.ChanPoint,
14✔
3350
                len(htlcs))
14✔
3351

14✔
3352
        msgsToSend := make([]ResolutionMsg, 0, len(htlcs))
14✔
3353
        failureMsg := &lnwire.FailPermanentChannelFailure{}
14✔
3354

14✔
3355
        for idx := range htlcs {
25✔
3356
                failMsg := ResolutionMsg{
11✔
3357
                        SourceChan: c.cfg.ShortChanID,
11✔
3358
                        HtlcIndex:  idx,
11✔
3359
                        Failure:    failureMsg,
11✔
3360
                }
11✔
3361

11✔
3362
                msgsToSend = append(msgsToSend, failMsg)
11✔
3363
        }
11✔
3364

3365
        // Send the msges to the switch, if there are any.
3366
        if len(msgsToSend) == 0 {
18✔
3367
                return nil
4✔
3368
        }
4✔
3369

3370
        log.Debugf("ChannelArbitrator(%v): sending resolution message=%v",
11✔
3371
                c.cfg.ChanPoint, lnutils.SpewLogClosure(msgsToSend))
11✔
3372

11✔
3373
        err := c.cfg.DeliverResolutionMsg(msgsToSend...)
11✔
3374
        if err != nil {
11✔
3375
                log.Errorf("Unable to send resolution msges to switch: %v", err)
×
3376
                return err
×
3377
        }
×
3378

3379
        return nil
11✔
3380
}
3381

3382
// handleCoopCloseEvent takes a coop close event from ChainEvents, marks the
3383
// channel as closed and advances the state.
3384
func (c *ChannelArbitrator) handleCoopCloseEvent(
3385
        closeInfo *CooperativeCloseInfo) error {
3✔
3386

3✔
3387
        log.Infof("ChannelArbitrator(%v) marking channel cooperatively closed "+
3✔
3388
                "at height %v", c.cfg.ChanPoint, closeInfo.CloseHeight)
3✔
3389

3✔
3390
        err := c.cfg.MarkChannelClosed(
3✔
3391
                closeInfo.ChannelCloseSummary,
3✔
3392
                channeldb.ChanStatusCoopBroadcasted,
3✔
3393
        )
3✔
3394
        if err != nil {
3✔
3395
                return fmt.Errorf("unable to mark channel closed: %w", err)
×
3396
        }
×
3397

3398
        // We'll now advance our state machine until it reaches a terminal
3399
        // state, and the channel is marked resolved.
3400
        _, _, err = c.advanceState(closeInfo.CloseHeight, coopCloseTrigger, nil)
3✔
3401
        if err != nil {
4✔
3402
                log.Errorf("Unable to advance state: %v", err)
1✔
3403
        }
1✔
3404

3405
        return nil
2✔
3406
}
3407

3408
// handleLocalForceCloseEvent takes a local force close event from ChainEvents,
3409
// saves the contract resolutions to disk, mark the channel as closed and
3410
// advance the state.
3411
func (c *ChannelArbitrator) handleLocalForceCloseEvent(
3412
        closeInfo *LocalUnilateralCloseInfo) error {
13✔
3413

13✔
3414
        closeTx := closeInfo.CloseTx
13✔
3415

13✔
3416
        resolutions, err := closeInfo.ContractResolutions.
13✔
3417
                UnwrapOrErr(
13✔
3418
                        fmt.Errorf("resolutions not found"),
13✔
3419
                )
13✔
3420
        if err != nil {
13✔
3421
                return fmt.Errorf("unable to get resolutions: %w", err)
×
3422
        }
×
3423

3424
        // We make sure that the htlc resolutions are present
3425
        // otherwise we would panic dereferencing the pointer.
3426
        //
3427
        // TODO(ziggie): Refactor ContractResolutions to use
3428
        // options.
3429
        if resolutions.HtlcResolutions == nil {
13✔
3430
                return fmt.Errorf("htlc resolutions is nil")
×
3431
        }
×
3432

3433
        log.Infof("ChannelArbitrator(%v): local force close tx=%v confirmed",
13✔
3434
                c.cfg.ChanPoint, closeTx.TxHash())
13✔
3435

13✔
3436
        contractRes := &ContractResolutions{
13✔
3437
                CommitHash:       closeTx.TxHash(),
13✔
3438
                CommitResolution: resolutions.CommitResolution,
13✔
3439
                HtlcResolutions:  *resolutions.HtlcResolutions,
13✔
3440
                AnchorResolution: resolutions.AnchorResolution,
13✔
3441
        }
13✔
3442

13✔
3443
        // When processing a unilateral close event, we'll transition to the
13✔
3444
        // ContractClosed state. We'll log out the set of resolutions such that
13✔
3445
        // they are available to fetch in that state, we'll also write the
13✔
3446
        // commit set so we can reconstruct our chain actions on restart.
13✔
3447
        err = c.log.LogContractResolutions(contractRes)
13✔
3448
        if err != nil {
13✔
3449
                return fmt.Errorf("unable to write resolutions: %w", err)
×
3450
        }
×
3451

3452
        err = c.log.InsertConfirmedCommitSet(&closeInfo.CommitSet)
13✔
3453
        if err != nil {
13✔
3454
                return fmt.Errorf("unable to write commit set: %w", err)
×
3455
        }
×
3456

3457
        // After the set of resolutions are successfully logged, we can safely
3458
        // close the channel. After this succeeds we won't be getting chain
3459
        // events anymore, so we must make sure we can recover on restart after
3460
        // it is marked closed. If the next state transition fails, we'll start
3461
        // up in the prior state again, and we won't be longer getting chain
3462
        // events. In this case we must manually re-trigger the state
3463
        // transition into StateContractClosed based on the close status of the
3464
        // channel.
3465
        err = c.cfg.MarkChannelClosed(
13✔
3466
                closeInfo.ChannelCloseSummary,
13✔
3467
                channeldb.ChanStatusLocalCloseInitiator,
13✔
3468
        )
13✔
3469
        if err != nil {
13✔
3470
                return fmt.Errorf("unable to mark channel closed: %w", err)
×
3471
        }
×
3472

3473
        // We'll now advance our state machine until it reaches a terminal
3474
        // state.
3475
        _, _, err = c.advanceState(
13✔
3476
                uint32(closeInfo.SpendingHeight),
13✔
3477
                localCloseTrigger, &closeInfo.CommitSet,
13✔
3478
        )
13✔
3479
        if err != nil {
14✔
3480
                log.Errorf("Unable to advance state: %v", err)
1✔
3481
        }
1✔
3482

3483
        return nil
13✔
3484
}
3485

3486
// handleRemoteForceCloseEvent takes a remote force close event from
3487
// ChainEvents, saves the contract resolutions to disk, mark the channel as
3488
// closed and advance the state.
3489
func (c *ChannelArbitrator) handleRemoteForceCloseEvent(
3490
        closeInfo *RemoteUnilateralCloseInfo) error {
9✔
3491

9✔
3492
        log.Infof("ChannelArbitrator(%v): remote party has force closed "+
9✔
3493
                "channel at height %v", c.cfg.ChanPoint,
9✔
3494
                closeInfo.SpendingHeight)
9✔
3495

9✔
3496
        // If we don't have a self output, and there are no active HTLC's, then
9✔
3497
        // we can immediately mark the contract as fully resolved and exit.
9✔
3498
        contractRes := &ContractResolutions{
9✔
3499
                CommitHash:       *closeInfo.SpenderTxHash,
9✔
3500
                CommitResolution: closeInfo.CommitResolution,
9✔
3501
                HtlcResolutions:  *closeInfo.HtlcResolutions,
9✔
3502
                AnchorResolution: closeInfo.AnchorResolution,
9✔
3503
        }
9✔
3504

9✔
3505
        // When processing a unilateral close event, we'll transition to the
9✔
3506
        // ContractClosed state. We'll log out the set of resolutions such that
9✔
3507
        // they are available to fetch in that state, we'll also write the
9✔
3508
        // commit set so we can reconstruct our chain actions on restart.
9✔
3509
        err := c.log.LogContractResolutions(contractRes)
9✔
3510
        if err != nil {
10✔
3511
                return fmt.Errorf("unable to write resolutions: %w", err)
1✔
3512
        }
1✔
3513

3514
        err = c.log.InsertConfirmedCommitSet(&closeInfo.CommitSet)
8✔
3515
        if err != nil {
8✔
3516
                return fmt.Errorf("unable to write commit set: %w", err)
×
3517
        }
×
3518

3519
        // After the set of resolutions are successfully logged, we can safely
3520
        // close the channel. After this succeeds we won't be getting chain
3521
        // events anymore, so we must make sure we can recover on restart after
3522
        // it is marked closed. If the next state transition fails, we'll start
3523
        // up in the prior state again, and we won't be longer getting chain
3524
        // events. In this case we must manually re-trigger the state
3525
        // transition into StateContractClosed based on the close status of the
3526
        // channel.
3527
        closeSummary := &closeInfo.ChannelCloseSummary
8✔
3528
        err = c.cfg.MarkChannelClosed(
8✔
3529
                closeSummary,
8✔
3530
                channeldb.ChanStatusRemoteCloseInitiator,
8✔
3531
        )
8✔
3532
        if err != nil {
9✔
3533
                return fmt.Errorf("unable to mark channel closed: %w", err)
1✔
3534
        }
1✔
3535

3536
        // We'll now advance our state machine until it reaches a terminal
3537
        // state.
3538
        _, _, err = c.advanceState(
7✔
3539
                uint32(closeInfo.SpendingHeight),
7✔
3540
                remoteCloseTrigger, &closeInfo.CommitSet,
7✔
3541
        )
7✔
3542
        if err != nil {
9✔
3543
                log.Errorf("Unable to advance state: %v", err)
2✔
3544
        }
2✔
3545

3546
        return nil
7✔
3547
}
3548

3549
// handleContractBreach takes a breach close event from ChainEvents, saves the
3550
// contract resolutions to disk, mark the channel as closed and advance the
3551
// state.
3552
func (c *ChannelArbitrator) handleContractBreach(
3553
        breachInfo *BreachCloseInfo) error {
2✔
3554

2✔
3555
        closeSummary := &breachInfo.CloseSummary
2✔
3556

2✔
3557
        log.Infof("ChannelArbitrator(%v): remote party has breached channel "+
2✔
3558
                "at height %v!", c.cfg.ChanPoint, closeSummary.CloseHeight)
2✔
3559

2✔
3560
        // In the breach case, we'll only have anchor and breach resolutions.
2✔
3561
        contractRes := &ContractResolutions{
2✔
3562
                CommitHash:       breachInfo.CommitHash,
2✔
3563
                BreachResolution: breachInfo.BreachResolution,
2✔
3564
                AnchorResolution: breachInfo.AnchorResolution,
2✔
3565
        }
2✔
3566

2✔
3567
        // We'll transition to the ContractClosed state and log the set of
2✔
3568
        // resolutions such that they can be turned into resolvers later on.
2✔
3569
        // We'll also insert the CommitSet of the latest set of commitments.
2✔
3570
        err := c.log.LogContractResolutions(contractRes)
2✔
3571
        if err != nil {
2✔
3572
                return fmt.Errorf("unable to write resolutions: %w", err)
×
3573
        }
×
3574

3575
        err = c.log.InsertConfirmedCommitSet(&breachInfo.CommitSet)
2✔
3576
        if err != nil {
2✔
3577
                return fmt.Errorf("unable to write commit set: %w", err)
×
3578
        }
×
3579

3580
        // The channel is finally marked pending closed here as the
3581
        // BreachArbitrator and channel arbitrator have persisted the relevant
3582
        // states.
3583
        err = c.cfg.MarkChannelClosed(
2✔
3584
                closeSummary, channeldb.ChanStatusRemoteCloseInitiator,
2✔
3585
        )
2✔
3586
        if err != nil {
2✔
3587
                return fmt.Errorf("unable to mark channel closed: %w", err)
×
3588
        }
×
3589

3590
        log.Infof("Breached channel=%v marked pending-closed",
2✔
3591
                breachInfo.BreachResolution.FundingOutPoint)
2✔
3592

2✔
3593
        // We'll advance our state machine until it reaches a terminal state.
2✔
3594
        _, _, err = c.advanceState(
2✔
3595
                closeSummary.CloseHeight, breachCloseTrigger,
2✔
3596
                &breachInfo.CommitSet,
2✔
3597
        )
2✔
3598
        if err != nil {
2✔
3599
                log.Errorf("Unable to advance state: %v", err)
×
3600
        }
×
3601

3602
        return nil
2✔
3603
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc