• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12343072627

15 Dec 2024 11:09PM UTC coverage: 57.504% (-1.1%) from 58.636%
12343072627

Pull #9315

github

yyforyongyu
contractcourt: offer outgoing htlc one block earlier before its expiry

We need to offer the outgoing htlc one block earlier to make sure when
the expiry height hits, the sweeper will not miss sweeping it in the
same block. This also means the outgoing contest resolver now only does
one thing - watch for preimage spend till height expiry-1, which can
easily be moved into the timeout resolver instead in the future.
Pull Request #9315: Implement `blockbeat`

1445 of 2007 new or added lines in 26 files covered. (72.0%)

19246 existing lines in 249 files now uncovered.

102342 of 177975 relevant lines covered (57.5%)

24772.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.08
/contractcourt/channel_arbitrator.go
1
package contractcourt
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "math"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/txscript"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/lightningnetwork/lnd/chainio"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/fn/v2"
20
        "github.com/lightningnetwork/lnd/graph/db/models"
21
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
22
        "github.com/lightningnetwork/lnd/input"
23
        "github.com/lightningnetwork/lnd/invoices"
24
        "github.com/lightningnetwork/lnd/kvdb"
25
        "github.com/lightningnetwork/lnd/labels"
26
        "github.com/lightningnetwork/lnd/lntypes"
27
        "github.com/lightningnetwork/lnd/lnutils"
28
        "github.com/lightningnetwork/lnd/lnwallet"
29
        "github.com/lightningnetwork/lnd/lnwire"
30
        "github.com/lightningnetwork/lnd/sweep"
31
)
32

33
var (
34
        // errAlreadyForceClosed is an error returned when we attempt to force
35
        // close a channel that's already in the process of doing so.
36
        errAlreadyForceClosed = errors.New("channel is already in the " +
37
                "process of being force closed")
38
)
39

40
const (
41
        // arbitratorBlockBufferSize is the size of the buffer we give to each
42
        // channel arbitrator.
43
        arbitratorBlockBufferSize = 20
44

45
        // AnchorOutputValue is the output value for the anchor output of an
46
        // anchor channel.
47
        // See BOLT 03 for more details:
48
        // https://github.com/lightning/bolts/blob/master/03-transactions.md
49
        AnchorOutputValue = btcutil.Amount(330)
50
)
51

52
// WitnessSubscription represents an intent to be notified once new witnesses
53
// are discovered by various active contract resolvers. A contract resolver may
54
// use this to be notified of when it can satisfy an incoming contract after we
55
// discover the witness for an outgoing contract.
56
type WitnessSubscription struct {
57
        // WitnessUpdates is a channel that newly discovered witnesses will be
58
        // sent over.
59
        //
60
        // TODO(roasbeef): couple with WitnessType?
61
        WitnessUpdates <-chan lntypes.Preimage
62

63
        // CancelSubscription is a function closure that should be used by a
64
        // client to cancel the subscription once they are no longer interested
65
        // in receiving new updates.
66
        CancelSubscription func()
67
}
68

69
// WitnessBeacon is a global beacon of witnesses. Contract resolvers will use
70
// this interface to lookup witnesses (preimages typically) of contracts
71
// they're trying to resolve, add new preimages they resolve, and finally
72
// receive new updates each new time a preimage is discovered.
73
//
74
// TODO(roasbeef): need to delete the pre-images once we've used them
75
// and have been sufficiently confirmed?
76
type WitnessBeacon interface {
77
        // SubscribeUpdates returns a channel that will be sent upon *each* time
78
        // a new preimage is discovered.
79
        SubscribeUpdates(chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
80
                payload *hop.Payload,
81
                nextHopOnionBlob []byte) (*WitnessSubscription, error)
82

83
        // LookupPreImage attempts to lookup a preimage in the global cache.
84
        // True is returned for the second argument if the preimage is found.
85
        LookupPreimage(payhash lntypes.Hash) (lntypes.Preimage, bool)
86

87
        // AddPreimages adds a batch of newly discovered preimages to the global
88
        // cache, and also signals any subscribers of the newly discovered
89
        // witness.
90
        AddPreimages(preimages ...lntypes.Preimage) error
91
}
92

93
// ArbChannel is an abstraction that allows the channel arbitrator to interact
94
// with an open channel.
95
type ArbChannel interface {
96
        // ForceCloseChan should force close the contract that this attendant
97
        // is watching over. We'll use this when we decide that we need to go
98
        // to chain. It should in addition tell the switch to remove the
99
        // corresponding link, such that we won't accept any new updates. The
100
        // returned summary contains all items needed to eventually resolve all
101
        // outputs on chain.
102
        ForceCloseChan() (*wire.MsgTx, error)
103

104
        // NewAnchorResolutions returns the anchor resolutions for currently
105
        // valid commitment transactions.
106
        NewAnchorResolutions() (*lnwallet.AnchorResolutions, error)
107
}
108

109
// ChannelArbitratorConfig contains all the functionality that the
110
// ChannelArbitrator needs in order to properly arbitrate any contract dispute
111
// on chain.
112
type ChannelArbitratorConfig struct {
113
        // ChanPoint is the channel point that uniquely identifies this
114
        // channel.
115
        ChanPoint wire.OutPoint
116

117
        // Channel is the full channel data structure. For legacy channels, this
118
        // field may not always be set after a restart.
119
        Channel ArbChannel
120

121
        // ShortChanID describes the exact location of the channel within the
122
        // chain. We'll use this to address any messages that we need to send
123
        // to the switch during contract resolution.
124
        ShortChanID lnwire.ShortChannelID
125

126
        // ChainEvents is an active subscription to the chain watcher for this
127
        // channel to be notified of any on-chain activity related to this
128
        // channel.
129
        ChainEvents *ChainEventSubscription
130

131
        // MarkCommitmentBroadcasted should mark the channel as the commitment
132
        // being broadcast, and we are waiting for the commitment to confirm.
133
        MarkCommitmentBroadcasted func(*wire.MsgTx, lntypes.ChannelParty) error
134

135
        // MarkChannelClosed marks the channel closed in the database, with the
136
        // passed close summary. After this method successfully returns we can
137
        // no longer expect to receive chain events for this channel, and must
138
        // be able to recover from a failure without getting the close event
139
        // again. It takes an optional channel status which will update the
140
        // channel status in the record that we keep of historical channels.
141
        MarkChannelClosed func(*channeldb.ChannelCloseSummary,
142
                ...channeldb.ChannelStatus) error
143

144
        // IsPendingClose is a boolean indicating whether the channel is marked
145
        // as pending close in the database.
146
        IsPendingClose bool
147

148
        // ClosingHeight is the height at which the channel was closed. Note
149
        // that this value is only valid if IsPendingClose is true.
150
        ClosingHeight uint32
151

152
        // CloseType is the type of the close event in case IsPendingClose is
153
        // true. Otherwise this value is unset.
154
        CloseType channeldb.ClosureType
155

156
        // MarkChannelResolved is a function closure that serves to mark a
157
        // channel as "fully resolved". A channel itself can be considered
158
        // fully resolved once all active contracts have individually been
159
        // fully resolved.
160
        //
161
        // TODO(roasbeef): need RPC's to combine for pendingchannels RPC
162
        MarkChannelResolved func() error
163

164
        // PutResolverReport records a resolver report for the channel. If the
165
        // transaction provided is nil, the function should write the report
166
        // in a new transaction.
167
        PutResolverReport func(tx kvdb.RwTx,
168
                report *channeldb.ResolverReport) error
169

170
        // FetchHistoricalChannel retrieves the historical state of a channel.
171
        // This is mostly used to supplement the ContractResolvers with
172
        // additional information required for proper contract resolution.
173
        FetchHistoricalChannel func() (*channeldb.OpenChannel, error)
174

175
        // FindOutgoingHTLCDeadline returns the deadline in absolute block
176
        // height for the specified outgoing HTLC. For an outgoing HTLC, its
177
        // deadline is defined by the timeout height of its corresponding
178
        // incoming HTLC - this is the expiry height the that remote peer can
179
        // spend his/her outgoing HTLC via the timeout path.
180
        FindOutgoingHTLCDeadline func(htlc channeldb.HTLC) fn.Option[int32]
181

182
        ChainArbitratorConfig
183
}
184

185
// ReportOutputType describes the type of output that is being reported
186
// on.
187
type ReportOutputType uint8
188

189
const (
190
        // ReportOutputIncomingHtlc is an incoming hash time locked contract on
191
        // the commitment tx.
192
        ReportOutputIncomingHtlc ReportOutputType = iota
193

194
        // ReportOutputOutgoingHtlc is an outgoing hash time locked contract on
195
        // the commitment tx.
196
        ReportOutputOutgoingHtlc
197

198
        // ReportOutputUnencumbered is an uncontested output on the commitment
199
        // transaction paying to us directly.
200
        ReportOutputUnencumbered
201

202
        // ReportOutputAnchor is an anchor output on the commitment tx.
203
        ReportOutputAnchor
204
)
205

206
// ContractReport provides a summary of a commitment tx output.
207
type ContractReport struct {
208
        // Outpoint is the final output that will be swept back to the wallet.
209
        Outpoint wire.OutPoint
210

211
        // Type indicates the type of the reported output.
212
        Type ReportOutputType
213

214
        // Amount is the final value that will be swept in back to the wallet.
215
        Amount btcutil.Amount
216

217
        // MaturityHeight is the absolute block height that this output will
218
        // mature at.
219
        MaturityHeight uint32
220

221
        // Stage indicates whether the htlc is in the CLTV-timeout stage (1) or
222
        // the CSV-delay stage (2). A stage 1 htlc's maturity height will be set
223
        // to its expiry height, while a stage 2 htlc's maturity height will be
224
        // set to its confirmation height plus the maturity requirement.
225
        Stage uint32
226

227
        // LimboBalance is the total number of frozen coins within this
228
        // contract.
229
        LimboBalance btcutil.Amount
230

231
        // RecoveredBalance is the total value that has been successfully swept
232
        // back to the user's wallet.
233
        RecoveredBalance btcutil.Amount
234
}
235

236
// resolverReport creates a resolve report using some of the information in the
237
// contract report.
238
func (c *ContractReport) resolverReport(spendTx *chainhash.Hash,
239
        resolverType channeldb.ResolverType,
240
        outcome channeldb.ResolverOutcome) *channeldb.ResolverReport {
10✔
241

10✔
242
        return &channeldb.ResolverReport{
10✔
243
                OutPoint:        c.Outpoint,
10✔
244
                Amount:          c.Amount,
10✔
245
                ResolverType:    resolverType,
10✔
246
                ResolverOutcome: outcome,
10✔
247
                SpendTxID:       spendTx,
10✔
248
        }
10✔
249
}
10✔
250

251
// htlcSet represents the set of active HTLCs on a given commitment
252
// transaction.
253
type htlcSet struct {
254
        // incomingHTLCs is a map of all incoming HTLCs on the target
255
        // commitment transaction. We may potentially go onchain to claim the
256
        // funds sent to us within this set.
257
        incomingHTLCs map[uint64]channeldb.HTLC
258

259
        // outgoingHTLCs is a map of all outgoing HTLCs on the target
260
        // commitment transaction. We may potentially go onchain to reclaim the
261
        // funds that are currently in limbo.
262
        outgoingHTLCs map[uint64]channeldb.HTLC
263
}
264

265
// newHtlcSet constructs a new HTLC set from a slice of HTLC's.
266
func newHtlcSet(htlcs []channeldb.HTLC) htlcSet {
46✔
267
        outHTLCs := make(map[uint64]channeldb.HTLC)
46✔
268
        inHTLCs := make(map[uint64]channeldb.HTLC)
46✔
269
        for _, htlc := range htlcs {
77✔
270
                if htlc.Incoming {
37✔
271
                        inHTLCs[htlc.HtlcIndex] = htlc
6✔
272
                        continue
6✔
273
                }
274

275
                outHTLCs[htlc.HtlcIndex] = htlc
25✔
276
        }
277

278
        return htlcSet{
46✔
279
                incomingHTLCs: inHTLCs,
46✔
280
                outgoingHTLCs: outHTLCs,
46✔
281
        }
46✔
282
}
283

284
// HtlcSetKey is a two-tuple that uniquely identifies a set of HTLCs on a
285
// commitment transaction.
286
type HtlcSetKey struct {
287
        // IsRemote denotes if the HTLCs are on the remote commitment
288
        // transaction.
289
        IsRemote bool
290

291
        // IsPending denotes if the commitment transaction that HTLCS are on
292
        // are pending (the higher of two unrevoked commitments).
293
        IsPending bool
294
}
295

296
var (
297
        // LocalHtlcSet is the HtlcSetKey used for local commitments.
298
        LocalHtlcSet = HtlcSetKey{IsRemote: false, IsPending: false}
299

300
        // RemoteHtlcSet is the HtlcSetKey used for remote commitments.
301
        RemoteHtlcSet = HtlcSetKey{IsRemote: true, IsPending: false}
302

303
        // RemotePendingHtlcSet is the HtlcSetKey used for dangling remote
304
        // commitment transactions.
305
        RemotePendingHtlcSet = HtlcSetKey{IsRemote: true, IsPending: true}
306
)
307

308
// String returns a human readable string describing the target HtlcSetKey.
309
func (h HtlcSetKey) String() string {
8✔
310
        switch h {
8✔
311
        case LocalHtlcSet:
4✔
312
                return "LocalHtlcSet"
4✔
313
        case RemoteHtlcSet:
2✔
314
                return "RemoteHtlcSet"
2✔
315
        case RemotePendingHtlcSet:
2✔
316
                return "RemotePendingHtlcSet"
2✔
317
        default:
×
318
                return "unknown HtlcSetKey"
×
319
        }
320
}
321

322
// ChannelArbitrator is the on-chain arbitrator for a particular channel. The
323
// struct will keep in sync with the current set of HTLCs on the commitment
324
// transaction. The job of the attendant is to go on-chain to either settle or
325
// cancel an HTLC as necessary iff: an HTLC times out, or we known the
326
// pre-image to an HTLC, but it wasn't settled by the link off-chain. The
327
// ChannelArbitrator will factor in an expected confirmation delta when
328
// broadcasting to ensure that we avoid any possibility of race conditions, and
329
// sweep the output(s) without contest.
330
type ChannelArbitrator struct {
331
        started int32 // To be used atomically.
332
        stopped int32 // To be used atomically.
333

334
        // Embed the blockbeat consumer struct to get access to the method
335
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
336
        chainio.BeatConsumer
337

338
        // startTimestamp is the time when this ChannelArbitrator was started.
339
        startTimestamp time.Time
340

341
        // log is a persistent log that the attendant will use to checkpoint
342
        // its next action, and the state of any unresolved contracts.
343
        log ArbitratorLog
344

345
        // activeHTLCs is the set of active incoming/outgoing HTLC's on all
346
        // currently valid commitment transactions.
347
        activeHTLCs map[HtlcSetKey]htlcSet
348

349
        // unmergedSet is used to update the activeHTLCs map in two callsites:
350
        // checkLocalChainActions and sweepAnchors. It contains the latest
351
        // updates from the link. It is not deleted from, its entries may be
352
        // replaced on subsequent calls to notifyContractUpdate.
353
        unmergedSet map[HtlcSetKey]htlcSet
354
        unmergedMtx sync.RWMutex
355

356
        // cfg contains all the functionality that the ChannelArbitrator requires
357
        // to do its duty.
358
        cfg ChannelArbitratorConfig
359

360
        // signalUpdates is a channel that any new live signals for the channel
361
        // we're watching over will be sent.
362
        signalUpdates chan *signalUpdateMsg
363

364
        // activeResolvers is a slice of any active resolvers. This is used to
365
        // be able to signal them for shutdown in the case that we shutdown.
366
        activeResolvers []ContractResolver
367

368
        // activeResolversLock prevents simultaneous read and write to the
369
        // resolvers slice.
370
        activeResolversLock sync.RWMutex
371

372
        // resolutionSignal is a channel that will be sent upon by contract
373
        // resolvers once their contract has been fully resolved. With each
374
        // send, we'll check to see if the contract is fully resolved.
375
        resolutionSignal chan struct{}
376

377
        // forceCloseReqs is a channel that requests to forcibly close the
378
        // contract will be sent over.
379
        forceCloseReqs chan *forceCloseReq
380

381
        // state is the current state of the arbitrator. This state is examined
382
        // upon start up to decide which actions to take.
383
        state ArbitratorState
384

385
        wg   sync.WaitGroup
386
        quit chan struct{}
387
}
388

389
// NewChannelArbitrator returns a new instance of a ChannelArbitrator backed by
390
// the passed config struct.
391
func NewChannelArbitrator(cfg ChannelArbitratorConfig,
392
        htlcSets map[HtlcSetKey]htlcSet, log ArbitratorLog) *ChannelArbitrator {
51✔
393

51✔
394
        // Create a new map for unmerged HTLC's as we will overwrite the values
51✔
395
        // and want to avoid modifying activeHTLCs directly. This soft copying
51✔
396
        // is done to ensure that activeHTLCs isn't reset as an empty map later
51✔
397
        // on.
51✔
398
        unmerged := make(map[HtlcSetKey]htlcSet)
51✔
399
        unmerged[LocalHtlcSet] = htlcSets[LocalHtlcSet]
51✔
400
        unmerged[RemoteHtlcSet] = htlcSets[RemoteHtlcSet]
51✔
401

51✔
402
        // If the pending set exists, write that as well.
51✔
403
        if _, ok := htlcSets[RemotePendingHtlcSet]; ok {
51✔
404
                unmerged[RemotePendingHtlcSet] = htlcSets[RemotePendingHtlcSet]
×
405
        }
×
406

407
        c := &ChannelArbitrator{
51✔
408
                log:              log,
51✔
409
                signalUpdates:    make(chan *signalUpdateMsg),
51✔
410
                resolutionSignal: make(chan struct{}),
51✔
411
                forceCloseReqs:   make(chan *forceCloseReq),
51✔
412
                activeHTLCs:      htlcSets,
51✔
413
                unmergedSet:      unmerged,
51✔
414
                cfg:              cfg,
51✔
415
                quit:             make(chan struct{}),
51✔
416
        }
51✔
417

51✔
418
        // Mount the block consumer.
51✔
419
        c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())
51✔
420

51✔
421
        return c
51✔
422
}
423

424
// Compile-time check for the chainio.Consumer interface.
425
var _ chainio.Consumer = (*ChannelArbitrator)(nil)
426

427
// chanArbStartState contains the information from disk that we need to start
428
// up a channel arbitrator.
429
type chanArbStartState struct {
430
        currentState ArbitratorState
431
        commitSet    *CommitSet
432
}
433

434
// getStartState retrieves the information from disk that our channel arbitrator
435
// requires to start.
436
func (c *ChannelArbitrator) getStartState(tx kvdb.RTx) (*chanArbStartState,
437
        error) {
48✔
438

48✔
439
        // First, we'll read our last state from disk, so our internal state
48✔
440
        // machine can act accordingly.
48✔
441
        state, err := c.log.CurrentState(tx)
48✔
442
        if err != nil {
48✔
443
                return nil, err
×
444
        }
×
445

446
        // Next we'll fetch our confirmed commitment set. This will only exist
447
        // if the channel has been closed out on chain for modern nodes. For
448
        // older nodes, this won't be found at all, and will rely on the
449
        // existing written chain actions. Additionally, if this channel hasn't
450
        // logged any actions in the log, then this field won't be present.
451
        commitSet, err := c.log.FetchConfirmedCommitSet(tx)
48✔
452
        if err != nil && err != errNoCommitSet && err != errScopeBucketNoExist {
48✔
453
                return nil, err
×
454
        }
×
455

456
        return &chanArbStartState{
48✔
457
                currentState: state,
48✔
458
                commitSet:    commitSet,
48✔
459
        }, nil
48✔
460
}
461

462
// Start starts all the goroutines that the ChannelArbitrator needs to operate.
463
// If takes a start state, which will be looked up on disk if it is not
464
// provided.
465
func (c *ChannelArbitrator) Start(state *chanArbStartState,
466
        beat chainio.Blockbeat) error {
48✔
467

48✔
468
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
48✔
469
                return nil
×
470
        }
×
471
        c.startTimestamp = c.cfg.Clock.Now()
48✔
472

48✔
473
        // If the state passed in is nil, we look it up now.
48✔
474
        if state == nil {
85✔
475
                var err error
37✔
476
                state, err = c.getStartState(nil)
37✔
477
                if err != nil {
37✔
478
                        return err
×
479
                }
×
480
        }
481

482
        log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v, state=%v",
48✔
483
                c.cfg.ChanPoint, lnutils.SpewLogClosure(c.activeHTLCs),
48✔
484
                state.currentState)
48✔
485

48✔
486
        // Set our state from our starting state.
48✔
487
        c.state = state.currentState
48✔
488

48✔
489
        // Get the starting height.
48✔
490
        bestHeight := beat.Height()
48✔
491

48✔
492
        c.wg.Add(1)
48✔
493
        go c.channelAttendant(bestHeight, state.commitSet)
48✔
494

48✔
495
        return nil
48✔
496
}
497

498
// progressStateMachineAfterRestart attempts to progress the state machine
499
// after a restart. This makes sure that if the state transition failed, we
500
// will try to progress the state machine again. Moreover it will relaunch
501
// resolvers if the channel is still in the pending close state and has not
502
// been fully resolved yet.
503
func (c *ChannelArbitrator) progressStateMachineAfterRestart(bestHeight int32,
504
        commitSet *CommitSet) error {
48✔
505

48✔
506
        // If the channel has been marked pending close in the database, and we
48✔
507
        // haven't transitioned the state machine to StateContractClosed (or a
48✔
508
        // succeeding state), then a state transition most likely failed. We'll
48✔
509
        // try to recover from this by manually advancing the state by setting
48✔
510
        // the corresponding close trigger.
48✔
511
        trigger := chainTrigger
48✔
512
        triggerHeight := uint32(bestHeight)
48✔
513
        if c.cfg.IsPendingClose {
53✔
514
                switch c.state {
5✔
515
                case StateDefault:
4✔
516
                        fallthrough
4✔
517
                case StateBroadcastCommit:
5✔
518
                        fallthrough
5✔
519
                case StateCommitmentBroadcasted:
5✔
520
                        switch c.cfg.CloseType {
5✔
521

522
                        case channeldb.CooperativeClose:
1✔
523
                                trigger = coopCloseTrigger
1✔
524

525
                        case channeldb.BreachClose:
1✔
526
                                trigger = breachCloseTrigger
1✔
527

528
                        case channeldb.LocalForceClose:
1✔
529
                                trigger = localCloseTrigger
1✔
530

531
                        case channeldb.RemoteForceClose:
2✔
532
                                trigger = remoteCloseTrigger
2✔
533
                        }
534

535
                        log.Warnf("ChannelArbitrator(%v): detected stalled "+
5✔
536
                                "state=%v for closed channel",
5✔
537
                                c.cfg.ChanPoint, c.state)
5✔
538
                }
539

540
                triggerHeight = c.cfg.ClosingHeight
5✔
541
        }
542

543
        log.Infof("ChannelArbitrator(%v): starting state=%v, trigger=%v, "+
48✔
544
                "triggerHeight=%v", c.cfg.ChanPoint, c.state, trigger,
48✔
545
                triggerHeight)
48✔
546

48✔
547
        // We'll now attempt to advance our state forward based on the current
48✔
548
        // on-chain state, and our set of active contracts.
48✔
549
        startingState := c.state
48✔
550
        nextState, _, err := c.advanceState(
48✔
551
                triggerHeight, trigger, commitSet,
48✔
552
        )
48✔
553
        if err != nil {
50✔
554
                switch err {
2✔
555

556
                // If we detect that we tried to fetch resolutions, but failed,
557
                // this channel was marked closed in the database before
558
                // resolutions successfully written. In this case there is not
559
                // much we can do, so we don't return the error.
560
                case errScopeBucketNoExist:
×
561
                        fallthrough
×
562
                case errNoResolutions:
1✔
563
                        log.Warnf("ChannelArbitrator(%v): detected closed"+
1✔
564
                                "channel with no contract resolutions written.",
1✔
565
                                c.cfg.ChanPoint)
1✔
566

567
                default:
1✔
568
                        return err
1✔
569
                }
570
        }
571

572
        // If we start and ended at the awaiting full resolution state, then
573
        // we'll relaunch our set of unresolved contracts.
574
        if startingState == StateWaitingFullResolution &&
47✔
575
                nextState == StateWaitingFullResolution {
48✔
576

1✔
577
                // In order to relaunch the resolvers, we'll need to fetch the
1✔
578
                // set of HTLCs that were present in the commitment transaction
1✔
579
                // at the time it was confirmed. commitSet.ConfCommitKey can't
1✔
580
                // be nil at this point since we're in
1✔
581
                // StateWaitingFullResolution. We can only be in
1✔
582
                // StateWaitingFullResolution after we've transitioned from
1✔
583
                // StateContractClosed which can only be triggered by the local
1✔
584
                // or remote close trigger. This trigger is only fired when we
1✔
585
                // receive a chain event from the chain watcher that the
1✔
586
                // commitment has been confirmed on chain, and before we
1✔
587
                // advance our state step, we call InsertConfirmedCommitSet.
1✔
588
                err := c.relaunchResolvers(commitSet, triggerHeight)
1✔
589
                if err != nil {
1✔
590
                        return err
×
591
                }
×
592
        }
593

594
        return nil
47✔
595
}
596

597
// maybeAugmentTaprootResolvers will update the contract resolution information
598
// for taproot channels. This ensures that all the resolvers have the latest
599
// resolution, which may also include data such as the control block and tap
600
// tweaks.
601
func maybeAugmentTaprootResolvers(chanType channeldb.ChannelType,
602
        resolver ContractResolver,
603
        contractResolutions *ContractResolutions) {
1✔
604

1✔
605
        if !chanType.IsTaproot() {
2✔
606
                return
1✔
607
        }
1✔
608

609
        // The on disk resolutions contains all the ctrl block
610
        // information, so we'll set that now for the relevant
611
        // resolvers.
UNCOV
612
        switch r := resolver.(type) {
×
UNCOV
613
        case *commitSweepResolver:
×
UNCOV
614
                if contractResolutions.CommitResolution != nil {
×
UNCOV
615
                        //nolint:ll
×
UNCOV
616
                        r.commitResolution = *contractResolutions.CommitResolution
×
UNCOV
617
                }
×
UNCOV
618
        case *htlcOutgoingContestResolver:
×
UNCOV
619
                //nolint:ll
×
UNCOV
620
                htlcResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
×
UNCOV
621
                for _, htlcRes := range htlcResolutions {
×
UNCOV
622
                        htlcRes := htlcRes
×
UNCOV
623

×
UNCOV
624
                        if r.htlcResolution.ClaimOutpoint ==
×
UNCOV
625
                                htlcRes.ClaimOutpoint {
×
UNCOV
626

×
UNCOV
627
                                r.htlcResolution = htlcRes
×
UNCOV
628
                        }
×
629
                }
630

UNCOV
631
        case *htlcTimeoutResolver:
×
UNCOV
632
                //nolint:ll
×
UNCOV
633
                htlcResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
×
UNCOV
634
                for _, htlcRes := range htlcResolutions {
×
UNCOV
635
                        htlcRes := htlcRes
×
UNCOV
636

×
UNCOV
637
                        if r.htlcResolution.ClaimOutpoint ==
×
UNCOV
638
                                htlcRes.ClaimOutpoint {
×
UNCOV
639

×
UNCOV
640
                                r.htlcResolution = htlcRes
×
UNCOV
641
                        }
×
642
                }
643

UNCOV
644
        case *htlcIncomingContestResolver:
×
UNCOV
645
                //nolint:ll
×
UNCOV
646
                htlcResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
×
UNCOV
647
                for _, htlcRes := range htlcResolutions {
×
UNCOV
648
                        htlcRes := htlcRes
×
UNCOV
649

×
UNCOV
650
                        if r.htlcResolution.ClaimOutpoint ==
×
UNCOV
651
                                htlcRes.ClaimOutpoint {
×
UNCOV
652

×
UNCOV
653
                                r.htlcResolution = htlcRes
×
UNCOV
654
                        }
×
655
                }
656
        case *htlcSuccessResolver:
×
657
                //nolint:ll
×
658
                htlcResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
×
659
                for _, htlcRes := range htlcResolutions {
×
660
                        htlcRes := htlcRes
×
661

×
662
                        if r.htlcResolution.ClaimOutpoint ==
×
663
                                htlcRes.ClaimOutpoint {
×
664

×
665
                                r.htlcResolution = htlcRes
×
666
                        }
×
667
                }
668
        }
669
}
670

671
// relauchResolvers relaunches the set of resolvers for unresolved contracts in
672
// order to provide them with information that's not immediately available upon
673
// starting the ChannelArbitrator. This information should ideally be stored in
674
// the database, so this only serves as a intermediate work-around to prevent a
675
// migration.
676
func (c *ChannelArbitrator) relaunchResolvers(commitSet *CommitSet,
677
        heightHint uint32) error {
1✔
678

1✔
679
        // We'll now query our log to see if there are any active unresolved
1✔
680
        // contracts. If this is the case, then we'll relaunch all contract
1✔
681
        // resolvers.
1✔
682
        unresolvedContracts, err := c.log.FetchUnresolvedContracts()
1✔
683
        if err != nil {
1✔
684
                return err
×
685
        }
×
686

687
        // Retrieve the commitment tx hash from the log.
688
        contractResolutions, err := c.log.FetchContractResolutions()
1✔
689
        if err != nil {
1✔
690
                log.Errorf("unable to fetch contract resolutions: %v",
×
691
                        err)
×
692
                return err
×
693
        }
×
694
        commitHash := contractResolutions.CommitHash
1✔
695

1✔
696
        // In prior versions of lnd, the information needed to supplement the
1✔
697
        // resolvers (in most cases, the full amount of the HTLC) was found in
1✔
698
        // the chain action map, which is now deprecated.  As a result, if the
1✔
699
        // commitSet is nil (an older node with unresolved HTLCs at time of
1✔
700
        // upgrade), then we'll use the chain action information in place. The
1✔
701
        // chain actions may exclude some information, but we cannot recover it
1✔
702
        // for these older nodes at the moment.
1✔
703
        var confirmedHTLCs []channeldb.HTLC
1✔
704
        if commitSet != nil && commitSet.ConfCommitKey.IsSome() {
2✔
705
                confCommitKey, err := commitSet.ConfCommitKey.UnwrapOrErr(
1✔
706
                        fmt.Errorf("no commitKey available"),
1✔
707
                )
1✔
708
                if err != nil {
1✔
709
                        return err
×
710
                }
×
711
                confirmedHTLCs = commitSet.HtlcSets[confCommitKey]
1✔
712
        } else {
×
713
                chainActions, err := c.log.FetchChainActions()
×
714
                if err != nil {
×
715
                        log.Errorf("unable to fetch chain actions: %v", err)
×
716
                        return err
×
717
                }
×
718
                for _, htlcs := range chainActions {
×
719
                        confirmedHTLCs = append(confirmedHTLCs, htlcs...)
×
720
                }
×
721
        }
722

723
        // Reconstruct the htlc outpoints and data from the chain action log.
724
        // The purpose of the constructed htlc map is to supplement to
725
        // resolvers restored from database with extra data. Ideally this data
726
        // is stored as part of the resolver in the log. This is a workaround
727
        // to prevent a db migration. We use all available htlc sets here in
728
        // order to ensure we have complete coverage.
729
        htlcMap := make(map[wire.OutPoint]*channeldb.HTLC)
1✔
730
        for _, htlc := range confirmedHTLCs {
4✔
731
                htlc := htlc
3✔
732
                outpoint := wire.OutPoint{
3✔
733
                        Hash:  commitHash,
3✔
734
                        Index: uint32(htlc.OutputIndex),
3✔
735
                }
3✔
736
                htlcMap[outpoint] = &htlc
3✔
737
        }
3✔
738

739
        // We'll also fetch the historical state of this channel, as it should
740
        // have been marked as closed by now, and supplement it to each resolver
741
        // such that we can properly resolve our pending contracts.
742
        var chanState *channeldb.OpenChannel
1✔
743
        chanState, err = c.cfg.FetchHistoricalChannel()
1✔
744
        switch {
1✔
745
        // If we don't find this channel, then it may be the case that it
746
        // was closed before we started to retain the final state
747
        // information for open channels.
748
        case err == channeldb.ErrNoHistoricalBucket:
×
749
                fallthrough
×
750
        case err == channeldb.ErrChannelNotFound:
×
751
                log.Warnf("ChannelArbitrator(%v): unable to fetch historical "+
×
752
                        "state", c.cfg.ChanPoint)
×
753

754
        case err != nil:
×
755
                return err
×
756
        }
757

758
        log.Infof("ChannelArbitrator(%v): relaunching %v contract "+
1✔
759
                "resolvers", c.cfg.ChanPoint, len(unresolvedContracts))
1✔
760

1✔
761
        for i := range unresolvedContracts {
2✔
762
                resolver := unresolvedContracts[i]
1✔
763

1✔
764
                if chanState != nil {
2✔
765
                        resolver.SupplementState(chanState)
1✔
766

1✔
767
                        // For taproot channels, we'll need to also make sure
1✔
768
                        // the control block information was set properly.
1✔
769
                        maybeAugmentTaprootResolvers(
1✔
770
                                chanState.ChanType, resolver,
1✔
771
                                contractResolutions,
1✔
772
                        )
1✔
773
                }
1✔
774

775
                unresolvedContracts[i] = resolver
1✔
776

1✔
777
                htlcResolver, ok := resolver.(htlcContractResolver)
1✔
778
                if !ok {
1✔
UNCOV
779
                        continue
×
780
                }
781

782
                htlcPoint := htlcResolver.HtlcPoint()
1✔
783
                htlc, ok := htlcMap[htlcPoint]
1✔
784
                if !ok {
1✔
785
                        return fmt.Errorf(
×
786
                                "htlc resolver %T unavailable", resolver,
×
787
                        )
×
788
                }
×
789

790
                htlcResolver.Supplement(*htlc)
1✔
791

1✔
792
                // If this is an outgoing HTLC, we will also need to supplement
1✔
793
                // the resolver with the expiry block height of its
1✔
794
                // corresponding incoming HTLC.
1✔
795
                if !htlc.Incoming {
2✔
796
                        deadline := c.cfg.FindOutgoingHTLCDeadline(*htlc)
1✔
797
                        htlcResolver.SupplementDeadline(deadline)
1✔
798
                }
1✔
799
        }
800

801
        // The anchor resolver is stateless and can always be re-instantiated.
802
        if contractResolutions.AnchorResolution != nil {
1✔
UNCOV
803
                anchorResolver := newAnchorResolver(
×
UNCOV
804
                        contractResolutions.AnchorResolution.AnchorSignDescriptor,
×
UNCOV
805
                        contractResolutions.AnchorResolution.CommitAnchor,
×
UNCOV
806
                        heightHint, c.cfg.ChanPoint,
×
UNCOV
807
                        ResolverConfig{
×
UNCOV
808
                                ChannelArbitratorConfig: c.cfg,
×
UNCOV
809
                        },
×
UNCOV
810
                )
×
UNCOV
811

×
UNCOV
812
                anchorResolver.SupplementState(chanState)
×
UNCOV
813

×
UNCOV
814
                unresolvedContracts = append(unresolvedContracts, anchorResolver)
×
UNCOV
815

×
UNCOV
816
                // TODO(roasbeef): this isn't re-launched?
×
UNCOV
817
        }
×
818

819
        c.resolveContracts(unresolvedContracts)
1✔
820

1✔
821
        return nil
1✔
822
}
823

824
// Report returns htlc reports for the active resolvers.
UNCOV
825
func (c *ChannelArbitrator) Report() []*ContractReport {
×
UNCOV
826
        c.activeResolversLock.RLock()
×
UNCOV
827
        defer c.activeResolversLock.RUnlock()
×
UNCOV
828

×
UNCOV
829
        var reports []*ContractReport
×
UNCOV
830
        for _, resolver := range c.activeResolvers {
×
UNCOV
831
                r, ok := resolver.(reportingContractResolver)
×
UNCOV
832
                if !ok {
×
833
                        continue
×
834
                }
835

UNCOV
836
                report := r.report()
×
UNCOV
837
                if report == nil {
×
UNCOV
838
                        continue
×
839
                }
840

UNCOV
841
                reports = append(reports, report)
×
842
        }
843

UNCOV
844
        return reports
×
845
}
846

847
// Stop signals the ChannelArbitrator for a graceful shutdown.
848
func (c *ChannelArbitrator) Stop() error {
51✔
849
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
57✔
850
                return nil
6✔
851
        }
6✔
852

853
        log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint)
45✔
854

45✔
855
        if c.cfg.ChainEvents.Cancel != nil {
56✔
856
                go c.cfg.ChainEvents.Cancel()
11✔
857
        }
11✔
858

859
        c.activeResolversLock.RLock()
45✔
860
        for _, activeResolver := range c.activeResolvers {
51✔
861
                activeResolver.Stop()
6✔
862
        }
6✔
863
        c.activeResolversLock.RUnlock()
45✔
864

45✔
865
        close(c.quit)
45✔
866
        c.wg.Wait()
45✔
867

45✔
868
        return nil
45✔
869
}
870

871
// transitionTrigger is an enum that denotes exactly *why* a state transition
872
// was initiated. This is useful as depending on the initial trigger, we may
873
// skip certain states as those actions are expected to have already taken
874
// place as a result of the external trigger.
875
type transitionTrigger uint8
876

877
const (
878
        // chainTrigger is a transition trigger that has been attempted due to
879
        // changing on-chain conditions such as a block which times out HTLC's
880
        // being attached.
881
        chainTrigger transitionTrigger = iota
882

883
        // userTrigger is a transition trigger driven by user action. Examples
884
        // of such a trigger include a user requesting a force closure of the
885
        // channel.
886
        userTrigger
887

888
        // remoteCloseTrigger is a transition trigger driven by the remote
889
        // peer's commitment being confirmed.
890
        remoteCloseTrigger
891

892
        // localCloseTrigger is a transition trigger driven by our commitment
893
        // being confirmed.
894
        localCloseTrigger
895

896
        // coopCloseTrigger is a transition trigger driven by a cooperative
897
        // close transaction being confirmed.
898
        coopCloseTrigger
899

900
        // breachCloseTrigger is a transition trigger driven by a remote breach
901
        // being confirmed. In this case the channel arbitrator will wait for
902
        // the BreachArbitrator to finish and then clean up gracefully.
903
        breachCloseTrigger
904
)
905

906
// String returns a human readable string describing the passed
907
// transitionTrigger.
UNCOV
908
func (t transitionTrigger) String() string {
×
UNCOV
909
        switch t {
×
UNCOV
910
        case chainTrigger:
×
UNCOV
911
                return "chainTrigger"
×
912

UNCOV
913
        case remoteCloseTrigger:
×
UNCOV
914
                return "remoteCloseTrigger"
×
915

UNCOV
916
        case userTrigger:
×
UNCOV
917
                return "userTrigger"
×
918

UNCOV
919
        case localCloseTrigger:
×
UNCOV
920
                return "localCloseTrigger"
×
921

UNCOV
922
        case coopCloseTrigger:
×
UNCOV
923
                return "coopCloseTrigger"
×
924

UNCOV
925
        case breachCloseTrigger:
×
UNCOV
926
                return "breachCloseTrigger"
×
927

928
        default:
×
929
                return "unknown trigger"
×
930
        }
931
}
932

933
// stateStep is a help method that examines our internal state, and attempts
934
// the appropriate state transition if necessary. The next state we transition
935
// to is returned, Additionally, if the next transition results in a commitment
936
// broadcast, the commitment transaction itself is returned.
937
func (c *ChannelArbitrator) stateStep(
938
        triggerHeight uint32, trigger transitionTrigger,
939
        confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
173✔
940

173✔
941
        var (
173✔
942
                nextState ArbitratorState
173✔
943
                closeTx   *wire.MsgTx
173✔
944
        )
173✔
945
        switch c.state {
173✔
946

947
        // If we're in the default state, then we'll check our set of actions
948
        // to see if while we were down, conditions have changed.
949
        case StateDefault:
63✔
950
                log.Debugf("ChannelArbitrator(%v): new block (height=%v) "+
63✔
951
                        "examining active HTLC's", c.cfg.ChanPoint,
63✔
952
                        triggerHeight)
63✔
953

63✔
954
                // As a new block has been connected to the end of the main
63✔
955
                // chain, we'll check to see if we need to make any on-chain
63✔
956
                // claims on behalf of the channel contract that we're
63✔
957
                // arbitrating for. If a commitment has confirmed, then we'll
63✔
958
                // use the set snapshot from the chain, otherwise we'll use our
63✔
959
                // current set.
63✔
960
                var (
63✔
961
                        chainActions ChainActionMap
63✔
962
                        err          error
63✔
963
                )
63✔
964

63✔
965
                // Normally if we force close the channel locally we will have
63✔
966
                // no confCommitSet. However when the remote commitment confirms
63✔
967
                // without us ever broadcasting our local commitment we need to
63✔
968
                // make sure we cancel all upstream HTLCs for outgoing dust
63✔
969
                // HTLCs as well hence we need to fetch the chain actions here
63✔
970
                // as well.
63✔
971
                if confCommitSet == nil {
117✔
972
                        // Update the set of activeHTLCs so
54✔
973
                        // checkLocalChainActions has an up-to-date view of the
54✔
974
                        // commitments.
54✔
975
                        c.updateActiveHTLCs()
54✔
976
                        htlcs := c.activeHTLCs
54✔
977
                        chainActions, err = c.checkLocalChainActions(
54✔
978
                                triggerHeight, trigger, htlcs, false,
54✔
979
                        )
54✔
980
                        if err != nil {
54✔
981
                                return StateDefault, nil, err
×
982
                        }
×
983
                } else {
9✔
984
                        chainActions, err = c.constructChainActions(
9✔
985
                                confCommitSet, triggerHeight, trigger,
9✔
986
                        )
9✔
987
                        if err != nil {
9✔
988
                                return StateDefault, nil, err
×
989
                        }
×
990
                }
991

992
                // If there are no actions to be made, then we'll remain in the
993
                // default state. If this isn't a self initiated event (we're
994
                // checking due to a chain update), then we'll exit now.
995
                if len(chainActions) == 0 && trigger == chainTrigger {
99✔
996
                        log.Debugf("ChannelArbitrator(%v): no actions for "+
36✔
997
                                "chain trigger, terminating", c.cfg.ChanPoint)
36✔
998

36✔
999
                        return StateDefault, closeTx, nil
36✔
1000
                }
36✔
1001

1002
                // Otherwise, we'll log that we checked the HTLC actions as the
1003
                // commitment transaction has already been broadcast.
1004
                log.Tracef("ChannelArbitrator(%v): logging chain_actions=%v",
27✔
1005
                        c.cfg.ChanPoint, lnutils.SpewLogClosure(chainActions))
27✔
1006

27✔
1007
                // Cancel upstream HTLCs for all outgoing dust HTLCs available
27✔
1008
                // either on the local or the remote/remote pending commitment
27✔
1009
                // transaction.
27✔
1010
                dustHTLCs := chainActions[HtlcFailDustAction]
27✔
1011
                if len(dustHTLCs) > 0 {
28✔
1012
                        log.Debugf("ChannelArbitrator(%v): canceling %v dust "+
1✔
1013
                                "HTLCs backwards", c.cfg.ChanPoint,
1✔
1014
                                len(dustHTLCs))
1✔
1015

1✔
1016
                        getIdx := func(htlc channeldb.HTLC) uint64 {
2✔
1017
                                return htlc.HtlcIndex
1✔
1018
                        }
1✔
1019
                        dustHTLCSet := fn.NewSet(fn.Map(dustHTLCs, getIdx)...)
1✔
1020
                        err = c.abandonForwards(dustHTLCSet)
1✔
1021
                        if err != nil {
1✔
1022
                                return StateError, closeTx, err
×
1023
                        }
×
1024
                }
1025

1026
                // Depending on the type of trigger, we'll either "tunnel"
1027
                // through to a farther state, or just proceed linearly to the
1028
                // next state.
1029
                switch trigger {
27✔
1030

1031
                // If this is a chain trigger, then we'll go straight to the
1032
                // next state, as we still need to broadcast the commitment
1033
                // transaction.
1034
                case chainTrigger:
5✔
1035
                        fallthrough
5✔
1036
                case userTrigger:
15✔
1037
                        nextState = StateBroadcastCommit
15✔
1038

1039
                // If the trigger is a cooperative close being confirmed, then
1040
                // we can go straight to StateFullyResolved, as there won't be
1041
                // any contracts to resolve.
1042
                case coopCloseTrigger:
3✔
1043
                        nextState = StateFullyResolved
3✔
1044

1045
                // Otherwise, if this state advance was triggered by a
1046
                // commitment being confirmed on chain, then we'll jump
1047
                // straight to the state where the contract has already been
1048
                // closed, and we will inspect the set of unresolved contracts.
1049
                case localCloseTrigger:
2✔
1050
                        log.Errorf("ChannelArbitrator(%v): unexpected local "+
2✔
1051
                                "commitment confirmed while in StateDefault",
2✔
1052
                                c.cfg.ChanPoint)
2✔
1053
                        fallthrough
2✔
1054
                case remoteCloseTrigger:
8✔
1055
                        nextState = StateContractClosed
8✔
1056

1057
                case breachCloseTrigger:
1✔
1058
                        nextContractState, err := c.checkLegacyBreach()
1✔
1059
                        if nextContractState == StateError {
1✔
1060
                                return nextContractState, nil, err
×
1061
                        }
×
1062

1063
                        nextState = nextContractState
1✔
1064
                }
1065

1066
        // If we're in this state, then we've decided to broadcast the
1067
        // commitment transaction. We enter this state either due to an outside
1068
        // sub-system, or because an on-chain action has been triggered.
1069
        case StateBroadcastCommit:
20✔
1070
                // Under normal operation, we can only enter
20✔
1071
                // StateBroadcastCommit via a user or chain trigger. On restart,
20✔
1072
                // this state may be reexecuted after closing the channel, but
20✔
1073
                // failing to commit to StateContractClosed or
20✔
1074
                // StateFullyResolved. In that case, one of the four close
20✔
1075
                // triggers will be presented, signifying that we should skip
20✔
1076
                // rebroadcasting, and go straight to resolving the on-chain
20✔
1077
                // contract or marking the channel resolved.
20✔
1078
                switch trigger {
20✔
1079
                case localCloseTrigger, remoteCloseTrigger:
×
1080
                        log.Infof("ChannelArbitrator(%v): detected %s "+
×
1081
                                "close after closing channel, fast-forwarding "+
×
1082
                                "to %s to resolve contract",
×
1083
                                c.cfg.ChanPoint, trigger, StateContractClosed)
×
1084
                        return StateContractClosed, closeTx, nil
×
1085

1086
                case breachCloseTrigger:
1✔
1087
                        nextContractState, err := c.checkLegacyBreach()
1✔
1088
                        if nextContractState == StateError {
1✔
1089
                                log.Infof("ChannelArbitrator(%v): unable to "+
×
1090
                                        "advance breach close resolution: %v",
×
1091
                                        c.cfg.ChanPoint, nextContractState)
×
1092
                                return StateError, closeTx, err
×
1093
                        }
×
1094

1095
                        log.Infof("ChannelArbitrator(%v): detected %s close "+
1✔
1096
                                "after closing channel, fast-forwarding to %s"+
1✔
1097
                                " to resolve contract", c.cfg.ChanPoint,
1✔
1098
                                trigger, nextContractState)
1✔
1099

1✔
1100
                        return nextContractState, closeTx, nil
1✔
1101

1102
                case coopCloseTrigger:
×
1103
                        log.Infof("ChannelArbitrator(%v): detected %s "+
×
1104
                                "close after closing channel, fast-forwarding "+
×
1105
                                "to %s to resolve contract",
×
1106
                                c.cfg.ChanPoint, trigger, StateFullyResolved)
×
1107
                        return StateFullyResolved, closeTx, nil
×
1108
                }
1109

1110
                log.Infof("ChannelArbitrator(%v): force closing "+
19✔
1111
                        "chan", c.cfg.ChanPoint)
19✔
1112

19✔
1113
                // Now that we have all the actions decided for the set of
19✔
1114
                // HTLC's, we'll broadcast the commitment transaction, and
19✔
1115
                // signal the link to exit.
19✔
1116

19✔
1117
                // We'll tell the switch that it should remove the link for
19✔
1118
                // this channel, in addition to fetching the force close
19✔
1119
                // summary needed to close this channel on chain.
19✔
1120
                forceCloseTx, err := c.cfg.Channel.ForceCloseChan()
19✔
1121
                if err != nil {
20✔
1122
                        log.Errorf("ChannelArbitrator(%v): unable to "+
1✔
1123
                                "force close: %v", c.cfg.ChanPoint, err)
1✔
1124

1✔
1125
                        // We tried to force close (HTLC may be expiring from
1✔
1126
                        // our PoV, etc), but we think we've lost data. In this
1✔
1127
                        // case, we'll not force close, but terminate the state
1✔
1128
                        // machine here to wait to see what confirms on chain.
1✔
1129
                        if errors.Is(err, lnwallet.ErrForceCloseLocalDataLoss) {
2✔
1130
                                log.Error("ChannelArbitrator(%v): broadcast "+
1✔
1131
                                        "failed due to local data loss, "+
1✔
1132
                                        "waiting for on chain confimation...",
1✔
1133
                                        c.cfg.ChanPoint)
1✔
1134

1✔
1135
                                return StateBroadcastCommit, nil, nil
1✔
1136
                        }
1✔
1137

1138
                        return StateError, closeTx, err
×
1139
                }
1140
                closeTx = forceCloseTx
18✔
1141

18✔
1142
                // Before publishing the transaction, we store it to the
18✔
1143
                // database, such that we can re-publish later in case it
18✔
1144
                // didn't propagate. We initiated the force close, so we
18✔
1145
                // mark broadcast with local initiator set to true.
18✔
1146
                err = c.cfg.MarkCommitmentBroadcasted(closeTx, lntypes.Local)
18✔
1147
                if err != nil {
18✔
1148
                        log.Errorf("ChannelArbitrator(%v): unable to "+
×
1149
                                "mark commitment broadcasted: %v",
×
1150
                                c.cfg.ChanPoint, err)
×
1151
                        return StateError, closeTx, err
×
1152
                }
×
1153

1154
                // With the close transaction in hand, broadcast the
1155
                // transaction to the network, thereby entering the post
1156
                // channel resolution state.
1157
                log.Infof("Broadcasting force close transaction %v, "+
18✔
1158
                        "ChannelPoint(%v): %v", closeTx.TxHash(),
18✔
1159
                        c.cfg.ChanPoint, lnutils.SpewLogClosure(closeTx))
18✔
1160

18✔
1161
                // At this point, we'll now broadcast the commitment
18✔
1162
                // transaction itself.
18✔
1163
                label := labels.MakeLabel(
18✔
1164
                        labels.LabelTypeChannelClose, &c.cfg.ShortChanID,
18✔
1165
                )
18✔
1166
                if err := c.cfg.PublishTx(closeTx, label); err != nil {
23✔
1167
                        log.Errorf("ChannelArbitrator(%v): unable to broadcast "+
5✔
1168
                                "close tx: %v", c.cfg.ChanPoint, err)
5✔
1169

5✔
1170
                        // This makes sure we don't fail at startup if the
5✔
1171
                        // commitment transaction has too low fees to make it
5✔
1172
                        // into mempool. The rebroadcaster makes sure this
5✔
1173
                        // transaction is republished regularly until confirmed
5✔
1174
                        // or replaced.
5✔
1175
                        if !errors.Is(err, lnwallet.ErrDoubleSpend) &&
5✔
1176
                                !errors.Is(err, lnwallet.ErrMempoolFee) {
7✔
1177

2✔
1178
                                return StateError, closeTx, err
2✔
1179
                        }
2✔
1180
                }
1181

1182
                // We go to the StateCommitmentBroadcasted state, where we'll
1183
                // be waiting for the commitment to be confirmed.
1184
                nextState = StateCommitmentBroadcasted
16✔
1185

1186
        // In this state we have broadcasted our own commitment, and will need
1187
        // to wait for a commitment (not necessarily the one we broadcasted!)
1188
        // to be confirmed.
1189
        case StateCommitmentBroadcasted:
30✔
1190
                switch trigger {
30✔
1191

1192
                // We are waiting for a commitment to be confirmed.
1193
                case chainTrigger, userTrigger:
17✔
1194
                        // The commitment transaction has been broadcast, but it
17✔
1195
                        // doesn't necessarily need to be the commitment
17✔
1196
                        // transaction version that is going to be confirmed. To
17✔
1197
                        // be sure that any of those versions can be anchored
17✔
1198
                        // down, we now submit all anchor resolutions to the
17✔
1199
                        // sweeper. The sweeper will keep trying to sweep all of
17✔
1200
                        // them.
17✔
1201
                        //
17✔
1202
                        // Note that the sweeper is idempotent. If we ever
17✔
1203
                        // happen to end up at this point in the code again, no
17✔
1204
                        // harm is done by re-offering the anchors to the
17✔
1205
                        // sweeper.
17✔
1206
                        anchors, err := c.cfg.Channel.NewAnchorResolutions()
17✔
1207
                        if err != nil {
17✔
1208
                                return StateError, closeTx, err
×
1209
                        }
×
1210

1211
                        err = c.sweepAnchors(anchors, triggerHeight)
17✔
1212
                        if err != nil {
17✔
1213
                                return StateError, closeTx, err
×
1214
                        }
×
1215

1216
                        nextState = StateCommitmentBroadcasted
17✔
1217

1218
                // If this state advance was triggered by any of the
1219
                // commitments being confirmed, then we'll jump to the state
1220
                // where the contract has been closed.
1221
                case localCloseTrigger, remoteCloseTrigger:
13✔
1222
                        nextState = StateContractClosed
13✔
1223

1224
                // If a coop close was confirmed, jump straight to the fully
1225
                // resolved state.
1226
                case coopCloseTrigger:
×
1227
                        nextState = StateFullyResolved
×
1228

1229
                case breachCloseTrigger:
×
1230
                        nextContractState, err := c.checkLegacyBreach()
×
1231
                        if nextContractState == StateError {
×
1232
                                return nextContractState, closeTx, err
×
1233
                        }
×
1234

1235
                        nextState = nextContractState
×
1236
                }
1237

1238
                log.Infof("ChannelArbitrator(%v): trigger %v moving from "+
30✔
1239
                        "state %v to %v", c.cfg.ChanPoint, trigger, c.state,
30✔
1240
                        nextState)
30✔
1241

1242
        // If we're in this state, then the contract has been fully closed to
1243
        // outside sub-systems, so we'll process the prior set of on-chain
1244
        // contract actions and launch a set of resolvers.
1245
        case StateContractClosed:
22✔
1246
                // First, we'll fetch our chain actions, and both sets of
22✔
1247
                // resolutions so we can process them.
22✔
1248
                contractResolutions, err := c.log.FetchContractResolutions()
22✔
1249
                if err != nil {
24✔
1250
                        log.Errorf("unable to fetch contract resolutions: %v",
2✔
1251
                                err)
2✔
1252
                        return StateError, closeTx, err
2✔
1253
                }
2✔
1254

1255
                // If the resolution is empty, and we have no HTLCs at all to
1256
                // send to, then we're done here. We don't need to launch any
1257
                // resolvers, and can go straight to our final state.
1258
                if contractResolutions.IsEmpty() && confCommitSet.IsEmpty() {
28✔
1259
                        log.Infof("ChannelArbitrator(%v): contract "+
8✔
1260
                                "resolutions empty, marking channel as fully resolved!",
8✔
1261
                                c.cfg.ChanPoint)
8✔
1262
                        nextState = StateFullyResolved
8✔
1263
                        break
8✔
1264
                }
1265

1266
                // First, we'll reconstruct a fresh set of chain actions as the
1267
                // set of actions we need to act on may differ based on if it
1268
                // was our commitment, or they're commitment that hit the chain.
1269
                htlcActions, err := c.constructChainActions(
12✔
1270
                        confCommitSet, triggerHeight, trigger,
12✔
1271
                )
12✔
1272
                if err != nil {
12✔
1273
                        return StateError, closeTx, err
×
1274
                }
×
1275

1276
                // In case its a breach transaction we fail back all upstream
1277
                // HTLCs for their corresponding outgoing HTLCs on the remote
1278
                // commitment set (remote and remote pending set).
1279
                if contractResolutions.BreachResolution != nil {
14✔
1280
                        // cancelBreachedHTLCs is a set which holds HTLCs whose
2✔
1281
                        // corresponding incoming HTLCs will be failed back
2✔
1282
                        // because the peer broadcasted an old state.
2✔
1283
                        cancelBreachedHTLCs := fn.NewSet[uint64]()
2✔
1284

2✔
1285
                        // We'll use the CommitSet, we'll fail back all
2✔
1286
                        // upstream HTLCs for their corresponding outgoing
2✔
1287
                        // HTLC that exist on either of the remote commitments.
2✔
1288
                        // The map is used to deduplicate any shared HTLC's.
2✔
1289
                        for htlcSetKey, htlcs := range confCommitSet.HtlcSets {
4✔
1290
                                if !htlcSetKey.IsRemote {
2✔
UNCOV
1291
                                        continue
×
1292
                                }
1293

1294
                                for _, htlc := range htlcs {
4✔
1295
                                        // Only outgoing HTLCs have a
2✔
1296
                                        // corresponding incoming HTLC.
2✔
1297
                                        if htlc.Incoming {
3✔
1298
                                                continue
1✔
1299
                                        }
1300

1301
                                        cancelBreachedHTLCs.Add(htlc.HtlcIndex)
1✔
1302
                                }
1303
                        }
1304

1305
                        err := c.abandonForwards(cancelBreachedHTLCs)
2✔
1306
                        if err != nil {
2✔
1307
                                return StateError, closeTx, err
×
1308
                        }
×
1309
                } else {
10✔
1310
                        // If it's not a breach, we resolve all incoming dust
10✔
1311
                        // HTLCs immediately after the commitment is confirmed.
10✔
1312
                        err = c.failIncomingDust(
10✔
1313
                                htlcActions[HtlcIncomingDustFinalAction],
10✔
1314
                        )
10✔
1315
                        if err != nil {
10✔
1316
                                return StateError, closeTx, err
×
1317
                        }
×
1318

1319
                        // We fail the upstream HTLCs for all remote pending
1320
                        // outgoing HTLCs as soon as the commitment is
1321
                        // confirmed. The upstream HTLCs for outgoing dust
1322
                        // HTLCs have already been resolved before we reach
1323
                        // this point.
1324
                        getIdx := func(htlc channeldb.HTLC) uint64 {
18✔
1325
                                return htlc.HtlcIndex
8✔
1326
                        }
8✔
1327
                        remoteDangling := fn.NewSet(fn.Map(
10✔
1328
                                htlcActions[HtlcFailDanglingAction], getIdx,
10✔
1329
                        )...)
10✔
1330
                        err := c.abandonForwards(remoteDangling)
10✔
1331
                        if err != nil {
10✔
1332
                                return StateError, closeTx, err
×
1333
                        }
×
1334
                }
1335

1336
                // Now that we know we'll need to act, we'll process all the
1337
                // resolvers, then create the structures we need to resolve all
1338
                // outstanding contracts.
1339
                resolvers, err := c.prepContractResolutions(
12✔
1340
                        contractResolutions, triggerHeight, htlcActions,
12✔
1341
                )
12✔
1342
                if err != nil {
12✔
1343
                        log.Errorf("ChannelArbitrator(%v): unable to "+
×
1344
                                "resolve contracts: %v", c.cfg.ChanPoint, err)
×
1345
                        return StateError, closeTx, err
×
1346
                }
×
1347

1348
                log.Debugf("ChannelArbitrator(%v): inserting %v contract "+
12✔
1349
                        "resolvers", c.cfg.ChanPoint, len(resolvers))
12✔
1350

12✔
1351
                err = c.log.InsertUnresolvedContracts(nil, resolvers...)
12✔
1352
                if err != nil {
12✔
1353
                        return StateError, closeTx, err
×
1354
                }
×
1355

1356
                // Finally, we'll launch all the required contract resolvers.
1357
                // Once they're all resolved, we're no longer needed.
1358
                c.resolveContracts(resolvers)
12✔
1359

12✔
1360
                nextState = StateWaitingFullResolution
12✔
1361

1362
        // This is our terminal state. We'll keep returning this state until
1363
        // all contracts are fully resolved.
1364
        case StateWaitingFullResolution:
16✔
1365
                log.Infof("ChannelArbitrator(%v): still awaiting contract "+
16✔
1366
                        "resolution", c.cfg.ChanPoint)
16✔
1367

16✔
1368
                unresolved, err := c.log.FetchUnresolvedContracts()
16✔
1369
                if err != nil {
16✔
1370
                        return StateError, closeTx, err
×
1371
                }
×
1372

1373
                // If we have no unresolved contracts, then we can move to the
1374
                // final state.
1375
                if len(unresolved) == 0 {
28✔
1376
                        nextState = StateFullyResolved
12✔
1377
                        break
12✔
1378
                }
1379

1380
                // Otherwise we still have unresolved contracts, then we'll
1381
                // stay alive to oversee their resolution.
1382
                nextState = StateWaitingFullResolution
4✔
1383

4✔
1384
                // Add debug logs.
4✔
1385
                for _, r := range unresolved {
8✔
1386
                        log.Debugf("ChannelArbitrator(%v): still have "+
4✔
1387
                                "unresolved contract: %T", c.cfg.ChanPoint, r)
4✔
1388
                }
4✔
1389

1390
        // If we start as fully resolved, then we'll end as fully resolved.
1391
        case StateFullyResolved:
22✔
1392
                // To ensure that the state of the contract in persistent
22✔
1393
                // storage is properly reflected, we'll mark the contract as
22✔
1394
                // fully resolved now.
22✔
1395
                nextState = StateFullyResolved
22✔
1396

22✔
1397
                log.Infof("ChannelPoint(%v) has been fully resolved "+
22✔
1398
                        "on-chain at height=%v", c.cfg.ChanPoint, triggerHeight)
22✔
1399

22✔
1400
                if err := c.cfg.MarkChannelResolved(); err != nil {
22✔
1401
                        log.Errorf("unable to mark channel resolved: %v", err)
×
1402
                        return StateError, closeTx, err
×
1403
                }
×
1404
        }
1405

1406
        log.Tracef("ChannelArbitrator(%v): next_state=%v", c.cfg.ChanPoint,
131✔
1407
                nextState)
131✔
1408

131✔
1409
        return nextState, closeTx, nil
131✔
1410
}
1411

1412
// sweepAnchors offers all given anchor resolutions to the sweeper. It requests
1413
// sweeping at the minimum fee rate. This fee rate can be upped manually by the
1414
// user via the BumpFee rpc.
1415
func (c *ChannelArbitrator) sweepAnchors(anchors *lnwallet.AnchorResolutions,
1416
        heightHint uint32) error {
19✔
1417

19✔
1418
        // Update the set of activeHTLCs so that the sweeping routine has an
19✔
1419
        // up-to-date view of the set of commitments.
19✔
1420
        c.updateActiveHTLCs()
19✔
1421

19✔
1422
        // Prepare the sweeping requests for all possible versions of
19✔
1423
        // commitments.
19✔
1424
        sweepReqs, err := c.prepareAnchorSweeps(heightHint, anchors)
19✔
1425
        if err != nil {
19✔
1426
                return err
×
1427
        }
×
1428

1429
        // Send out the sweeping requests to the sweeper.
1430
        for _, req := range sweepReqs {
27✔
1431
                _, err = c.cfg.Sweeper.SweepInput(req.input, req.params)
8✔
1432
                if err != nil {
8✔
1433
                        return err
×
1434
                }
×
1435
        }
1436

1437
        return nil
19✔
1438
}
1439

1440
// findCommitmentDeadlineAndValue finds the deadline (relative block height)
1441
// for a commitment transaction by extracting the minimum CLTV from its HTLCs.
1442
// From our PoV, the deadline delta is defined to be the smaller of,
1443
//   - half of the least CLTV from outgoing HTLCs' corresponding incoming
1444
//     HTLCs,  or,
1445
//   - half of the least CLTV from incoming HTLCs if the preimage is available.
1446
//
1447
// We use half of the CTLV value to ensure that we have enough time to sweep
1448
// the second-level HTLCs.
1449
//
1450
// It also finds the total value that are time-sensitive, which is the sum of
1451
// all the outgoing HTLCs plus incoming HTLCs whose preimages are known. It
1452
// then returns the value left after subtracting the budget used for sweeping
1453
// the time-sensitive HTLCs.
1454
//
1455
// NOTE: when the deadline turns out to be 0 blocks, we will replace it with 1
1456
// block because our fee estimator doesn't allow a 0 conf target. This also
1457
// means we've left behind and should increase our fee to make the transaction
1458
// confirmed asap.
1459
func (c *ChannelArbitrator) findCommitmentDeadlineAndValue(heightHint uint32,
1460
        htlcs htlcSet) (fn.Option[int32], btcutil.Amount, error) {
13✔
1461

13✔
1462
        deadlineMinHeight := uint32(math.MaxUint32)
13✔
1463
        totalValue := btcutil.Amount(0)
13✔
1464

13✔
1465
        // First, iterate through the outgoingHTLCs to find the lowest CLTV
13✔
1466
        // value.
13✔
1467
        for _, htlc := range htlcs.outgoingHTLCs {
23✔
1468
                // Skip if the HTLC is dust.
10✔
1469
                if htlc.OutputIndex < 0 {
13✔
1470
                        log.Debugf("ChannelArbitrator(%v): skipped deadline "+
3✔
1471
                                "for dust htlc=%x",
3✔
1472
                                c.cfg.ChanPoint, htlc.RHash[:])
3✔
1473

3✔
1474
                        continue
3✔
1475
                }
1476

1477
                value := htlc.Amt.ToSatoshis()
7✔
1478

7✔
1479
                // Find the expiry height for this outgoing HTLC's incoming
7✔
1480
                // HTLC.
7✔
1481
                deadlineOpt := c.cfg.FindOutgoingHTLCDeadline(htlc)
7✔
1482

7✔
1483
                // The deadline is default to the current deadlineMinHeight,
7✔
1484
                // and it's overwritten when it's not none.
7✔
1485
                deadline := deadlineMinHeight
7✔
1486
                deadlineOpt.WhenSome(func(d int32) {
12✔
1487
                        deadline = uint32(d)
5✔
1488

5✔
1489
                        // We only consider the value is under protection when
5✔
1490
                        // it's time-sensitive.
5✔
1491
                        totalValue += value
5✔
1492
                })
5✔
1493

1494
                if deadline < deadlineMinHeight {
12✔
1495
                        deadlineMinHeight = deadline
5✔
1496

5✔
1497
                        log.Tracef("ChannelArbitrator(%v): outgoing HTLC has "+
5✔
1498
                                "deadline=%v, value=%v", c.cfg.ChanPoint,
5✔
1499
                                deadlineMinHeight, value)
5✔
1500
                }
5✔
1501
        }
1502

1503
        // Then going through the incomingHTLCs, and update the minHeight when
1504
        // conditions met.
1505
        for _, htlc := range htlcs.incomingHTLCs {
22✔
1506
                // Skip if the HTLC is dust.
9✔
1507
                if htlc.OutputIndex < 0 {
10✔
1508
                        log.Debugf("ChannelArbitrator(%v): skipped deadline "+
1✔
1509
                                "for dust htlc=%x",
1✔
1510
                                c.cfg.ChanPoint, htlc.RHash[:])
1✔
1511

1✔
1512
                        continue
1✔
1513
                }
1514

1515
                // Since it's an HTLC sent to us, check if we have preimage for
1516
                // this HTLC.
1517
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
8✔
1518
                if err != nil {
8✔
1519
                        return fn.None[int32](), 0, err
×
1520
                }
×
1521

1522
                if !preimageAvailable {
10✔
1523
                        continue
2✔
1524
                }
1525

1526
                value := htlc.Amt.ToSatoshis()
6✔
1527
                totalValue += value
6✔
1528

6✔
1529
                if htlc.RefundTimeout < deadlineMinHeight {
11✔
1530
                        deadlineMinHeight = htlc.RefundTimeout
5✔
1531

5✔
1532
                        log.Tracef("ChannelArbitrator(%v): incoming HTLC has "+
5✔
1533
                                "deadline=%v, amt=%v", c.cfg.ChanPoint,
5✔
1534
                                deadlineMinHeight, value)
5✔
1535
                }
5✔
1536
        }
1537

1538
        // Calculate the deadline. There are two cases to be handled here,
1539
        //   - when the deadlineMinHeight never gets updated, which could
1540
        //     happen when we have no outgoing HTLCs, and, for incoming HTLCs,
1541
        //       * either we have none, or,
1542
        //       * none of the HTLCs are preimageAvailable.
1543
        //   - when our deadlineMinHeight is no greater than the heightHint,
1544
        //     which means we are behind our schedule.
1545
        var deadline uint32
13✔
1546
        switch {
13✔
1547
        // When we couldn't find a deadline height from our HTLCs, we will fall
1548
        // back to the default value as there's no time pressure here.
1549
        case deadlineMinHeight == math.MaxUint32:
4✔
1550
                return fn.None[int32](), 0, nil
4✔
1551

1552
        // When the deadline is passed, we will fall back to the smallest conf
1553
        // target (1 block).
1554
        case deadlineMinHeight <= heightHint:
1✔
1555
                log.Warnf("ChannelArbitrator(%v): deadline is passed with "+
1✔
1556
                        "deadlineMinHeight=%d, heightHint=%d",
1✔
1557
                        c.cfg.ChanPoint, deadlineMinHeight, heightHint)
1✔
1558
                deadline = 1
1✔
1559

1560
        // Use half of the deadline delta, and leave the other half to be used
1561
        // to sweep the HTLCs.
1562
        default:
8✔
1563
                deadline = (deadlineMinHeight - heightHint) / 2
8✔
1564
        }
1565

1566
        // Calculate the value left after subtracting the budget used for
1567
        // sweeping the time-sensitive HTLCs.
1568
        valueLeft := totalValue - calculateBudget(
9✔
1569
                totalValue, c.cfg.Budget.DeadlineHTLCRatio,
9✔
1570
                c.cfg.Budget.DeadlineHTLC,
9✔
1571
        )
9✔
1572

9✔
1573
        log.Debugf("ChannelArbitrator(%v): calculated valueLeft=%v, "+
9✔
1574
                "deadline=%d, using deadlineMinHeight=%d, heightHint=%d",
9✔
1575
                c.cfg.ChanPoint, valueLeft, deadline, deadlineMinHeight,
9✔
1576
                heightHint)
9✔
1577

9✔
1578
        return fn.Some(int32(deadline)), valueLeft, nil
9✔
1579
}
1580

1581
// resolveContracts updates the activeResolvers list and starts to resolve each
1582
// contract concurrently, and launches them.
1583
func (c *ChannelArbitrator) resolveContracts(resolvers []ContractResolver) {
13✔
1584
        c.activeResolversLock.Lock()
13✔
1585
        c.activeResolvers = resolvers
13✔
1586
        c.activeResolversLock.Unlock()
13✔
1587

13✔
1588
        // Launch all resolvers.
13✔
1589
        c.launchResolvers()
13✔
1590

13✔
1591
        for _, contract := range resolvers {
19✔
1592
                c.wg.Add(1)
6✔
1593
                go c.resolveContract(contract)
6✔
1594
        }
6✔
1595
}
1596

1597
// launchResolvers launches all the active resolvers concurrently.
1598
func (c *ChannelArbitrator) launchResolvers() {
19✔
1599
        c.activeResolversLock.Lock()
19✔
1600
        resolvers := c.activeResolvers
19✔
1601
        c.activeResolversLock.Unlock()
19✔
1602

19✔
1603
        // errChans is a map of channels that will be used to receive errors
19✔
1604
        // returned from launching the resolvers.
19✔
1605
        errChans := make(map[ContractResolver]chan error, len(resolvers))
19✔
1606

19✔
1607
        // Launch each resolver in goroutines.
19✔
1608
        for _, r := range resolvers {
25✔
1609
                // If the contract is already resolved, there's no need to
6✔
1610
                // launch it again.
6✔
1611
                if r.IsResolved() {
6✔
NEW
1612
                        log.Debugf("ChannelArbitrator(%v): skipping resolver "+
×
NEW
1613
                                "%T as it's already resolved", c.cfg.ChanPoint,
×
NEW
1614
                                r)
×
NEW
1615

×
NEW
1616
                        continue
×
1617
                }
1618

1619
                // Create a signal chan.
1620
                errChan := make(chan error, 1)
6✔
1621
                errChans[r] = errChan
6✔
1622

6✔
1623
                go func() {
12✔
1624
                        err := r.Launch()
6✔
1625
                        errChan <- err
6✔
1626
                }()
6✔
1627
        }
1628

1629
        // Wait for all resolvers to finish launching.
1630
        for r, errChan := range errChans {
25✔
1631
                select {
6✔
1632
                case err := <-errChan:
6✔
1633
                        if err == nil {
12✔
1634
                                continue
6✔
1635
                        }
1636

NEW
1637
                        log.Errorf("ChannelArbitrator(%v): unable to launch "+
×
NEW
1638
                                "contract resolver(%T): %v", c.cfg.ChanPoint, r,
×
NEW
1639
                                err)
×
1640

NEW
1641
                case <-c.quit:
×
NEW
1642
                        log.Debugf("ChannelArbitrator quit signal received, " +
×
NEW
1643
                                "exit launchResolvers")
×
NEW
1644

×
NEW
1645
                        return
×
1646
                }
1647
        }
1648
}
1649

1650
// advanceState is the main driver of our state machine. This method is an
1651
// iterative function which repeatedly attempts to advance the internal state
1652
// of the channel arbitrator. The state will be advanced until we reach a
1653
// redundant transition, meaning that the state transition is a noop. The final
1654
// param is a callback that allows the caller to execute an arbitrary action
1655
// after each state transition.
1656
func (c *ChannelArbitrator) advanceState(
1657
        triggerHeight uint32, trigger transitionTrigger,
1658
        confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
87✔
1659

87✔
1660
        var (
87✔
1661
                priorState   ArbitratorState
87✔
1662
                forceCloseTx *wire.MsgTx
87✔
1663
        )
87✔
1664

87✔
1665
        // We'll continue to advance our state forward until the state we
87✔
1666
        // transition to is that same state that we started at.
87✔
1667
        for {
260✔
1668
                priorState = c.state
173✔
1669
                log.Debugf("ChannelArbitrator(%v): attempting state step with "+
173✔
1670
                        "trigger=%v from state=%v at height=%v",
173✔
1671
                        c.cfg.ChanPoint, trigger, priorState, triggerHeight)
173✔
1672

173✔
1673
                nextState, closeTx, err := c.stateStep(
173✔
1674
                        triggerHeight, trigger, confCommitSet,
173✔
1675
                )
173✔
1676
                if err != nil {
177✔
1677
                        log.Errorf("ChannelArbitrator(%v): unable to advance "+
4✔
1678
                                "state: %v", c.cfg.ChanPoint, err)
4✔
1679
                        return priorState, nil, err
4✔
1680
                }
4✔
1681

1682
                if forceCloseTx == nil && closeTx != nil {
185✔
1683
                        forceCloseTx = closeTx
16✔
1684
                }
16✔
1685

1686
                // Our termination transition is a noop transition. If we get
1687
                // our prior state back as the next state, then we'll
1688
                // terminate.
1689
                if nextState == priorState {
249✔
1690
                        log.Debugf("ChannelArbitrator(%v): terminating at "+
80✔
1691
                                "state=%v", c.cfg.ChanPoint, nextState)
80✔
1692
                        return nextState, forceCloseTx, nil
80✔
1693
                }
80✔
1694

1695
                // As the prior state was successfully executed, we can now
1696
                // commit the next state. This ensures that we will re-execute
1697
                // the prior state if anything fails.
1698
                if err := c.log.CommitState(nextState); err != nil {
92✔
1699
                        log.Errorf("ChannelArbitrator(%v): unable to commit "+
3✔
1700
                                "next state(%v): %v", c.cfg.ChanPoint,
3✔
1701
                                nextState, err)
3✔
1702
                        return priorState, nil, err
3✔
1703
                }
3✔
1704
                c.state = nextState
86✔
1705
        }
1706
}
1707

1708
// ChainAction is an enum that encompasses all possible on-chain actions
1709
// we'll take for a set of HTLC's.
1710
type ChainAction uint8
1711

1712
const (
1713
        // NoAction is the min chainAction type, indicating that no action
1714
        // needs to be taken for a given HTLC.
1715
        NoAction ChainAction = 0
1716

1717
        // HtlcTimeoutAction indicates that the HTLC will timeout soon. As a
1718
        // result, we should get ready to sweep it on chain after the timeout.
1719
        HtlcTimeoutAction = 1
1720

1721
        // HtlcClaimAction indicates that we should claim the HTLC on chain
1722
        // before its timeout period.
1723
        HtlcClaimAction = 2
1724

1725
        // HtlcFailDustAction indicates that we should fail the upstream HTLC
1726
        // for an outgoing dust HTLC immediately (even before the commitment
1727
        // transaction is confirmed) because it has no output on the commitment
1728
        // transaction. This also includes remote pending outgoing dust HTLCs.
1729
        HtlcFailDustAction = 3
1730

1731
        // HtlcOutgoingWatchAction indicates that we can't yet timeout this
1732
        // HTLC, but we had to go to chain on order to resolve an existing
1733
        // HTLC.  In this case, we'll either: time it out once it expires, or
1734
        // will learn the pre-image if the remote party claims the output. In
1735
        // this case, well add the pre-image to our global store.
1736
        HtlcOutgoingWatchAction = 4
1737

1738
        // HtlcIncomingWatchAction indicates that we don't yet have the
1739
        // pre-image to claim incoming HTLC, but we had to go to chain in order
1740
        // to resolve and existing HTLC. In this case, we'll either: let the
1741
        // other party time it out, or eventually learn of the pre-image, in
1742
        // which case we'll claim on chain.
1743
        HtlcIncomingWatchAction = 5
1744

1745
        // HtlcIncomingDustFinalAction indicates that we should mark an incoming
1746
        // dust htlc as final because it can't be claimed on-chain.
1747
        HtlcIncomingDustFinalAction = 6
1748

1749
        // HtlcFailDanglingAction indicates that we should fail the upstream
1750
        // HTLC for an outgoing HTLC immediately after the commitment
1751
        // transaction has confirmed because it has no corresponding output on
1752
        // the commitment transaction. This category does NOT include any dust
1753
        // HTLCs which are mapped in the "HtlcFailDustAction" category.
1754
        HtlcFailDanglingAction = 7
1755
)
1756

1757
// String returns a human readable string describing a chain action.
1758
func (c ChainAction) String() string {
×
1759
        switch c {
×
1760
        case NoAction:
×
1761
                return "NoAction"
×
1762

1763
        case HtlcTimeoutAction:
×
1764
                return "HtlcTimeoutAction"
×
1765

1766
        case HtlcClaimAction:
×
1767
                return "HtlcClaimAction"
×
1768

1769
        case HtlcFailDustAction:
×
1770
                return "HtlcFailDustAction"
×
1771

1772
        case HtlcOutgoingWatchAction:
×
1773
                return "HtlcOutgoingWatchAction"
×
1774

1775
        case HtlcIncomingWatchAction:
×
1776
                return "HtlcIncomingWatchAction"
×
1777

1778
        case HtlcIncomingDustFinalAction:
×
1779
                return "HtlcIncomingDustFinalAction"
×
1780

1781
        case HtlcFailDanglingAction:
×
1782
                return "HtlcFailDanglingAction"
×
1783

1784
        default:
×
1785
                return "<unknown action>"
×
1786
        }
1787
}
1788

1789
// ChainActionMap is a map of a chain action, to the set of HTLC's that need to
1790
// be acted upon for a given action type. The channel
1791
type ChainActionMap map[ChainAction][]channeldb.HTLC
1792

1793
// Merge merges the passed chain actions with the target chain action map.
1794
func (c ChainActionMap) Merge(actions ChainActionMap) {
68✔
1795
        for chainAction, htlcs := range actions {
81✔
1796
                c[chainAction] = append(c[chainAction], htlcs...)
13✔
1797
        }
13✔
1798
}
1799

1800
// shouldGoOnChain takes into account the absolute timeout of the HTLC, if the
1801
// confirmation delta that we need is close, and returns a bool indicating if
1802
// we should go on chain to claim.  We do this rather than waiting up until the
1803
// last minute as we want to ensure that when we *need* (HTLC is timed out) to
1804
// sweep, the commitment is already confirmed.
1805
func (c *ChannelArbitrator) shouldGoOnChain(htlc channeldb.HTLC,
1806
        broadcastDelta, currentHeight uint32) bool {
26✔
1807

26✔
1808
        // We'll calculate the broadcast cut off for this HTLC. This is the
26✔
1809
        // height that (based on our current fee estimation) we should
26✔
1810
        // broadcast in order to ensure the commitment transaction is confirmed
26✔
1811
        // before the HTLC fully expires.
26✔
1812
        broadcastCutOff := htlc.RefundTimeout - broadcastDelta
26✔
1813

26✔
1814
        log.Tracef("ChannelArbitrator(%v): examining outgoing contract: "+
26✔
1815
                "expiry=%v, cutoff=%v, height=%v", c.cfg.ChanPoint, htlc.RefundTimeout,
26✔
1816
                broadcastCutOff, currentHeight)
26✔
1817

26✔
1818
        // TODO(roasbeef): take into account default HTLC delta, don't need to
26✔
1819
        // broadcast immediately
26✔
1820
        //  * can then batch with SINGLE | ANYONECANPAY
26✔
1821

26✔
1822
        // We should on-chain for this HTLC, iff we're within out broadcast
26✔
1823
        // cutoff window.
26✔
1824
        if currentHeight < broadcastCutOff {
45✔
1825
                return false
19✔
1826
        }
19✔
1827

1828
        // In case of incoming htlc we should go to chain.
1829
        if htlc.Incoming {
7✔
UNCOV
1830
                return true
×
UNCOV
1831
        }
×
1832

1833
        // For htlcs that are result of our initiated payments we give some grace
1834
        // period before force closing the channel. During this time we expect
1835
        // both nodes to connect and give a chance to the other node to send its
1836
        // updates and cancel the htlc.
1837
        // This shouldn't add any security risk as there is no incoming htlc to
1838
        // fulfill at this case and the expectation is that when the channel is
1839
        // active the other node will send update_fail_htlc to remove the htlc
1840
        // without closing the channel. It is up to the user to force close the
1841
        // channel if the peer misbehaves and doesn't send the update_fail_htlc.
1842
        // It is useful when this node is most of the time not online and is
1843
        // likely to miss the time slot where the htlc may be cancelled.
1844
        isForwarded := c.cfg.IsForwardedHTLC(c.cfg.ShortChanID, htlc.HtlcIndex)
7✔
1845
        upTime := c.cfg.Clock.Now().Sub(c.startTimestamp)
7✔
1846
        return isForwarded || upTime > c.cfg.PaymentsExpirationGracePeriod
7✔
1847
}
1848

1849
// checkCommitChainActions is called for each new block connected to the end of
1850
// the main chain. Given the new block height, this new method will examine all
1851
// active HTLC's, and determine if we need to go on-chain to claim any of them.
1852
// A map of action -> []htlc is returned, detailing what action (if any) should
1853
// be performed for each HTLC. For timed out HTLC's, once the commitment has
1854
// been sufficiently confirmed, the HTLC's should be canceled backwards. For
1855
// redeemed HTLC's, we should send the pre-image back to the incoming link.
1856
func (c *ChannelArbitrator) checkCommitChainActions(height uint32,
1857
        trigger transitionTrigger, htlcs htlcSet) (ChainActionMap, error) {
68✔
1858

68✔
1859
        // TODO(roasbeef): would need to lock channel? channel totem?
68✔
1860
        //  * race condition if adding and we broadcast, etc
68✔
1861
        //  * or would make each instance sync?
68✔
1862

68✔
1863
        log.Debugf("ChannelArbitrator(%v): checking commit chain actions at "+
68✔
1864
                "height=%v, in_htlc_count=%v, out_htlc_count=%v",
68✔
1865
                c.cfg.ChanPoint, height,
68✔
1866
                len(htlcs.incomingHTLCs), len(htlcs.outgoingHTLCs))
68✔
1867

68✔
1868
        actionMap := make(ChainActionMap)
68✔
1869

68✔
1870
        // First, we'll make an initial pass over the set of incoming and
68✔
1871
        // outgoing HTLC's to decide if we need to go on chain at all.
68✔
1872
        haveChainActions := false
68✔
1873
        for _, htlc := range htlcs.outgoingHTLCs {
75✔
1874
                // We'll need to go on-chain for an outgoing HTLC if it was
7✔
1875
                // never resolved downstream, and it's "close" to timing out.
7✔
1876
                //
7✔
1877
                // TODO(yy): If there's no corresponding incoming HTLC, it
7✔
1878
                // means we are the first hop, hence the payer. This is a
7✔
1879
                // tricky case - unlike a forwarding hop, we don't have an
7✔
1880
                // incoming HTLC that will time out, which means as long as we
7✔
1881
                // can learn the preimage, we can settle the invoice (before it
7✔
1882
                // expires?).
7✔
1883
                toChain := c.shouldGoOnChain(
7✔
1884
                        htlc, c.cfg.OutgoingBroadcastDelta, height,
7✔
1885
                )
7✔
1886

7✔
1887
                if toChain {
7✔
UNCOV
1888
                        // Convert to int64 in case of overflow.
×
UNCOV
1889
                        remainingBlocks := int64(htlc.RefundTimeout) -
×
UNCOV
1890
                                int64(height)
×
UNCOV
1891

×
UNCOV
1892
                        log.Infof("ChannelArbitrator(%v): go to chain for "+
×
UNCOV
1893
                                "outgoing htlc %x: timeout=%v, amount=%v, "+
×
UNCOV
1894
                                "blocks_until_expiry=%v, broadcast_delta=%v",
×
UNCOV
1895
                                c.cfg.ChanPoint, htlc.RHash[:],
×
UNCOV
1896
                                htlc.RefundTimeout, htlc.Amt, remainingBlocks,
×
UNCOV
1897
                                c.cfg.OutgoingBroadcastDelta,
×
UNCOV
1898
                        )
×
UNCOV
1899
                }
×
1900

1901
                haveChainActions = haveChainActions || toChain
7✔
1902
        }
1903

1904
        for _, htlc := range htlcs.incomingHTLCs {
73✔
1905
                // We'll need to go on-chain to pull an incoming HTLC iff we
5✔
1906
                // know the pre-image and it's close to timing out. We need to
5✔
1907
                // ensure that we claim the funds that are rightfully ours
5✔
1908
                // on-chain.
5✔
1909
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
5✔
1910
                if err != nil {
5✔
1911
                        return nil, err
×
1912
                }
×
1913

1914
                if !preimageAvailable {
9✔
1915
                        continue
4✔
1916
                }
1917

1918
                toChain := c.shouldGoOnChain(
1✔
1919
                        htlc, c.cfg.IncomingBroadcastDelta, height,
1✔
1920
                )
1✔
1921

1✔
1922
                if toChain {
1✔
UNCOV
1923
                        // Convert to int64 in case of overflow.
×
UNCOV
1924
                        remainingBlocks := int64(htlc.RefundTimeout) -
×
UNCOV
1925
                                int64(height)
×
UNCOV
1926

×
UNCOV
1927
                        log.Infof("ChannelArbitrator(%v): go to chain for "+
×
UNCOV
1928
                                "incoming htlc %x: timeout=%v, amount=%v, "+
×
UNCOV
1929
                                "blocks_until_expiry=%v, broadcast_delta=%v",
×
UNCOV
1930
                                c.cfg.ChanPoint, htlc.RHash[:],
×
UNCOV
1931
                                htlc.RefundTimeout, htlc.Amt, remainingBlocks,
×
UNCOV
1932
                                c.cfg.IncomingBroadcastDelta,
×
UNCOV
1933
                        )
×
UNCOV
1934
                }
×
1935

1936
                haveChainActions = haveChainActions || toChain
1✔
1937
        }
1938

1939
        // If we don't have any actions to make, then we'll return an empty
1940
        // action map. We only do this if this was a chain trigger though, as
1941
        // if we're going to broadcast the commitment (or the remote party did)
1942
        // we're *forced* to act on each HTLC.
1943
        if !haveChainActions && trigger == chainTrigger {
108✔
1944
                log.Tracef("ChannelArbitrator(%v): no actions to take at "+
40✔
1945
                        "height=%v", c.cfg.ChanPoint, height)
40✔
1946
                return actionMap, nil
40✔
1947
        }
40✔
1948

1949
        // Now that we know we'll need to go on-chain, we'll examine all of our
1950
        // active outgoing HTLC's to see if we either need to: sweep them after
1951
        // a timeout (then cancel backwards), cancel them backwards
1952
        // immediately, or watch them as they're still active contracts.
1953
        for _, htlc := range htlcs.outgoingHTLCs {
35✔
1954
                switch {
7✔
1955
                // If the HTLC is dust, then we can cancel it backwards
1956
                // immediately as there's no matching contract to arbitrate
1957
                // on-chain. We know the HTLC is dust, if the OutputIndex
1958
                // negative.
1959
                case htlc.OutputIndex < 0:
2✔
1960
                        log.Tracef("ChannelArbitrator(%v): immediately "+
2✔
1961
                                "failing dust htlc=%x", c.cfg.ChanPoint,
2✔
1962
                                htlc.RHash[:])
2✔
1963

2✔
1964
                        actionMap[HtlcFailDustAction] = append(
2✔
1965
                                actionMap[HtlcFailDustAction], htlc,
2✔
1966
                        )
2✔
1967

1968
                // If we don't need to immediately act on this HTLC, then we'll
1969
                // mark it still "live". After we broadcast, we'll monitor it
1970
                // until the HTLC times out to see if we can also redeem it
1971
                // on-chain.
1972
                case !c.shouldGoOnChain(htlc, c.cfg.OutgoingBroadcastDelta,
1973
                        height,
1974
                ):
5✔
1975
                        // TODO(roasbeef): also need to be able to query
5✔
1976
                        // circuit map to see if HTLC hasn't been fully
5✔
1977
                        // resolved
5✔
1978
                        //
5✔
1979
                        //  * can't fail incoming until if outgoing not yet
5✔
1980
                        //  failed
5✔
1981

5✔
1982
                        log.Tracef("ChannelArbitrator(%v): watching chain to "+
5✔
1983
                                "decide action for outgoing htlc=%x",
5✔
1984
                                c.cfg.ChanPoint, htlc.RHash[:])
5✔
1985

5✔
1986
                        actionMap[HtlcOutgoingWatchAction] = append(
5✔
1987
                                actionMap[HtlcOutgoingWatchAction], htlc,
5✔
1988
                        )
5✔
1989

1990
                // Otherwise, we'll update our actionMap to mark that we need
1991
                // to sweep this HTLC on-chain
UNCOV
1992
                default:
×
UNCOV
1993
                        log.Tracef("ChannelArbitrator(%v): going on-chain to "+
×
UNCOV
1994
                                "timeout htlc=%x", c.cfg.ChanPoint, htlc.RHash[:])
×
UNCOV
1995

×
UNCOV
1996
                        actionMap[HtlcTimeoutAction] = append(
×
UNCOV
1997
                                actionMap[HtlcTimeoutAction], htlc,
×
UNCOV
1998
                        )
×
1999
                }
2000
        }
2001

2002
        // Similarly, for each incoming HTLC, now that we need to go on-chain,
2003
        // we'll either: sweep it immediately if we know the pre-image, or
2004
        // observe the output on-chain if we don't In this last, case we'll
2005
        // either learn of it eventually from the outgoing HTLC, or the sender
2006
        // will timeout the HTLC.
2007
        for _, htlc := range htlcs.incomingHTLCs {
33✔
2008
                // If the HTLC is dust, there is no action to be taken.
5✔
2009
                if htlc.OutputIndex < 0 {
7✔
2010
                        log.Debugf("ChannelArbitrator(%v): no resolution "+
2✔
2011
                                "needed for incoming dust htlc=%x",
2✔
2012
                                c.cfg.ChanPoint, htlc.RHash[:])
2✔
2013

2✔
2014
                        actionMap[HtlcIncomingDustFinalAction] = append(
2✔
2015
                                actionMap[HtlcIncomingDustFinalAction], htlc,
2✔
2016
                        )
2✔
2017

2✔
2018
                        continue
2✔
2019
                }
2020

2021
                log.Tracef("ChannelArbitrator(%v): watching chain to decide "+
3✔
2022
                        "action for incoming htlc=%x", c.cfg.ChanPoint,
3✔
2023
                        htlc.RHash[:])
3✔
2024

3✔
2025
                actionMap[HtlcIncomingWatchAction] = append(
3✔
2026
                        actionMap[HtlcIncomingWatchAction], htlc,
3✔
2027
                )
3✔
2028
        }
2029

2030
        return actionMap, nil
28✔
2031
}
2032

2033
// isPreimageAvailable returns whether the hash preimage is available in either
2034
// the preimage cache or the invoice database.
2035
func (c *ChannelArbitrator) isPreimageAvailable(hash lntypes.Hash) (bool,
2036
        error) {
17✔
2037

17✔
2038
        // Start by checking the preimage cache for preimages of
17✔
2039
        // forwarded HTLCs.
17✔
2040
        _, preimageAvailable := c.cfg.PreimageDB.LookupPreimage(
17✔
2041
                hash,
17✔
2042
        )
17✔
2043
        if preimageAvailable {
24✔
2044
                return true, nil
7✔
2045
        }
7✔
2046

2047
        // Then check if we have an invoice that can be settled by this HTLC.
2048
        //
2049
        // TODO(joostjager): Check that there are still more blocks remaining
2050
        // than the invoice cltv delta. We don't want to go to chain only to
2051
        // have the incoming contest resolver decide that we don't want to
2052
        // settle this invoice.
2053
        invoice, err := c.cfg.Registry.LookupInvoice(context.Background(), hash)
10✔
2054
        switch {
10✔
UNCOV
2055
        case err == nil:
×
2056
        case errors.Is(err, invoices.ErrInvoiceNotFound) ||
2057
                errors.Is(err, invoices.ErrNoInvoicesCreated):
10✔
2058

10✔
2059
                return false, nil
10✔
2060
        default:
×
2061
                return false, err
×
2062
        }
2063

UNCOV
2064
        preimageAvailable = invoice.Terms.PaymentPreimage != nil
×
UNCOV
2065

×
UNCOV
2066
        return preimageAvailable, nil
×
2067
}
2068

2069
// checkLocalChainActions is similar to checkCommitChainActions, but it also
2070
// examines the set of HTLCs on the remote party's commitment. This allows us
2071
// to ensure we're able to satisfy the HTLC timeout constraints for incoming vs
2072
// outgoing HTLCs.
2073
func (c *ChannelArbitrator) checkLocalChainActions(
2074
        height uint32, trigger transitionTrigger,
2075
        activeHTLCs map[HtlcSetKey]htlcSet,
2076
        commitsConfirmed bool) (ChainActionMap, error) {
60✔
2077

60✔
2078
        // First, we'll check our local chain actions as normal. This will only
60✔
2079
        // examine HTLCs on our local commitment (timeout or settle).
60✔
2080
        localCommitActions, err := c.checkCommitChainActions(
60✔
2081
                height, trigger, activeHTLCs[LocalHtlcSet],
60✔
2082
        )
60✔
2083
        if err != nil {
60✔
2084
                return nil, err
×
2085
        }
×
2086

2087
        // Next, we'll examine the remote commitment (and maybe a dangling one)
2088
        // to see if the set difference of our HTLCs is non-empty. If so, then
2089
        // we may need to cancel back some HTLCs if we decide go to chain.
2090
        remoteDanglingActions := c.checkRemoteDanglingActions(
60✔
2091
                height, activeHTLCs, commitsConfirmed,
60✔
2092
        )
60✔
2093

60✔
2094
        // Finally, we'll merge the two set of chain actions.
60✔
2095
        localCommitActions.Merge(remoteDanglingActions)
60✔
2096

60✔
2097
        return localCommitActions, nil
60✔
2098
}
2099

2100
// checkRemoteDanglingActions examines the set of remote commitments for any
2101
// HTLCs that are close to timing out. If we find any, then we'll return a set
2102
// of chain actions for HTLCs that are on our commitment, but not theirs to
2103
// cancel immediately.
2104
func (c *ChannelArbitrator) checkRemoteDanglingActions(
2105
        height uint32, activeHTLCs map[HtlcSetKey]htlcSet,
2106
        commitsConfirmed bool) ChainActionMap {
60✔
2107

60✔
2108
        var (
60✔
2109
                pendingRemoteHTLCs []channeldb.HTLC
60✔
2110
                localHTLCs         = make(map[uint64]struct{})
60✔
2111
                remoteHTLCs        = make(map[uint64]channeldb.HTLC)
60✔
2112
                actionMap          = make(ChainActionMap)
60✔
2113
        )
60✔
2114

60✔
2115
        // First, we'll construct two sets of the outgoing HTLCs: those on our
60✔
2116
        // local commitment, and those that are on the remote commitment(s).
60✔
2117
        for htlcSetKey, htlcs := range activeHTLCs {
177✔
2118
                if htlcSetKey.IsRemote {
179✔
2119
                        for _, htlc := range htlcs.outgoingHTLCs {
76✔
2120
                                remoteHTLCs[htlc.HtlcIndex] = htlc
14✔
2121
                        }
14✔
2122
                } else {
55✔
2123
                        for _, htlc := range htlcs.outgoingHTLCs {
60✔
2124
                                localHTLCs[htlc.HtlcIndex] = struct{}{}
5✔
2125
                        }
5✔
2126
                }
2127
        }
2128

2129
        // With both sets constructed, we'll now compute the set difference of
2130
        // our two sets of HTLCs. This'll give us the HTLCs that exist on the
2131
        // remote commitment transaction, but not on ours.
2132
        for htlcIndex, htlc := range remoteHTLCs {
74✔
2133
                if _, ok := localHTLCs[htlcIndex]; ok {
15✔
2134
                        continue
1✔
2135
                }
2136

2137
                pendingRemoteHTLCs = append(pendingRemoteHTLCs, htlc)
13✔
2138
        }
2139

2140
        // Finally, we'll examine all the pending remote HTLCs for those that
2141
        // have expired. If we find any, then we'll recommend that they be
2142
        // failed now so we can free up the incoming HTLC.
2143
        for _, htlc := range pendingRemoteHTLCs {
73✔
2144
                // We'll now check if we need to go to chain in order to cancel
13✔
2145
                // the incoming HTLC.
13✔
2146
                goToChain := c.shouldGoOnChain(htlc, c.cfg.OutgoingBroadcastDelta,
13✔
2147
                        height,
13✔
2148
                )
13✔
2149

13✔
2150
                // If we don't need to go to chain, and no commitments have
13✔
2151
                // been confirmed, then we can move on. Otherwise, if
13✔
2152
                // commitments have been confirmed, then we need to cancel back
13✔
2153
                // *all* of the pending remote HTLCS.
13✔
2154
                if !goToChain && !commitsConfirmed {
17✔
2155
                        continue
4✔
2156
                }
2157

2158
                // Dust htlcs can be canceled back even before the commitment
2159
                // transaction confirms. Dust htlcs are not enforceable onchain.
2160
                // If another version of the commit tx would confirm we either
2161
                // gain or lose those dust amounts but there is no other way
2162
                // than cancelling the incoming back because we will never learn
2163
                // the preimage.
2164
                if htlc.OutputIndex < 0 {
9✔
2165
                        log.Infof("ChannelArbitrator(%v): fail dangling dust "+
×
2166
                                "htlc=%x from local/remote commitments diff",
×
2167
                                c.cfg.ChanPoint, htlc.RHash[:])
×
2168

×
2169
                        actionMap[HtlcFailDustAction] = append(
×
2170
                                actionMap[HtlcFailDustAction], htlc,
×
2171
                        )
×
2172

×
2173
                        continue
×
2174
                }
2175

2176
                log.Infof("ChannelArbitrator(%v): fail dangling htlc=%x from "+
9✔
2177
                        "local/remote commitments diff",
9✔
2178
                        c.cfg.ChanPoint, htlc.RHash[:])
9✔
2179

9✔
2180
                actionMap[HtlcFailDanglingAction] = append(
9✔
2181
                        actionMap[HtlcFailDanglingAction], htlc,
9✔
2182
                )
9✔
2183
        }
2184

2185
        return actionMap
60✔
2186
}
2187

2188
// checkRemoteChainActions examines the two possible remote commitment chains
2189
// and returns the set of chain actions we need to carry out if the remote
2190
// commitment (non pending) confirms. The pendingConf indicates if the pending
2191
// remote commitment confirmed. This is similar to checkCommitChainActions, but
2192
// we'll immediately fail any HTLCs on the pending remote commit, but not the
2193
// remote commit (or the other way around).
2194
func (c *ChannelArbitrator) checkRemoteChainActions(
2195
        height uint32, trigger transitionTrigger,
2196
        activeHTLCs map[HtlcSetKey]htlcSet,
2197
        pendingConf bool) (ChainActionMap, error) {
8✔
2198

8✔
2199
        // First, we'll examine all the normal chain actions on the remote
8✔
2200
        // commitment that confirmed.
8✔
2201
        confHTLCs := activeHTLCs[RemoteHtlcSet]
8✔
2202
        if pendingConf {
10✔
2203
                confHTLCs = activeHTLCs[RemotePendingHtlcSet]
2✔
2204
        }
2✔
2205
        remoteCommitActions, err := c.checkCommitChainActions(
8✔
2206
                height, trigger, confHTLCs,
8✔
2207
        )
8✔
2208
        if err != nil {
8✔
2209
                return nil, err
×
2210
        }
×
2211

2212
        // With these actions computed, we'll now check the diff of the HTLCs on
2213
        // the commitments, and cancel back any that are on the pending but not
2214
        // the non-pending.
2215
        remoteDiffActions := c.checkRemoteDiffActions(
8✔
2216
                activeHTLCs, pendingConf,
8✔
2217
        )
8✔
2218

8✔
2219
        // Finally, we'll merge all the chain actions and the final set of
8✔
2220
        // chain actions.
8✔
2221
        remoteCommitActions.Merge(remoteDiffActions)
8✔
2222
        return remoteCommitActions, nil
8✔
2223
}
2224

2225
// checkRemoteDiffActions checks the set difference of the HTLCs on the remote
2226
// confirmed commit and remote pending commit for HTLCS that we need to cancel
2227
// back. If we find any HTLCs on the remote pending but not the remote, then
2228
// we'll mark them to be failed immediately.
2229
func (c *ChannelArbitrator) checkRemoteDiffActions(
2230
        activeHTLCs map[HtlcSetKey]htlcSet,
2231
        pendingConf bool) ChainActionMap {
8✔
2232

8✔
2233
        // First, we'll partition the HTLCs into those that are present on the
8✔
2234
        // confirmed commitment, and those on the dangling commitment.
8✔
2235
        confHTLCs := activeHTLCs[RemoteHtlcSet]
8✔
2236
        danglingHTLCs := activeHTLCs[RemotePendingHtlcSet]
8✔
2237
        if pendingConf {
10✔
2238
                confHTLCs = activeHTLCs[RemotePendingHtlcSet]
2✔
2239
                danglingHTLCs = activeHTLCs[RemoteHtlcSet]
2✔
2240
        }
2✔
2241

2242
        // Next, we'll create a set of all the HTLCs confirmed commitment.
2243
        remoteHtlcs := make(map[uint64]struct{})
8✔
2244
        for _, htlc := range confHTLCs.outgoingHTLCs {
10✔
2245
                remoteHtlcs[htlc.HtlcIndex] = struct{}{}
2✔
2246
        }
2✔
2247

2248
        // With the remote HTLCs assembled, we'll mark any HTLCs only on the
2249
        // remote pending commitment to be failed asap.
2250
        actionMap := make(ChainActionMap)
8✔
2251
        for _, htlc := range danglingHTLCs.outgoingHTLCs {
12✔
2252
                if _, ok := remoteHtlcs[htlc.HtlcIndex]; ok {
4✔
UNCOV
2253
                        continue
×
2254
                }
2255

2256
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
4✔
2257
                if err != nil {
4✔
2258
                        log.Errorf("ChannelArbitrator(%v): failed to query "+
×
2259
                                "preimage for dangling htlc=%x from remote "+
×
2260
                                "commitments diff", c.cfg.ChanPoint,
×
2261
                                htlc.RHash[:])
×
2262

×
2263
                        continue
×
2264
                }
2265

2266
                if preimageAvailable {
4✔
2267
                        continue
×
2268
                }
2269

2270
                // Dust HTLCs on the remote commitment can be failed back.
2271
                if htlc.OutputIndex < 0 {
4✔
2272
                        log.Infof("ChannelArbitrator(%v): fail dangling dust "+
×
2273
                                "htlc=%x from remote commitments diff",
×
2274
                                c.cfg.ChanPoint, htlc.RHash[:])
×
2275

×
2276
                        actionMap[HtlcFailDustAction] = append(
×
2277
                                actionMap[HtlcFailDustAction], htlc,
×
2278
                        )
×
2279

×
2280
                        continue
×
2281
                }
2282

2283
                actionMap[HtlcFailDanglingAction] = append(
4✔
2284
                        actionMap[HtlcFailDanglingAction], htlc,
4✔
2285
                )
4✔
2286

4✔
2287
                log.Infof("ChannelArbitrator(%v): fail dangling htlc=%x from "+
4✔
2288
                        "remote commitments diff",
4✔
2289
                        c.cfg.ChanPoint, htlc.RHash[:])
4✔
2290
        }
2291

2292
        return actionMap
8✔
2293
}
2294

2295
// constructChainActions returns the set of actions that should be taken for
2296
// confirmed HTLCs at the specified height. Our actions will depend on the set
2297
// of HTLCs that were active across all channels at the time of channel
2298
// closure.
2299
func (c *ChannelArbitrator) constructChainActions(confCommitSet *CommitSet,
2300
        height uint32, trigger transitionTrigger) (ChainActionMap, error) {
21✔
2301

21✔
2302
        // If we've reached this point and have not confirmed commitment set,
21✔
2303
        // then this is an older node that had a pending close channel before
21✔
2304
        // the CommitSet was introduced. In this case, we'll just return the
21✔
2305
        // existing ChainActionMap they had on disk.
21✔
2306
        if confCommitSet == nil || confCommitSet.ConfCommitKey.IsNone() {
28✔
2307
                return c.log.FetchChainActions()
7✔
2308
        }
7✔
2309

2310
        // Otherwise, we have the full commitment set written to disk, and can
2311
        // proceed as normal.
2312
        htlcSets := confCommitSet.toActiveHTLCSets()
14✔
2313
        confCommitKey, err := confCommitSet.ConfCommitKey.UnwrapOrErr(
14✔
2314
                fmt.Errorf("no commitKey available"),
14✔
2315
        )
14✔
2316
        if err != nil {
14✔
2317
                return nil, err
×
2318
        }
×
2319

2320
        switch confCommitKey {
14✔
2321
        // If the local commitment transaction confirmed, then we'll examine
2322
        // that as well as their commitments to the set of chain actions.
2323
        case LocalHtlcSet:
6✔
2324
                return c.checkLocalChainActions(
6✔
2325
                        height, trigger, htlcSets, true,
6✔
2326
                )
6✔
2327

2328
        // If the remote commitment confirmed, then we'll grab all the chain
2329
        // actions for the remote commit, and check the pending commit for any
2330
        // HTLCS we need to handle immediately (dust).
2331
        case RemoteHtlcSet:
6✔
2332
                return c.checkRemoteChainActions(
6✔
2333
                        height, trigger, htlcSets, false,
6✔
2334
                )
6✔
2335

2336
        // Otherwise, the remote pending commitment confirmed, so we'll examine
2337
        // the HTLCs on that unrevoked dangling commitment.
2338
        case RemotePendingHtlcSet:
2✔
2339
                return c.checkRemoteChainActions(
2✔
2340
                        height, trigger, htlcSets, true,
2✔
2341
                )
2✔
2342
        }
2343

2344
        return nil, fmt.Errorf("unable to locate chain actions")
×
2345
}
2346

2347
// prepContractResolutions is called either in the case that we decide we need
2348
// to go to chain, or the remote party goes to chain. Given a set of actions we
2349
// need to take for each HTLC, this method will return a set of contract
2350
// resolvers that will resolve the contracts on-chain if needed, and also a set
2351
// of packets to send to the htlcswitch in order to ensure all incoming HTLC's
2352
// are properly resolved.
2353
func (c *ChannelArbitrator) prepContractResolutions(
2354
        contractResolutions *ContractResolutions, height uint32,
2355
        htlcActions ChainActionMap) ([]ContractResolver, error) {
12✔
2356

12✔
2357
        // We'll also fetch the historical state of this channel, as it should
12✔
2358
        // have been marked as closed by now, and supplement it to each resolver
12✔
2359
        // such that we can properly resolve our pending contracts.
12✔
2360
        var chanState *channeldb.OpenChannel
12✔
2361
        chanState, err := c.cfg.FetchHistoricalChannel()
12✔
2362
        switch {
12✔
2363
        // If we don't find this channel, then it may be the case that it
2364
        // was closed before we started to retain the final state
2365
        // information for open channels.
2366
        case err == channeldb.ErrNoHistoricalBucket:
×
2367
                fallthrough
×
2368
        case err == channeldb.ErrChannelNotFound:
×
2369
                log.Warnf("ChannelArbitrator(%v): unable to fetch historical "+
×
2370
                        "state", c.cfg.ChanPoint)
×
2371

2372
        case err != nil:
×
2373
                return nil, err
×
2374
        }
2375

2376
        incomingResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
12✔
2377
        outgoingResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
12✔
2378

12✔
2379
        // We'll use these two maps to quickly look up an active HTLC with its
12✔
2380
        // matching HTLC resolution.
12✔
2381
        outResolutionMap := make(map[wire.OutPoint]lnwallet.OutgoingHtlcResolution)
12✔
2382
        inResolutionMap := make(map[wire.OutPoint]lnwallet.IncomingHtlcResolution)
12✔
2383
        for i := 0; i < len(incomingResolutions); i++ {
12✔
UNCOV
2384
                inRes := incomingResolutions[i]
×
UNCOV
2385
                inResolutionMap[inRes.HtlcPoint()] = inRes
×
UNCOV
2386
        }
×
2387
        for i := 0; i < len(outgoingResolutions); i++ {
13✔
2388
                outRes := outgoingResolutions[i]
1✔
2389
                outResolutionMap[outRes.HtlcPoint()] = outRes
1✔
2390
        }
1✔
2391

2392
        // We'll create the resolver kit that we'll be cloning for each
2393
        // resolver so they each can do their duty.
2394
        resolverCfg := ResolverConfig{
12✔
2395
                ChannelArbitratorConfig: c.cfg,
12✔
2396
                Checkpoint: func(res ContractResolver,
12✔
2397
                        reports ...*channeldb.ResolverReport) error {
14✔
2398

2✔
2399
                        return c.log.InsertUnresolvedContracts(reports, res)
2✔
2400
                },
2✔
2401
        }
2402

2403
        commitHash := contractResolutions.CommitHash
12✔
2404

12✔
2405
        var htlcResolvers []ContractResolver
12✔
2406

12✔
2407
        // We instantiate an anchor resolver if the commitment tx has an
12✔
2408
        // anchor.
12✔
2409
        if contractResolutions.AnchorResolution != nil {
14✔
2410
                anchorResolver := newAnchorResolver(
2✔
2411
                        contractResolutions.AnchorResolution.AnchorSignDescriptor,
2✔
2412
                        contractResolutions.AnchorResolution.CommitAnchor,
2✔
2413
                        height, c.cfg.ChanPoint, resolverCfg,
2✔
2414
                )
2✔
2415
                anchorResolver.SupplementState(chanState)
2✔
2416

2✔
2417
                htlcResolvers = append(htlcResolvers, anchorResolver)
2✔
2418
        }
2✔
2419

2420
        // If this is a breach close, we'll create a breach resolver, determine
2421
        // the htlc's to fail back, and exit. This is done because the other
2422
        // steps taken for non-breach-closes do not matter for breach-closes.
2423
        if contractResolutions.BreachResolution != nil {
14✔
2424
                breachResolver := newBreachResolver(resolverCfg)
2✔
2425
                htlcResolvers = append(htlcResolvers, breachResolver)
2✔
2426

2✔
2427
                return htlcResolvers, nil
2✔
2428
        }
2✔
2429

2430
        // For each HTLC, we'll either act immediately, meaning we'll instantly
2431
        // fail the HTLC, or we'll act only once the transaction has been
2432
        // confirmed, in which case we'll need an HTLC resolver.
2433
        for htlcAction, htlcs := range htlcActions {
21✔
2434
                switch htlcAction {
11✔
2435
                // If we can claim this HTLC, we'll create an HTLC resolver to
2436
                // claim the HTLC (second-level or directly), then add the pre
2437
                case HtlcClaimAction:
×
2438
                        for _, htlc := range htlcs {
×
2439
                                htlc := htlc
×
2440

×
2441
                                htlcOp := wire.OutPoint{
×
2442
                                        Hash:  commitHash,
×
2443
                                        Index: uint32(htlc.OutputIndex),
×
2444
                                }
×
2445

×
2446
                                resolution, ok := inResolutionMap[htlcOp]
×
2447
                                if !ok {
×
2448
                                        // TODO(roasbeef): panic?
×
2449
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2450
                                                "incoming resolution: %v",
×
2451
                                                c.cfg.ChanPoint, htlcOp)
×
2452
                                        continue
×
2453
                                }
2454

2455
                                resolver := newSuccessResolver(
×
2456
                                        resolution, height, htlc, resolverCfg,
×
2457
                                )
×
2458
                                if chanState != nil {
×
2459
                                        resolver.SupplementState(chanState)
×
2460
                                }
×
2461
                                htlcResolvers = append(htlcResolvers, resolver)
×
2462
                        }
2463

2464
                // If we can timeout the HTLC directly, then we'll create the
2465
                // proper resolver to do so, who will then cancel the packet
2466
                // backwards.
UNCOV
2467
                case HtlcTimeoutAction:
×
UNCOV
2468
                        for _, htlc := range htlcs {
×
UNCOV
2469
                                htlc := htlc
×
UNCOV
2470

×
UNCOV
2471
                                htlcOp := wire.OutPoint{
×
UNCOV
2472
                                        Hash:  commitHash,
×
UNCOV
2473
                                        Index: uint32(htlc.OutputIndex),
×
UNCOV
2474
                                }
×
UNCOV
2475

×
UNCOV
2476
                                resolution, ok := outResolutionMap[htlcOp]
×
UNCOV
2477
                                if !ok {
×
2478
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2479
                                                "outgoing resolution: %v", c.cfg.ChanPoint, htlcOp)
×
2480
                                        continue
×
2481
                                }
2482

UNCOV
2483
                                resolver := newTimeoutResolver(
×
UNCOV
2484
                                        resolution, height, htlc, resolverCfg,
×
UNCOV
2485
                                )
×
UNCOV
2486
                                if chanState != nil {
×
UNCOV
2487
                                        resolver.SupplementState(chanState)
×
UNCOV
2488
                                }
×
2489

2490
                                // For outgoing HTLCs, we will also need to
2491
                                // supplement the resolver with the expiry
2492
                                // block height of its corresponding incoming
2493
                                // HTLC.
UNCOV
2494
                                deadline := c.cfg.FindOutgoingHTLCDeadline(htlc)
×
UNCOV
2495
                                resolver.SupplementDeadline(deadline)
×
UNCOV
2496

×
UNCOV
2497
                                htlcResolvers = append(htlcResolvers, resolver)
×
2498
                        }
2499

2500
                // If this is an incoming HTLC, but we can't act yet, then
2501
                // we'll create an incoming resolver to redeem the HTLC if we
2502
                // learn of the pre-image, or let the remote party time out.
UNCOV
2503
                case HtlcIncomingWatchAction:
×
UNCOV
2504
                        for _, htlc := range htlcs {
×
UNCOV
2505
                                htlc := htlc
×
UNCOV
2506

×
UNCOV
2507
                                htlcOp := wire.OutPoint{
×
UNCOV
2508
                                        Hash:  commitHash,
×
UNCOV
2509
                                        Index: uint32(htlc.OutputIndex),
×
UNCOV
2510
                                }
×
UNCOV
2511

×
UNCOV
2512
                                // TODO(roasbeef): need to handle incoming dust...
×
UNCOV
2513

×
UNCOV
2514
                                // TODO(roasbeef): can't be negative!!!
×
UNCOV
2515
                                resolution, ok := inResolutionMap[htlcOp]
×
UNCOV
2516
                                if !ok {
×
2517
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2518
                                                "incoming resolution: %v",
×
2519
                                                c.cfg.ChanPoint, htlcOp)
×
2520
                                        continue
×
2521
                                }
2522

UNCOV
2523
                                resolver := newIncomingContestResolver(
×
UNCOV
2524
                                        resolution, height, htlc,
×
UNCOV
2525
                                        resolverCfg,
×
UNCOV
2526
                                )
×
UNCOV
2527
                                if chanState != nil {
×
UNCOV
2528
                                        resolver.SupplementState(chanState)
×
UNCOV
2529
                                }
×
UNCOV
2530
                                htlcResolvers = append(htlcResolvers, resolver)
×
2531
                        }
2532

2533
                // Finally, if this is an outgoing HTLC we've sent, then we'll
2534
                // launch a resolver to watch for the pre-image (and settle
2535
                // backwards), or just timeout.
2536
                case HtlcOutgoingWatchAction:
1✔
2537
                        for _, htlc := range htlcs {
2✔
2538
                                htlc := htlc
1✔
2539

1✔
2540
                                htlcOp := wire.OutPoint{
1✔
2541
                                        Hash:  commitHash,
1✔
2542
                                        Index: uint32(htlc.OutputIndex),
1✔
2543
                                }
1✔
2544

1✔
2545
                                resolution, ok := outResolutionMap[htlcOp]
1✔
2546
                                if !ok {
1✔
2547
                                        log.Errorf("ChannelArbitrator(%v) "+
×
2548
                                                "unable to find outgoing "+
×
2549
                                                "resolution: %v",
×
2550
                                                c.cfg.ChanPoint, htlcOp)
×
2551

×
2552
                                        continue
×
2553
                                }
2554

2555
                                resolver := newOutgoingContestResolver(
1✔
2556
                                        resolution, height, htlc, resolverCfg,
1✔
2557
                                )
1✔
2558
                                if chanState != nil {
2✔
2559
                                        resolver.SupplementState(chanState)
1✔
2560
                                }
1✔
2561

2562
                                // For outgoing HTLCs, we will also need to
2563
                                // supplement the resolver with the expiry
2564
                                // block height of its corresponding incoming
2565
                                // HTLC.
2566
                                deadline := c.cfg.FindOutgoingHTLCDeadline(htlc)
1✔
2567
                                resolver.SupplementDeadline(deadline)
1✔
2568

1✔
2569
                                htlcResolvers = append(htlcResolvers, resolver)
1✔
2570
                        }
2571
                }
2572
        }
2573

2574
        // If this is was an unilateral closure, then we'll also create a
2575
        // resolver to sweep our commitment output (but only if it wasn't
2576
        // trimmed).
2577
        if contractResolutions.CommitResolution != nil {
10✔
UNCOV
2578
                resolver := newCommitSweepResolver(
×
UNCOV
2579
                        *contractResolutions.CommitResolution, height,
×
UNCOV
2580
                        c.cfg.ChanPoint, resolverCfg,
×
UNCOV
2581
                )
×
UNCOV
2582
                if chanState != nil {
×
UNCOV
2583
                        resolver.SupplementState(chanState)
×
UNCOV
2584
                }
×
UNCOV
2585
                htlcResolvers = append(htlcResolvers, resolver)
×
2586
        }
2587

2588
        return htlcResolvers, nil
10✔
2589
}
2590

2591
// replaceResolver replaces a in the list of active resolvers. If the resolver
2592
// to be replaced is not found, it returns an error.
2593
func (c *ChannelArbitrator) replaceResolver(oldResolver,
2594
        newResolver ContractResolver) error {
1✔
2595

1✔
2596
        c.activeResolversLock.Lock()
1✔
2597
        defer c.activeResolversLock.Unlock()
1✔
2598

1✔
2599
        oldKey := oldResolver.ResolverKey()
1✔
2600
        for i, r := range c.activeResolvers {
2✔
2601
                if bytes.Equal(r.ResolverKey(), oldKey) {
2✔
2602
                        c.activeResolvers[i] = newResolver
1✔
2603
                        return nil
1✔
2604
                }
1✔
2605
        }
2606

2607
        return errors.New("resolver to be replaced not found")
×
2608
}
2609

2610
// resolveContract is a goroutine tasked with fully resolving an unresolved
2611
// contract. Either the initial contract will be resolved after a single step,
2612
// or the contract will itself create another contract to be resolved. In
2613
// either case, one the contract has been fully resolved, we'll signal back to
2614
// the main goroutine so it can properly keep track of the set of unresolved
2615
// contracts.
2616
//
2617
// NOTE: This MUST be run as a goroutine.
2618
func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver) {
6✔
2619
        defer c.wg.Done()
6✔
2620

6✔
2621
        log.Debugf("ChannelArbitrator(%v): attempting to resolve %T",
6✔
2622
                c.cfg.ChanPoint, currentContract)
6✔
2623

6✔
2624
        // Until the contract is fully resolved, we'll continue to iteratively
6✔
2625
        // resolve the contract one step at a time.
6✔
2626
        for !currentContract.IsResolved() {
13✔
2627
                log.Debugf("ChannelArbitrator(%v): contract %T not yet resolved",
7✔
2628
                        c.cfg.ChanPoint, currentContract)
7✔
2629

7✔
2630
                select {
7✔
2631

2632
                // If we've been signalled to quit, then we'll exit early.
2633
                case <-c.quit:
×
2634
                        return
×
2635

2636
                default:
7✔
2637
                        // Otherwise, we'll attempt to resolve the current
7✔
2638
                        // contract.
7✔
2639
                        nextContract, err := currentContract.Resolve()
7✔
2640
                        if err != nil {
8✔
2641
                                if err == errResolverShuttingDown {
2✔
2642
                                        return
1✔
2643
                                }
1✔
2644

UNCOV
2645
                                log.Errorf("ChannelArbitrator(%v): unable to "+
×
UNCOV
2646
                                        "progress %T: %v",
×
UNCOV
2647
                                        c.cfg.ChanPoint, currentContract, err)
×
UNCOV
2648
                                return
×
2649
                        }
2650

2651
                        switch {
6✔
2652
                        // If this contract produced another, then this means
2653
                        // the current contract was only able to be partially
2654
                        // resolved in this step. So we'll do a contract swap
2655
                        // within our logs: the new contract will take the
2656
                        // place of the old one.
2657
                        case nextContract != nil:
1✔
2658
                                log.Debugf("ChannelArbitrator(%v): swapping "+
1✔
2659
                                        "out contract %T for %T ",
1✔
2660
                                        c.cfg.ChanPoint, currentContract,
1✔
2661
                                        nextContract)
1✔
2662

1✔
2663
                                // Swap contract in log.
1✔
2664
                                err := c.log.SwapContract(
1✔
2665
                                        currentContract, nextContract,
1✔
2666
                                )
1✔
2667
                                if err != nil {
1✔
2668
                                        log.Errorf("unable to add recurse "+
×
2669
                                                "contract: %v", err)
×
2670
                                }
×
2671

2672
                                // Swap contract in resolvers list. This is to
2673
                                // make sure that reports are queried from the
2674
                                // new resolver.
2675
                                err = c.replaceResolver(
1✔
2676
                                        currentContract, nextContract,
1✔
2677
                                )
1✔
2678
                                if err != nil {
1✔
2679
                                        log.Errorf("unable to replace "+
×
2680
                                                "contract: %v", err)
×
2681
                                }
×
2682

2683
                                // As this contract produced another, we'll
2684
                                // re-assign, so we can continue our resolution
2685
                                // loop.
2686
                                currentContract = nextContract
1✔
2687

1✔
2688
                                // Launch the new contract.
1✔
2689
                                err = currentContract.Launch()
1✔
2690
                                if err != nil {
1✔
NEW
2691
                                        log.Errorf("Failed to launch %T: %v",
×
NEW
2692
                                                currentContract, err)
×
NEW
2693
                                }
×
2694

2695
                        // If this contract is actually fully resolved, then
2696
                        // we'll mark it as such within the database.
2697
                        case currentContract.IsResolved():
5✔
2698
                                log.Debugf("ChannelArbitrator(%v): marking "+
5✔
2699
                                        "contract %T fully resolved",
5✔
2700
                                        c.cfg.ChanPoint, currentContract)
5✔
2701

5✔
2702
                                err := c.log.ResolveContract(currentContract)
5✔
2703
                                if err != nil {
5✔
2704
                                        log.Errorf("unable to resolve contract: %v",
×
2705
                                                err)
×
2706
                                }
×
2707

2708
                                // Now that the contract has been resolved,
2709
                                // well signal to the main goroutine.
2710
                                select {
5✔
2711
                                case c.resolutionSignal <- struct{}{}:
3✔
2712
                                case <-c.quit:
2✔
2713
                                        return
2✔
2714
                                }
2715
                        }
2716

2717
                }
2718
        }
2719
}
2720

2721
// signalUpdateMsg is a struct that carries fresh signals to the
2722
// ChannelArbitrator. We need to receive a message like this each time the
2723
// channel becomes active, as it's internal state may change.
2724
type signalUpdateMsg struct {
2725
        // newSignals is the set of new active signals to be sent to the
2726
        // arbitrator.
2727
        newSignals *ContractSignals
2728

2729
        // doneChan is a channel that will be closed on the arbitrator has
2730
        // attached the new signals.
2731
        doneChan chan struct{}
2732
}
2733

2734
// UpdateContractSignals updates the set of signals the ChannelArbitrator needs
2735
// to receive from a channel in real-time in order to keep in sync with the
2736
// latest state of the contract.
2737
func (c *ChannelArbitrator) UpdateContractSignals(newSignals *ContractSignals) {
11✔
2738
        done := make(chan struct{})
11✔
2739

11✔
2740
        select {
11✔
2741
        case c.signalUpdates <- &signalUpdateMsg{
2742
                newSignals: newSignals,
2743
                doneChan:   done,
2744
        }:
11✔
2745
        case <-c.quit:
×
2746
        }
2747

2748
        select {
11✔
2749
        case <-done:
11✔
2750
        case <-c.quit:
×
2751
        }
2752
}
2753

2754
// notifyContractUpdate updates the ChannelArbitrator's unmerged mappings such
2755
// that it can later be merged with activeHTLCs when calling
2756
// checkLocalChainActions or sweepAnchors. These are the only two places that
2757
// activeHTLCs is used.
2758
func (c *ChannelArbitrator) notifyContractUpdate(upd *ContractUpdate) {
12✔
2759
        c.unmergedMtx.Lock()
12✔
2760
        defer c.unmergedMtx.Unlock()
12✔
2761

12✔
2762
        // Update the mapping.
12✔
2763
        c.unmergedSet[upd.HtlcKey] = newHtlcSet(upd.Htlcs)
12✔
2764

12✔
2765
        log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v",
12✔
2766
                c.cfg.ChanPoint, lnutils.SpewLogClosure(upd))
12✔
2767
}
12✔
2768

2769
// updateActiveHTLCs merges the unmerged set of HTLCs from the link with
2770
// activeHTLCs.
2771
func (c *ChannelArbitrator) updateActiveHTLCs() {
73✔
2772
        c.unmergedMtx.RLock()
73✔
2773
        defer c.unmergedMtx.RUnlock()
73✔
2774

73✔
2775
        // Update the mapping.
73✔
2776
        c.activeHTLCs[LocalHtlcSet] = c.unmergedSet[LocalHtlcSet]
73✔
2777
        c.activeHTLCs[RemoteHtlcSet] = c.unmergedSet[RemoteHtlcSet]
73✔
2778

73✔
2779
        // If the pending set exists, update that as well.
73✔
2780
        if _, ok := c.unmergedSet[RemotePendingHtlcSet]; ok {
82✔
2781
                pendingSet := c.unmergedSet[RemotePendingHtlcSet]
9✔
2782
                c.activeHTLCs[RemotePendingHtlcSet] = pendingSet
9✔
2783
        }
9✔
2784
}
2785

2786
// channelAttendant is the primary goroutine that acts at the judicial
2787
// arbitrator between our channel state, the remote channel peer, and the
2788
// blockchain (Our judge). This goroutine will ensure that we faithfully execute
2789
// all clauses of our contract in the case that we need to go on-chain for a
2790
// dispute. Currently, two such conditions warrant our intervention: when an
2791
// outgoing HTLC is about to timeout, and when we know the pre-image for an
2792
// incoming HTLC, but it hasn't yet been settled off-chain. In these cases,
2793
// we'll: broadcast our commitment, cancel/settle any HTLC's backwards after
2794
// sufficient confirmation, and finally send our set of outputs to the UTXO
2795
// Nursery for incubation, and ultimate sweeping.
2796
//
2797
// NOTE: This MUST be run as a goroutine.
2798
//
2799
//nolint:funlen
2800
func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
2801
        commitSet *CommitSet) {
48✔
2802

48✔
2803
        // TODO(roasbeef): tell top chain arb we're done
48✔
2804
        defer func() {
93✔
2805
                c.wg.Done()
45✔
2806
        }()
45✔
2807

2808
        err := c.progressStateMachineAfterRestart(bestHeight, commitSet)
48✔
2809
        if err != nil {
49✔
2810
                // In case of an error, we return early but we do not shutdown
1✔
2811
                // LND, because there might be other channels that still can be
1✔
2812
                // resolved and we don't want to interfere with that.
1✔
2813
                // We continue to run the channel attendant in case the channel
1✔
2814
                // closes via other means for example the remote pary force
1✔
2815
                // closes the channel. So we log the error and continue.
1✔
2816
                log.Errorf("Unable to progress state machine after "+
1✔
2817
                        "restart: %v", err)
1✔
2818
        }
1✔
2819

2820
        for {
144✔
2821
                select {
96✔
2822

2823
                // A new block has arrived, we'll examine all the active HTLC's
2824
                // to see if any of them have expired, and also update our
2825
                // track of the best current height.
2826
                case beat := <-c.BlockbeatChan:
6✔
2827
                        bestHeight = beat.Height()
6✔
2828

6✔
2829
                        log.Debugf("ChannelArbitrator(%v): new block height=%v",
6✔
2830
                                c.cfg.ChanPoint, bestHeight)
6✔
2831

6✔
2832
                        err := c.handleBlockbeat(beat)
6✔
2833
                        if err != nil {
6✔
NEW
2834
                                log.Errorf("Handle block=%v got err: %v",
×
NEW
2835
                                        bestHeight, err)
×
UNCOV
2836
                        }
×
2837

2838
                        // If as a result of this trigger, the contract is
2839
                        // fully resolved, then well exit.
2840
                        if c.state == StateFullyResolved {
6✔
2841
                                return
×
2842
                        }
×
2843

2844
                // A new signal update was just sent. This indicates that the
2845
                // channel under watch is now live, and may modify its internal
2846
                // state, so we'll get the most up to date signals to we can
2847
                // properly do our job.
2848
                case signalUpdate := <-c.signalUpdates:
11✔
2849
                        log.Tracef("ChannelArbitrator(%v): got new signal "+
11✔
2850
                                "update!", c.cfg.ChanPoint)
11✔
2851

11✔
2852
                        // We'll update the ShortChannelID.
11✔
2853
                        c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID
11✔
2854

11✔
2855
                        // Now that the signal has been updated, we'll now
11✔
2856
                        // close the done channel to signal to the caller we've
11✔
2857
                        // registered the new ShortChannelID.
11✔
2858
                        close(signalUpdate.doneChan)
11✔
2859

2860
                // We've cooperatively closed the channel, so we're no longer
2861
                // needed. We'll mark the channel as resolved and exit.
2862
                case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
2✔
2863
                        log.Infof("ChannelArbitrator(%v) marking channel "+
2✔
2864
                                "cooperatively closed", c.cfg.ChanPoint)
2✔
2865

2✔
2866
                        err := c.cfg.MarkChannelClosed(
2✔
2867
                                closeInfo.ChannelCloseSummary,
2✔
2868
                                channeldb.ChanStatusCoopBroadcasted,
2✔
2869
                        )
2✔
2870
                        if err != nil {
2✔
2871
                                log.Errorf("Unable to mark channel closed: "+
×
2872
                                        "%v", err)
×
2873
                                return
×
2874
                        }
×
2875

2876
                        // We'll now advance our state machine until it reaches
2877
                        // a terminal state, and the channel is marked resolved.
2878
                        _, _, err = c.advanceState(
2✔
2879
                                closeInfo.CloseHeight, coopCloseTrigger, nil,
2✔
2880
                        )
2✔
2881
                        if err != nil {
3✔
2882
                                log.Errorf("Unable to advance state: %v", err)
1✔
2883
                                return
1✔
2884
                        }
1✔
2885

2886
                // We have broadcasted our commitment, and it is now confirmed
2887
                // on-chain.
2888
                case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
12✔
2889
                        if c.state != StateCommitmentBroadcasted {
13✔
2890
                                log.Errorf("ChannelArbitrator(%v): unexpected "+
1✔
2891
                                        "local on-chain channel close",
1✔
2892
                                        c.cfg.ChanPoint)
1✔
2893
                        }
1✔
2894

2895
                        closeTx := closeInfo.CloseTx
12✔
2896

12✔
2897
                        resolutions, err := closeInfo.ContractResolutions.
12✔
2898
                                UnwrapOrErr(
12✔
2899
                                        fmt.Errorf("resolutions not found"),
12✔
2900
                                )
12✔
2901
                        if err != nil {
12✔
2902
                                log.Errorf("ChannelArbitrator(%v): unable to "+
×
2903
                                        "get resolutions: %v", c.cfg.ChanPoint,
×
2904
                                        err)
×
2905

×
2906
                                return
×
2907
                        }
×
2908

2909
                        // We make sure that the htlc resolutions are present
2910
                        // otherwise we would panic dereferencing the pointer.
2911
                        //
2912
                        // TODO(ziggie): Refactor ContractResolutions to use
2913
                        // options.
2914
                        if resolutions.HtlcResolutions == nil {
12✔
2915
                                log.Errorf("ChannelArbitrator(%v): htlc "+
×
2916
                                        "resolutions not found",
×
2917
                                        c.cfg.ChanPoint)
×
2918

×
2919
                                return
×
2920
                        }
×
2921

2922
                        log.Infof("ChannelArbitrator(%v): local force close "+
12✔
2923
                                "tx=%v confirmed", c.cfg.ChanPoint,
12✔
2924
                                closeTx.TxHash())
12✔
2925

12✔
2926
                        contractRes := &ContractResolutions{
12✔
2927
                                CommitHash:       closeTx.TxHash(),
12✔
2928
                                CommitResolution: resolutions.CommitResolution,
12✔
2929
                                HtlcResolutions:  *resolutions.HtlcResolutions,
12✔
2930
                                AnchorResolution: resolutions.AnchorResolution,
12✔
2931
                        }
12✔
2932

12✔
2933
                        // When processing a unilateral close event, we'll
12✔
2934
                        // transition to the ContractClosed state. We'll log
12✔
2935
                        // out the set of resolutions such that they are
12✔
2936
                        // available to fetch in that state, we'll also write
12✔
2937
                        // the commit set so we can reconstruct our chain
12✔
2938
                        // actions on restart.
12✔
2939
                        err = c.log.LogContractResolutions(contractRes)
12✔
2940
                        if err != nil {
12✔
2941
                                log.Errorf("Unable to write resolutions: %v",
×
2942
                                        err)
×
2943
                                return
×
2944
                        }
×
2945
                        err = c.log.InsertConfirmedCommitSet(
12✔
2946
                                &closeInfo.CommitSet,
12✔
2947
                        )
12✔
2948
                        if err != nil {
12✔
2949
                                log.Errorf("Unable to write commit set: %v",
×
2950
                                        err)
×
2951
                                return
×
2952
                        }
×
2953

2954
                        // After the set of resolutions are successfully
2955
                        // logged, we can safely close the channel. After this
2956
                        // succeeds we won't be getting chain events anymore,
2957
                        // so we must make sure we can recover on restart after
2958
                        // it is marked closed. If the next state transition
2959
                        // fails, we'll start up in the prior state again, and
2960
                        // we won't be longer getting chain events. In this
2961
                        // case we must manually re-trigger the state
2962
                        // transition into StateContractClosed based on the
2963
                        // close status of the channel.
2964
                        err = c.cfg.MarkChannelClosed(
12✔
2965
                                closeInfo.ChannelCloseSummary,
12✔
2966
                                channeldb.ChanStatusLocalCloseInitiator,
12✔
2967
                        )
12✔
2968
                        if err != nil {
12✔
2969
                                log.Errorf("Unable to mark "+
×
2970
                                        "channel closed: %v", err)
×
2971
                                return
×
2972
                        }
×
2973

2974
                        // We'll now advance our state machine until it reaches
2975
                        // a terminal state.
2976
                        _, _, err = c.advanceState(
12✔
2977
                                uint32(closeInfo.SpendingHeight),
12✔
2978
                                localCloseTrigger, &closeInfo.CommitSet,
12✔
2979
                        )
12✔
2980
                        if err != nil {
13✔
2981
                                log.Errorf("Unable to advance state: %v", err)
1✔
2982
                        }
1✔
2983

2984
                // The remote party has broadcast the commitment on-chain.
2985
                // We'll examine our state to determine if we need to act at
2986
                // all.
2987
                case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
8✔
2988
                        log.Infof("ChannelArbitrator(%v): remote party has "+
8✔
2989
                                "closed channel out on-chain", c.cfg.ChanPoint)
8✔
2990

8✔
2991
                        // If we don't have a self output, and there are no
8✔
2992
                        // active HTLC's, then we can immediately mark the
8✔
2993
                        // contract as fully resolved and exit.
8✔
2994
                        contractRes := &ContractResolutions{
8✔
2995
                                CommitHash:       *uniClosure.SpenderTxHash,
8✔
2996
                                CommitResolution: uniClosure.CommitResolution,
8✔
2997
                                HtlcResolutions:  *uniClosure.HtlcResolutions,
8✔
2998
                                AnchorResolution: uniClosure.AnchorResolution,
8✔
2999
                        }
8✔
3000

8✔
3001
                        // When processing a unilateral close event, we'll
8✔
3002
                        // transition to the ContractClosed state. We'll log
8✔
3003
                        // out the set of resolutions such that they are
8✔
3004
                        // available to fetch in that state, we'll also write
8✔
3005
                        // the commit set so we can reconstruct our chain
8✔
3006
                        // actions on restart.
8✔
3007
                        err := c.log.LogContractResolutions(contractRes)
8✔
3008
                        if err != nil {
9✔
3009
                                log.Errorf("Unable to write resolutions: %v",
1✔
3010
                                        err)
1✔
3011
                                return
1✔
3012
                        }
1✔
3013
                        err = c.log.InsertConfirmedCommitSet(
7✔
3014
                                &uniClosure.CommitSet,
7✔
3015
                        )
7✔
3016
                        if err != nil {
7✔
3017
                                log.Errorf("Unable to write commit set: %v",
×
3018
                                        err)
×
3019
                                return
×
3020
                        }
×
3021

3022
                        // After the set of resolutions are successfully
3023
                        // logged, we can safely close the channel. After this
3024
                        // succeeds we won't be getting chain events anymore,
3025
                        // so we must make sure we can recover on restart after
3026
                        // it is marked closed. If the next state transition
3027
                        // fails, we'll start up in the prior state again, and
3028
                        // we won't be longer getting chain events. In this
3029
                        // case we must manually re-trigger the state
3030
                        // transition into StateContractClosed based on the
3031
                        // close status of the channel.
3032
                        closeSummary := &uniClosure.ChannelCloseSummary
7✔
3033
                        err = c.cfg.MarkChannelClosed(
7✔
3034
                                closeSummary,
7✔
3035
                                channeldb.ChanStatusRemoteCloseInitiator,
7✔
3036
                        )
7✔
3037
                        if err != nil {
8✔
3038
                                log.Errorf("Unable to mark channel closed: %v",
1✔
3039
                                        err)
1✔
3040
                                return
1✔
3041
                        }
1✔
3042

3043
                        // We'll now advance our state machine until it reaches
3044
                        // a terminal state.
3045
                        _, _, err = c.advanceState(
6✔
3046
                                uint32(uniClosure.SpendingHeight),
6✔
3047
                                remoteCloseTrigger, &uniClosure.CommitSet,
6✔
3048
                        )
6✔
3049
                        if err != nil {
8✔
3050
                                log.Errorf("Unable to advance state: %v", err)
2✔
3051
                        }
2✔
3052

3053
                // The remote has breached the channel. As this is handled by
3054
                // the ChainWatcher and BreachArbitrator, we don't have to do
3055
                // anything in particular, so just advance our state and
3056
                // gracefully exit.
3057
                case breachInfo := <-c.cfg.ChainEvents.ContractBreach:
1✔
3058
                        log.Infof("ChannelArbitrator(%v): remote party has "+
1✔
3059
                                "breached channel!", c.cfg.ChanPoint)
1✔
3060

1✔
3061
                        // In the breach case, we'll only have anchor and
1✔
3062
                        // breach resolutions.
1✔
3063
                        contractRes := &ContractResolutions{
1✔
3064
                                CommitHash:       breachInfo.CommitHash,
1✔
3065
                                BreachResolution: breachInfo.BreachResolution,
1✔
3066
                                AnchorResolution: breachInfo.AnchorResolution,
1✔
3067
                        }
1✔
3068

1✔
3069
                        // We'll transition to the ContractClosed state and log
1✔
3070
                        // the set of resolutions such that they can be turned
1✔
3071
                        // into resolvers later on. We'll also insert the
1✔
3072
                        // CommitSet of the latest set of commitments.
1✔
3073
                        err := c.log.LogContractResolutions(contractRes)
1✔
3074
                        if err != nil {
1✔
3075
                                log.Errorf("Unable to write resolutions: %v",
×
3076
                                        err)
×
3077
                                return
×
3078
                        }
×
3079
                        err = c.log.InsertConfirmedCommitSet(
1✔
3080
                                &breachInfo.CommitSet,
1✔
3081
                        )
1✔
3082
                        if err != nil {
1✔
3083
                                log.Errorf("Unable to write commit set: %v",
×
3084
                                        err)
×
3085
                                return
×
3086
                        }
×
3087

3088
                        // The channel is finally marked pending closed here as
3089
                        // the BreachArbitrator and channel arbitrator have
3090
                        // persisted the relevant states.
3091
                        closeSummary := &breachInfo.CloseSummary
1✔
3092
                        err = c.cfg.MarkChannelClosed(
1✔
3093
                                closeSummary,
1✔
3094
                                channeldb.ChanStatusRemoteCloseInitiator,
1✔
3095
                        )
1✔
3096
                        if err != nil {
1✔
3097
                                log.Errorf("Unable to mark channel closed: %v",
×
3098
                                        err)
×
3099
                                return
×
3100
                        }
×
3101

3102
                        log.Infof("Breached channel=%v marked pending-closed",
1✔
3103
                                breachInfo.BreachResolution.FundingOutPoint)
1✔
3104

1✔
3105
                        // We'll advance our state machine until it reaches a
1✔
3106
                        // terminal state.
1✔
3107
                        _, _, err = c.advanceState(
1✔
3108
                                uint32(bestHeight), breachCloseTrigger,
1✔
3109
                                &breachInfo.CommitSet,
1✔
3110
                        )
1✔
3111
                        if err != nil {
1✔
3112
                                log.Errorf("Unable to advance state: %v", err)
×
3113
                        }
×
3114

3115
                // A new contract has just been resolved, we'll now check our
3116
                // log to see if all contracts have been resolved. If so, then
3117
                // we can exit as the contract is fully resolved.
3118
                case <-c.resolutionSignal:
3✔
3119
                        log.Infof("ChannelArbitrator(%v): a contract has been "+
3✔
3120
                                "fully resolved!", c.cfg.ChanPoint)
3✔
3121

3✔
3122
                        nextState, _, err := c.advanceState(
3✔
3123
                                uint32(bestHeight), chainTrigger, nil,
3✔
3124
                        )
3✔
3125
                        if err != nil {
3✔
3126
                                log.Errorf("Unable to advance state: %v", err)
×
3127
                        }
×
3128

3129
                        // If we don't have anything further to do after
3130
                        // advancing our state, then we'll exit.
3131
                        if nextState == StateFullyResolved {
6✔
3132
                                log.Infof("ChannelArbitrator(%v): all "+
3✔
3133
                                        "contracts fully resolved, exiting",
3✔
3134
                                        c.cfg.ChanPoint)
3✔
3135

3✔
3136
                                return
3✔
3137
                        }
3✔
3138

3139
                // We've just received a request to forcibly close out the
3140
                // channel. We'll
3141
                case closeReq := <-c.forceCloseReqs:
11✔
3142
                        log.Infof("ChannelArbitrator(%v): received force "+
11✔
3143
                                "close request", c.cfg.ChanPoint)
11✔
3144

11✔
3145
                        if c.state != StateDefault {
12✔
3146
                                select {
1✔
3147
                                case closeReq.closeTx <- nil:
1✔
3148
                                case <-c.quit:
×
3149
                                }
3150

3151
                                select {
1✔
3152
                                case closeReq.errResp <- errAlreadyForceClosed:
1✔
3153
                                case <-c.quit:
×
3154
                                }
3155

3156
                                continue
1✔
3157
                        }
3158

3159
                        nextState, closeTx, err := c.advanceState(
10✔
3160
                                uint32(bestHeight), userTrigger, nil,
10✔
3161
                        )
10✔
3162
                        if err != nil {
11✔
3163
                                log.Errorf("Unable to advance state: %v", err)
1✔
3164
                        }
1✔
3165

3166
                        select {
10✔
3167
                        case closeReq.closeTx <- closeTx:
10✔
3168
                        case <-c.quit:
×
3169
                                return
×
3170
                        }
3171

3172
                        select {
10✔
3173
                        case closeReq.errResp <- err:
10✔
3174
                        case <-c.quit:
×
3175
                                return
×
3176
                        }
3177

3178
                        // If we don't have anything further to do after
3179
                        // advancing our state, then we'll exit.
3180
                        if nextState == StateFullyResolved {
10✔
3181
                                log.Infof("ChannelArbitrator(%v): all "+
×
3182
                                        "contracts resolved, exiting",
×
3183
                                        c.cfg.ChanPoint)
×
3184
                                return
×
3185
                        }
×
3186

3187
                case <-c.quit:
39✔
3188
                        return
39✔
3189
                }
3190
        }
3191
}
3192

3193
// handleBlockbeat processes a newly received blockbeat by advancing the
3194
// arbitrator's internal state using the received block height.
3195
func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
6✔
3196
        // Notify we've processed the block.
6✔
3197
        defer c.NotifyBlockProcessed(beat, nil)
6✔
3198

6✔
3199
        // Try to advance the state if we are in StateDefault.
6✔
3200
        if c.state == StateDefault {
11✔
3201
                // Now that a new block has arrived, we'll attempt to advance
5✔
3202
                // our state forward.
5✔
3203
                _, _, err := c.advanceState(
5✔
3204
                        uint32(beat.Height()), chainTrigger, nil,
5✔
3205
                )
5✔
3206
                if err != nil {
5✔
NEW
3207
                        return fmt.Errorf("unable to advance state: %w", err)
×
NEW
3208
                }
×
3209
        }
3210

3211
        // Launch all active resolvers when a new blockbeat is received.
3212
        c.launchResolvers()
6✔
3213

6✔
3214
        return nil
6✔
3215
}
3216

3217
// Name returns a human-readable string for this subsystem.
3218
//
3219
// NOTE: Part of chainio.Consumer interface.
3220
func (c *ChannelArbitrator) Name() string {
51✔
3221
        return fmt.Sprintf("ChannelArbitrator(%v)", c.cfg.ChanPoint)
51✔
3222
}
51✔
3223

3224
// checkLegacyBreach returns StateFullyResolved if the channel was closed with
3225
// a breach transaction before the channel arbitrator launched its own breach
3226
// resolver. StateContractClosed is returned if this is a modern breach close
3227
// with a breach resolver. StateError is returned if the log lookup failed.
3228
func (c *ChannelArbitrator) checkLegacyBreach() (ArbitratorState, error) {
2✔
3229
        // A previous version of the channel arbitrator would make the breach
2✔
3230
        // close skip to StateFullyResolved. If there are no contract
2✔
3231
        // resolutions in the bolt arbitrator log, then this is an older breach
2✔
3232
        // close. Otherwise, if there are resolutions, the state should advance
2✔
3233
        // to StateContractClosed.
2✔
3234
        _, err := c.log.FetchContractResolutions()
2✔
3235
        if err == errNoResolutions {
2✔
3236
                // This is an older breach close still in the database.
×
3237
                return StateFullyResolved, nil
×
3238
        } else if err != nil {
2✔
3239
                return StateError, err
×
3240
        }
×
3241

3242
        // This is a modern breach close with resolvers.
3243
        return StateContractClosed, nil
2✔
3244
}
3245

3246
// sweepRequest wraps the arguments used when calling `SweepInput`.
3247
type sweepRequest struct {
3248
        // input is the input to be swept.
3249
        input input.Input
3250

3251
        // params holds the sweeping parameters.
3252
        params sweep.Params
3253
}
3254

3255
// createSweepRequest creates an anchor sweeping request for a particular
3256
// version (local/remote/remote pending) of the commitment.
3257
func (c *ChannelArbitrator) createSweepRequest(
3258
        anchor *lnwallet.AnchorResolution, htlcs htlcSet, anchorPath string,
3259
        heightHint uint32) (sweepRequest, error) {
8✔
3260

8✔
3261
        // Use the chan id as the exclusive group. This prevents any of the
8✔
3262
        // anchors from being batched together.
8✔
3263
        exclusiveGroup := c.cfg.ShortChanID.ToUint64()
8✔
3264

8✔
3265
        // Find the deadline for this specific anchor.
8✔
3266
        deadline, value, err := c.findCommitmentDeadlineAndValue(
8✔
3267
                heightHint, htlcs,
8✔
3268
        )
8✔
3269
        if err != nil {
8✔
3270
                return sweepRequest{}, err
×
3271
        }
×
3272

3273
        // If we cannot find a deadline, it means there's no HTLCs at stake,
3274
        // which means we can relax our anchor sweeping conditions as we don't
3275
        // have any time sensitive outputs to sweep. However we need to
3276
        // register the anchor output with the sweeper so we are later able to
3277
        // bump the close fee.
3278
        if deadline.IsNone() {
11✔
3279
                log.Infof("ChannelArbitrator(%v): no HTLCs at stake, "+
3✔
3280
                        "sweeping anchor with default deadline",
3✔
3281
                        c.cfg.ChanPoint)
3✔
3282
        }
3✔
3283

3284
        witnessType := input.CommitmentAnchor
8✔
3285

8✔
3286
        // For taproot channels, we need to use the proper witness type.
8✔
3287
        if txscript.IsPayToTaproot(
8✔
3288
                anchor.AnchorSignDescriptor.Output.PkScript,
8✔
3289
        ) {
8✔
UNCOV
3290

×
UNCOV
3291
                witnessType = input.TaprootAnchorSweepSpend
×
UNCOV
3292
        }
×
3293

3294
        // Prepare anchor output for sweeping.
3295
        anchorInput := input.MakeBaseInput(
8✔
3296
                &anchor.CommitAnchor,
8✔
3297
                witnessType,
8✔
3298
                &anchor.AnchorSignDescriptor,
8✔
3299
                heightHint,
8✔
3300
                &input.TxInfo{
8✔
3301
                        Fee:    anchor.CommitFee,
8✔
3302
                        Weight: anchor.CommitWeight,
8✔
3303
                },
8✔
3304
        )
8✔
3305

8✔
3306
        // If we have a deadline, we'll use it to calculate the deadline
8✔
3307
        // height, otherwise default to none.
8✔
3308
        deadlineDesc := "None"
8✔
3309
        deadlineHeight := fn.MapOption(func(d int32) int32 {
13✔
3310
                deadlineDesc = fmt.Sprintf("%d", d)
5✔
3311

5✔
3312
                return d + int32(heightHint)
5✔
3313
        })(deadline)
5✔
3314

3315
        // Calculate the budget based on the value under protection, which is
3316
        // the sum of all HTLCs on this commitment subtracted by their budgets.
3317
        // The anchor output in itself has a small output value of 330 sats so
3318
        // we also include it in the budget to pay for the cpfp transaction.
3319
        budget := calculateBudget(
8✔
3320
                value, c.cfg.Budget.AnchorCPFPRatio, c.cfg.Budget.AnchorCPFP,
8✔
3321
        ) + AnchorOutputValue
8✔
3322

8✔
3323
        log.Infof("ChannelArbitrator(%v): offering anchor from %s commitment "+
8✔
3324
                "%v to sweeper with deadline=%v, budget=%v", c.cfg.ChanPoint,
8✔
3325
                anchorPath, anchor.CommitAnchor, deadlineDesc, budget)
8✔
3326

8✔
3327
        // Sweep anchor output with a confirmation target fee preference.
8✔
3328
        // Because this is a cpfp-operation, the anchor will only be attempted
8✔
3329
        // to sweep when the current fee estimate for the confirmation target
8✔
3330
        // exceeds the commit fee rate.
8✔
3331
        return sweepRequest{
8✔
3332
                input: &anchorInput,
8✔
3333
                params: sweep.Params{
8✔
3334
                        ExclusiveGroup: &exclusiveGroup,
8✔
3335
                        Budget:         budget,
8✔
3336
                        DeadlineHeight: deadlineHeight,
8✔
3337
                },
8✔
3338
        }, nil
8✔
3339
}
3340

3341
// prepareAnchorSweeps creates a list of requests to be used by the sweeper for
3342
// all possible commitment versions.
3343
func (c *ChannelArbitrator) prepareAnchorSweeps(heightHint uint32,
3344
        anchors *lnwallet.AnchorResolutions) ([]sweepRequest, error) {
19✔
3345

19✔
3346
        // requests holds all the possible anchor sweep requests. We can have
19✔
3347
        // up to 3 different versions of commitments (local/remote/remote
19✔
3348
        // dangling) to be CPFPed by the anchors.
19✔
3349
        requests := make([]sweepRequest, 0, 3)
19✔
3350

19✔
3351
        // remotePendingReq holds the request for sweeping the anchor output on
19✔
3352
        // the remote pending commitment. It's only set when there's an actual
19✔
3353
        // pending remote commitment and it's used to decide whether we need to
19✔
3354
        // update the fee budget when sweeping the anchor output on the local
19✔
3355
        // commitment.
19✔
3356
        remotePendingReq := fn.None[sweepRequest]()
19✔
3357

19✔
3358
        // First we check on the remote pending commitment and optionally
19✔
3359
        // create an anchor sweeping request.
19✔
3360
        htlcs, ok := c.activeHTLCs[RemotePendingHtlcSet]
19✔
3361
        if ok && anchors.RemotePending != nil {
21✔
3362
                req, err := c.createSweepRequest(
2✔
3363
                        anchors.RemotePending, htlcs, "remote pending",
2✔
3364
                        heightHint,
2✔
3365
                )
2✔
3366
                if err != nil {
2✔
3367
                        return nil, err
×
3368
                }
×
3369

3370
                // Save the request.
3371
                requests = append(requests, req)
2✔
3372

2✔
3373
                // Set the optional variable.
2✔
3374
                remotePendingReq = fn.Some(req)
2✔
3375
        }
3376

3377
        // Check the local commitment and optionally create an anchor sweeping
3378
        // request. The params used in this request will be influenced by the
3379
        // anchor sweeping request made from the pending remote commitment.
3380
        htlcs, ok = c.activeHTLCs[LocalHtlcSet]
19✔
3381
        if ok && anchors.Local != nil {
22✔
3382
                req, err := c.createSweepRequest(
3✔
3383
                        anchors.Local, htlcs, "local", heightHint,
3✔
3384
                )
3✔
3385
                if err != nil {
3✔
3386
                        return nil, err
×
3387
                }
×
3388

3389
                // If there's an anchor sweeping request from the pending
3390
                // remote commitment, we will compare its budget against the
3391
                // budget used here and choose the params that has a larger
3392
                // budget. The deadline when choosing the remote pending budget
3393
                // instead of the local one will always be earlier or equal to
3394
                // the local deadline because outgoing HTLCs are resolved on
3395
                // the local commitment first before they are removed from the
3396
                // remote one.
3397
                remotePendingReq.WhenSome(func(s sweepRequest) {
5✔
3398
                        if s.params.Budget <= req.params.Budget {
3✔
3399
                                return
1✔
3400
                        }
1✔
3401

3402
                        log.Infof("ChannelArbitrator(%v): replaced local "+
1✔
3403
                                "anchor(%v) sweep params with pending remote "+
1✔
3404
                                "anchor sweep params, \nold:[%v], \nnew:[%v]",
1✔
3405
                                c.cfg.ChanPoint, anchors.Local.CommitAnchor,
1✔
3406
                                req.params, s.params)
1✔
3407

1✔
3408
                        req.params = s.params
1✔
3409
                })
3410

3411
                // Save the request.
3412
                requests = append(requests, req)
3✔
3413
        }
3414

3415
        // Check the remote commitment and create an anchor sweeping request if
3416
        // needed.
3417
        htlcs, ok = c.activeHTLCs[RemoteHtlcSet]
19✔
3418
        if ok && anchors.Remote != nil {
22✔
3419
                req, err := c.createSweepRequest(
3✔
3420
                        anchors.Remote, htlcs, "remote", heightHint,
3✔
3421
                )
3✔
3422
                if err != nil {
3✔
3423
                        return nil, err
×
3424
                }
×
3425

3426
                requests = append(requests, req)
3✔
3427
        }
3428

3429
        return requests, nil
19✔
3430
}
3431

3432
// failIncomingDust resolves the incoming dust HTLCs because they do not have
3433
// an output on the commitment transaction and cannot be resolved onchain. We
3434
// mark them as failed here.
3435
func (c *ChannelArbitrator) failIncomingDust(
3436
        incomingDustHTLCs []channeldb.HTLC) error {
10✔
3437

10✔
3438
        for _, htlc := range incomingDustHTLCs {
11✔
3439
                if !htlc.Incoming || htlc.OutputIndex >= 0 {
1✔
3440
                        return fmt.Errorf("htlc with index %v is not incoming "+
×
3441
                                "dust", htlc.OutputIndex)
×
3442
                }
×
3443

3444
                key := models.CircuitKey{
1✔
3445
                        ChanID: c.cfg.ShortChanID,
1✔
3446
                        HtlcID: htlc.HtlcIndex,
1✔
3447
                }
1✔
3448

1✔
3449
                // Mark this dust htlc as final failed.
1✔
3450
                chainArbCfg := c.cfg.ChainArbitratorConfig
1✔
3451
                err := chainArbCfg.PutFinalHtlcOutcome(
1✔
3452
                        key.ChanID, key.HtlcID, false,
1✔
3453
                )
1✔
3454
                if err != nil {
1✔
3455
                        return err
×
3456
                }
×
3457

3458
                // Send notification.
3459
                chainArbCfg.HtlcNotifier.NotifyFinalHtlcEvent(
1✔
3460
                        key,
1✔
3461
                        channeldb.FinalHtlcInfo{
1✔
3462
                                Settled:  false,
1✔
3463
                                Offchain: false,
1✔
3464
                        },
1✔
3465
                )
1✔
3466
        }
3467

3468
        return nil
10✔
3469
}
3470

3471
// abandonForwards cancels back the incoming HTLCs for their corresponding
3472
// outgoing HTLCs. We use a set here to avoid sending duplicate failure messages
3473
// for the same HTLC. This also needs to be done for locally initiated outgoing
3474
// HTLCs they are special cased in the switch.
3475
func (c *ChannelArbitrator) abandonForwards(htlcs fn.Set[uint64]) error {
13✔
3476
        log.Debugf("ChannelArbitrator(%v): cancelling back %v incoming "+
13✔
3477
                "HTLC(s)", c.cfg.ChanPoint,
13✔
3478
                len(htlcs))
13✔
3479

13✔
3480
        msgsToSend := make([]ResolutionMsg, 0, len(htlcs))
13✔
3481
        failureMsg := &lnwire.FailPermanentChannelFailure{}
13✔
3482

13✔
3483
        for idx := range htlcs {
23✔
3484
                failMsg := ResolutionMsg{
10✔
3485
                        SourceChan: c.cfg.ShortChanID,
10✔
3486
                        HtlcIndex:  idx,
10✔
3487
                        Failure:    failureMsg,
10✔
3488
                }
10✔
3489

10✔
3490
                msgsToSend = append(msgsToSend, failMsg)
10✔
3491
        }
10✔
3492

3493
        // Send the msges to the switch, if there are any.
3494
        if len(msgsToSend) == 0 {
16✔
3495
                return nil
3✔
3496
        }
3✔
3497

3498
        log.Debugf("ChannelArbitrator(%v): sending resolution message=%v",
10✔
3499
                c.cfg.ChanPoint, lnutils.SpewLogClosure(msgsToSend))
10✔
3500

10✔
3501
        err := c.cfg.DeliverResolutionMsg(msgsToSend...)
10✔
3502
        if err != nil {
10✔
3503
                log.Errorf("Unable to send resolution msges to switch: %v", err)
×
3504
                return err
×
3505
        }
×
3506

3507
        return nil
10✔
3508
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc