• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11136034567

02 Oct 2024 01:06AM UTC coverage: 58.817% (+0.003%) from 58.814%
11136034567

push

github

web-flow
Merge pull request #8644 from Roasbeef/remove-sql-mutex-part-deux

kvdb/postgres: remove global application level lock

130416 of 221731 relevant lines covered (58.82%)

28306.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.41
/contractcourt/channel_arbitrator.go
1
package contractcourt
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "math"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/txscript"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/channeldb/models"
19
        "github.com/lightningnetwork/lnd/fn"
20
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
21
        "github.com/lightningnetwork/lnd/input"
22
        "github.com/lightningnetwork/lnd/invoices"
23
        "github.com/lightningnetwork/lnd/kvdb"
24
        "github.com/lightningnetwork/lnd/labels"
25
        "github.com/lightningnetwork/lnd/lntypes"
26
        "github.com/lightningnetwork/lnd/lnutils"
27
        "github.com/lightningnetwork/lnd/lnwallet"
28
        "github.com/lightningnetwork/lnd/lnwire"
29
        "github.com/lightningnetwork/lnd/sweep"
30
)
31

32
var (
33
        // errAlreadyForceClosed is an error returned when we attempt to force
34
        // close a channel that's already in the process of doing so.
35
        errAlreadyForceClosed = errors.New("channel is already in the " +
36
                "process of being force closed")
37
)
38

39
const (
40
        // arbitratorBlockBufferSize is the size of the buffer we give to each
41
        // channel arbitrator.
42
        arbitratorBlockBufferSize = 20
43

44
        // AnchorOutputValue is the output value for the anchor output of an
45
        // anchor channel.
46
        // See BOLT 03 for more details:
47
        // https://github.com/lightning/bolts/blob/master/03-transactions.md
48
        AnchorOutputValue = btcutil.Amount(330)
49
)
50

51
// WitnessSubscription represents an intent to be notified once new witnesses
52
// are discovered by various active contract resolvers. A contract resolver may
53
// use this to be notified of when it can satisfy an incoming contract after we
54
// discover the witness for an outgoing contract.
55
type WitnessSubscription struct {
56
        // WitnessUpdates is a channel that newly discovered witnesses will be
57
        // sent over.
58
        //
59
        // TODO(roasbeef): couple with WitnessType?
60
        WitnessUpdates <-chan lntypes.Preimage
61

62
        // CancelSubscription is a function closure that should be used by a
63
        // client to cancel the subscription once they are no longer interested
64
        // in receiving new updates.
65
        CancelSubscription func()
66
}
67

68
// WitnessBeacon is a global beacon of witnesses. Contract resolvers will use
69
// this interface to lookup witnesses (preimages typically) of contracts
70
// they're trying to resolve, add new preimages they resolve, and finally
71
// receive new updates each new time a preimage is discovered.
72
//
73
// TODO(roasbeef): need to delete the pre-images once we've used them
74
// and have been sufficiently confirmed?
75
type WitnessBeacon interface {
76
        // SubscribeUpdates returns a channel that will be sent upon *each* time
77
        // a new preimage is discovered.
78
        SubscribeUpdates(chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
79
                payload *hop.Payload,
80
                nextHopOnionBlob []byte) (*WitnessSubscription, error)
81

82
        // LookupPreImage attempts to lookup a preimage in the global cache.
83
        // True is returned for the second argument if the preimage is found.
84
        LookupPreimage(payhash lntypes.Hash) (lntypes.Preimage, bool)
85

86
        // AddPreimages adds a batch of newly discovered preimages to the global
87
        // cache, and also signals any subscribers of the newly discovered
88
        // witness.
89
        AddPreimages(preimages ...lntypes.Preimage) error
90
}
91

92
// ArbChannel is an abstraction that allows the channel arbitrator to interact
93
// with an open channel.
94
type ArbChannel interface {
95
        // ForceCloseChan should force close the contract that this attendant
96
        // is watching over. We'll use this when we decide that we need to go
97
        // to chain. It should in addition tell the switch to remove the
98
        // corresponding link, such that we won't accept any new updates. The
99
        // returned summary contains all items needed to eventually resolve all
100
        // outputs on chain.
101
        ForceCloseChan() (*lnwallet.LocalForceCloseSummary, error)
102

103
        // NewAnchorResolutions returns the anchor resolutions for currently
104
        // valid commitment transactions.
105
        NewAnchorResolutions() (*lnwallet.AnchorResolutions, error)
106
}
107

108
// ChannelArbitratorConfig contains all the functionality that the
109
// ChannelArbitrator needs in order to properly arbitrate any contract dispute
110
// on chain.
111
type ChannelArbitratorConfig struct {
112
        // ChanPoint is the channel point that uniquely identifies this
113
        // channel.
114
        ChanPoint wire.OutPoint
115

116
        // Channel is the full channel data structure. For legacy channels, this
117
        // field may not always be set after a restart.
118
        Channel ArbChannel
119

120
        // ShortChanID describes the exact location of the channel within the
121
        // chain. We'll use this to address any messages that we need to send
122
        // to the switch during contract resolution.
123
        ShortChanID lnwire.ShortChannelID
124

125
        // ChainEvents is an active subscription to the chain watcher for this
126
        // channel to be notified of any on-chain activity related to this
127
        // channel.
128
        ChainEvents *ChainEventSubscription
129

130
        // MarkCommitmentBroadcasted should mark the channel as the commitment
131
        // being broadcast, and we are waiting for the commitment to confirm.
132
        MarkCommitmentBroadcasted func(*wire.MsgTx, lntypes.ChannelParty) error
133

134
        // MarkChannelClosed marks the channel closed in the database, with the
135
        // passed close summary. After this method successfully returns we can
136
        // no longer expect to receive chain events for this channel, and must
137
        // be able to recover from a failure without getting the close event
138
        // again. It takes an optional channel status which will update the
139
        // channel status in the record that we keep of historical channels.
140
        MarkChannelClosed func(*channeldb.ChannelCloseSummary,
141
                ...channeldb.ChannelStatus) error
142

143
        // IsPendingClose is a boolean indicating whether the channel is marked
144
        // as pending close in the database.
145
        IsPendingClose bool
146

147
        // ClosingHeight is the height at which the channel was closed. Note
148
        // that this value is only valid if IsPendingClose is true.
149
        ClosingHeight uint32
150

151
        // CloseType is the type of the close event in case IsPendingClose is
152
        // true. Otherwise this value is unset.
153
        CloseType channeldb.ClosureType
154

155
        // MarkChannelResolved is a function closure that serves to mark a
156
        // channel as "fully resolved". A channel itself can be considered
157
        // fully resolved once all active contracts have individually been
158
        // fully resolved.
159
        //
160
        // TODO(roasbeef): need RPC's to combine for pendingchannels RPC
161
        MarkChannelResolved func() error
162

163
        // PutResolverReport records a resolver report for the channel. If the
164
        // transaction provided is nil, the function should write the report
165
        // in a new transaction.
166
        PutResolverReport func(tx kvdb.RwTx,
167
                report *channeldb.ResolverReport) error
168

169
        // FetchHistoricalChannel retrieves the historical state of a channel.
170
        // This is mostly used to supplement the ContractResolvers with
171
        // additional information required for proper contract resolution.
172
        FetchHistoricalChannel func() (*channeldb.OpenChannel, error)
173

174
        // FindOutgoingHTLCDeadline returns the deadline in absolute block
175
        // height for the specified outgoing HTLC. For an outgoing HTLC, its
176
        // deadline is defined by the timeout height of its corresponding
177
        // incoming HTLC - this is the expiry height the that remote peer can
178
        // spend his/her outgoing HTLC via the timeout path.
179
        FindOutgoingHTLCDeadline func(htlc channeldb.HTLC) fn.Option[int32]
180

181
        ChainArbitratorConfig
182
}
183

184
// ReportOutputType describes the type of output that is being reported
185
// on.
186
type ReportOutputType uint8
187

188
const (
189
        // ReportOutputIncomingHtlc is an incoming hash time locked contract on
190
        // the commitment tx.
191
        ReportOutputIncomingHtlc ReportOutputType = iota
192

193
        // ReportOutputOutgoingHtlc is an outgoing hash time locked contract on
194
        // the commitment tx.
195
        ReportOutputOutgoingHtlc
196

197
        // ReportOutputUnencumbered is an uncontested output on the commitment
198
        // transaction paying to us directly.
199
        ReportOutputUnencumbered
200

201
        // ReportOutputAnchor is an anchor output on the commitment tx.
202
        ReportOutputAnchor
203
)
204

205
// ContractReport provides a summary of a commitment tx output.
206
type ContractReport struct {
207
        // Outpoint is the final output that will be swept back to the wallet.
208
        Outpoint wire.OutPoint
209

210
        // Type indicates the type of the reported output.
211
        Type ReportOutputType
212

213
        // Amount is the final value that will be swept in back to the wallet.
214
        Amount btcutil.Amount
215

216
        // MaturityHeight is the absolute block height that this output will
217
        // mature at.
218
        MaturityHeight uint32
219

220
        // Stage indicates whether the htlc is in the CLTV-timeout stage (1) or
221
        // the CSV-delay stage (2). A stage 1 htlc's maturity height will be set
222
        // to its expiry height, while a stage 2 htlc's maturity height will be
223
        // set to its confirmation height plus the maturity requirement.
224
        Stage uint32
225

226
        // LimboBalance is the total number of frozen coins within this
227
        // contract.
228
        LimboBalance btcutil.Amount
229

230
        // RecoveredBalance is the total value that has been successfully swept
231
        // back to the user's wallet.
232
        RecoveredBalance btcutil.Amount
233
}
234

235
// resolverReport creates a resolve report using some of the information in the
236
// contract report.
237
func (c *ContractReport) resolverReport(spendTx *chainhash.Hash,
238
        resolverType channeldb.ResolverType,
239
        outcome channeldb.ResolverOutcome) *channeldb.ResolverReport {
14✔
240

14✔
241
        return &channeldb.ResolverReport{
14✔
242
                OutPoint:        c.Outpoint,
14✔
243
                Amount:          c.Amount,
14✔
244
                ResolverType:    resolverType,
14✔
245
                ResolverOutcome: outcome,
14✔
246
                SpendTxID:       spendTx,
14✔
247
        }
14✔
248
}
14✔
249

250
// htlcSet represents the set of active HTLCs on a given commitment
251
// transaction.
252
type htlcSet struct {
253
        // incomingHTLCs is a map of all incoming HTLCs on the target
254
        // commitment transaction. We may potentially go onchain to claim the
255
        // funds sent to us within this set.
256
        incomingHTLCs map[uint64]channeldb.HTLC
257

258
        // outgoingHTLCs is a map of all outgoing HTLCs on the target
259
        // commitment transaction. We may potentially go onchain to reclaim the
260
        // funds that are currently in limbo.
261
        outgoingHTLCs map[uint64]channeldb.HTLC
262
}
263

264
// newHtlcSet constructs a new HTLC set from a slice of HTLC's.
265
func newHtlcSet(htlcs []channeldb.HTLC) htlcSet {
50✔
266
        outHTLCs := make(map[uint64]channeldb.HTLC)
50✔
267
        inHTLCs := make(map[uint64]channeldb.HTLC)
50✔
268
        for _, htlc := range htlcs {
85✔
269
                if htlc.Incoming {
45✔
270
                        inHTLCs[htlc.HtlcIndex] = htlc
10✔
271
                        continue
10✔
272
                }
273

274
                outHTLCs[htlc.HtlcIndex] = htlc
29✔
275
        }
276

277
        return htlcSet{
50✔
278
                incomingHTLCs: inHTLCs,
50✔
279
                outgoingHTLCs: outHTLCs,
50✔
280
        }
50✔
281
}
282

283
// HtlcSetKey is a two-tuple that uniquely identifies a set of HTLCs on a
284
// commitment transaction.
285
type HtlcSetKey struct {
286
        // IsRemote denotes if the HTLCs are on the remote commitment
287
        // transaction.
288
        IsRemote bool
289

290
        // IsPending denotes if the commitment transaction that HTLCS are on
291
        // are pending (the higher of two unrevoked commitments).
292
        IsPending bool
293
}
294

295
var (
296
        // LocalHtlcSet is the HtlcSetKey used for local commitments.
297
        LocalHtlcSet = HtlcSetKey{IsRemote: false, IsPending: false}
298

299
        // RemoteHtlcSet is the HtlcSetKey used for remote commitments.
300
        RemoteHtlcSet = HtlcSetKey{IsRemote: true, IsPending: false}
301

302
        // RemotePendingHtlcSet is the HtlcSetKey used for dangling remote
303
        // commitment transactions.
304
        RemotePendingHtlcSet = HtlcSetKey{IsRemote: true, IsPending: true}
305
)
306

307
// String returns a human readable string describing the target HtlcSetKey.
308
func (h HtlcSetKey) String() string {
12✔
309
        switch h {
12✔
310
        case LocalHtlcSet:
8✔
311
                return "LocalHtlcSet"
8✔
312
        case RemoteHtlcSet:
6✔
313
                return "RemoteHtlcSet"
6✔
314
        case RemotePendingHtlcSet:
2✔
315
                return "RemotePendingHtlcSet"
2✔
316
        default:
×
317
                return "unknown HtlcSetKey"
×
318
        }
319
}
320

321
// ChannelArbitrator is the on-chain arbitrator for a particular channel. The
322
// struct will keep in sync with the current set of HTLCs on the commitment
323
// transaction. The job of the attendant is to go on-chain to either settle or
324
// cancel an HTLC as necessary iff: an HTLC times out, or we known the
325
// pre-image to an HTLC, but it wasn't settled by the link off-chain. The
326
// ChannelArbitrator will factor in an expected confirmation delta when
327
// broadcasting to ensure that we avoid any possibility of race conditions, and
328
// sweep the output(s) without contest.
329
type ChannelArbitrator struct {
330
        started int32 // To be used atomically.
331
        stopped int32 // To be used atomically.
332

333
        // startTimestamp is the time when this ChannelArbitrator was started.
334
        startTimestamp time.Time
335

336
        // log is a persistent log that the attendant will use to checkpoint
337
        // its next action, and the state of any unresolved contracts.
338
        log ArbitratorLog
339

340
        // activeHTLCs is the set of active incoming/outgoing HTLC's on all
341
        // currently valid commitment transactions.
342
        activeHTLCs map[HtlcSetKey]htlcSet
343

344
        // unmergedSet is used to update the activeHTLCs map in two callsites:
345
        // checkLocalChainActions and sweepAnchors. It contains the latest
346
        // updates from the link. It is not deleted from, its entries may be
347
        // replaced on subsequent calls to notifyContractUpdate.
348
        unmergedSet map[HtlcSetKey]htlcSet
349
        unmergedMtx sync.RWMutex
350

351
        // cfg contains all the functionality that the ChannelArbitrator requires
352
        // to do its duty.
353
        cfg ChannelArbitratorConfig
354

355
        // blocks is a channel that the arbitrator will receive new blocks on.
356
        // This channel should be buffered by so that it does not block the
357
        // sender.
358
        blocks chan int32
359

360
        // signalUpdates is a channel that any new live signals for the channel
361
        // we're watching over will be sent.
362
        signalUpdates chan *signalUpdateMsg
363

364
        // activeResolvers is a slice of any active resolvers. This is used to
365
        // be able to signal them for shutdown in the case that we shutdown.
366
        activeResolvers []ContractResolver
367

368
        // activeResolversLock prevents simultaneous read and write to the
369
        // resolvers slice.
370
        activeResolversLock sync.RWMutex
371

372
        // resolutionSignal is a channel that will be sent upon by contract
373
        // resolvers once their contract has been fully resolved. With each
374
        // send, we'll check to see if the contract is fully resolved.
375
        resolutionSignal chan struct{}
376

377
        // forceCloseReqs is a channel that requests to forcibly close the
378
        // contract will be sent over.
379
        forceCloseReqs chan *forceCloseReq
380

381
        // state is the current state of the arbitrator. This state is examined
382
        // upon start up to decide which actions to take.
383
        state ArbitratorState
384

385
        wg   sync.WaitGroup
386
        quit chan struct{}
387
}
388

389
// NewChannelArbitrator returns a new instance of a ChannelArbitrator backed by
390
// the passed config struct.
391
func NewChannelArbitrator(cfg ChannelArbitratorConfig,
392
        htlcSets map[HtlcSetKey]htlcSet, log ArbitratorLog) *ChannelArbitrator {
54✔
393

54✔
394
        // Create a new map for unmerged HTLC's as we will overwrite the values
54✔
395
        // and want to avoid modifying activeHTLCs directly. This soft copying
54✔
396
        // is done to ensure that activeHTLCs isn't reset as an empty map later
54✔
397
        // on.
54✔
398
        unmerged := make(map[HtlcSetKey]htlcSet)
54✔
399
        unmerged[LocalHtlcSet] = htlcSets[LocalHtlcSet]
54✔
400
        unmerged[RemoteHtlcSet] = htlcSets[RemoteHtlcSet]
54✔
401

54✔
402
        // If the pending set exists, write that as well.
54✔
403
        if _, ok := htlcSets[RemotePendingHtlcSet]; ok {
54✔
404
                unmerged[RemotePendingHtlcSet] = htlcSets[RemotePendingHtlcSet]
×
405
        }
×
406

407
        return &ChannelArbitrator{
54✔
408
                log:              log,
54✔
409
                blocks:           make(chan int32, arbitratorBlockBufferSize),
54✔
410
                signalUpdates:    make(chan *signalUpdateMsg),
54✔
411
                resolutionSignal: make(chan struct{}),
54✔
412
                forceCloseReqs:   make(chan *forceCloseReq),
54✔
413
                activeHTLCs:      htlcSets,
54✔
414
                unmergedSet:      unmerged,
54✔
415
                cfg:              cfg,
54✔
416
                quit:             make(chan struct{}),
54✔
417
        }
54✔
418
}
419

420
// chanArbStartState contains the information from disk that we need to start
421
// up a channel arbitrator.
422
type chanArbStartState struct {
423
        currentState ArbitratorState
424
        commitSet    *CommitSet
425
}
426

427
// getStartState retrieves the information from disk that our channel arbitrator
428
// requires to start.
429
func (c *ChannelArbitrator) getStartState(tx kvdb.RTx) (*chanArbStartState,
430
        error) {
52✔
431

52✔
432
        // First, we'll read our last state from disk, so our internal state
52✔
433
        // machine can act accordingly.
52✔
434
        state, err := c.log.CurrentState(tx)
52✔
435
        if err != nil {
52✔
436
                return nil, err
×
437
        }
×
438

439
        // Next we'll fetch our confirmed commitment set. This will only exist
440
        // if the channel has been closed out on chain for modern nodes. For
441
        // older nodes, this won't be found at all, and will rely on the
442
        // existing written chain actions. Additionally, if this channel hasn't
443
        // logged any actions in the log, then this field won't be present.
444
        commitSet, err := c.log.FetchConfirmedCommitSet(tx)
52✔
445
        if err != nil && err != errNoCommitSet && err != errScopeBucketNoExist {
52✔
446
                return nil, err
×
447
        }
×
448

449
        return &chanArbStartState{
52✔
450
                currentState: state,
52✔
451
                commitSet:    commitSet,
52✔
452
        }, nil
52✔
453
}
454

455
// Start starts all the goroutines that the ChannelArbitrator needs to operate.
456
// If takes a start state, which will be looked up on disk if it is not
457
// provided.
458
func (c *ChannelArbitrator) Start(state *chanArbStartState) error {
52✔
459
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
52✔
460
                return nil
×
461
        }
×
462
        c.startTimestamp = c.cfg.Clock.Now()
52✔
463

52✔
464
        // If the state passed in is nil, we look it up now.
52✔
465
        if state == nil {
93✔
466
                var err error
41✔
467
                state, err = c.getStartState(nil)
41✔
468
                if err != nil {
41✔
469
                        return err
×
470
                }
×
471
        }
472

473
        log.Debugf("Starting ChannelArbitrator(%v), htlc_set=%v, state=%v",
52✔
474
                c.cfg.ChanPoint, lnutils.SpewLogClosure(c.activeHTLCs),
52✔
475
                state.currentState)
52✔
476

52✔
477
        // Set our state from our starting state.
52✔
478
        c.state = state.currentState
52✔
479

52✔
480
        _, bestHeight, err := c.cfg.ChainIO.GetBestBlock()
52✔
481
        if err != nil {
52✔
482
                return err
×
483
        }
×
484

485
        // If the channel has been marked pending close in the database, and we
486
        // haven't transitioned the state machine to StateContractClosed (or a
487
        // succeeding state), then a state transition most likely failed. We'll
488
        // try to recover from this by manually advancing the state by setting
489
        // the corresponding close trigger.
490
        trigger := chainTrigger
52✔
491
        triggerHeight := uint32(bestHeight)
52✔
492
        if c.cfg.IsPendingClose {
61✔
493
                switch c.state {
9✔
494
                case StateDefault:
4✔
495
                        fallthrough
4✔
496
                case StateBroadcastCommit:
5✔
497
                        fallthrough
5✔
498
                case StateCommitmentBroadcasted:
5✔
499
                        switch c.cfg.CloseType {
5✔
500

501
                        case channeldb.CooperativeClose:
1✔
502
                                trigger = coopCloseTrigger
1✔
503

504
                        case channeldb.BreachClose:
1✔
505
                                trigger = breachCloseTrigger
1✔
506

507
                        case channeldb.LocalForceClose:
1✔
508
                                trigger = localCloseTrigger
1✔
509

510
                        case channeldb.RemoteForceClose:
2✔
511
                                trigger = remoteCloseTrigger
2✔
512
                        }
513

514
                        log.Warnf("ChannelArbitrator(%v): detected stalled "+
5✔
515
                                "state=%v for closed channel",
5✔
516
                                c.cfg.ChanPoint, c.state)
5✔
517
                }
518

519
                triggerHeight = c.cfg.ClosingHeight
9✔
520
        }
521

522
        log.Infof("ChannelArbitrator(%v): starting state=%v, trigger=%v, "+
52✔
523
                "triggerHeight=%v", c.cfg.ChanPoint, c.state, trigger,
52✔
524
                triggerHeight)
52✔
525

52✔
526
        // We'll now attempt to advance our state forward based on the current
52✔
527
        // on-chain state, and our set of active contracts.
52✔
528
        startingState := c.state
52✔
529
        nextState, _, err := c.advanceState(
52✔
530
                triggerHeight, trigger, state.commitSet,
52✔
531
        )
52✔
532
        if err != nil {
54✔
533
                switch err {
2✔
534

535
                // If we detect that we tried to fetch resolutions, but failed,
536
                // this channel was marked closed in the database before
537
                // resolutions successfully written. In this case there is not
538
                // much we can do, so we don't return the error.
539
                case errScopeBucketNoExist:
×
540
                        fallthrough
×
541
                case errNoResolutions:
1✔
542
                        log.Warnf("ChannelArbitrator(%v): detected closed"+
1✔
543
                                "channel with no contract resolutions written.",
1✔
544
                                c.cfg.ChanPoint)
1✔
545

546
                default:
1✔
547
                        return err
1✔
548
                }
549
        }
550

551
        // If we start and ended at the awaiting full resolution state, then
552
        // we'll relaunch our set of unresolved contracts.
553
        if startingState == StateWaitingFullResolution &&
51✔
554
                nextState == StateWaitingFullResolution {
56✔
555

5✔
556
                // In order to relaunch the resolvers, we'll need to fetch the
5✔
557
                // set of HTLCs that were present in the commitment transaction
5✔
558
                // at the time it was confirmed. commitSet.ConfCommitKey can't
5✔
559
                // be nil at this point since we're in
5✔
560
                // StateWaitingFullResolution. We can only be in
5✔
561
                // StateWaitingFullResolution after we've transitioned from
5✔
562
                // StateContractClosed which can only be triggered by the local
5✔
563
                // or remote close trigger. This trigger is only fired when we
5✔
564
                // receive a chain event from the chain watcher that the
5✔
565
                // commitment has been confirmed on chain, and before we
5✔
566
                // advance our state step, we call InsertConfirmedCommitSet.
5✔
567
                err := c.relaunchResolvers(state.commitSet, triggerHeight)
5✔
568
                if err != nil {
5✔
569
                        return err
×
570
                }
×
571
        }
572

573
        c.wg.Add(1)
51✔
574
        go c.channelAttendant(bestHeight)
51✔
575
        return nil
51✔
576
}
577

578
// maybeAugmentTaprootResolvers will update the contract resolution information
579
// for taproot channels. This ensures that all the resolvers have the latest
580
// resolution, which may also include data such as the control block and tap
581
// tweaks.
582
func maybeAugmentTaprootResolvers(chanType channeldb.ChannelType,
583
        resolver ContractResolver,
584
        contractResolutions *ContractResolutions) {
5✔
585

5✔
586
        if !chanType.IsTaproot() {
10✔
587
                return
5✔
588
        }
5✔
589

590
        // The on disk resolutions contains all the ctrl block
591
        // information, so we'll set that now for the relevant
592
        // resolvers.
593
        switch r := resolver.(type) {
4✔
594
        case *commitSweepResolver:
4✔
595
                if contractResolutions.CommitResolution != nil {
8✔
596
                        //nolint:lll
4✔
597
                        r.commitResolution = *contractResolutions.CommitResolution
4✔
598
                }
4✔
599
        case *htlcOutgoingContestResolver:
4✔
600
                //nolint:lll
4✔
601
                htlcResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
4✔
602
                for _, htlcRes := range htlcResolutions {
8✔
603
                        htlcRes := htlcRes
4✔
604

4✔
605
                        if r.htlcResolution.ClaimOutpoint ==
4✔
606
                                htlcRes.ClaimOutpoint {
8✔
607

4✔
608
                                r.htlcResolution = htlcRes
4✔
609
                        }
4✔
610
                }
611

612
        case *htlcTimeoutResolver:
4✔
613
                //nolint:lll
4✔
614
                htlcResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
4✔
615
                for _, htlcRes := range htlcResolutions {
8✔
616
                        htlcRes := htlcRes
4✔
617

4✔
618
                        if r.htlcResolution.ClaimOutpoint ==
4✔
619
                                htlcRes.ClaimOutpoint {
8✔
620

4✔
621
                                r.htlcResolution = htlcRes
4✔
622
                        }
4✔
623
                }
624

625
        case *htlcIncomingContestResolver:
4✔
626
                //nolint:lll
4✔
627
                htlcResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
4✔
628
                for _, htlcRes := range htlcResolutions {
8✔
629
                        htlcRes := htlcRes
4✔
630

4✔
631
                        if r.htlcResolution.ClaimOutpoint ==
4✔
632
                                htlcRes.ClaimOutpoint {
8✔
633

4✔
634
                                r.htlcResolution = htlcRes
4✔
635
                        }
4✔
636
                }
637
        case *htlcSuccessResolver:
×
638
                //nolint:lll
×
639
                htlcResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
×
640
                for _, htlcRes := range htlcResolutions {
×
641
                        htlcRes := htlcRes
×
642

×
643
                        if r.htlcResolution.ClaimOutpoint ==
×
644
                                htlcRes.ClaimOutpoint {
×
645

×
646
                                r.htlcResolution = htlcRes
×
647
                        }
×
648
                }
649
        }
650
}
651

652
// relauchResolvers relaunches the set of resolvers for unresolved contracts in
653
// order to provide them with information that's not immediately available upon
654
// starting the ChannelArbitrator. This information should ideally be stored in
655
// the database, so this only serves as a intermediate work-around to prevent a
656
// migration.
657
func (c *ChannelArbitrator) relaunchResolvers(commitSet *CommitSet,
658
        heightHint uint32) error {
5✔
659

5✔
660
        // We'll now query our log to see if there are any active unresolved
5✔
661
        // contracts. If this is the case, then we'll relaunch all contract
5✔
662
        // resolvers.
5✔
663
        unresolvedContracts, err := c.log.FetchUnresolvedContracts()
5✔
664
        if err != nil {
5✔
665
                return err
×
666
        }
×
667

668
        // Retrieve the commitment tx hash from the log.
669
        contractResolutions, err := c.log.FetchContractResolutions()
5✔
670
        if err != nil {
5✔
671
                log.Errorf("unable to fetch contract resolutions: %v",
×
672
                        err)
×
673
                return err
×
674
        }
×
675
        commitHash := contractResolutions.CommitHash
5✔
676

5✔
677
        // In prior versions of lnd, the information needed to supplement the
5✔
678
        // resolvers (in most cases, the full amount of the HTLC) was found in
5✔
679
        // the chain action map, which is now deprecated.  As a result, if the
5✔
680
        // commitSet is nil (an older node with unresolved HTLCs at time of
5✔
681
        // upgrade), then we'll use the chain action information in place. The
5✔
682
        // chain actions may exclude some information, but we cannot recover it
5✔
683
        // for these older nodes at the moment.
5✔
684
        var confirmedHTLCs []channeldb.HTLC
5✔
685
        if commitSet != nil {
10✔
686
                confirmedHTLCs = commitSet.HtlcSets[*commitSet.ConfCommitKey]
5✔
687
        } else {
5✔
688
                chainActions, err := c.log.FetchChainActions()
×
689
                if err != nil {
×
690
                        log.Errorf("unable to fetch chain actions: %v", err)
×
691
                        return err
×
692
                }
×
693
                for _, htlcs := range chainActions {
×
694
                        confirmedHTLCs = append(confirmedHTLCs, htlcs...)
×
695
                }
×
696
        }
697

698
        // Reconstruct the htlc outpoints and data from the chain action log.
699
        // The purpose of the constructed htlc map is to supplement to
700
        // resolvers restored from database with extra data. Ideally this data
701
        // is stored as part of the resolver in the log. This is a workaround
702
        // to prevent a db migration. We use all available htlc sets here in
703
        // order to ensure we have complete coverage.
704
        htlcMap := make(map[wire.OutPoint]*channeldb.HTLC)
5✔
705
        for _, htlc := range confirmedHTLCs {
12✔
706
                htlc := htlc
7✔
707
                outpoint := wire.OutPoint{
7✔
708
                        Hash:  commitHash,
7✔
709
                        Index: uint32(htlc.OutputIndex),
7✔
710
                }
7✔
711
                htlcMap[outpoint] = &htlc
7✔
712
        }
7✔
713

714
        // We'll also fetch the historical state of this channel, as it should
715
        // have been marked as closed by now, and supplement it to each resolver
716
        // such that we can properly resolve our pending contracts.
717
        var chanState *channeldb.OpenChannel
5✔
718
        chanState, err = c.cfg.FetchHistoricalChannel()
5✔
719
        switch {
5✔
720
        // If we don't find this channel, then it may be the case that it
721
        // was closed before we started to retain the final state
722
        // information for open channels.
723
        case err == channeldb.ErrNoHistoricalBucket:
×
724
                fallthrough
×
725
        case err == channeldb.ErrChannelNotFound:
×
726
                log.Warnf("ChannelArbitrator(%v): unable to fetch historical "+
×
727
                        "state", c.cfg.ChanPoint)
×
728

729
        case err != nil:
×
730
                return err
×
731
        }
732

733
        log.Infof("ChannelArbitrator(%v): relaunching %v contract "+
5✔
734
                "resolvers", c.cfg.ChanPoint, len(unresolvedContracts))
5✔
735

5✔
736
        for i := range unresolvedContracts {
10✔
737
                resolver := unresolvedContracts[i]
5✔
738

5✔
739
                if chanState != nil {
10✔
740
                        resolver.SupplementState(chanState)
5✔
741

5✔
742
                        // For taproot channels, we'll need to also make sure
5✔
743
                        // the control block information was set properly.
5✔
744
                        maybeAugmentTaprootResolvers(
5✔
745
                                chanState.ChanType, resolver,
5✔
746
                                contractResolutions,
5✔
747
                        )
5✔
748
                }
5✔
749

750
                unresolvedContracts[i] = resolver
5✔
751

5✔
752
                htlcResolver, ok := resolver.(htlcContractResolver)
5✔
753
                if !ok {
9✔
754
                        continue
4✔
755
                }
756

757
                htlcPoint := htlcResolver.HtlcPoint()
5✔
758
                htlc, ok := htlcMap[htlcPoint]
5✔
759
                if !ok {
5✔
760
                        return fmt.Errorf(
×
761
                                "htlc resolver %T unavailable", resolver,
×
762
                        )
×
763
                }
×
764

765
                htlcResolver.Supplement(*htlc)
5✔
766

5✔
767
                // If this is an outgoing HTLC, we will also need to supplement
5✔
768
                // the resolver with the expiry block height of its
5✔
769
                // corresponding incoming HTLC.
5✔
770
                if !htlc.Incoming {
10✔
771
                        deadline := c.cfg.FindOutgoingHTLCDeadline(*htlc)
5✔
772
                        htlcResolver.SupplementDeadline(deadline)
5✔
773
                }
5✔
774
        }
775

776
        // The anchor resolver is stateless and can always be re-instantiated.
777
        if contractResolutions.AnchorResolution != nil {
9✔
778
                anchorResolver := newAnchorResolver(
4✔
779
                        contractResolutions.AnchorResolution.AnchorSignDescriptor,
4✔
780
                        contractResolutions.AnchorResolution.CommitAnchor,
4✔
781
                        heightHint, c.cfg.ChanPoint,
4✔
782
                        ResolverConfig{
4✔
783
                                ChannelArbitratorConfig: c.cfg,
4✔
784
                        },
4✔
785
                )
4✔
786

4✔
787
                anchorResolver.SupplementState(chanState)
4✔
788

4✔
789
                unresolvedContracts = append(unresolvedContracts, anchorResolver)
4✔
790

4✔
791
                // TODO(roasbeef): this isn't re-launched?
4✔
792
        }
4✔
793

794
        c.launchResolvers(unresolvedContracts, true)
5✔
795

5✔
796
        return nil
5✔
797
}
798

799
// Report returns htlc reports for the active resolvers.
800
func (c *ChannelArbitrator) Report() []*ContractReport {
4✔
801
        c.activeResolversLock.RLock()
4✔
802
        defer c.activeResolversLock.RUnlock()
4✔
803

4✔
804
        var reports []*ContractReport
4✔
805
        for _, resolver := range c.activeResolvers {
8✔
806
                r, ok := resolver.(reportingContractResolver)
4✔
807
                if !ok {
4✔
808
                        continue
×
809
                }
810

811
                report := r.report()
4✔
812
                if report == nil {
8✔
813
                        continue
4✔
814
                }
815

816
                reports = append(reports, report)
4✔
817
        }
818

819
        return reports
4✔
820
}
821

822
// Stop signals the ChannelArbitrator for a graceful shutdown.
823
func (c *ChannelArbitrator) Stop() error {
54✔
824
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
60✔
825
                return nil
6✔
826
        }
6✔
827

828
        log.Debugf("Stopping ChannelArbitrator(%v)", c.cfg.ChanPoint)
48✔
829

48✔
830
        if c.cfg.ChainEvents.Cancel != nil {
63✔
831
                go c.cfg.ChainEvents.Cancel()
15✔
832
        }
15✔
833

834
        c.activeResolversLock.RLock()
48✔
835
        for _, activeResolver := range c.activeResolvers {
58✔
836
                activeResolver.Stop()
10✔
837
        }
10✔
838
        c.activeResolversLock.RUnlock()
48✔
839

48✔
840
        close(c.quit)
48✔
841
        c.wg.Wait()
48✔
842

48✔
843
        return nil
48✔
844
}
845

846
// transitionTrigger is an enum that denotes exactly *why* a state transition
847
// was initiated. This is useful as depending on the initial trigger, we may
848
// skip certain states as those actions are expected to have already taken
849
// place as a result of the external trigger.
850
type transitionTrigger uint8
851

852
const (
853
        // chainTrigger is a transition trigger that has been attempted due to
854
        // changing on-chain conditions such as a block which times out HTLC's
855
        // being attached.
856
        chainTrigger transitionTrigger = iota
857

858
        // userTrigger is a transition trigger driven by user action. Examples
859
        // of such a trigger include a user requesting a force closure of the
860
        // channel.
861
        userTrigger
862

863
        // remoteCloseTrigger is a transition trigger driven by the remote
864
        // peer's commitment being confirmed.
865
        remoteCloseTrigger
866

867
        // localCloseTrigger is a transition trigger driven by our commitment
868
        // being confirmed.
869
        localCloseTrigger
870

871
        // coopCloseTrigger is a transition trigger driven by a cooperative
872
        // close transaction being confirmed.
873
        coopCloseTrigger
874

875
        // breachCloseTrigger is a transition trigger driven by a remote breach
876
        // being confirmed. In this case the channel arbitrator will wait for
877
        // the BreachArbitrator to finish and then clean up gracefully.
878
        breachCloseTrigger
879
)
880

881
// String returns a human readable string describing the passed
882
// transitionTrigger.
883
func (t transitionTrigger) String() string {
4✔
884
        switch t {
4✔
885
        case chainTrigger:
4✔
886
                return "chainTrigger"
4✔
887

888
        case remoteCloseTrigger:
4✔
889
                return "remoteCloseTrigger"
4✔
890

891
        case userTrigger:
4✔
892
                return "userTrigger"
4✔
893

894
        case localCloseTrigger:
4✔
895
                return "localCloseTrigger"
4✔
896

897
        case coopCloseTrigger:
4✔
898
                return "coopCloseTrigger"
4✔
899

900
        case breachCloseTrigger:
4✔
901
                return "breachCloseTrigger"
4✔
902

903
        default:
×
904
                return "unknown trigger"
×
905
        }
906
}
907

908
// stateStep is a help method that examines our internal state, and attempts
909
// the appropriate state transition if necessary. The next state we transition
910
// to is returned, Additionally, if the next transition results in a commitment
911
// broadcast, the commitment transaction itself is returned.
912
func (c *ChannelArbitrator) stateStep(
913
        triggerHeight uint32, trigger transitionTrigger,
914
        confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
179✔
915

179✔
916
        var (
179✔
917
                nextState ArbitratorState
179✔
918
                closeTx   *wire.MsgTx
179✔
919
        )
179✔
920
        switch c.state {
179✔
921

922
        // If we're in the default state, then we'll check our set of actions
923
        // to see if while we were down, conditions have changed.
924
        case StateDefault:
68✔
925
                log.Debugf("ChannelArbitrator(%v): new block (height=%v) "+
68✔
926
                        "examining active HTLC's", c.cfg.ChanPoint,
68✔
927
                        triggerHeight)
68✔
928

68✔
929
                // As a new block has been connected to the end of the main
68✔
930
                // chain, we'll check to see if we need to make any on-chain
68✔
931
                // claims on behalf of the channel contract that we're
68✔
932
                // arbitrating for. If a commitment has confirmed, then we'll
68✔
933
                // use the set snapshot from the chain, otherwise we'll use our
68✔
934
                // current set.
68✔
935
                var htlcs map[HtlcSetKey]htlcSet
68✔
936
                if confCommitSet != nil {
81✔
937
                        htlcs = confCommitSet.toActiveHTLCSets()
13✔
938
                } else {
72✔
939
                        // Update the set of activeHTLCs so
59✔
940
                        // checkLocalChainActions has an up-to-date view of the
59✔
941
                        // commitments.
59✔
942
                        c.updateActiveHTLCs()
59✔
943
                        htlcs = c.activeHTLCs
59✔
944
                }
59✔
945
                chainActions, err := c.checkLocalChainActions(
68✔
946
                        triggerHeight, trigger, htlcs, false,
68✔
947
                )
68✔
948
                if err != nil {
68✔
949
                        return StateDefault, nil, err
×
950
                }
×
951

952
                // If there are no actions to be made, then we'll remain in the
953
                // default state. If this isn't a self initiated event (we're
954
                // checking due to a chain update), then we'll exit now.
955
                if len(chainActions) == 0 && trigger == chainTrigger {
109✔
956
                        log.Debugf("ChannelArbitrator(%v): no actions for "+
41✔
957
                                "chain trigger, terminating", c.cfg.ChanPoint)
41✔
958

41✔
959
                        return StateDefault, closeTx, nil
41✔
960
                }
41✔
961

962
                // Otherwise, we'll log that we checked the HTLC actions as the
963
                // commitment transaction has already been broadcast.
964
                log.Tracef("ChannelArbitrator(%v): logging chain_actions=%v",
31✔
965
                        c.cfg.ChanPoint, lnutils.SpewLogClosure(chainActions))
31✔
966

31✔
967
                // Depending on the type of trigger, we'll either "tunnel"
31✔
968
                // through to a farther state, or just proceed linearly to the
31✔
969
                // next state.
31✔
970
                switch trigger {
31✔
971

972
                // If this is a chain trigger, then we'll go straight to the
973
                // next state, as we still need to broadcast the commitment
974
                // transaction.
975
                case chainTrigger:
9✔
976
                        fallthrough
9✔
977
                case userTrigger:
19✔
978
                        nextState = StateBroadcastCommit
19✔
979

980
                // If the trigger is a cooperative close being confirmed, then
981
                // we can go straight to StateFullyResolved, as there won't be
982
                // any contracts to resolve.
983
                case coopCloseTrigger:
7✔
984
                        nextState = StateFullyResolved
7✔
985

986
                // Otherwise, if this state advance was triggered by a
987
                // commitment being confirmed on chain, then we'll jump
988
                // straight to the state where the contract has already been
989
                // closed, and we will inspect the set of unresolved contracts.
990
                case localCloseTrigger:
6✔
991
                        log.Errorf("ChannelArbitrator(%v): unexpected local "+
6✔
992
                                "commitment confirmed while in StateDefault",
6✔
993
                                c.cfg.ChanPoint)
6✔
994
                        fallthrough
6✔
995
                case remoteCloseTrigger:
12✔
996
                        nextState = StateContractClosed
12✔
997

998
                case breachCloseTrigger:
5✔
999
                        nextContractState, err := c.checkLegacyBreach()
5✔
1000
                        if nextContractState == StateError {
5✔
1001
                                return nextContractState, nil, err
×
1002
                        }
×
1003

1004
                        nextState = nextContractState
5✔
1005
                }
1006

1007
        // If we're in this state, then we've decided to broadcast the
1008
        // commitment transaction. We enter this state either due to an outside
1009
        // sub-system, or because an on-chain action has been triggered.
1010
        case StateBroadcastCommit:
24✔
1011
                // Under normal operation, we can only enter
24✔
1012
                // StateBroadcastCommit via a user or chain trigger. On restart,
24✔
1013
                // this state may be reexecuted after closing the channel, but
24✔
1014
                // failing to commit to StateContractClosed or
24✔
1015
                // StateFullyResolved. In that case, one of the four close
24✔
1016
                // triggers will be presented, signifying that we should skip
24✔
1017
                // rebroadcasting, and go straight to resolving the on-chain
24✔
1018
                // contract or marking the channel resolved.
24✔
1019
                switch trigger {
24✔
1020
                case localCloseTrigger, remoteCloseTrigger:
×
1021
                        log.Infof("ChannelArbitrator(%v): detected %s "+
×
1022
                                "close after closing channel, fast-forwarding "+
×
1023
                                "to %s to resolve contract",
×
1024
                                c.cfg.ChanPoint, trigger, StateContractClosed)
×
1025
                        return StateContractClosed, closeTx, nil
×
1026

1027
                case breachCloseTrigger:
5✔
1028
                        nextContractState, err := c.checkLegacyBreach()
5✔
1029
                        if nextContractState == StateError {
5✔
1030
                                log.Infof("ChannelArbitrator(%v): unable to "+
×
1031
                                        "advance breach close resolution: %v",
×
1032
                                        c.cfg.ChanPoint, nextContractState)
×
1033
                                return StateError, closeTx, err
×
1034
                        }
×
1035

1036
                        log.Infof("ChannelArbitrator(%v): detected %s close "+
5✔
1037
                                "after closing channel, fast-forwarding to %s"+
5✔
1038
                                " to resolve contract", c.cfg.ChanPoint,
5✔
1039
                                trigger, nextContractState)
5✔
1040

5✔
1041
                        return nextContractState, closeTx, nil
5✔
1042

1043
                case coopCloseTrigger:
×
1044
                        log.Infof("ChannelArbitrator(%v): detected %s "+
×
1045
                                "close after closing channel, fast-forwarding "+
×
1046
                                "to %s to resolve contract",
×
1047
                                c.cfg.ChanPoint, trigger, StateFullyResolved)
×
1048
                        return StateFullyResolved, closeTx, nil
×
1049
                }
1050

1051
                log.Infof("ChannelArbitrator(%v): force closing "+
23✔
1052
                        "chan", c.cfg.ChanPoint)
23✔
1053

23✔
1054
                // Now that we have all the actions decided for the set of
23✔
1055
                // HTLC's, we'll broadcast the commitment transaction, and
23✔
1056
                // signal the link to exit.
23✔
1057

23✔
1058
                // We'll tell the switch that it should remove the link for
23✔
1059
                // this channel, in addition to fetching the force close
23✔
1060
                // summary needed to close this channel on chain.
23✔
1061
                closeSummary, err := c.cfg.Channel.ForceCloseChan()
23✔
1062
                if err != nil {
24✔
1063
                        log.Errorf("ChannelArbitrator(%v): unable to "+
1✔
1064
                                "force close: %v", c.cfg.ChanPoint, err)
1✔
1065

1✔
1066
                        // We tried to force close (HTLC may be expiring from
1✔
1067
                        // our PoV, etc), but we think we've lost data. In this
1✔
1068
                        // case, we'll not force close, but terminate the state
1✔
1069
                        // machine here to wait to see what confirms on chain.
1✔
1070
                        if errors.Is(err, lnwallet.ErrForceCloseLocalDataLoss) {
2✔
1071
                                log.Error("ChannelArbitrator(%v): broadcast "+
1✔
1072
                                        "failed due to local data loss, "+
1✔
1073
                                        "waiting for on chain confimation...",
1✔
1074
                                        c.cfg.ChanPoint)
1✔
1075

1✔
1076
                                return StateBroadcastCommit, nil, nil
1✔
1077
                        }
1✔
1078

1079
                        return StateError, closeTx, err
×
1080
                }
1081
                closeTx = closeSummary.CloseTx
22✔
1082

22✔
1083
                // Before publishing the transaction, we store it to the
22✔
1084
                // database, such that we can re-publish later in case it
22✔
1085
                // didn't propagate. We initiated the force close, so we
22✔
1086
                // mark broadcast with local initiator set to true.
22✔
1087
                err = c.cfg.MarkCommitmentBroadcasted(closeTx, lntypes.Local)
22✔
1088
                if err != nil {
22✔
1089
                        log.Errorf("ChannelArbitrator(%v): unable to "+
×
1090
                                "mark commitment broadcasted: %v",
×
1091
                                c.cfg.ChanPoint, err)
×
1092
                        return StateError, closeTx, err
×
1093
                }
×
1094

1095
                // With the close transaction in hand, broadcast the
1096
                // transaction to the network, thereby entering the post
1097
                // channel resolution state.
1098
                log.Infof("Broadcasting force close transaction %v, "+
22✔
1099
                        "ChannelPoint(%v): %v", closeTx.TxHash(),
22✔
1100
                        c.cfg.ChanPoint, lnutils.SpewLogClosure(closeTx))
22✔
1101

22✔
1102
                // At this point, we'll now broadcast the commitment
22✔
1103
                // transaction itself.
22✔
1104
                label := labels.MakeLabel(
22✔
1105
                        labels.LabelTypeChannelClose, &c.cfg.ShortChanID,
22✔
1106
                )
22✔
1107
                if err := c.cfg.PublishTx(closeTx, label); err != nil {
31✔
1108
                        log.Errorf("ChannelArbitrator(%v): unable to broadcast "+
9✔
1109
                                "close tx: %v", c.cfg.ChanPoint, err)
9✔
1110

9✔
1111
                        // This makes sure we don't fail at startup if the
9✔
1112
                        // commitment transaction has too low fees to make it
9✔
1113
                        // into mempool. The rebroadcaster makes sure this
9✔
1114
                        // transaction is republished regularly until confirmed
9✔
1115
                        // or replaced.
9✔
1116
                        if !errors.Is(err, lnwallet.ErrDoubleSpend) &&
9✔
1117
                                !errors.Is(err, lnwallet.ErrMempoolFee) {
15✔
1118

6✔
1119
                                return StateError, closeTx, err
6✔
1120
                        }
6✔
1121
                }
1122

1123
                // We go to the StateCommitmentBroadcasted state, where we'll
1124
                // be waiting for the commitment to be confirmed.
1125
                nextState = StateCommitmentBroadcasted
20✔
1126

1127
        // In this state we have broadcasted our own commitment, and will need
1128
        // to wait for a commitment (not necessarily the one we broadcasted!)
1129
        // to be confirmed.
1130
        case StateCommitmentBroadcasted:
34✔
1131
                switch trigger {
34✔
1132

1133
                // We are waiting for a commitment to be confirmed.
1134
                case chainTrigger, userTrigger:
21✔
1135
                        // The commitment transaction has been broadcast, but it
21✔
1136
                        // doesn't necessarily need to be the commitment
21✔
1137
                        // transaction version that is going to be confirmed. To
21✔
1138
                        // be sure that any of those versions can be anchored
21✔
1139
                        // down, we now submit all anchor resolutions to the
21✔
1140
                        // sweeper. The sweeper will keep trying to sweep all of
21✔
1141
                        // them.
21✔
1142
                        //
21✔
1143
                        // Note that the sweeper is idempotent. If we ever
21✔
1144
                        // happen to end up at this point in the code again, no
21✔
1145
                        // harm is done by re-offering the anchors to the
21✔
1146
                        // sweeper.
21✔
1147
                        anchors, err := c.cfg.Channel.NewAnchorResolutions()
21✔
1148
                        if err != nil {
21✔
1149
                                return StateError, closeTx, err
×
1150
                        }
×
1151

1152
                        err = c.sweepAnchors(anchors, triggerHeight)
21✔
1153
                        if err != nil {
21✔
1154
                                return StateError, closeTx, err
×
1155
                        }
×
1156

1157
                        nextState = StateCommitmentBroadcasted
21✔
1158

1159
                // If this state advance was triggered by any of the
1160
                // commitments being confirmed, then we'll jump to the state
1161
                // where the contract has been closed.
1162
                case localCloseTrigger, remoteCloseTrigger:
17✔
1163
                        nextState = StateContractClosed
17✔
1164

1165
                // If a coop close was confirmed, jump straight to the fully
1166
                // resolved state.
1167
                case coopCloseTrigger:
×
1168
                        nextState = StateFullyResolved
×
1169

1170
                case breachCloseTrigger:
×
1171
                        nextContractState, err := c.checkLegacyBreach()
×
1172
                        if nextContractState == StateError {
×
1173
                                return nextContractState, closeTx, err
×
1174
                        }
×
1175

1176
                        nextState = nextContractState
×
1177
                }
1178

1179
                log.Infof("ChannelArbitrator(%v): trigger %v moving from "+
34✔
1180
                        "state %v to %v", c.cfg.ChanPoint, trigger, c.state,
34✔
1181
                        nextState)
34✔
1182

1183
        // If we're in this state, then the contract has been fully closed to
1184
        // outside sub-systems, so we'll process the prior set of on-chain
1185
        // contract actions and launch a set of resolvers.
1186
        case StateContractClosed:
26✔
1187
                // First, we'll fetch our chain actions, and both sets of
26✔
1188
                // resolutions so we can process them.
26✔
1189
                contractResolutions, err := c.log.FetchContractResolutions()
26✔
1190
                if err != nil {
28✔
1191
                        log.Errorf("unable to fetch contract resolutions: %v",
2✔
1192
                                err)
2✔
1193
                        return StateError, closeTx, err
2✔
1194
                }
2✔
1195

1196
                // If the resolution is empty, and we have no HTLCs at all to
1197
                // send to, then we're done here. We don't need to launch any
1198
                // resolvers, and can go straight to our final state.
1199
                if contractResolutions.IsEmpty() && confCommitSet.IsEmpty() {
36✔
1200
                        log.Infof("ChannelArbitrator(%v): contract "+
12✔
1201
                                "resolutions empty, marking channel as fully resolved!",
12✔
1202
                                c.cfg.ChanPoint)
12✔
1203
                        nextState = StateFullyResolved
12✔
1204
                        break
12✔
1205
                }
1206

1207
                // Now that we know we'll need to act, we'll process all the
1208
                // resolvers, then create the structures we need to resolve all
1209
                // outstanding contracts.
1210
                resolvers, pktsToSend, err := c.prepContractResolutions(
16✔
1211
                        contractResolutions, triggerHeight, trigger,
16✔
1212
                        confCommitSet,
16✔
1213
                )
16✔
1214
                if err != nil {
16✔
1215
                        log.Errorf("ChannelArbitrator(%v): unable to "+
×
1216
                                "resolve contracts: %v", c.cfg.ChanPoint, err)
×
1217
                        return StateError, closeTx, err
×
1218
                }
×
1219

1220
                // With the commitment broadcast, we'll then send over all
1221
                // messages we can send immediately.
1222
                if len(pktsToSend) != 0 {
30✔
1223
                        log.Debugf("ChannelArbitrator(%v): sending "+
14✔
1224
                                "resolution message=%v", c.cfg.ChanPoint,
14✔
1225
                                lnutils.SpewLogClosure(pktsToSend))
14✔
1226

14✔
1227
                        err := c.cfg.DeliverResolutionMsg(pktsToSend...)
14✔
1228
                        if err != nil {
14✔
1229
                                log.Errorf("unable to send pkts: %v", err)
×
1230
                                return StateError, closeTx, err
×
1231
                        }
×
1232
                }
1233

1234
                log.Debugf("ChannelArbitrator(%v): inserting %v contract "+
16✔
1235
                        "resolvers", c.cfg.ChanPoint, len(resolvers))
16✔
1236

16✔
1237
                err = c.log.InsertUnresolvedContracts(nil, resolvers...)
16✔
1238
                if err != nil {
16✔
1239
                        return StateError, closeTx, err
×
1240
                }
×
1241

1242
                // Finally, we'll launch all the required contract resolvers.
1243
                // Once they're all resolved, we're no longer needed.
1244
                c.launchResolvers(resolvers, false)
16✔
1245

16✔
1246
                nextState = StateWaitingFullResolution
16✔
1247

1248
        // This is our terminal state. We'll keep returning this state until
1249
        // all contracts are fully resolved.
1250
        case StateWaitingFullResolution:
21✔
1251
                log.Infof("ChannelArbitrator(%v): still awaiting contract "+
21✔
1252
                        "resolution", c.cfg.ChanPoint)
21✔
1253

21✔
1254
                unresolved, err := c.log.FetchUnresolvedContracts()
21✔
1255
                if err != nil {
21✔
1256
                        return StateError, closeTx, err
×
1257
                }
×
1258

1259
                // If we have no unresolved contracts, then we can move to the
1260
                // final state.
1261
                if len(unresolved) == 0 {
37✔
1262
                        nextState = StateFullyResolved
16✔
1263
                        break
16✔
1264
                }
1265

1266
                // Otherwise we still have unresolved contracts, then we'll
1267
                // stay alive to oversee their resolution.
1268
                nextState = StateWaitingFullResolution
9✔
1269

9✔
1270
                // Add debug logs.
9✔
1271
                for _, r := range unresolved {
18✔
1272
                        log.Debugf("ChannelArbitrator(%v): still have "+
9✔
1273
                                "unresolved contract: %T", c.cfg.ChanPoint, r)
9✔
1274
                }
9✔
1275

1276
        // If we start as fully resolved, then we'll end as fully resolved.
1277
        case StateFullyResolved:
26✔
1278
                // To ensure that the state of the contract in persistent
26✔
1279
                // storage is properly reflected, we'll mark the contract as
26✔
1280
                // fully resolved now.
26✔
1281
                nextState = StateFullyResolved
26✔
1282

26✔
1283
                log.Infof("ChannelPoint(%v) has been fully resolved "+
26✔
1284
                        "on-chain at height=%v", c.cfg.ChanPoint, triggerHeight)
26✔
1285

26✔
1286
                if err := c.cfg.MarkChannelResolved(); err != nil {
26✔
1287
                        log.Errorf("unable to mark channel resolved: %v", err)
×
1288
                        return StateError, closeTx, err
×
1289
                }
×
1290
        }
1291

1292
        log.Tracef("ChannelArbitrator(%v): next_state=%v", c.cfg.ChanPoint,
136✔
1293
                nextState)
136✔
1294

136✔
1295
        return nextState, closeTx, nil
136✔
1296
}
1297

1298
// sweepAnchors offers all given anchor resolutions to the sweeper. It requests
1299
// sweeping at the minimum fee rate. This fee rate can be upped manually by the
1300
// user via the BumpFee rpc.
1301
func (c *ChannelArbitrator) sweepAnchors(anchors *lnwallet.AnchorResolutions,
1302
        heightHint uint32) error {
22✔
1303

22✔
1304
        // Use the chan id as the exclusive group. This prevents any of the
22✔
1305
        // anchors from being batched together.
22✔
1306
        exclusiveGroup := c.cfg.ShortChanID.ToUint64()
22✔
1307

22✔
1308
        // sweepWithDeadline is a helper closure that takes an anchor
22✔
1309
        // resolution and sweeps it with its corresponding deadline.
22✔
1310
        sweepWithDeadline := func(anchor *lnwallet.AnchorResolution,
22✔
1311
                htlcs htlcSet, anchorPath string) error {
31✔
1312

9✔
1313
                // Find the deadline for this specific anchor.
9✔
1314
                deadline, value, err := c.findCommitmentDeadlineAndValue(
9✔
1315
                        heightHint, htlcs,
9✔
1316
                )
9✔
1317
                if err != nil {
9✔
1318
                        return err
×
1319
                }
×
1320

1321
                // If we cannot find a deadline, it means there's no HTLCs at
1322
                // stake, which means we can relax our anchor sweeping
1323
                // conditions as we don't have any time sensitive outputs to
1324
                // sweep. However we need to register the anchor output with the
1325
                // sweeper so we are later able to bump the close fee.
1326
                if deadline.IsNone() {
14✔
1327
                        log.Infof("ChannelArbitrator(%v): no HTLCs at stake, "+
5✔
1328
                                "sweeping anchor with default deadline",
5✔
1329
                                c.cfg.ChanPoint)
5✔
1330
                }
5✔
1331

1332
                witnessType := input.CommitmentAnchor
9✔
1333

9✔
1334
                // For taproot channels, we need to use the proper witness
9✔
1335
                // type.
9✔
1336
                if txscript.IsPayToTaproot(
9✔
1337
                        anchor.AnchorSignDescriptor.Output.PkScript,
9✔
1338
                ) {
13✔
1339

4✔
1340
                        witnessType = input.TaprootAnchorSweepSpend
4✔
1341
                }
4✔
1342

1343
                // Prepare anchor output for sweeping.
1344
                anchorInput := input.MakeBaseInput(
9✔
1345
                        &anchor.CommitAnchor,
9✔
1346
                        witnessType,
9✔
1347
                        &anchor.AnchorSignDescriptor,
9✔
1348
                        heightHint,
9✔
1349
                        &input.TxInfo{
9✔
1350
                                Fee:    anchor.CommitFee,
9✔
1351
                                Weight: anchor.CommitWeight,
9✔
1352
                        },
9✔
1353
                )
9✔
1354

9✔
1355
                // If we have a deadline, we'll use it to calculate the
9✔
1356
                // deadline height, otherwise default to none.
9✔
1357
                deadlineDesc := "None"
9✔
1358
                deadlineHeight := fn.MapOption(func(d int32) int32 {
17✔
1359
                        deadlineDesc = fmt.Sprintf("%d", d)
8✔
1360

8✔
1361
                        return d + int32(heightHint)
8✔
1362
                })(deadline)
8✔
1363

1364
                // Calculate the budget based on the value under protection,
1365
                // which is the sum of all HTLCs on this commitment subtracted
1366
                // by their budgets.
1367
                // The anchor output in itself has a small output value of 330
1368
                // sats so we also include it in the budget to pay for the
1369
                // cpfp transaction.
1370
                budget := calculateBudget(
9✔
1371
                        value, c.cfg.Budget.AnchorCPFPRatio,
9✔
1372
                        c.cfg.Budget.AnchorCPFP,
9✔
1373
                ) + AnchorOutputValue
9✔
1374

9✔
1375
                log.Infof("ChannelArbitrator(%v): offering anchor from %s "+
9✔
1376
                        "commitment %v to sweeper with deadline=%v, budget=%v",
9✔
1377
                        c.cfg.ChanPoint, anchorPath, anchor.CommitAnchor,
9✔
1378
                        deadlineDesc, budget)
9✔
1379

9✔
1380
                // Sweep anchor output with a confirmation target fee
9✔
1381
                // preference. Because this is a cpfp-operation, the anchor
9✔
1382
                // will only be attempted to sweep when the current fee
9✔
1383
                // estimate for the confirmation target exceeds the commit fee
9✔
1384
                // rate.
9✔
1385
                _, err = c.cfg.Sweeper.SweepInput(
9✔
1386
                        &anchorInput,
9✔
1387
                        sweep.Params{
9✔
1388
                                ExclusiveGroup: &exclusiveGroup,
9✔
1389
                                Budget:         budget,
9✔
1390
                                DeadlineHeight: deadlineHeight,
9✔
1391
                        },
9✔
1392
                )
9✔
1393
                if err != nil {
9✔
1394
                        return err
×
1395
                }
×
1396

1397
                return nil
9✔
1398
        }
1399

1400
        // Update the set of activeHTLCs so that the sweeping routine has an
1401
        // up-to-date view of the set of commitments.
1402
        c.updateActiveHTLCs()
22✔
1403

22✔
1404
        // Sweep anchors based on different HTLC sets. Notice the HTLC sets may
22✔
1405
        // differ across commitments, thus their deadline values could vary.
22✔
1406
        for htlcSet, htlcs := range c.activeHTLCs {
67✔
1407
                switch {
45✔
1408
                case htlcSet == LocalHtlcSet && anchors.Local != nil:
6✔
1409
                        err := sweepWithDeadline(anchors.Local, htlcs, "local")
6✔
1410
                        if err != nil {
6✔
1411
                                return err
×
1412
                        }
×
1413

1414
                case htlcSet == RemoteHtlcSet && anchors.Remote != nil:
6✔
1415
                        err := sweepWithDeadline(
6✔
1416
                                anchors.Remote, htlcs, "remote",
6✔
1417
                        )
6✔
1418
                        if err != nil {
6✔
1419
                                return err
×
1420
                        }
×
1421

1422
                case htlcSet == RemotePendingHtlcSet &&
1423
                        anchors.RemotePending != nil:
1✔
1424

1✔
1425
                        err := sweepWithDeadline(
1✔
1426
                                anchors.RemotePending, htlcs, "remote pending",
1✔
1427
                        )
1✔
1428
                        if err != nil {
1✔
1429
                                return err
×
1430
                        }
×
1431
                }
1432
        }
1433

1434
        return nil
22✔
1435
}
1436

1437
// findCommitmentDeadlineAndValue finds the deadline (relative block height)
1438
// for a commitment transaction by extracting the minimum CLTV from its HTLCs.
1439
// From our PoV, the deadline delta is defined to be the smaller of,
1440
//   - half of the least CLTV from outgoing HTLCs' corresponding incoming
1441
//     HTLCs,  or,
1442
//   - half of the least CLTV from incoming HTLCs if the preimage is available.
1443
//
1444
// We use half of the CTLV value to ensure that we have enough time to sweep
1445
// the second-level HTLCs.
1446
//
1447
// It also finds the total value that are time-sensitive, which is the sum of
1448
// all the outgoing HTLCs plus incoming HTLCs whose preimages are known. It
1449
// then returns the value left after subtracting the budget used for sweeping
1450
// the time-sensitive HTLCs.
1451
//
1452
// NOTE: when the deadline turns out to be 0 blocks, we will replace it with 1
1453
// block because our fee estimator doesn't allow a 0 conf target. This also
1454
// means we've left behind and should increase our fee to make the transaction
1455
// confirmed asap.
1456
func (c *ChannelArbitrator) findCommitmentDeadlineAndValue(heightHint uint32,
1457
        htlcs htlcSet) (fn.Option[int32], btcutil.Amount, error) {
14✔
1458

14✔
1459
        deadlineMinHeight := uint32(math.MaxUint32)
14✔
1460
        totalValue := btcutil.Amount(0)
14✔
1461

14✔
1462
        // First, iterate through the outgoingHTLCs to find the lowest CLTV
14✔
1463
        // value.
14✔
1464
        for _, htlc := range htlcs.outgoingHTLCs {
27✔
1465
                // Skip if the HTLC is dust.
13✔
1466
                if htlc.OutputIndex < 0 {
20✔
1467
                        log.Debugf("ChannelArbitrator(%v): skipped deadline "+
7✔
1468
                                "for dust htlc=%x",
7✔
1469
                                c.cfg.ChanPoint, htlc.RHash[:])
7✔
1470

7✔
1471
                        continue
7✔
1472
                }
1473

1474
                value := htlc.Amt.ToSatoshis()
10✔
1475

10✔
1476
                // Find the expiry height for this outgoing HTLC's incoming
10✔
1477
                // HTLC.
10✔
1478
                deadlineOpt := c.cfg.FindOutgoingHTLCDeadline(htlc)
10✔
1479

10✔
1480
                // The deadline is default to the current deadlineMinHeight,
10✔
1481
                // and it's overwritten when it's not none.
10✔
1482
                deadline := deadlineMinHeight
10✔
1483
                deadlineOpt.WhenSome(func(d int32) {
18✔
1484
                        deadline = uint32(d)
8✔
1485

8✔
1486
                        // We only consider the value is under protection when
8✔
1487
                        // it's time-sensitive.
8✔
1488
                        totalValue += value
8✔
1489
                })
8✔
1490

1491
                if deadline < deadlineMinHeight {
18✔
1492
                        deadlineMinHeight = deadline
8✔
1493

8✔
1494
                        log.Tracef("ChannelArbitrator(%v): outgoing HTLC has "+
8✔
1495
                                "deadline=%v, value=%v", c.cfg.ChanPoint,
8✔
1496
                                deadlineMinHeight, value)
8✔
1497
                }
8✔
1498
        }
1499

1500
        // Then going through the incomingHTLCs, and update the minHeight when
1501
        // conditions met.
1502
        for _, htlc := range htlcs.incomingHTLCs {
27✔
1503
                // Skip if the HTLC is dust.
13✔
1504
                if htlc.OutputIndex < 0 {
14✔
1505
                        log.Debugf("ChannelArbitrator(%v): skipped deadline "+
1✔
1506
                                "for dust htlc=%x",
1✔
1507
                                c.cfg.ChanPoint, htlc.RHash[:])
1✔
1508

1✔
1509
                        continue
1✔
1510
                }
1511

1512
                // Since it's an HTLC sent to us, check if we have preimage for
1513
                // this HTLC.
1514
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
12✔
1515
                if err != nil {
12✔
1516
                        return fn.None[int32](), 0, err
×
1517
                }
×
1518

1519
                if !preimageAvailable {
18✔
1520
                        continue
6✔
1521
                }
1522

1523
                value := htlc.Amt.ToSatoshis()
10✔
1524
                totalValue += value
10✔
1525

10✔
1526
                if htlc.RefundTimeout < deadlineMinHeight {
19✔
1527
                        deadlineMinHeight = htlc.RefundTimeout
9✔
1528

9✔
1529
                        log.Tracef("ChannelArbitrator(%v): incoming HTLC has "+
9✔
1530
                                "deadline=%v, amt=%v", c.cfg.ChanPoint,
9✔
1531
                                deadlineMinHeight, value)
9✔
1532
                }
9✔
1533
        }
1534

1535
        // Calculate the deadline. There are two cases to be handled here,
1536
        //   - when the deadlineMinHeight never gets updated, which could
1537
        //     happen when we have no outgoing HTLCs, and, for incoming HTLCs,
1538
        //       * either we have none, or,
1539
        //       * none of the HTLCs are preimageAvailable.
1540
        //   - when our deadlineMinHeight is no greater than the heightHint,
1541
        //     which means we are behind our schedule.
1542
        var deadline uint32
14✔
1543
        switch {
14✔
1544
        // When we couldn't find a deadline height from our HTLCs, we will fall
1545
        // back to the default value as there's no time pressure here.
1546
        case deadlineMinHeight == math.MaxUint32:
6✔
1547
                return fn.None[int32](), 0, nil
6✔
1548

1549
        // When the deadline is passed, we will fall back to the smallest conf
1550
        // target (1 block).
1551
        case deadlineMinHeight <= heightHint:
1✔
1552
                log.Warnf("ChannelArbitrator(%v): deadline is passed with "+
1✔
1553
                        "deadlineMinHeight=%d, heightHint=%d",
1✔
1554
                        c.cfg.ChanPoint, deadlineMinHeight, heightHint)
1✔
1555
                deadline = 1
1✔
1556

1557
        // Use half of the deadline delta, and leave the other half to be used
1558
        // to sweep the HTLCs.
1559
        default:
11✔
1560
                deadline = (deadlineMinHeight - heightHint) / 2
11✔
1561
        }
1562

1563
        // Calculate the value left after subtracting the budget used for
1564
        // sweeping the time-sensitive HTLCs.
1565
        valueLeft := totalValue - calculateBudget(
12✔
1566
                totalValue, c.cfg.Budget.DeadlineHTLCRatio,
12✔
1567
                c.cfg.Budget.DeadlineHTLC,
12✔
1568
        )
12✔
1569

12✔
1570
        log.Debugf("ChannelArbitrator(%v): calculated valueLeft=%v, "+
12✔
1571
                "deadline=%d, using deadlineMinHeight=%d, heightHint=%d",
12✔
1572
                c.cfg.ChanPoint, valueLeft, deadline, deadlineMinHeight,
12✔
1573
                heightHint)
12✔
1574

12✔
1575
        return fn.Some(int32(deadline)), valueLeft, nil
12✔
1576
}
1577

1578
// launchResolvers updates the activeResolvers list and starts the resolvers.
1579
func (c *ChannelArbitrator) launchResolvers(resolvers []ContractResolver,
1580
        immediate bool) {
17✔
1581

17✔
1582
        c.activeResolversLock.Lock()
17✔
1583
        defer c.activeResolversLock.Unlock()
17✔
1584

17✔
1585
        c.activeResolvers = resolvers
17✔
1586
        for _, contract := range resolvers {
27✔
1587
                c.wg.Add(1)
10✔
1588
                go c.resolveContract(contract, immediate)
10✔
1589
        }
10✔
1590
}
1591

1592
// advanceState is the main driver of our state machine. This method is an
1593
// iterative function which repeatedly attempts to advance the internal state
1594
// of the channel arbitrator. The state will be advanced until we reach a
1595
// redundant transition, meaning that the state transition is a noop. The final
1596
// param is a callback that allows the caller to execute an arbitrary action
1597
// after each state transition.
1598
func (c *ChannelArbitrator) advanceState(
1599
        triggerHeight uint32, trigger transitionTrigger,
1600
        confCommitSet *CommitSet) (ArbitratorState, *wire.MsgTx, error) {
93✔
1601

93✔
1602
        var (
93✔
1603
                priorState   ArbitratorState
93✔
1604
                forceCloseTx *wire.MsgTx
93✔
1605
        )
93✔
1606

93✔
1607
        // We'll continue to advance our state forward until the state we
93✔
1608
        // transition to is that same state that we started at.
93✔
1609
        for {
272✔
1610
                priorState = c.state
179✔
1611
                log.Debugf("ChannelArbitrator(%v): attempting state step with "+
179✔
1612
                        "trigger=%v from state=%v", c.cfg.ChanPoint, trigger,
179✔
1613
                        priorState)
179✔
1614

179✔
1615
                nextState, closeTx, err := c.stateStep(
179✔
1616
                        triggerHeight, trigger, confCommitSet,
179✔
1617
                )
179✔
1618
                if err != nil {
187✔
1619
                        log.Errorf("ChannelArbitrator(%v): unable to advance "+
8✔
1620
                                "state: %v", c.cfg.ChanPoint, err)
8✔
1621
                        return priorState, nil, err
8✔
1622
                }
8✔
1623

1624
                if forceCloseTx == nil && closeTx != nil {
195✔
1625
                        forceCloseTx = closeTx
20✔
1626
                }
20✔
1627

1628
                // Our termination transition is a noop transition. If we get
1629
                // our prior state back as the next state, then we'll
1630
                // terminate.
1631
                if nextState == priorState {
261✔
1632
                        log.Debugf("ChannelArbitrator(%v): terminating at "+
86✔
1633
                                "state=%v", c.cfg.ChanPoint, nextState)
86✔
1634
                        return nextState, forceCloseTx, nil
86✔
1635
                }
86✔
1636

1637
                // As the prior state was successfully executed, we can now
1638
                // commit the next state. This ensures that we will re-execute
1639
                // the prior state if anything fails.
1640
                if err := c.log.CommitState(nextState); err != nil {
96✔
1641
                        log.Errorf("ChannelArbitrator(%v): unable to commit "+
3✔
1642
                                "next state(%v): %v", c.cfg.ChanPoint,
3✔
1643
                                nextState, err)
3✔
1644
                        return priorState, nil, err
3✔
1645
                }
3✔
1646
                c.state = nextState
90✔
1647
        }
1648
}
1649

1650
// ChainAction is an enum that encompasses all possible on-chain actions
1651
// we'll take for a set of HTLC's.
1652
type ChainAction uint8
1653

1654
const (
1655
        // NoAction is the min chainAction type, indicating that no action
1656
        // needs to be taken for a given HTLC.
1657
        NoAction ChainAction = 0
1658

1659
        // HtlcTimeoutAction indicates that the HTLC will timeout soon. As a
1660
        // result, we should get ready to sweep it on chain after the timeout.
1661
        HtlcTimeoutAction = 1
1662

1663
        // HtlcClaimAction indicates that we should claim the HTLC on chain
1664
        // before its timeout period.
1665
        HtlcClaimAction = 2
1666

1667
        // HtlcFailNowAction indicates that we should fail an outgoing HTLC
1668
        // immediately by cancelling it backwards as it has no corresponding
1669
        // output in our commitment transaction.
1670
        HtlcFailNowAction = 3
1671

1672
        // HtlcOutgoingWatchAction indicates that we can't yet timeout this
1673
        // HTLC, but we had to go to chain on order to resolve an existing
1674
        // HTLC.  In this case, we'll either: time it out once it expires, or
1675
        // will learn the pre-image if the remote party claims the output. In
1676
        // this case, well add the pre-image to our global store.
1677
        HtlcOutgoingWatchAction = 4
1678

1679
        // HtlcIncomingWatchAction indicates that we don't yet have the
1680
        // pre-image to claim incoming HTLC, but we had to go to chain in order
1681
        // to resolve and existing HTLC. In this case, we'll either: let the
1682
        // other party time it out, or eventually learn of the pre-image, in
1683
        // which case we'll claim on chain.
1684
        HtlcIncomingWatchAction = 5
1685

1686
        // HtlcIncomingDustFinalAction indicates that we should mark an incoming
1687
        // dust htlc as final because it can't be claimed on-chain.
1688
        HtlcIncomingDustFinalAction = 6
1689
)
1690

1691
// String returns a human readable string describing a chain action.
1692
func (c ChainAction) String() string {
×
1693
        switch c {
×
1694
        case NoAction:
×
1695
                return "NoAction"
×
1696

1697
        case HtlcTimeoutAction:
×
1698
                return "HtlcTimeoutAction"
×
1699

1700
        case HtlcClaimAction:
×
1701
                return "HtlcClaimAction"
×
1702

1703
        case HtlcFailNowAction:
×
1704
                return "HtlcFailNowAction"
×
1705

1706
        case HtlcOutgoingWatchAction:
×
1707
                return "HtlcOutgoingWatchAction"
×
1708

1709
        case HtlcIncomingWatchAction:
×
1710
                return "HtlcIncomingWatchAction"
×
1711

1712
        case HtlcIncomingDustFinalAction:
×
1713
                return "HtlcIncomingDustFinalAction"
×
1714

1715
        default:
×
1716
                return "<unknown action>"
×
1717
        }
1718
}
1719

1720
// ChainActionMap is a map of a chain action, to the set of HTLC's that need to
1721
// be acted upon for a given action type. The channel
1722
type ChainActionMap map[ChainAction][]channeldb.HTLC
1723

1724
// Merge merges the passed chain actions with the target chain action map.
1725
func (c ChainActionMap) Merge(actions ChainActionMap) {
80✔
1726
        for chainAction, htlcs := range actions {
97✔
1727
                c[chainAction] = append(c[chainAction], htlcs...)
17✔
1728
        }
17✔
1729
}
1730

1731
// shouldGoOnChain takes into account the absolute timeout of the HTLC, if the
1732
// confirmation delta that we need is close, and returns a bool indicating if
1733
// we should go on chain to claim.  We do this rather than waiting up until the
1734
// last minute as we want to ensure that when we *need* (HTLC is timed out) to
1735
// sweep, the commitment is already confirmed.
1736
func (c *ChannelArbitrator) shouldGoOnChain(htlc channeldb.HTLC,
1737
        broadcastDelta, currentHeight uint32) bool {
31✔
1738

31✔
1739
        // We'll calculate the broadcast cut off for this HTLC. This is the
31✔
1740
        // height that (based on our current fee estimation) we should
31✔
1741
        // broadcast in order to ensure the commitment transaction is confirmed
31✔
1742
        // before the HTLC fully expires.
31✔
1743
        broadcastCutOff := htlc.RefundTimeout - broadcastDelta
31✔
1744

31✔
1745
        log.Tracef("ChannelArbitrator(%v): examining outgoing contract: "+
31✔
1746
                "expiry=%v, cutoff=%v, height=%v", c.cfg.ChanPoint, htlc.RefundTimeout,
31✔
1747
                broadcastCutOff, currentHeight)
31✔
1748

31✔
1749
        // TODO(roasbeef): take into account default HTLC delta, don't need to
31✔
1750
        // broadcast immediately
31✔
1751
        //  * can then batch with SINGLE | ANYONECANPAY
31✔
1752

31✔
1753
        // We should on-chain for this HTLC, iff we're within out broadcast
31✔
1754
        // cutoff window.
31✔
1755
        if currentHeight < broadcastCutOff {
55✔
1756
                return false
24✔
1757
        }
24✔
1758

1759
        // In case of incoming htlc we should go to chain.
1760
        if htlc.Incoming {
15✔
1761
                return true
4✔
1762
        }
4✔
1763

1764
        // For htlcs that are result of our initiated payments we give some grace
1765
        // period before force closing the channel. During this time we expect
1766
        // both nodes to connect and give a chance to the other node to send its
1767
        // updates and cancel the htlc.
1768
        // This shouldn't add any security risk as there is no incoming htlc to
1769
        // fulfill at this case and the expectation is that when the channel is
1770
        // active the other node will send update_fail_htlc to remove the htlc
1771
        // without closing the channel. It is up to the user to force close the
1772
        // channel if the peer misbehaves and doesn't send the update_fail_htlc.
1773
        // It is useful when this node is most of the time not online and is
1774
        // likely to miss the time slot where the htlc may be cancelled.
1775
        isForwarded := c.cfg.IsForwardedHTLC(c.cfg.ShortChanID, htlc.HtlcIndex)
11✔
1776
        upTime := c.cfg.Clock.Now().Sub(c.startTimestamp)
11✔
1777
        return isForwarded || upTime > c.cfg.PaymentsExpirationGracePeriod
11✔
1778
}
1779

1780
// checkCommitChainActions is called for each new block connected to the end of
1781
// the main chain. Given the new block height, this new method will examine all
1782
// active HTLC's, and determine if we need to go on-chain to claim any of them.
1783
// A map of action -> []htlc is returned, detailing what action (if any) should
1784
// be performed for each HTLC. For timed out HTLC's, once the commitment has
1785
// been sufficiently confirmed, the HTLC's should be canceled backwards. For
1786
// redeemed HTLC's, we should send the pre-image back to the incoming link.
1787
func (c *ChannelArbitrator) checkCommitChainActions(height uint32,
1788
        trigger transitionTrigger, htlcs htlcSet) (ChainActionMap, error) {
80✔
1789

80✔
1790
        // TODO(roasbeef): would need to lock channel? channel totem?
80✔
1791
        //  * race condition if adding and we broadcast, etc
80✔
1792
        //  * or would make each instance sync?
80✔
1793

80✔
1794
        log.Debugf("ChannelArbitrator(%v): checking commit chain actions at "+
80✔
1795
                "height=%v, in_htlc_count=%v, out_htlc_count=%v",
80✔
1796
                c.cfg.ChanPoint, height,
80✔
1797
                len(htlcs.incomingHTLCs), len(htlcs.outgoingHTLCs))
80✔
1798

80✔
1799
        actionMap := make(ChainActionMap)
80✔
1800

80✔
1801
        // First, we'll make an initial pass over the set of incoming and
80✔
1802
        // outgoing HTLC's to decide if we need to go on chain at all.
80✔
1803
        haveChainActions := false
80✔
1804
        for _, htlc := range htlcs.outgoingHTLCs {
91✔
1805
                // We'll need to go on-chain for an outgoing HTLC if it was
11✔
1806
                // never resolved downstream, and it's "close" to timing out.
11✔
1807
                //
11✔
1808
                // TODO(yy): If there's no corresponding incoming HTLC, it
11✔
1809
                // means we are the first hop, hence the payer. This is a
11✔
1810
                // tricky case - unlike a forwarding hop, we don't have an
11✔
1811
                // incoming HTLC that will time out, which means as long as we
11✔
1812
                // can learn the preimage, we can settle the invoice (before it
11✔
1813
                // expires?).
11✔
1814
                toChain := c.shouldGoOnChain(
11✔
1815
                        htlc, c.cfg.OutgoingBroadcastDelta, height,
11✔
1816
                )
11✔
1817

11✔
1818
                if toChain {
15✔
1819
                        // Convert to int64 in case of overflow.
4✔
1820
                        remainingBlocks := int64(htlc.RefundTimeout) -
4✔
1821
                                int64(height)
4✔
1822

4✔
1823
                        log.Infof("ChannelArbitrator(%v): go to chain for "+
4✔
1824
                                "outgoing htlc %x: timeout=%v, amount=%v, "+
4✔
1825
                                "blocks_until_expiry=%v, broadcast_delta=%v",
4✔
1826
                                c.cfg.ChanPoint, htlc.RHash[:],
4✔
1827
                                htlc.RefundTimeout, htlc.Amt, remainingBlocks,
4✔
1828
                                c.cfg.OutgoingBroadcastDelta,
4✔
1829
                        )
4✔
1830
                }
4✔
1831

1832
                haveChainActions = haveChainActions || toChain
11✔
1833
        }
1834

1835
        for _, htlc := range htlcs.incomingHTLCs {
89✔
1836
                // We'll need to go on-chain to pull an incoming HTLC iff we
9✔
1837
                // know the pre-image and it's close to timing out. We need to
9✔
1838
                // ensure that we claim the funds that are rightfully ours
9✔
1839
                // on-chain.
9✔
1840
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
9✔
1841
                if err != nil {
9✔
1842
                        return nil, err
×
1843
                }
×
1844

1845
                if !preimageAvailable {
16✔
1846
                        continue
7✔
1847
                }
1848

1849
                toChain := c.shouldGoOnChain(
6✔
1850
                        htlc, c.cfg.IncomingBroadcastDelta, height,
6✔
1851
                )
6✔
1852

6✔
1853
                if toChain {
10✔
1854
                        // Convert to int64 in case of overflow.
4✔
1855
                        remainingBlocks := int64(htlc.RefundTimeout) -
4✔
1856
                                int64(height)
4✔
1857

4✔
1858
                        log.Infof("ChannelArbitrator(%v): go to chain for "+
4✔
1859
                                "incoming htlc %x: timeout=%v, amount=%v, "+
4✔
1860
                                "blocks_until_expiry=%v, broadcast_delta=%v",
4✔
1861
                                c.cfg.ChanPoint, htlc.RHash[:],
4✔
1862
                                htlc.RefundTimeout, htlc.Amt, remainingBlocks,
4✔
1863
                                c.cfg.IncomingBroadcastDelta,
4✔
1864
                        )
4✔
1865
                }
4✔
1866

1867
                haveChainActions = haveChainActions || toChain
6✔
1868
        }
1869

1870
        // If we don't have any actions to make, then we'll return an empty
1871
        // action map. We only do this if this was a chain trigger though, as
1872
        // if we're going to broadcast the commitment (or the remote party did)
1873
        // we're *forced* to act on each HTLC.
1874
        if !haveChainActions && trigger == chainTrigger {
126✔
1875
                log.Tracef("ChannelArbitrator(%v): no actions to take at "+
46✔
1876
                        "height=%v", c.cfg.ChanPoint, height)
46✔
1877
                return actionMap, nil
46✔
1878
        }
46✔
1879

1880
        // Now that we know we'll need to go on-chain, we'll examine all of our
1881
        // active outgoing HTLC's to see if we either need to: sweep them after
1882
        // a timeout (then cancel backwards), cancel them backwards
1883
        // immediately, or watch them as they're still active contracts.
1884
        for _, htlc := range htlcs.outgoingHTLCs {
48✔
1885
                switch {
10✔
1886
                // If the HTLC is dust, then we can cancel it backwards
1887
                // immediately as there's no matching contract to arbitrate
1888
                // on-chain. We know the HTLC is dust, if the OutputIndex
1889
                // negative.
1890
                case htlc.OutputIndex < 0:
6✔
1891
                        log.Tracef("ChannelArbitrator(%v): immediately "+
6✔
1892
                                "failing dust htlc=%x", c.cfg.ChanPoint,
6✔
1893
                                htlc.RHash[:])
6✔
1894

6✔
1895
                        actionMap[HtlcFailNowAction] = append(
6✔
1896
                                actionMap[HtlcFailNowAction], htlc,
6✔
1897
                        )
6✔
1898

1899
                // If we don't need to immediately act on this HTLC, then we'll
1900
                // mark it still "live". After we broadcast, we'll monitor it
1901
                // until the HTLC times out to see if we can also redeem it
1902
                // on-chain.
1903
                case !c.shouldGoOnChain(htlc, c.cfg.OutgoingBroadcastDelta,
1904
                        height,
1905
                ):
8✔
1906
                        // TODO(roasbeef): also need to be able to query
8✔
1907
                        // circuit map to see if HTLC hasn't been fully
8✔
1908
                        // resolved
8✔
1909
                        //
8✔
1910
                        //  * can't fail incoming until if outgoing not yet
8✔
1911
                        //  failed
8✔
1912

8✔
1913
                        log.Tracef("ChannelArbitrator(%v): watching chain to "+
8✔
1914
                                "decide action for outgoing htlc=%x",
8✔
1915
                                c.cfg.ChanPoint, htlc.RHash[:])
8✔
1916

8✔
1917
                        actionMap[HtlcOutgoingWatchAction] = append(
8✔
1918
                                actionMap[HtlcOutgoingWatchAction], htlc,
8✔
1919
                        )
8✔
1920

1921
                // Otherwise, we'll update our actionMap to mark that we need
1922
                // to sweep this HTLC on-chain
1923
                default:
4✔
1924
                        log.Tracef("ChannelArbitrator(%v): going on-chain to "+
4✔
1925
                                "timeout htlc=%x", c.cfg.ChanPoint, htlc.RHash[:])
4✔
1926

4✔
1927
                        actionMap[HtlcTimeoutAction] = append(
4✔
1928
                                actionMap[HtlcTimeoutAction], htlc,
4✔
1929
                        )
4✔
1930
                }
1931
        }
1932

1933
        // Similarly, for each incoming HTLC, now that we need to go on-chain,
1934
        // we'll either: sweep it immediately if we know the pre-image, or
1935
        // observe the output on-chain if we don't In this last, case we'll
1936
        // either learn of it eventually from the outgoing HTLC, or the sender
1937
        // will timeout the HTLC.
1938
        for _, htlc := range htlcs.incomingHTLCs {
46✔
1939
                // If the HTLC is dust, there is no action to be taken.
8✔
1940
                if htlc.OutputIndex < 0 {
14✔
1941
                        log.Debugf("ChannelArbitrator(%v): no resolution "+
6✔
1942
                                "needed for incoming dust htlc=%x",
6✔
1943
                                c.cfg.ChanPoint, htlc.RHash[:])
6✔
1944

6✔
1945
                        actionMap[HtlcIncomingDustFinalAction] = append(
6✔
1946
                                actionMap[HtlcIncomingDustFinalAction], htlc,
6✔
1947
                        )
6✔
1948

6✔
1949
                        continue
6✔
1950
                }
1951

1952
                log.Tracef("ChannelArbitrator(%v): watching chain to decide "+
6✔
1953
                        "action for incoming htlc=%x", c.cfg.ChanPoint,
6✔
1954
                        htlc.RHash[:])
6✔
1955

6✔
1956
                actionMap[HtlcIncomingWatchAction] = append(
6✔
1957
                        actionMap[HtlcIncomingWatchAction], htlc,
6✔
1958
                )
6✔
1959
        }
1960

1961
        return actionMap, nil
38✔
1962
}
1963

1964
// isPreimageAvailable returns whether the hash preimage is available in either
1965
// the preimage cache or the invoice database.
1966
func (c *ChannelArbitrator) isPreimageAvailable(hash lntypes.Hash) (bool,
1967
        error) {
21✔
1968

21✔
1969
        // Start by checking the preimage cache for preimages of
21✔
1970
        // forwarded HTLCs.
21✔
1971
        _, preimageAvailable := c.cfg.PreimageDB.LookupPreimage(
21✔
1972
                hash,
21✔
1973
        )
21✔
1974
        if preimageAvailable {
33✔
1975
                return true, nil
12✔
1976
        }
12✔
1977

1978
        // Then check if we have an invoice that can be settled by this HTLC.
1979
        //
1980
        // TODO(joostjager): Check that there are still more blocks remaining
1981
        // than the invoice cltv delta. We don't want to go to chain only to
1982
        // have the incoming contest resolver decide that we don't want to
1983
        // settle this invoice.
1984
        invoice, err := c.cfg.Registry.LookupInvoice(context.Background(), hash)
13✔
1985
        switch {
13✔
1986
        case err == nil:
4✔
1987
        case errors.Is(err, invoices.ErrInvoiceNotFound) ||
1988
                errors.Is(err, invoices.ErrNoInvoicesCreated):
13✔
1989

13✔
1990
                return false, nil
13✔
1991
        default:
×
1992
                return false, err
×
1993
        }
1994

1995
        preimageAvailable = invoice.Terms.PaymentPreimage != nil
4✔
1996

4✔
1997
        return preimageAvailable, nil
4✔
1998
}
1999

2000
// checkLocalChainActions is similar to checkCommitChainActions, but it also
2001
// examines the set of HTLCs on the remote party's commitment. This allows us
2002
// to ensure we're able to satisfy the HTLC timeout constraints for incoming vs
2003
// outgoing HTLCs.
2004
func (c *ChannelArbitrator) checkLocalChainActions(
2005
        height uint32, trigger transitionTrigger,
2006
        activeHTLCs map[HtlcSetKey]htlcSet,
2007
        commitsConfirmed bool) (ChainActionMap, error) {
74✔
2008

74✔
2009
        // First, we'll check our local chain actions as normal. This will only
74✔
2010
        // examine HTLCs on our local commitment (timeout or settle).
74✔
2011
        localCommitActions, err := c.checkCommitChainActions(
74✔
2012
                height, trigger, activeHTLCs[LocalHtlcSet],
74✔
2013
        )
74✔
2014
        if err != nil {
74✔
2015
                return nil, err
×
2016
        }
×
2017

2018
        // Next, we'll examine the remote commitment (and maybe a dangling one)
2019
        // to see if the set difference of our HTLCs is non-empty. If so, then
2020
        // we may need to cancel back some HTLCs if we decide go to chain.
2021
        remoteDanglingActions := c.checkRemoteDanglingActions(
74✔
2022
                height, activeHTLCs, commitsConfirmed,
74✔
2023
        )
74✔
2024

74✔
2025
        // Finally, we'll merge the two set of chain actions.
74✔
2026
        localCommitActions.Merge(remoteDanglingActions)
74✔
2027

74✔
2028
        return localCommitActions, nil
74✔
2029
}
2030

2031
// checkRemoteDanglingActions examines the set of remote commitments for any
2032
// HTLCs that are close to timing out. If we find any, then we'll return a set
2033
// of chain actions for HTLCs that are on our commitment, but not theirs to
2034
// cancel immediately.
2035
func (c *ChannelArbitrator) checkRemoteDanglingActions(
2036
        height uint32, activeHTLCs map[HtlcSetKey]htlcSet,
2037
        commitsConfirmed bool) ChainActionMap {
74✔
2038

74✔
2039
        var (
74✔
2040
                pendingRemoteHTLCs []channeldb.HTLC
74✔
2041
                localHTLCs         = make(map[uint64]struct{})
74✔
2042
                remoteHTLCs        = make(map[uint64]channeldb.HTLC)
74✔
2043
                actionMap          = make(ChainActionMap)
74✔
2044
        )
74✔
2045

74✔
2046
        // First, we'll construct two sets of the outgoing HTLCs: those on our
74✔
2047
        // local commitment, and those that are on the remote commitment(s).
74✔
2048
        for htlcSetKey, htlcs := range activeHTLCs {
198✔
2049
                if htlcSetKey.IsRemote {
192✔
2050
                        for _, htlc := range htlcs.outgoingHTLCs {
88✔
2051
                                remoteHTLCs[htlc.HtlcIndex] = htlc
20✔
2052
                        }
20✔
2053
                } else {
60✔
2054
                        for _, htlc := range htlcs.outgoingHTLCs {
70✔
2055
                                localHTLCs[htlc.HtlcIndex] = struct{}{}
10✔
2056
                        }
10✔
2057
                }
2058
        }
2059

2060
        // With both sets constructed, we'll now compute the set difference of
2061
        // our two sets of HTLCs. This'll give us the HTLCs that exist on the
2062
        // remote commitment transaction, but not on ours.
2063
        for htlcIndex, htlc := range remoteHTLCs {
94✔
2064
                if _, ok := localHTLCs[htlcIndex]; ok {
26✔
2065
                        continue
6✔
2066
                }
2067

2068
                pendingRemoteHTLCs = append(pendingRemoteHTLCs, htlc)
18✔
2069
        }
2070

2071
        // Finally, we'll examine all the pending remote HTLCs for those that
2072
        // have expired. If we find any, then we'll recommend that they be
2073
        // failed now so we can free up the incoming HTLC.
2074
        for _, htlc := range pendingRemoteHTLCs {
92✔
2075
                // We'll now check if we need to go to chain in order to cancel
18✔
2076
                // the incoming HTLC.
18✔
2077
                goToChain := c.shouldGoOnChain(htlc, c.cfg.OutgoingBroadcastDelta,
18✔
2078
                        height,
18✔
2079
                )
18✔
2080

18✔
2081
                // If we don't need to go to chain, and no commitments have
18✔
2082
                // been confirmed, then we can move on. Otherwise, if
18✔
2083
                // commitments have been confirmed, then we need to cancel back
18✔
2084
                // *all* of the pending remote HTLCS.
18✔
2085
                if !goToChain && !commitsConfirmed {
27✔
2086
                        continue
9✔
2087
                }
2088

2089
                log.Infof("ChannelArbitrator(%v): fail dangling htlc=%x from "+
13✔
2090
                        "local/remote commitments diff",
13✔
2091
                        c.cfg.ChanPoint, htlc.RHash[:])
13✔
2092

13✔
2093
                actionMap[HtlcFailNowAction] = append(
13✔
2094
                        actionMap[HtlcFailNowAction], htlc,
13✔
2095
                )
13✔
2096
        }
2097

2098
        return actionMap
74✔
2099
}
2100

2101
// checkRemoteChainActions examines the two possible remote commitment chains
2102
// and returns the set of chain actions we need to carry out if the remote
2103
// commitment (non pending) confirms. The pendingConf indicates if the pending
2104
// remote commitment confirmed. This is similar to checkCommitChainActions, but
2105
// we'll immediately fail any HTLCs on the pending remote commit, but not the
2106
// remote commit (or the other way around).
2107
func (c *ChannelArbitrator) checkRemoteChainActions(
2108
        height uint32, trigger transitionTrigger,
2109
        activeHTLCs map[HtlcSetKey]htlcSet,
2110
        pendingConf bool) (ChainActionMap, error) {
10✔
2111

10✔
2112
        // First, we'll examine all the normal chain actions on the remote
10✔
2113
        // commitment that confirmed.
10✔
2114
        confHTLCs := activeHTLCs[RemoteHtlcSet]
10✔
2115
        if pendingConf {
12✔
2116
                confHTLCs = activeHTLCs[RemotePendingHtlcSet]
2✔
2117
        }
2✔
2118
        remoteCommitActions, err := c.checkCommitChainActions(
10✔
2119
                height, trigger, confHTLCs,
10✔
2120
        )
10✔
2121
        if err != nil {
10✔
2122
                return nil, err
×
2123
        }
×
2124

2125
        // With these actions computed, we'll now check the diff of the HTLCs on
2126
        // the commitments, and cancel back any that are on the pending but not
2127
        // the non-pending.
2128
        remoteDiffActions := c.checkRemoteDiffActions(
10✔
2129
                activeHTLCs, pendingConf,
10✔
2130
        )
10✔
2131

10✔
2132
        // Finally, we'll merge all the chain actions and the final set of
10✔
2133
        // chain actions.
10✔
2134
        remoteCommitActions.Merge(remoteDiffActions)
10✔
2135
        return remoteCommitActions, nil
10✔
2136
}
2137

2138
// checkRemoteDiffActions checks the set difference of the HTLCs on the remote
2139
// confirmed commit and remote dangling commit for HTLCS that we need to cancel
2140
// back. If we find any HTLCs on the remote pending but not the remote, then
2141
// we'll mark them to be failed immediately.
2142
func (c *ChannelArbitrator) checkRemoteDiffActions(
2143
        activeHTLCs map[HtlcSetKey]htlcSet,
2144
        pendingConf bool) ChainActionMap {
10✔
2145

10✔
2146
        // First, we'll partition the HTLCs into those that are present on the
10✔
2147
        // confirmed commitment, and those on the dangling commitment.
10✔
2148
        confHTLCs := activeHTLCs[RemoteHtlcSet]
10✔
2149
        danglingHTLCs := activeHTLCs[RemotePendingHtlcSet]
10✔
2150
        if pendingConf {
12✔
2151
                confHTLCs = activeHTLCs[RemotePendingHtlcSet]
2✔
2152
                danglingHTLCs = activeHTLCs[RemoteHtlcSet]
2✔
2153
        }
2✔
2154

2155
        // Next, we'll create a set of all the HTLCs confirmed commitment.
2156
        remoteHtlcs := make(map[uint64]struct{})
10✔
2157
        for _, htlc := range confHTLCs.outgoingHTLCs {
15✔
2158
                remoteHtlcs[htlc.HtlcIndex] = struct{}{}
5✔
2159
        }
5✔
2160

2161
        // With the remote HTLCs assembled, we'll mark any HTLCs only on the
2162
        // remote dangling commitment to be failed asap.
2163
        actionMap := make(ChainActionMap)
10✔
2164
        for _, htlc := range danglingHTLCs.outgoingHTLCs {
18✔
2165
                if _, ok := remoteHtlcs[htlc.HtlcIndex]; ok {
12✔
2166
                        continue
4✔
2167
                }
2168

2169
                preimageAvailable, err := c.isPreimageAvailable(htlc.RHash)
4✔
2170
                if err != nil {
4✔
2171
                        log.Errorf("ChannelArbitrator(%v): failed to query "+
×
2172
                                "preimage for dangling htlc=%x from remote "+
×
2173
                                "commitments diff", c.cfg.ChanPoint,
×
2174
                                htlc.RHash[:])
×
2175

×
2176
                        continue
×
2177
                }
2178

2179
                if preimageAvailable {
4✔
2180
                        continue
×
2181
                }
2182

2183
                actionMap[HtlcFailNowAction] = append(
4✔
2184
                        actionMap[HtlcFailNowAction], htlc,
4✔
2185
                )
4✔
2186

4✔
2187
                log.Infof("ChannelArbitrator(%v): fail dangling htlc=%x from "+
4✔
2188
                        "remote commitments diff",
4✔
2189
                        c.cfg.ChanPoint, htlc.RHash[:])
4✔
2190
        }
2191

2192
        return actionMap
10✔
2193
}
2194

2195
// constructChainActions returns the set of actions that should be taken for
2196
// confirmed HTLCs at the specified height. Our actions will depend on the set
2197
// of HTLCs that were active across all channels at the time of channel
2198
// closure.
2199
func (c *ChannelArbitrator) constructChainActions(confCommitSet *CommitSet,
2200
        height uint32, trigger transitionTrigger) (ChainActionMap, error) {
16✔
2201

16✔
2202
        // If we've reached this point and have not confirmed commitment set,
16✔
2203
        // then this is an older node that had a pending close channel before
16✔
2204
        // the CommitSet was introduced. In this case, we'll just return the
16✔
2205
        // existing ChainActionMap they had on disk.
16✔
2206
        if confCommitSet == nil {
16✔
2207
                return c.log.FetchChainActions()
×
2208
        }
×
2209

2210
        // Otherwise, we have the full commitment set written to disk, and can
2211
        // proceed as normal.
2212
        htlcSets := confCommitSet.toActiveHTLCSets()
16✔
2213
        switch *confCommitSet.ConfCommitKey {
16✔
2214

2215
        // If the local commitment transaction confirmed, then we'll examine
2216
        // that as well as their commitments to the set of chain actions.
2217
        case LocalHtlcSet:
10✔
2218
                return c.checkLocalChainActions(
10✔
2219
                        height, trigger, htlcSets, true,
10✔
2220
                )
10✔
2221

2222
        // If the remote commitment confirmed, then we'll grab all the chain
2223
        // actions for the remote commit, and check the pending commit for any
2224
        // HTLCS we need to handle immediately (dust).
2225
        case RemoteHtlcSet:
8✔
2226
                return c.checkRemoteChainActions(
8✔
2227
                        height, trigger, htlcSets, false,
8✔
2228
                )
8✔
2229

2230
        // Otherwise, the remote pending commitment confirmed, so we'll examine
2231
        // the HTLCs on that unrevoked dangling commitment.
2232
        case RemotePendingHtlcSet:
2✔
2233
                return c.checkRemoteChainActions(
2✔
2234
                        height, trigger, htlcSets, true,
2✔
2235
                )
2✔
2236
        }
2237

2238
        return nil, fmt.Errorf("unable to locate chain actions")
×
2239
}
2240

2241
// prepContractResolutions is called either in the case that we decide we need
2242
// to go to chain, or the remote party goes to chain. Given a set of actions we
2243
// need to take for each HTLC, this method will return a set of contract
2244
// resolvers that will resolve the contracts on-chain if needed, and also a set
2245
// of packets to send to the htlcswitch in order to ensure all incoming HTLC's
2246
// are properly resolved.
2247
func (c *ChannelArbitrator) prepContractResolutions(
2248
        contractResolutions *ContractResolutions, height uint32,
2249
        trigger transitionTrigger,
2250
        confCommitSet *CommitSet) ([]ContractResolver, []ResolutionMsg, error) {
16✔
2251

16✔
2252
        // First, we'll reconstruct a fresh set of chain actions as the set of
16✔
2253
        // actions we need to act on may differ based on if it was our
16✔
2254
        // commitment, or they're commitment that hit the chain.
16✔
2255
        htlcActions, err := c.constructChainActions(
16✔
2256
                confCommitSet, height, trigger,
16✔
2257
        )
16✔
2258
        if err != nil {
16✔
2259
                return nil, nil, err
×
2260
        }
×
2261

2262
        // We'll also fetch the historical state of this channel, as it should
2263
        // have been marked as closed by now, and supplement it to each resolver
2264
        // such that we can properly resolve our pending contracts.
2265
        var chanState *channeldb.OpenChannel
16✔
2266
        chanState, err = c.cfg.FetchHistoricalChannel()
16✔
2267
        switch {
16✔
2268
        // If we don't find this channel, then it may be the case that it
2269
        // was closed before we started to retain the final state
2270
        // information for open channels.
2271
        case err == channeldb.ErrNoHistoricalBucket:
×
2272
                fallthrough
×
2273
        case err == channeldb.ErrChannelNotFound:
×
2274
                log.Warnf("ChannelArbitrator(%v): unable to fetch historical "+
×
2275
                        "state", c.cfg.ChanPoint)
×
2276

2277
        case err != nil:
×
2278
                return nil, nil, err
×
2279
        }
2280

2281
        // There may be a class of HTLC's which we can fail back immediately,
2282
        // for those we'll prepare a slice of packets to add to our outbox. Any
2283
        // packets we need to send, will be cancels.
2284
        var (
16✔
2285
                msgsToSend []ResolutionMsg
16✔
2286
        )
16✔
2287

16✔
2288
        incomingResolutions := contractResolutions.HtlcResolutions.IncomingHTLCs
16✔
2289
        outgoingResolutions := contractResolutions.HtlcResolutions.OutgoingHTLCs
16✔
2290

16✔
2291
        // We'll use these two maps to quickly look up an active HTLC with its
16✔
2292
        // matching HTLC resolution.
16✔
2293
        outResolutionMap := make(map[wire.OutPoint]lnwallet.OutgoingHtlcResolution)
16✔
2294
        inResolutionMap := make(map[wire.OutPoint]lnwallet.IncomingHtlcResolution)
16✔
2295
        for i := 0; i < len(incomingResolutions); i++ {
20✔
2296
                inRes := incomingResolutions[i]
4✔
2297
                inResolutionMap[inRes.HtlcPoint()] = inRes
4✔
2298
        }
4✔
2299
        for i := 0; i < len(outgoingResolutions); i++ {
21✔
2300
                outRes := outgoingResolutions[i]
5✔
2301
                outResolutionMap[outRes.HtlcPoint()] = outRes
5✔
2302
        }
5✔
2303

2304
        // We'll create the resolver kit that we'll be cloning for each
2305
        // resolver so they each can do their duty.
2306
        resolverCfg := ResolverConfig{
16✔
2307
                ChannelArbitratorConfig: c.cfg,
16✔
2308
                Checkpoint: func(res ContractResolver,
16✔
2309
                        reports ...*channeldb.ResolverReport) error {
22✔
2310

6✔
2311
                        return c.log.InsertUnresolvedContracts(reports, res)
6✔
2312
                },
6✔
2313
        }
2314

2315
        commitHash := contractResolutions.CommitHash
16✔
2316
        failureMsg := &lnwire.FailPermanentChannelFailure{}
16✔
2317

16✔
2318
        var htlcResolvers []ContractResolver
16✔
2319

16✔
2320
        // We instantiate an anchor resolver if the commitment tx has an
16✔
2321
        // anchor.
16✔
2322
        if contractResolutions.AnchorResolution != nil {
22✔
2323
                anchorResolver := newAnchorResolver(
6✔
2324
                        contractResolutions.AnchorResolution.AnchorSignDescriptor,
6✔
2325
                        contractResolutions.AnchorResolution.CommitAnchor,
6✔
2326
                        height, c.cfg.ChanPoint, resolverCfg,
6✔
2327
                )
6✔
2328
                anchorResolver.SupplementState(chanState)
6✔
2329

6✔
2330
                htlcResolvers = append(htlcResolvers, anchorResolver)
6✔
2331
        }
6✔
2332

2333
        // If this is a breach close, we'll create a breach resolver, determine
2334
        // the htlc's to fail back, and exit. This is done because the other
2335
        // steps taken for non-breach-closes do not matter for breach-closes.
2336
        if contractResolutions.BreachResolution != nil {
22✔
2337
                breachResolver := newBreachResolver(resolverCfg)
6✔
2338
                htlcResolvers = append(htlcResolvers, breachResolver)
6✔
2339

6✔
2340
                // We'll use the CommitSet, we'll fail back all outgoing HTLC's
6✔
2341
                // that exist on either of the remote commitments. The map is
6✔
2342
                // used to deduplicate any shared htlc's.
6✔
2343
                remoteOutgoing := make(map[uint64]channeldb.HTLC)
6✔
2344
                for htlcSetKey, htlcs := range confCommitSet.HtlcSets {
12✔
2345
                        if !htlcSetKey.IsRemote {
10✔
2346
                                continue
4✔
2347
                        }
2348

2349
                        for _, htlc := range htlcs {
12✔
2350
                                if htlc.Incoming {
11✔
2351
                                        continue
5✔
2352
                                }
2353

2354
                                remoteOutgoing[htlc.HtlcIndex] = htlc
5✔
2355
                        }
2356
                }
2357

2358
                // Now we'll loop over the map and create ResolutionMsgs for
2359
                // each of them.
2360
                for _, htlc := range remoteOutgoing {
11✔
2361
                        failMsg := ResolutionMsg{
5✔
2362
                                SourceChan: c.cfg.ShortChanID,
5✔
2363
                                HtlcIndex:  htlc.HtlcIndex,
5✔
2364
                                Failure:    failureMsg,
5✔
2365
                        }
5✔
2366

5✔
2367
                        msgsToSend = append(msgsToSend, failMsg)
5✔
2368
                }
5✔
2369

2370
                return htlcResolvers, msgsToSend, nil
6✔
2371
        }
2372

2373
        // For each HTLC, we'll either act immediately, meaning we'll instantly
2374
        // fail the HTLC, or we'll act only once the transaction has been
2375
        // confirmed, in which case we'll need an HTLC resolver.
2376
        for htlcAction, htlcs := range htlcActions {
29✔
2377
                switch htlcAction {
15✔
2378

2379
                // If we can fail an HTLC immediately (an outgoing HTLC with no
2380
                // contract), then we'll assemble an HTLC fail packet to send.
2381
                case HtlcFailNowAction:
13✔
2382
                        for _, htlc := range htlcs {
26✔
2383
                                failMsg := ResolutionMsg{
13✔
2384
                                        SourceChan: c.cfg.ShortChanID,
13✔
2385
                                        HtlcIndex:  htlc.HtlcIndex,
13✔
2386
                                        Failure:    failureMsg,
13✔
2387
                                }
13✔
2388

13✔
2389
                                msgsToSend = append(msgsToSend, failMsg)
13✔
2390
                        }
13✔
2391

2392
                // If we can claim this HTLC, we'll create an HTLC resolver to
2393
                // claim the HTLC (second-level or directly), then add the pre
2394
                case HtlcClaimAction:
×
2395
                        for _, htlc := range htlcs {
×
2396
                                htlc := htlc
×
2397

×
2398
                                htlcOp := wire.OutPoint{
×
2399
                                        Hash:  commitHash,
×
2400
                                        Index: uint32(htlc.OutputIndex),
×
2401
                                }
×
2402

×
2403
                                resolution, ok := inResolutionMap[htlcOp]
×
2404
                                if !ok {
×
2405
                                        // TODO(roasbeef): panic?
×
2406
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2407
                                                "incoming resolution: %v",
×
2408
                                                c.cfg.ChanPoint, htlcOp)
×
2409
                                        continue
×
2410
                                }
2411

2412
                                resolver := newSuccessResolver(
×
2413
                                        resolution, height, htlc, resolverCfg,
×
2414
                                )
×
2415
                                if chanState != nil {
×
2416
                                        resolver.SupplementState(chanState)
×
2417
                                }
×
2418
                                htlcResolvers = append(htlcResolvers, resolver)
×
2419
                        }
2420

2421
                // If we can timeout the HTLC directly, then we'll create the
2422
                // proper resolver to do so, who will then cancel the packet
2423
                // backwards.
2424
                case HtlcTimeoutAction:
4✔
2425
                        for _, htlc := range htlcs {
8✔
2426
                                htlc := htlc
4✔
2427

4✔
2428
                                htlcOp := wire.OutPoint{
4✔
2429
                                        Hash:  commitHash,
4✔
2430
                                        Index: uint32(htlc.OutputIndex),
4✔
2431
                                }
4✔
2432

4✔
2433
                                resolution, ok := outResolutionMap[htlcOp]
4✔
2434
                                if !ok {
4✔
2435
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2436
                                                "outgoing resolution: %v", c.cfg.ChanPoint, htlcOp)
×
2437
                                        continue
×
2438
                                }
2439

2440
                                resolver := newTimeoutResolver(
4✔
2441
                                        resolution, height, htlc, resolverCfg,
4✔
2442
                                )
4✔
2443
                                if chanState != nil {
8✔
2444
                                        resolver.SupplementState(chanState)
4✔
2445
                                }
4✔
2446

2447
                                // For outgoing HTLCs, we will also need to
2448
                                // supplement the resolver with the expiry
2449
                                // block height of its corresponding incoming
2450
                                // HTLC.
2451
                                deadline := c.cfg.FindOutgoingHTLCDeadline(htlc)
4✔
2452
                                resolver.SupplementDeadline(deadline)
4✔
2453

4✔
2454
                                htlcResolvers = append(htlcResolvers, resolver)
4✔
2455
                        }
2456

2457
                // If this is an incoming HTLC, but we can't act yet, then
2458
                // we'll create an incoming resolver to redeem the HTLC if we
2459
                // learn of the pre-image, or let the remote party time out.
2460
                case HtlcIncomingWatchAction:
4✔
2461
                        for _, htlc := range htlcs {
8✔
2462
                                htlc := htlc
4✔
2463

4✔
2464
                                htlcOp := wire.OutPoint{
4✔
2465
                                        Hash:  commitHash,
4✔
2466
                                        Index: uint32(htlc.OutputIndex),
4✔
2467
                                }
4✔
2468

4✔
2469
                                // TODO(roasbeef): need to handle incoming dust...
4✔
2470

4✔
2471
                                // TODO(roasbeef): can't be negative!!!
4✔
2472
                                resolution, ok := inResolutionMap[htlcOp]
4✔
2473
                                if !ok {
4✔
2474
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2475
                                                "incoming resolution: %v",
×
2476
                                                c.cfg.ChanPoint, htlcOp)
×
2477
                                        continue
×
2478
                                }
2479

2480
                                resolver := newIncomingContestResolver(
4✔
2481
                                        resolution, height, htlc,
4✔
2482
                                        resolverCfg,
4✔
2483
                                )
4✔
2484
                                if chanState != nil {
8✔
2485
                                        resolver.SupplementState(chanState)
4✔
2486
                                }
4✔
2487
                                htlcResolvers = append(htlcResolvers, resolver)
4✔
2488
                        }
2489

2490
                // We've lost an htlc because it isn't manifested on the
2491
                // commitment transaction that closed the channel.
2492
                case HtlcIncomingDustFinalAction:
5✔
2493
                        for _, htlc := range htlcs {
10✔
2494
                                htlc := htlc
5✔
2495

5✔
2496
                                key := models.CircuitKey{
5✔
2497
                                        ChanID: c.cfg.ShortChanID,
5✔
2498
                                        HtlcID: htlc.HtlcIndex,
5✔
2499
                                }
5✔
2500

5✔
2501
                                // Mark this dust htlc as final failed.
5✔
2502
                                chainArbCfg := c.cfg.ChainArbitratorConfig
5✔
2503
                                err := chainArbCfg.PutFinalHtlcOutcome(
5✔
2504
                                        key.ChanID, key.HtlcID, false,
5✔
2505
                                )
5✔
2506
                                if err != nil {
5✔
2507
                                        return nil, nil, err
×
2508
                                }
×
2509

2510
                                // Send notification.
2511
                                chainArbCfg.HtlcNotifier.NotifyFinalHtlcEvent(
5✔
2512
                                        key,
5✔
2513
                                        channeldb.FinalHtlcInfo{
5✔
2514
                                                Settled:  false,
5✔
2515
                                                Offchain: false,
5✔
2516
                                        },
5✔
2517
                                )
5✔
2518
                        }
2519

2520
                // Finally, if this is an outgoing HTLC we've sent, then we'll
2521
                // launch a resolver to watch for the pre-image (and settle
2522
                // backwards), or just timeout.
2523
                case HtlcOutgoingWatchAction:
5✔
2524
                        for _, htlc := range htlcs {
10✔
2525
                                htlc := htlc
5✔
2526

5✔
2527
                                htlcOp := wire.OutPoint{
5✔
2528
                                        Hash:  commitHash,
5✔
2529
                                        Index: uint32(htlc.OutputIndex),
5✔
2530
                                }
5✔
2531

5✔
2532
                                resolution, ok := outResolutionMap[htlcOp]
5✔
2533
                                if !ok {
5✔
2534
                                        log.Errorf("ChannelArbitrator(%v) unable to find "+
×
2535
                                                "outgoing resolution: %v",
×
2536
                                                c.cfg.ChanPoint, htlcOp)
×
2537
                                        continue
×
2538
                                }
2539

2540
                                resolver := newOutgoingContestResolver(
5✔
2541
                                        resolution, height, htlc, resolverCfg,
5✔
2542
                                )
5✔
2543
                                if chanState != nil {
10✔
2544
                                        resolver.SupplementState(chanState)
5✔
2545
                                }
5✔
2546

2547
                                // For outgoing HTLCs, we will also need to
2548
                                // supplement the resolver with the expiry
2549
                                // block height of its corresponding incoming
2550
                                // HTLC.
2551
                                deadline := c.cfg.FindOutgoingHTLCDeadline(htlc)
5✔
2552
                                resolver.SupplementDeadline(deadline)
5✔
2553

5✔
2554
                                htlcResolvers = append(htlcResolvers, resolver)
5✔
2555
                        }
2556
                }
2557
        }
2558

2559
        // If this is was an unilateral closure, then we'll also create a
2560
        // resolver to sweep our commitment output (but only if it wasn't
2561
        // trimmed).
2562
        if contractResolutions.CommitResolution != nil {
18✔
2563
                resolver := newCommitSweepResolver(
4✔
2564
                        *contractResolutions.CommitResolution, height,
4✔
2565
                        c.cfg.ChanPoint, resolverCfg,
4✔
2566
                )
4✔
2567
                if chanState != nil {
8✔
2568
                        resolver.SupplementState(chanState)
4✔
2569
                }
4✔
2570
                htlcResolvers = append(htlcResolvers, resolver)
4✔
2571
        }
2572

2573
        return htlcResolvers, msgsToSend, nil
14✔
2574
}
2575

2576
// replaceResolver replaces a in the list of active resolvers. If the resolver
2577
// to be replaced is not found, it returns an error.
2578
func (c *ChannelArbitrator) replaceResolver(oldResolver,
2579
        newResolver ContractResolver) error {
5✔
2580

5✔
2581
        c.activeResolversLock.Lock()
5✔
2582
        defer c.activeResolversLock.Unlock()
5✔
2583

5✔
2584
        oldKey := oldResolver.ResolverKey()
5✔
2585
        for i, r := range c.activeResolvers {
10✔
2586
                if bytes.Equal(r.ResolverKey(), oldKey) {
10✔
2587
                        c.activeResolvers[i] = newResolver
5✔
2588
                        return nil
5✔
2589
                }
5✔
2590
        }
2591

2592
        return errors.New("resolver to be replaced not found")
×
2593
}
2594

2595
// resolveContract is a goroutine tasked with fully resolving an unresolved
2596
// contract. Either the initial contract will be resolved after a single step,
2597
// or the contract will itself create another contract to be resolved. In
2598
// either case, one the contract has been fully resolved, we'll signal back to
2599
// the main goroutine so it can properly keep track of the set of unresolved
2600
// contracts.
2601
//
2602
// NOTE: This MUST be run as a goroutine.
2603
func (c *ChannelArbitrator) resolveContract(currentContract ContractResolver,
2604
        immediate bool) {
10✔
2605

10✔
2606
        defer c.wg.Done()
10✔
2607

10✔
2608
        log.Debugf("ChannelArbitrator(%v): attempting to resolve %T",
10✔
2609
                c.cfg.ChanPoint, currentContract)
10✔
2610

10✔
2611
        // Until the contract is fully resolved, we'll continue to iteratively
10✔
2612
        // resolve the contract one step at a time.
10✔
2613
        for !currentContract.IsResolved() {
21✔
2614
                log.Debugf("ChannelArbitrator(%v): contract %T not yet resolved",
11✔
2615
                        c.cfg.ChanPoint, currentContract)
11✔
2616

11✔
2617
                select {
11✔
2618

2619
                // If we've been signalled to quit, then we'll exit early.
2620
                case <-c.quit:
×
2621
                        return
×
2622

2623
                default:
11✔
2624
                        // Otherwise, we'll attempt to resolve the current
11✔
2625
                        // contract.
11✔
2626
                        nextContract, err := currentContract.Resolve(immediate)
11✔
2627
                        if err != nil {
16✔
2628
                                if err == errResolverShuttingDown {
9✔
2629
                                        return
4✔
2630
                                }
4✔
2631

2632
                                log.Errorf("ChannelArbitrator(%v): unable to "+
5✔
2633
                                        "progress %T: %v",
5✔
2634
                                        c.cfg.ChanPoint, currentContract, err)
5✔
2635
                                return
5✔
2636
                        }
2637

2638
                        switch {
10✔
2639
                        // If this contract produced another, then this means
2640
                        // the current contract was only able to be partially
2641
                        // resolved in this step. So we'll do a contract swap
2642
                        // within our logs: the new contract will take the
2643
                        // place of the old one.
2644
                        case nextContract != nil:
5✔
2645
                                log.Debugf("ChannelArbitrator(%v): swapping "+
5✔
2646
                                        "out contract %T for %T ",
5✔
2647
                                        c.cfg.ChanPoint, currentContract,
5✔
2648
                                        nextContract)
5✔
2649

5✔
2650
                                // Swap contract in log.
5✔
2651
                                err := c.log.SwapContract(
5✔
2652
                                        currentContract, nextContract,
5✔
2653
                                )
5✔
2654
                                if err != nil {
5✔
2655
                                        log.Errorf("unable to add recurse "+
×
2656
                                                "contract: %v", err)
×
2657
                                }
×
2658

2659
                                // Swap contract in resolvers list. This is to
2660
                                // make sure that reports are queried from the
2661
                                // new resolver.
2662
                                err = c.replaceResolver(
5✔
2663
                                        currentContract, nextContract,
5✔
2664
                                )
5✔
2665
                                if err != nil {
5✔
2666
                                        log.Errorf("unable to replace "+
×
2667
                                                "contract: %v", err)
×
2668
                                }
×
2669

2670
                                // As this contract produced another, we'll
2671
                                // re-assign, so we can continue our resolution
2672
                                // loop.
2673
                                currentContract = nextContract
5✔
2674

2675
                        // If this contract is actually fully resolved, then
2676
                        // we'll mark it as such within the database.
2677
                        case currentContract.IsResolved():
9✔
2678
                                log.Debugf("ChannelArbitrator(%v): marking "+
9✔
2679
                                        "contract %T fully resolved",
9✔
2680
                                        c.cfg.ChanPoint, currentContract)
9✔
2681

9✔
2682
                                err := c.log.ResolveContract(currentContract)
9✔
2683
                                if err != nil {
9✔
2684
                                        log.Errorf("unable to resolve contract: %v",
×
2685
                                                err)
×
2686
                                }
×
2687

2688
                                // Now that the contract has been resolved,
2689
                                // well signal to the main goroutine.
2690
                                select {
9✔
2691
                                case c.resolutionSignal <- struct{}{}:
8✔
2692
                                case <-c.quit:
5✔
2693
                                        return
5✔
2694
                                }
2695
                        }
2696

2697
                }
2698
        }
2699
}
2700

2701
// signalUpdateMsg is a struct that carries fresh signals to the
2702
// ChannelArbitrator. We need to receive a message like this each time the
2703
// channel becomes active, as it's internal state may change.
2704
type signalUpdateMsg struct {
2705
        // newSignals is the set of new active signals to be sent to the
2706
        // arbitrator.
2707
        newSignals *ContractSignals
2708

2709
        // doneChan is a channel that will be closed on the arbitrator has
2710
        // attached the new signals.
2711
        doneChan chan struct{}
2712
}
2713

2714
// UpdateContractSignals updates the set of signals the ChannelArbitrator needs
2715
// to receive from a channel in real-time in order to keep in sync with the
2716
// latest state of the contract.
2717
func (c *ChannelArbitrator) UpdateContractSignals(newSignals *ContractSignals) {
15✔
2718
        done := make(chan struct{})
15✔
2719

15✔
2720
        select {
15✔
2721
        case c.signalUpdates <- &signalUpdateMsg{
2722
                newSignals: newSignals,
2723
                doneChan:   done,
2724
        }:
15✔
2725
        case <-c.quit:
×
2726
        }
2727

2728
        select {
15✔
2729
        case <-done:
15✔
2730
        case <-c.quit:
×
2731
        }
2732
}
2733

2734
// notifyContractUpdate updates the ChannelArbitrator's unmerged mappings such
2735
// that it can later be merged with activeHTLCs when calling
2736
// checkLocalChainActions or sweepAnchors. These are the only two places that
2737
// activeHTLCs is used.
2738
func (c *ChannelArbitrator) notifyContractUpdate(upd *ContractUpdate) {
16✔
2739
        c.unmergedMtx.Lock()
16✔
2740
        defer c.unmergedMtx.Unlock()
16✔
2741

16✔
2742
        // Update the mapping.
16✔
2743
        c.unmergedSet[upd.HtlcKey] = newHtlcSet(upd.Htlcs)
16✔
2744

16✔
2745
        log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v",
16✔
2746
                c.cfg.ChanPoint, lnutils.SpewLogClosure(upd))
16✔
2747
}
16✔
2748

2749
// updateActiveHTLCs merges the unmerged set of HTLCs from the link with
2750
// activeHTLCs.
2751
func (c *ChannelArbitrator) updateActiveHTLCs() {
77✔
2752
        c.unmergedMtx.RLock()
77✔
2753
        defer c.unmergedMtx.RUnlock()
77✔
2754

77✔
2755
        // Update the mapping.
77✔
2756
        c.activeHTLCs[LocalHtlcSet] = c.unmergedSet[LocalHtlcSet]
77✔
2757
        c.activeHTLCs[RemoteHtlcSet] = c.unmergedSet[RemoteHtlcSet]
77✔
2758

77✔
2759
        // If the pending set exists, update that as well.
77✔
2760
        if _, ok := c.unmergedSet[RemotePendingHtlcSet]; ok {
90✔
2761
                pendingSet := c.unmergedSet[RemotePendingHtlcSet]
13✔
2762
                c.activeHTLCs[RemotePendingHtlcSet] = pendingSet
13✔
2763
        }
13✔
2764
}
2765

2766
// channelAttendant is the primary goroutine that acts at the judicial
2767
// arbitrator between our channel state, the remote channel peer, and the
2768
// blockchain (Our judge). This goroutine will ensure that we faithfully execute
2769
// all clauses of our contract in the case that we need to go on-chain for a
2770
// dispute. Currently, two such conditions warrant our intervention: when an
2771
// outgoing HTLC is about to timeout, and when we know the pre-image for an
2772
// incoming HTLC, but it hasn't yet been settled off-chain. In these cases,
2773
// we'll: broadcast our commitment, cancel/settle any HTLC's backwards after
2774
// sufficient confirmation, and finally send our set of outputs to the UTXO
2775
// Nursery for incubation, and ultimate sweeping.
2776
//
2777
// NOTE: This MUST be run as a goroutine.
2778
func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
51✔
2779

51✔
2780
        // TODO(roasbeef): tell top chain arb we're done
51✔
2781
        defer func() {
99✔
2782
                c.wg.Done()
48✔
2783
        }()
48✔
2784

2785
        for {
160✔
2786
                select {
109✔
2787

2788
                // A new block has arrived, we'll examine all the active HTLC's
2789
                // to see if any of them have expired, and also update our
2790
                // track of the best current height.
2791
                case blockHeight, ok := <-c.blocks:
19✔
2792
                        if !ok {
23✔
2793
                                return
4✔
2794
                        }
4✔
2795
                        bestHeight = blockHeight
19✔
2796

19✔
2797
                        // If we're not in the default state, then we can
19✔
2798
                        // ignore this signal as we're waiting for contract
19✔
2799
                        // resolution.
19✔
2800
                        if c.state != StateDefault {
32✔
2801
                                continue
13✔
2802
                        }
2803

2804
                        // Now that a new block has arrived, we'll attempt to
2805
                        // advance our state forward.
2806
                        nextState, _, err := c.advanceState(
10✔
2807
                                uint32(bestHeight), chainTrigger, nil,
10✔
2808
                        )
10✔
2809
                        if err != nil {
10✔
2810
                                log.Errorf("Unable to advance state: %v", err)
×
2811
                        }
×
2812

2813
                        // If as a result of this trigger, the contract is
2814
                        // fully resolved, then well exit.
2815
                        if nextState == StateFullyResolved {
10✔
2816
                                return
×
2817
                        }
×
2818

2819
                // A new signal update was just sent. This indicates that the
2820
                // channel under watch is now live, and may modify its internal
2821
                // state, so we'll get the most up to date signals to we can
2822
                // properly do our job.
2823
                case signalUpdate := <-c.signalUpdates:
15✔
2824
                        log.Tracef("ChannelArbitrator(%v): got new signal "+
15✔
2825
                                "update!", c.cfg.ChanPoint)
15✔
2826

15✔
2827
                        // We'll update the ShortChannelID.
15✔
2828
                        c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID
15✔
2829

15✔
2830
                        // Now that the signal has been updated, we'll now
15✔
2831
                        // close the done channel to signal to the caller we've
15✔
2832
                        // registered the new ShortChannelID.
15✔
2833
                        close(signalUpdate.doneChan)
15✔
2834

2835
                // We've cooperatively closed the channel, so we're no longer
2836
                // needed. We'll mark the channel as resolved and exit.
2837
                case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
6✔
2838
                        log.Infof("ChannelArbitrator(%v) marking channel "+
6✔
2839
                                "cooperatively closed", c.cfg.ChanPoint)
6✔
2840

6✔
2841
                        err := c.cfg.MarkChannelClosed(
6✔
2842
                                closeInfo.ChannelCloseSummary,
6✔
2843
                                channeldb.ChanStatusCoopBroadcasted,
6✔
2844
                        )
6✔
2845
                        if err != nil {
6✔
2846
                                log.Errorf("Unable to mark channel closed: "+
×
2847
                                        "%v", err)
×
2848
                                return
×
2849
                        }
×
2850

2851
                        // We'll now advance our state machine until it reaches
2852
                        // a terminal state, and the channel is marked resolved.
2853
                        _, _, err = c.advanceState(
6✔
2854
                                closeInfo.CloseHeight, coopCloseTrigger, nil,
6✔
2855
                        )
6✔
2856
                        if err != nil {
7✔
2857
                                log.Errorf("Unable to advance state: %v", err)
1✔
2858
                                return
1✔
2859
                        }
1✔
2860

2861
                // We have broadcasted our commitment, and it is now confirmed
2862
                // on-chain.
2863
                case closeInfo := <-c.cfg.ChainEvents.LocalUnilateralClosure:
16✔
2864
                        log.Infof("ChannelArbitrator(%v): local on-chain "+
16✔
2865
                                "channel close", c.cfg.ChanPoint)
16✔
2866

16✔
2867
                        if c.state != StateCommitmentBroadcasted {
21✔
2868
                                log.Errorf("ChannelArbitrator(%v): unexpected "+
5✔
2869
                                        "local on-chain channel close",
5✔
2870
                                        c.cfg.ChanPoint)
5✔
2871
                        }
5✔
2872
                        closeTx := closeInfo.CloseTx
16✔
2873

16✔
2874
                        contractRes := &ContractResolutions{
16✔
2875
                                CommitHash:       closeTx.TxHash(),
16✔
2876
                                CommitResolution: closeInfo.CommitResolution,
16✔
2877
                                HtlcResolutions:  *closeInfo.HtlcResolutions,
16✔
2878
                                AnchorResolution: closeInfo.AnchorResolution,
16✔
2879
                        }
16✔
2880

16✔
2881
                        // When processing a unilateral close event, we'll
16✔
2882
                        // transition to the ContractClosed state. We'll log
16✔
2883
                        // out the set of resolutions such that they are
16✔
2884
                        // available to fetch in that state, we'll also write
16✔
2885
                        // the commit set so we can reconstruct our chain
16✔
2886
                        // actions on restart.
16✔
2887
                        err := c.log.LogContractResolutions(contractRes)
16✔
2888
                        if err != nil {
16✔
2889
                                log.Errorf("Unable to write resolutions: %v",
×
2890
                                        err)
×
2891
                                return
×
2892
                        }
×
2893
                        err = c.log.InsertConfirmedCommitSet(
16✔
2894
                                &closeInfo.CommitSet,
16✔
2895
                        )
16✔
2896
                        if err != nil {
16✔
2897
                                log.Errorf("Unable to write commit set: %v",
×
2898
                                        err)
×
2899
                                return
×
2900
                        }
×
2901

2902
                        // After the set of resolutions are successfully
2903
                        // logged, we can safely close the channel. After this
2904
                        // succeeds we won't be getting chain events anymore,
2905
                        // so we must make sure we can recover on restart after
2906
                        // it is marked closed. If the next state transition
2907
                        // fails, we'll start up in the prior state again, and
2908
                        // we won't be longer getting chain events. In this
2909
                        // case we must manually re-trigger the state
2910
                        // transition into StateContractClosed based on the
2911
                        // close status of the channel.
2912
                        err = c.cfg.MarkChannelClosed(
16✔
2913
                                closeInfo.ChannelCloseSummary,
16✔
2914
                                channeldb.ChanStatusLocalCloseInitiator,
16✔
2915
                        )
16✔
2916
                        if err != nil {
16✔
2917
                                log.Errorf("Unable to mark "+
×
2918
                                        "channel closed: %v", err)
×
2919
                                return
×
2920
                        }
×
2921

2922
                        // We'll now advance our state machine until it reaches
2923
                        // a terminal state.
2924
                        _, _, err = c.advanceState(
16✔
2925
                                uint32(closeInfo.SpendingHeight),
16✔
2926
                                localCloseTrigger, &closeInfo.CommitSet,
16✔
2927
                        )
16✔
2928
                        if err != nil {
17✔
2929
                                log.Errorf("Unable to advance state: %v", err)
1✔
2930
                        }
1✔
2931

2932
                // The remote party has broadcast the commitment on-chain.
2933
                // We'll examine our state to determine if we need to act at
2934
                // all.
2935
                case uniClosure := <-c.cfg.ChainEvents.RemoteUnilateralClosure:
12✔
2936
                        log.Infof("ChannelArbitrator(%v): remote party has "+
12✔
2937
                                "closed channel out on-chain", c.cfg.ChanPoint)
12✔
2938

12✔
2939
                        // If we don't have a self output, and there are no
12✔
2940
                        // active HTLC's, then we can immediately mark the
12✔
2941
                        // contract as fully resolved and exit.
12✔
2942
                        contractRes := &ContractResolutions{
12✔
2943
                                CommitHash:       *uniClosure.SpenderTxHash,
12✔
2944
                                CommitResolution: uniClosure.CommitResolution,
12✔
2945
                                HtlcResolutions:  *uniClosure.HtlcResolutions,
12✔
2946
                                AnchorResolution: uniClosure.AnchorResolution,
12✔
2947
                        }
12✔
2948

12✔
2949
                        // When processing a unilateral close event, we'll
12✔
2950
                        // transition to the ContractClosed state. We'll log
12✔
2951
                        // out the set of resolutions such that they are
12✔
2952
                        // available to fetch in that state, we'll also write
12✔
2953
                        // the commit set so we can reconstruct our chain
12✔
2954
                        // actions on restart.
12✔
2955
                        err := c.log.LogContractResolutions(contractRes)
12✔
2956
                        if err != nil {
13✔
2957
                                log.Errorf("Unable to write resolutions: %v",
1✔
2958
                                        err)
1✔
2959
                                return
1✔
2960
                        }
1✔
2961
                        err = c.log.InsertConfirmedCommitSet(
11✔
2962
                                &uniClosure.CommitSet,
11✔
2963
                        )
11✔
2964
                        if err != nil {
11✔
2965
                                log.Errorf("Unable to write commit set: %v",
×
2966
                                        err)
×
2967
                                return
×
2968
                        }
×
2969

2970
                        // After the set of resolutions are successfully
2971
                        // logged, we can safely close the channel. After this
2972
                        // succeeds we won't be getting chain events anymore,
2973
                        // so we must make sure we can recover on restart after
2974
                        // it is marked closed. If the next state transition
2975
                        // fails, we'll start up in the prior state again, and
2976
                        // we won't be longer getting chain events. In this
2977
                        // case we must manually re-trigger the state
2978
                        // transition into StateContractClosed based on the
2979
                        // close status of the channel.
2980
                        closeSummary := &uniClosure.ChannelCloseSummary
11✔
2981
                        err = c.cfg.MarkChannelClosed(
11✔
2982
                                closeSummary,
11✔
2983
                                channeldb.ChanStatusRemoteCloseInitiator,
11✔
2984
                        )
11✔
2985
                        if err != nil {
12✔
2986
                                log.Errorf("Unable to mark channel closed: %v",
1✔
2987
                                        err)
1✔
2988
                                return
1✔
2989
                        }
1✔
2990

2991
                        // We'll now advance our state machine until it reaches
2992
                        // a terminal state.
2993
                        _, _, err = c.advanceState(
10✔
2994
                                uint32(uniClosure.SpendingHeight),
10✔
2995
                                remoteCloseTrigger, &uniClosure.CommitSet,
10✔
2996
                        )
10✔
2997
                        if err != nil {
12✔
2998
                                log.Errorf("Unable to advance state: %v", err)
2✔
2999
                        }
2✔
3000

3001
                // The remote has breached the channel. As this is handled by
3002
                // the ChainWatcher and BreachArbitrator, we don't have to do
3003
                // anything in particular, so just advance our state and
3004
                // gracefully exit.
3005
                case breachInfo := <-c.cfg.ChainEvents.ContractBreach:
5✔
3006
                        log.Infof("ChannelArbitrator(%v): remote party has "+
5✔
3007
                                "breached channel!", c.cfg.ChanPoint)
5✔
3008

5✔
3009
                        // In the breach case, we'll only have anchor and
5✔
3010
                        // breach resolutions.
5✔
3011
                        contractRes := &ContractResolutions{
5✔
3012
                                CommitHash:       breachInfo.CommitHash,
5✔
3013
                                BreachResolution: breachInfo.BreachResolution,
5✔
3014
                                AnchorResolution: breachInfo.AnchorResolution,
5✔
3015
                        }
5✔
3016

5✔
3017
                        // We'll transition to the ContractClosed state and log
5✔
3018
                        // the set of resolutions such that they can be turned
5✔
3019
                        // into resolvers later on. We'll also insert the
5✔
3020
                        // CommitSet of the latest set of commitments.
5✔
3021
                        err := c.log.LogContractResolutions(contractRes)
5✔
3022
                        if err != nil {
5✔
3023
                                log.Errorf("Unable to write resolutions: %v",
×
3024
                                        err)
×
3025
                                return
×
3026
                        }
×
3027
                        err = c.log.InsertConfirmedCommitSet(
5✔
3028
                                &breachInfo.CommitSet,
5✔
3029
                        )
5✔
3030
                        if err != nil {
5✔
3031
                                log.Errorf("Unable to write commit set: %v",
×
3032
                                        err)
×
3033
                                return
×
3034
                        }
×
3035

3036
                        // The channel is finally marked pending closed here as
3037
                        // the BreachArbitrator and channel arbitrator have
3038
                        // persisted the relevant states.
3039
                        closeSummary := &breachInfo.CloseSummary
5✔
3040
                        err = c.cfg.MarkChannelClosed(
5✔
3041
                                closeSummary,
5✔
3042
                                channeldb.ChanStatusRemoteCloseInitiator,
5✔
3043
                        )
5✔
3044
                        if err != nil {
5✔
3045
                                log.Errorf("Unable to mark channel closed: %v",
×
3046
                                        err)
×
3047
                                return
×
3048
                        }
×
3049

3050
                        log.Infof("Breached channel=%v marked pending-closed",
5✔
3051
                                breachInfo.BreachResolution.FundingOutPoint)
5✔
3052

5✔
3053
                        // We'll advance our state machine until it reaches a
5✔
3054
                        // terminal state.
5✔
3055
                        _, _, err = c.advanceState(
5✔
3056
                                uint32(bestHeight), breachCloseTrigger,
5✔
3057
                                &breachInfo.CommitSet,
5✔
3058
                        )
5✔
3059
                        if err != nil {
5✔
3060
                                log.Errorf("Unable to advance state: %v", err)
×
3061
                        }
×
3062

3063
                // A new contract has just been resolved, we'll now check our
3064
                // log to see if all contracts have been resolved. If so, then
3065
                // we can exit as the contract is fully resolved.
3066
                case <-c.resolutionSignal:
8✔
3067
                        log.Infof("ChannelArbitrator(%v): a contract has been "+
8✔
3068
                                "fully resolved!", c.cfg.ChanPoint)
8✔
3069

8✔
3070
                        nextState, _, err := c.advanceState(
8✔
3071
                                uint32(bestHeight), chainTrigger, nil,
8✔
3072
                        )
8✔
3073
                        if err != nil {
8✔
3074
                                log.Errorf("Unable to advance state: %v", err)
×
3075
                        }
×
3076

3077
                        // If we don't have anything further to do after
3078
                        // advancing our state, then we'll exit.
3079
                        if nextState == StateFullyResolved {
11✔
3080
                                log.Infof("ChannelArbitrator(%v): all "+
3✔
3081
                                        "contracts fully resolved, exiting",
3✔
3082
                                        c.cfg.ChanPoint)
3✔
3083

3✔
3084
                                return
3✔
3085
                        }
3✔
3086

3087
                // We've just received a request to forcibly close out the
3088
                // channel. We'll
3089
                case closeReq := <-c.forceCloseReqs:
15✔
3090
                        log.Infof("ChannelArbitrator(%v): received force "+
15✔
3091
                                "close request", c.cfg.ChanPoint)
15✔
3092

15✔
3093
                        if c.state != StateDefault {
19✔
3094
                                select {
4✔
3095
                                case closeReq.closeTx <- nil:
4✔
3096
                                case <-c.quit:
×
3097
                                }
3098

3099
                                select {
4✔
3100
                                case closeReq.errResp <- errAlreadyForceClosed:
4✔
3101
                                case <-c.quit:
×
3102
                                }
3103

3104
                                continue
4✔
3105
                        }
3106

3107
                        nextState, closeTx, err := c.advanceState(
14✔
3108
                                uint32(bestHeight), userTrigger, nil,
14✔
3109
                        )
14✔
3110
                        if err != nil {
19✔
3111
                                log.Errorf("Unable to advance state: %v", err)
5✔
3112
                        }
5✔
3113

3114
                        select {
14✔
3115
                        case closeReq.closeTx <- closeTx:
14✔
3116
                        case <-c.quit:
×
3117
                                return
×
3118
                        }
3119

3120
                        select {
14✔
3121
                        case closeReq.errResp <- err:
14✔
3122
                        case <-c.quit:
×
3123
                                return
×
3124
                        }
3125

3126
                        // If we don't have anything further to do after
3127
                        // advancing our state, then we'll exit.
3128
                        if nextState == StateFullyResolved {
14✔
3129
                                log.Infof("ChannelArbitrator(%v): all "+
×
3130
                                        "contracts resolved, exiting",
×
3131
                                        c.cfg.ChanPoint)
×
3132
                                return
×
3133
                        }
×
3134

3135
                case <-c.quit:
42✔
3136
                        return
42✔
3137
                }
3138
        }
3139
}
3140

3141
// checkLegacyBreach returns StateFullyResolved if the channel was closed with
3142
// a breach transaction before the channel arbitrator launched its own breach
3143
// resolver. StateContractClosed is returned if this is a modern breach close
3144
// with a breach resolver. StateError is returned if the log lookup failed.
3145
func (c *ChannelArbitrator) checkLegacyBreach() (ArbitratorState, error) {
6✔
3146
        // A previous version of the channel arbitrator would make the breach
6✔
3147
        // close skip to StateFullyResolved. If there are no contract
6✔
3148
        // resolutions in the bolt arbitrator log, then this is an older breach
6✔
3149
        // close. Otherwise, if there are resolutions, the state should advance
6✔
3150
        // to StateContractClosed.
6✔
3151
        _, err := c.log.FetchContractResolutions()
6✔
3152
        if err == errNoResolutions {
6✔
3153
                // This is an older breach close still in the database.
×
3154
                return StateFullyResolved, nil
×
3155
        } else if err != nil {
6✔
3156
                return StateError, err
×
3157
        }
×
3158

3159
        // This is a modern breach close with resolvers.
3160
        return StateContractClosed, nil
6✔
3161
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc