• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.37
/contractcourt/chain_arbitrator.go
1
package contractcourt
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/channeldb"
16
        "github.com/lightningnetwork/lnd/clock"
17
        "github.com/lightningnetwork/lnd/fn/v2"
18
        "github.com/lightningnetwork/lnd/graph/db/models"
19
        "github.com/lightningnetwork/lnd/input"
20
        "github.com/lightningnetwork/lnd/kvdb"
21
        "github.com/lightningnetwork/lnd/labels"
22
        "github.com/lightningnetwork/lnd/lnwallet"
23
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
24
        "github.com/lightningnetwork/lnd/lnwire"
25
)
26

27
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
28
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
29

30
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
31
// outgoing contract has been fully resolved. For multi-hop contracts, if we
32
// resolve the outgoing contract, we'll also need to ensure that the incoming
33
// contract is resolved as well. We package the items required to resolve the
34
// incoming contracts within this message.
35
type ResolutionMsg struct {
36
        // SourceChan identifies the channel that this message is being sent
37
        // from. This is the channel's short channel ID.
38
        SourceChan lnwire.ShortChannelID
39

40
        // HtlcIndex is the index of the contract within the original
41
        // commitment trace.
42
        HtlcIndex uint64
43

44
        // Failure will be non-nil if the incoming contract should be canceled
45
        // all together. This can happen if the outgoing contract was dust, if
46
        // if the outgoing HTLC timed out.
47
        Failure lnwire.FailureMessage
48

49
        // PreImage will be non-nil if the incoming contract can successfully
50
        // be redeemed. This can happen if we learn of the preimage from the
51
        // outgoing HTLC on-chain.
52
        PreImage *[32]byte
53
}
54

55
// ChainArbitratorConfig is a configuration struct that contains all the
56
// function closures and interface that required to arbitrate on-chain
57
// contracts for a particular chain.
58
type ChainArbitratorConfig struct {
59
        // ChainHash is the chain that this arbitrator is to operate within.
60
        ChainHash chainhash.Hash
61

62
        // IncomingBroadcastDelta is the delta that we'll use to decide when to
63
        // broadcast our commitment transaction if we have incoming htlcs. This
64
        // value should be set based on our current fee estimation of the
65
        // commitment transaction. We use this to determine when we should
66
        // broadcast instead of just the HTLC timeout, as we want to ensure
67
        // that the commitment transaction is already confirmed, by the time the
68
        // HTLC expires. Otherwise we may end up not settling the htlc on-chain
69
        // because the other party managed to time it out.
70
        IncomingBroadcastDelta uint32
71

72
        // OutgoingBroadcastDelta is the delta that we'll use to decide when to
73
        // broadcast our commitment transaction if there are active outgoing
74
        // htlcs. This value can be lower than the incoming broadcast delta.
75
        OutgoingBroadcastDelta uint32
76

77
        // NewSweepAddr is a function that returns a new address under control
78
        // by the wallet. We'll use this to sweep any no-delay outputs as a
79
        // result of unilateral channel closes.
80
        //
81
        // NOTE: This SHOULD return a p2wkh script.
82
        NewSweepAddr func() ([]byte, error)
83

84
        // PublishTx reliably broadcasts a transaction to the network. Once
85
        // this function exits without an error, then they transaction MUST
86
        // continually be rebroadcast if needed.
87
        PublishTx func(*wire.MsgTx, string) error
88

89
        // DeliverResolutionMsg is a function that will append an outgoing
90
        // message to the "out box" for a ChannelLink. This is used to cancel
91
        // backwards any HTLC's that are either dust, we're timing out, or
92
        // settling on-chain to the incoming link.
93
        DeliverResolutionMsg func(...ResolutionMsg) error
94

95
        // MarkLinkInactive is a function closure that the ChainArbitrator will
96
        // use to mark that active HTLC's shouldn't be attempted to be routed
97
        // over a particular channel. This function will be called in that a
98
        // ChannelArbitrator decides that it needs to go to chain in order to
99
        // resolve contracts.
100
        //
101
        // TODO(roasbeef): rename, routing based
102
        MarkLinkInactive func(wire.OutPoint) error
103

104
        // ContractBreach is a function closure that the ChainArbitrator will
105
        // use to notify the BreachArbitrator about a contract breach. It should
106
        // only return a non-nil error when the BreachArbitrator has preserved
107
        // the necessary breach info for this channel point. Once the breach
108
        // resolution is persisted in the ChannelArbitrator, it will be safe
109
        // to mark the channel closed.
110
        ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
111

112
        // IsOurAddress is a function that returns true if the passed address
113
        // is known to the underlying wallet. Otherwise, false should be
114
        // returned.
115
        IsOurAddress func(btcutil.Address) bool
116

117
        // IncubateOutputs sends either an incoming HTLC, an outgoing HTLC, or
118
        // both to the utxo nursery. Once this function returns, the nursery
119
        // should have safely persisted the outputs to disk, and should start
120
        // the process of incubation. This is used when a resolver wishes to
121
        // pass off the output to the nursery as we're only waiting on an
122
        // absolute/relative item block.
123
        IncubateOutputs func(wire.OutPoint,
124
                fn.Option[lnwallet.OutgoingHtlcResolution],
125
                fn.Option[lnwallet.IncomingHtlcResolution],
126
                uint32, fn.Option[int32]) error
127

128
        // PreimageDB is a global store of all known pre-images. We'll use this
129
        // to decide if we should broadcast a commitment transaction to claim
130
        // an HTLC on-chain.
131
        PreimageDB WitnessBeacon
132

133
        // Notifier is an instance of a chain notifier we'll use to watch for
134
        // certain on-chain events.
135
        Notifier chainntnfs.ChainNotifier
136

137
        // Mempool is the a mempool watcher that allows us to watch for events
138
        // happened in mempool.
139
        Mempool chainntnfs.MempoolWatcher
140

141
        // Signer is a signer backed by the active lnd node. This should be
142
        // capable of producing a signature as specified by a valid
143
        // SignDescriptor.
144
        Signer input.Signer
145

146
        // FeeEstimator will be used to return fee estimates.
147
        FeeEstimator chainfee.Estimator
148

149
        // ChainIO allows us to query the state of the current main chain.
150
        ChainIO lnwallet.BlockChainIO
151

152
        // DisableChannel disables a channel, resulting in it not being able to
153
        // forward payments.
154
        DisableChannel func(wire.OutPoint) error
155

156
        // Sweeper allows resolvers to sweep their final outputs.
157
        Sweeper UtxoSweeper
158

159
        // Registry is the invoice database that is used by resolvers to lookup
160
        // preimages and settle invoices.
161
        Registry Registry
162

163
        // NotifyClosedChannel is a function closure that the ChainArbitrator
164
        // will use to notify the ChannelNotifier about a newly closed channel.
165
        NotifyClosedChannel func(wire.OutPoint)
166

167
        // NotifyFullyResolvedChannel is a function closure that the
168
        // ChainArbitrator will use to notify the ChannelNotifier about a newly
169
        // resolved channel. The main difference to NotifyClosedChannel is that
170
        // in case of a local force close the NotifyClosedChannel is called when
171
        // the published commitment transaction confirms while
172
        // NotifyFullyResolvedChannel is only called when the channel is fully
173
        // resolved (which includes sweeping any time locked funds).
174
        NotifyFullyResolvedChannel func(point wire.OutPoint)
175

176
        // OnionProcessor is used to decode onion payloads for on-chain
177
        // resolution.
178
        OnionProcessor OnionProcessor
179

180
        // PaymentsExpirationGracePeriod indicates a time window we let the
181
        // other node to cancel an outgoing htlc that our node has initiated and
182
        // has timed out.
183
        PaymentsExpirationGracePeriod time.Duration
184

185
        // IsForwardedHTLC checks for a given htlc, identified by channel id and
186
        // htlcIndex, if it is a forwarded one.
187
        IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
188

189
        // Clock is the clock implementation that ChannelArbitrator uses.
190
        // It is useful for testing.
191
        Clock clock.Clock
192

193
        // SubscribeBreachComplete is used by the breachResolver to register a
194
        // subscription that notifies when the breach resolution process is
195
        // complete.
196
        SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
197
                bool, error)
198

199
        // PutFinalHtlcOutcome stores the final outcome of an htlc in the
200
        // database.
201
        PutFinalHtlcOutcome func(chanId lnwire.ShortChannelID,
202
                htlcId uint64, settled bool) error
203

204
        // HtlcNotifier is an interface that htlc events are sent to.
205
        HtlcNotifier HtlcNotifier
206

207
        // Budget is the configured budget for the arbitrator.
208
        Budget BudgetConfig
209

210
        // QueryIncomingCircuit is used to find the outgoing HTLC's
211
        // corresponding incoming HTLC circuit. It queries the circuit map for
212
        // a given outgoing circuit key and returns the incoming circuit key.
213
        //
214
        // TODO(yy): this is a hacky way to get around the cycling import issue
215
        // as we cannot import `htlcswitch` here. A proper way is to define an
216
        // interface here that asks for method `LookupOpenCircuit`,
217
        // meanwhile, turn `PaymentCircuit` into an interface or bring it to a
218
        // lower package.
219
        QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey
220

221
        // AuxLeafStore is an optional store that can be used to store auxiliary
222
        // leaves for certain custom channel types.
223
        AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
224

225
        // AuxSigner is an optional signer that can be used to sign auxiliary
226
        // leaves for certain custom channel types.
227
        AuxSigner fn.Option[lnwallet.AuxSigner]
228

229
        // AuxResolver is an optional interface that can be used to modify the
230
        // way contracts are resolved.
231
        AuxResolver fn.Option[lnwallet.AuxContractResolver]
232
}
233

234
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
235
// active, and channel that are in the "pending close" state. Within the
236
// contractcourt package, the ChainArbitrator manages a set of active
237
// ContractArbitrators. Each ContractArbitrators is responsible for watching
238
// the chain for any activity that affects the state of the channel, and also
239
// for monitoring each contract in order to determine if any on-chain activity is
240
// required. Outside sub-systems interact with the ChainArbitrator in order to
241
// forcibly exit a contract, update the set of live signals for each contract,
242
// and to receive reports on the state of contract resolution.
243
type ChainArbitrator struct {
244
        started int32 // To be used atomically.
245
        stopped int32 // To be used atomically.
246

247
        sync.Mutex
248

249
        // activeChannels is a map of all the active contracts that are still
250
        // open, and not fully resolved.
251
        activeChannels map[wire.OutPoint]*ChannelArbitrator
252

253
        // activeWatchers is a map of all the active chainWatchers for channels
254
        // that are still considered open.
255
        activeWatchers map[wire.OutPoint]*chainWatcher
256

257
        // cfg is the config struct for the arbitrator that contains all
258
        // methods and interface it needs to operate.
259
        cfg ChainArbitratorConfig
260

261
        // chanSource will be used by the ChainArbitrator to fetch all the
262
        // active channels that it must still watch over.
263
        chanSource *channeldb.DB
264

265
        quit chan struct{}
266

267
        wg sync.WaitGroup
268
}
269

270
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
271
// passed config struct, and backing persistent database.
272
func NewChainArbitrator(cfg ChainArbitratorConfig,
273
        db *channeldb.DB) *ChainArbitrator {
2✔
274

2✔
275
        return &ChainArbitrator{
2✔
276
                cfg:            cfg,
2✔
277
                activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
2✔
278
                activeWatchers: make(map[wire.OutPoint]*chainWatcher),
2✔
279
                chanSource:     db,
2✔
280
                quit:           make(chan struct{}),
2✔
281
        }
2✔
282
}
2✔
283

284
// arbChannel is a wrapper around an open channel that channel arbitrators
285
// interact with.
286
type arbChannel struct {
287
        // channel is the in-memory channel state.
288
        channel *channeldb.OpenChannel
289

290
        // c references the chain arbitrator and is used by arbChannel
291
        // internally.
292
        c *ChainArbitrator
293
}
294

295
// NewAnchorResolutions returns the anchor resolutions for currently valid
296
// commitment transactions.
297
//
298
// NOTE: Part of the ArbChannel interface.
299
func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
300
        error) {
×
301

×
302
        // Get a fresh copy of the database state to base the anchor resolutions
×
303
        // on. Unfortunately the channel instance that we have here isn't the
×
304
        // same instance that is used by the link.
×
305
        chanPoint := a.channel.FundingOutpoint
×
306

×
307
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
×
308
        if err != nil {
×
309
                return nil, err
×
310
        }
×
311

312
        var chanOpts []lnwallet.ChannelOpt
×
313
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
314
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
315
        })
×
316
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
317
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
318
        })
×
319
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
320
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
321
        })
×
322

323
        chanMachine, err := lnwallet.NewLightningChannel(
×
324
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
325
        )
×
326
        if err != nil {
×
327
                return nil, err
×
328
        }
×
329

330
        return chanMachine.NewAnchorResolutions()
×
331
}
332

333
// ForceCloseChan should force close the contract that this attendant is
334
// watching over. We'll use this when we decide that we need to go to chain. It
335
// should in addition tell the switch to remove the corresponding link, such
336
// that we won't accept any new updates.
337
//
338
// NOTE: Part of the ArbChannel interface.
339
func (a *arbChannel) ForceCloseChan() (*wire.MsgTx, error) {
×
340
        // First, we mark the channel as borked, this ensure
×
341
        // that no new state transitions can happen, and also
×
342
        // that the link won't be loaded into the switch.
×
343
        if err := a.channel.MarkBorked(); err != nil {
×
344
                return nil, err
×
345
        }
×
346

347
        // With the channel marked as borked, we'll now remove
348
        // the link from the switch if its there. If the link
349
        // is active, then this method will block until it
350
        // exits.
351
        chanPoint := a.channel.FundingOutpoint
×
352

×
353
        if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
×
354
                log.Errorf("unable to mark link inactive: %v", err)
×
355
        }
×
356

357
        // Now that we know the link can't mutate the channel
358
        // state, we'll read the channel from disk the target
359
        // channel according to its channel point.
360
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
×
361
        if err != nil {
×
362
                return nil, err
×
363
        }
×
364

365
        var chanOpts []lnwallet.ChannelOpt
×
366
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
367
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
368
        })
×
369
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
370
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
371
        })
×
372
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
373
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
374
        })
×
375

376
        // Finally, we'll force close the channel completing
377
        // the force close workflow.
378
        chanMachine, err := lnwallet.NewLightningChannel(
×
379
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
380
        )
×
381
        if err != nil {
×
382
                return nil, err
×
383
        }
×
384

385
        closeSummary, err := chanMachine.ForceClose(
×
386
                lnwallet.WithSkipContractResolutions(),
×
387
        )
×
388
        if err != nil {
×
389
                return nil, err
×
390
        }
×
391

392
        return closeSummary.CloseTx, nil
×
393
}
394

395
// newActiveChannelArbitrator creates a new instance of an active channel
396
// arbitrator given the state of the target channel.
397
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
398
        c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
11✔
399

11✔
400
        // TODO(roasbeef): fetch best height (or pass in) so can ensure block
11✔
401
        // epoch delivers all the notifications to
11✔
402

11✔
403
        chanPoint := channel.FundingOutpoint
11✔
404

11✔
405
        log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", chanPoint)
11✔
406

11✔
407
        // Next we'll create the matching configuration struct that contains
11✔
408
        // all interfaces and methods the arbitrator needs to do its job.
11✔
409
        arbCfg := ChannelArbitratorConfig{
11✔
410
                ChanPoint:   chanPoint,
11✔
411
                Channel:     c.getArbChannel(channel),
11✔
412
                ShortChanID: channel.ShortChanID(),
11✔
413

11✔
414
                MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
11✔
415
                MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
11✔
416
                        statuses ...channeldb.ChannelStatus) error {
11✔
417

×
418
                        err := channel.CloseChannel(summary, statuses...)
×
419
                        if err != nil {
×
420
                                return err
×
421
                        }
×
422
                        c.cfg.NotifyClosedChannel(summary.ChanPoint)
×
423
                        return nil
×
424
                },
425
                IsPendingClose:        false,
426
                ChainArbitratorConfig: c.cfg,
427
                ChainEvents:           chanEvents,
428
                PutResolverReport: func(tx kvdb.RwTx,
429
                        report *channeldb.ResolverReport) error {
×
430

×
431
                        return c.chanSource.PutResolverReport(
×
432
                                tx, c.cfg.ChainHash, &chanPoint, report,
×
433
                        )
×
434
                },
×
435
                FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
×
436
                        chanStateDB := c.chanSource.ChannelStateDB()
×
437
                        return chanStateDB.FetchHistoricalChannel(&chanPoint)
×
438
                },
×
439
                FindOutgoingHTLCDeadline: func(
440
                        htlc channeldb.HTLC) fn.Option[int32] {
×
441

×
442
                        return c.FindOutgoingHTLCDeadline(
×
443
                                channel.ShortChanID(), htlc,
×
444
                        )
×
445
                },
×
446
        }
447

448
        // The final component needed is an arbitrator log that the arbitrator
449
        // will use to keep track of its internal state using a backed
450
        // persistent log.
451
        //
452
        // TODO(roasbeef); abstraction leak...
453
        //  * rework: adaptor method to set log scope w/ factory func
454
        chanLog, err := newBoltArbitratorLog(
11✔
455
                c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
11✔
456
        )
11✔
457
        if err != nil {
11✔
458
                return nil, err
×
459
        }
×
460

461
        arbCfg.MarkChannelResolved = func() error {
11✔
462
                if c.cfg.NotifyFullyResolvedChannel != nil {
×
463
                        c.cfg.NotifyFullyResolvedChannel(chanPoint)
×
464
                }
×
465

466
                return c.ResolveContract(chanPoint)
×
467
        }
468

469
        // Finally, we'll need to construct a series of htlc Sets based on all
470
        // currently known valid commitments.
471
        htlcSets := make(map[HtlcSetKey]htlcSet)
11✔
472
        htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
11✔
473
        htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
11✔
474

11✔
475
        pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
11✔
476
        if err != nil && err != channeldb.ErrNoPendingCommit {
11✔
477
                return nil, err
×
478
        }
×
479
        if pendingRemoteCommitment != nil {
11✔
480
                htlcSets[RemotePendingHtlcSet] = newHtlcSet(
×
481
                        pendingRemoteCommitment.Commitment.Htlcs,
×
482
                )
×
483
        }
×
484

485
        return NewChannelArbitrator(
11✔
486
                arbCfg, htlcSets, chanLog,
11✔
487
        ), nil
11✔
488
}
489

490
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
491
func (c *ChainArbitrator) getArbChannel(
492
        channel *channeldb.OpenChannel) *arbChannel {
11✔
493

11✔
494
        return &arbChannel{
11✔
495
                channel: channel,
11✔
496
                c:       c,
11✔
497
        }
11✔
498
}
11✔
499

500
// ResolveContract marks a contract as fully resolved within the database.
501
// This is only to be done once all contracts which were live on the channel
502
// before hitting the chain have been resolved.
503
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
2✔
504
        log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
2✔
505

2✔
506
        // First, we'll we'll mark the channel as fully closed from the PoV of
2✔
507
        // the channel source.
2✔
508
        err := c.chanSource.ChannelStateDB().MarkChanFullyClosed(&chanPoint)
2✔
509
        if err != nil {
2✔
510
                log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
×
511
                        "fully closed: %v", chanPoint, err)
×
512
                return err
×
513
        }
×
514

515
        // Now that the channel has been marked as fully closed, we'll stop
516
        // both the channel arbitrator and chain watcher for this channel if
517
        // they're still active.
518
        var arbLog ArbitratorLog
2✔
519
        c.Lock()
2✔
520
        chainArb := c.activeChannels[chanPoint]
2✔
521
        delete(c.activeChannels, chanPoint)
2✔
522

2✔
523
        chainWatcher := c.activeWatchers[chanPoint]
2✔
524
        delete(c.activeWatchers, chanPoint)
2✔
525
        c.Unlock()
2✔
526

2✔
527
        if chainArb != nil {
3✔
528
                arbLog = chainArb.log
1✔
529

1✔
530
                if err := chainArb.Stop(); err != nil {
1✔
531
                        log.Warnf("unable to stop ChannelArbitrator(%v): %v",
×
532
                                chanPoint, err)
×
533
                }
×
534
        }
535
        if chainWatcher != nil {
3✔
536
                if err := chainWatcher.Stop(); err != nil {
1✔
537
                        log.Warnf("unable to stop ChainWatcher(%v): %v",
×
538
                                chanPoint, err)
×
539
                }
×
540
        }
541

542
        // Once this has been marked as resolved, we'll wipe the log that the
543
        // channel arbitrator was using to store its persistent state. We do
544
        // this after marking the channel resolved, as otherwise, the
545
        // arbitrator would be re-created, and think it was starting from the
546
        // default state.
547
        if arbLog != nil {
3✔
548
                if err := arbLog.WipeHistory(); err != nil {
1✔
549
                        return err
×
550
                }
×
551
        }
552

553
        return nil
2✔
554
}
555

556
// Start launches all goroutines that the ChainArbitrator needs to operate.
557
func (c *ChainArbitrator) Start() error {
2✔
558
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
2✔
559
                return nil
×
560
        }
×
561

562
        log.Infof("ChainArbitrator starting with config: budget=[%v]",
2✔
563
                &c.cfg.Budget)
2✔
564

2✔
565
        // First, we'll fetch all the channels that are still open, in order to
2✔
566
        // collect them within our set of active contracts.
2✔
567
        openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
2✔
568
        if err != nil {
2✔
569
                return err
×
570
        }
×
571

572
        if len(openChannels) > 0 {
4✔
573
                log.Infof("Creating ChannelArbitrators for %v active channels",
2✔
574
                        len(openChannels))
2✔
575
        }
2✔
576

577
        // For each open channel, we'll configure then launch a corresponding
578
        // ChannelArbitrator.
579
        for _, channel := range openChannels {
13✔
580
                chanPoint := channel.FundingOutpoint
11✔
581
                channel := channel
11✔
582

11✔
583
                // First, we'll create an active chainWatcher for this channel
11✔
584
                // to ensure that we detect any relevant on chain events.
11✔
585
                breachClosure := func(ret *lnwallet.BreachRetribution) error {
11✔
586
                        return c.cfg.ContractBreach(chanPoint, ret)
×
587
                }
×
588

589
                chainWatcher, err := newChainWatcher(
11✔
590
                        chainWatcherConfig{
11✔
591
                                chanState:           channel,
11✔
592
                                notifier:            c.cfg.Notifier,
11✔
593
                                signer:              c.cfg.Signer,
11✔
594
                                isOurAddr:           c.cfg.IsOurAddress,
11✔
595
                                contractBreach:      breachClosure,
11✔
596
                                extractStateNumHint: lnwallet.GetStateNumHint,
11✔
597
                                auxLeafStore:        c.cfg.AuxLeafStore,
11✔
598
                                auxResolver:         c.cfg.AuxResolver,
11✔
599
                        },
11✔
600
                )
11✔
601
                if err != nil {
11✔
602
                        return err
×
603
                }
×
604

605
                c.activeWatchers[chanPoint] = chainWatcher
11✔
606
                channelArb, err := newActiveChannelArbitrator(
11✔
607
                        channel, c, chainWatcher.SubscribeChannelEvents(),
11✔
608
                )
11✔
609
                if err != nil {
11✔
610
                        return err
×
611
                }
×
612

613
                c.activeChannels[chanPoint] = channelArb
11✔
614

11✔
615
                // Republish any closing transactions for this channel.
11✔
616
                err = c.republishClosingTxs(channel)
11✔
617
                if err != nil {
11✔
618
                        log.Errorf("Failed to republish closing txs for "+
×
619
                                "channel %v", chanPoint)
×
620
                }
×
621
        }
622

623
        // In addition to the channels that we know to be open, we'll also
624
        // launch arbitrators to finishing resolving any channels that are in
625
        // the pending close state.
626
        closingChannels, err := c.chanSource.ChannelStateDB().FetchClosedChannels(
2✔
627
                true,
2✔
628
        )
2✔
629
        if err != nil {
2✔
630
                return err
×
631
        }
×
632

633
        if len(closingChannels) > 0 {
2✔
634
                log.Infof("Creating ChannelArbitrators for %v closing channels",
×
635
                        len(closingChannels))
×
636
        }
×
637

638
        // Next, for each channel is the closing state, we'll launch a
639
        // corresponding more restricted resolver, as we don't have to watch
640
        // the chain any longer, only resolve the contracts on the confirmed
641
        // commitment.
642
        //nolint:ll
643
        for _, closeChanInfo := range closingChannels {
2✔
644
                // We can leave off the CloseContract and ForceCloseChan
×
645
                // methods as the channel is already closed at this point.
×
646
                chanPoint := closeChanInfo.ChanPoint
×
647
                arbCfg := ChannelArbitratorConfig{
×
648
                        ChanPoint:             chanPoint,
×
649
                        ShortChanID:           closeChanInfo.ShortChanID,
×
650
                        ChainArbitratorConfig: c.cfg,
×
651
                        ChainEvents:           &ChainEventSubscription{},
×
652
                        IsPendingClose:        true,
×
653
                        ClosingHeight:         closeChanInfo.CloseHeight,
×
654
                        CloseType:             closeChanInfo.CloseType,
×
655
                        PutResolverReport: func(tx kvdb.RwTx,
×
656
                                report *channeldb.ResolverReport) error {
×
657

×
658
                                return c.chanSource.PutResolverReport(
×
659
                                        tx, c.cfg.ChainHash, &chanPoint, report,
×
660
                                )
×
661
                        },
×
662
                        FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
×
663
                                chanStateDB := c.chanSource.ChannelStateDB()
×
664
                                return chanStateDB.FetchHistoricalChannel(&chanPoint)
×
665
                        },
×
666
                        FindOutgoingHTLCDeadline: func(
667
                                htlc channeldb.HTLC) fn.Option[int32] {
×
668

×
669
                                return c.FindOutgoingHTLCDeadline(
×
670
                                        closeChanInfo.ShortChanID, htlc,
×
671
                                )
×
672
                        },
×
673
                }
674
                chanLog, err := newBoltArbitratorLog(
×
675
                        c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
×
676
                )
×
677
                if err != nil {
×
678
                        return err
×
679
                }
×
680
                arbCfg.MarkChannelResolved = func() error {
×
681
                        if c.cfg.NotifyFullyResolvedChannel != nil {
×
682
                                c.cfg.NotifyFullyResolvedChannel(chanPoint)
×
683
                        }
×
684

685
                        return c.ResolveContract(chanPoint)
×
686
                }
687

688
                // We create an empty map of HTLC's here since it's possible
689
                // that the channel is in StateDefault and updateActiveHTLCs is
690
                // called. We want to avoid writing to an empty map. Since the
691
                // channel is already in the process of being resolved, no new
692
                // HTLCs will be added.
693
                c.activeChannels[chanPoint] = NewChannelArbitrator(
×
694
                        arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
×
695
                )
×
696
        }
697

698
        // Now, we'll start all chain watchers in parallel to shorten start up
699
        // duration. In neutrino mode, this allows spend registrations to take
700
        // advantage of batch spend reporting, instead of doing a single rescan
701
        // per chain watcher.
702
        //
703
        // NOTE: After this point, we Stop the chain arb to ensure that any
704
        // lingering goroutines are cleaned up before exiting.
705
        watcherErrs := make(chan error, len(c.activeWatchers))
2✔
706
        var wg sync.WaitGroup
2✔
707
        for _, watcher := range c.activeWatchers {
13✔
708
                wg.Add(1)
11✔
709
                go func(w *chainWatcher) {
22✔
710
                        defer wg.Done()
11✔
711
                        select {
11✔
712
                        case watcherErrs <- w.Start():
11✔
713
                        case <-c.quit:
×
714
                                watcherErrs <- ErrChainArbExiting
×
715
                        }
716
                }(watcher)
717
        }
718

719
        // Once all chain watchers have been started, seal the err chan to
720
        // signal the end of the err stream.
721
        go func() {
4✔
722
                wg.Wait()
2✔
723
                close(watcherErrs)
2✔
724
        }()
2✔
725

726
        // stopAndLog is a helper function which shuts down the chain arb and
727
        // logs errors if they occur.
728
        stopAndLog := func() {
2✔
729
                if err := c.Stop(); err != nil {
×
730
                        log.Errorf("ChainArbitrator could not shutdown: %v", err)
×
731
                }
×
732
        }
733

734
        // Handle all errors returned from spawning our chain watchers. If any
735
        // of them failed, we will stop the chain arb to shutdown any active
736
        // goroutines.
737
        for err := range watcherErrs {
13✔
738
                if err != nil {
11✔
739
                        stopAndLog()
×
740
                        return err
×
741
                }
×
742
        }
743

744
        // Before we start all of our arbitrators, we do a preliminary state
745
        // lookup so that we can combine all of these lookups in a single db
746
        // transaction.
747
        var startStates map[wire.OutPoint]*chanArbStartState
2✔
748

2✔
749
        err = kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
4✔
750
                for _, arbitrator := range c.activeChannels {
13✔
751
                        startState, err := arbitrator.getStartState(tx)
11✔
752
                        if err != nil {
11✔
753
                                return err
×
754
                        }
×
755

756
                        startStates[arbitrator.cfg.ChanPoint] = startState
11✔
757
                }
758

759
                return nil
2✔
760
        }, func() {
2✔
761
                startStates = make(
2✔
762
                        map[wire.OutPoint]*chanArbStartState,
2✔
763
                        len(c.activeChannels),
2✔
764
                )
2✔
765
        })
2✔
766
        if err != nil {
2✔
767
                stopAndLog()
×
768
                return err
×
769
        }
×
770

771
        // Launch all the goroutines for each arbitrator so they can carry out
772
        // their duties.
773
        for _, arbitrator := range c.activeChannels {
13✔
774
                startState, ok := startStates[arbitrator.cfg.ChanPoint]
11✔
775
                if !ok {
11✔
776
                        stopAndLog()
×
777
                        return fmt.Errorf("arbitrator: %v has no start state",
×
778
                                arbitrator.cfg.ChanPoint)
×
779
                }
×
780

781
                if err := arbitrator.Start(startState); err != nil {
11✔
782
                        stopAndLog()
×
783
                        return err
×
784
                }
×
785
        }
786

787
        // Subscribe to a single stream of block epoch notifications that we
788
        // will dispatch to all active arbitrators.
789
        blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
2✔
790
        if err != nil {
2✔
791
                return err
×
792
        }
×
793

794
        // Start our goroutine which will dispatch blocks to each arbitrator.
795
        c.wg.Add(1)
2✔
796
        go func() {
4✔
797
                defer c.wg.Done()
2✔
798
                c.dispatchBlocks(blockEpoch)
2✔
799
        }()
2✔
800

801
        // TODO(roasbeef): eventually move all breach watching here
802

803
        return nil
2✔
804
}
805

806
// blockRecipient contains the information we need to dispatch a block to a
807
// channel arbitrator.
808
type blockRecipient struct {
809
        // chanPoint is the funding outpoint of the channel.
810
        chanPoint wire.OutPoint
811

812
        // blocks is the channel that new block heights are sent into. This
813
        // channel should be sufficiently buffered as to not block the sender.
814
        blocks chan<- int32
815

816
        // quit is closed if the receiving entity is shutting down.
817
        quit chan struct{}
818
}
819

820
// dispatchBlocks consumes a block epoch notification stream and dispatches
821
// blocks to each of the chain arb's active channel arbitrators. This function
822
// must be run in a goroutine.
823
func (c *ChainArbitrator) dispatchBlocks(
824
        blockEpoch *chainntnfs.BlockEpochEvent) {
2✔
825

2✔
826
        // getRecipients is a helper function which acquires the chain arb
2✔
827
        // lock and returns a set of block recipients which can be used to
2✔
828
        // dispatch blocks.
2✔
829
        getRecipients := func() []blockRecipient {
4✔
830
                c.Lock()
2✔
831
                blocks := make([]blockRecipient, 0, len(c.activeChannels))
2✔
832
                for _, channel := range c.activeChannels {
12✔
833
                        blocks = append(blocks, blockRecipient{
10✔
834
                                chanPoint: channel.cfg.ChanPoint,
10✔
835
                                blocks:    channel.blocks,
10✔
836
                                quit:      channel.quit,
10✔
837
                        })
10✔
838
                }
10✔
839
                c.Unlock()
2✔
840

2✔
841
                return blocks
2✔
842
        }
843

844
        // On exit, cancel our blocks subscription and close each block channel
845
        // so that the arbitrators know they will no longer be receiving blocks.
846
        defer func() {
4✔
847
                blockEpoch.Cancel()
2✔
848

2✔
849
                recipients := getRecipients()
2✔
850
                for _, recipient := range recipients {
12✔
851
                        close(recipient.blocks)
10✔
852
                }
10✔
853
        }()
854

855
        // Consume block epochs until we receive the instruction to shutdown.
856
        for {
4✔
857
                select {
2✔
858
                // Consume block epochs, exiting if our subscription is
859
                // terminated.
860
                case block, ok := <-blockEpoch.Epochs:
×
861
                        if !ok {
×
862
                                log.Trace("dispatchBlocks block epoch " +
×
863
                                        "cancelled")
×
864
                                return
×
865
                        }
×
866

867
                        // Get the set of currently active channels block
868
                        // subscription channels and dispatch the block to
869
                        // each.
870
                        for _, recipient := range getRecipients() {
×
871
                                select {
×
872
                                // Deliver the block to the arbitrator.
873
                                case recipient.blocks <- block.Height:
×
874

875
                                // If the recipient is shutting down, exit
876
                                // without delivering the block. This may be
877
                                // the case when two blocks are mined in quick
878
                                // succession, and the arbitrator resolves
879
                                // after the first block, and does not need to
880
                                // consume the second block.
881
                                case <-recipient.quit:
×
882
                                        log.Debugf("channel: %v exit without "+
×
883
                                                "receiving block: %v",
×
884
                                                recipient.chanPoint,
×
885
                                                block.Height)
×
886

887
                                // If the chain arb is shutting down, we don't
888
                                // need to deliver any more blocks (everything
889
                                // will be shutting down).
890
                                case <-c.quit:
×
891
                                        return
×
892
                                }
893
                        }
894

895
                // Exit if the chain arbitrator is shutting down.
896
                case <-c.quit:
2✔
897
                        return
2✔
898
                }
899
        }
900
}
901

902
// republishClosingTxs will load any stored cooperative or unilateral closing
903
// transactions and republish them. This helps ensure propagation of the
904
// transactions in the event that prior publications failed.
905
func (c *ChainArbitrator) republishClosingTxs(
906
        channel *channeldb.OpenChannel) error {
11✔
907

11✔
908
        // If the channel has had its unilateral close broadcasted already,
11✔
909
        // republish it in case it didn't propagate.
11✔
910
        if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
16✔
911
                err := c.rebroadcast(
5✔
912
                        channel, channeldb.ChanStatusCommitBroadcasted,
5✔
913
                )
5✔
914
                if err != nil {
5✔
915
                        return err
×
916
                }
×
917
        }
918

919
        // If the channel has had its cooperative close broadcasted
920
        // already, republish it in case it didn't propagate.
921
        if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
16✔
922
                err := c.rebroadcast(
5✔
923
                        channel, channeldb.ChanStatusCoopBroadcasted,
5✔
924
                )
5✔
925
                if err != nil {
5✔
926
                        return err
×
927
                }
×
928
        }
929

930
        return nil
11✔
931
}
932

933
// rebroadcast is a helper method which will republish the unilateral or
934
// cooperative close transaction or a channel in a particular state.
935
//
936
// NOTE: There is no risk to calling this method if the channel isn't in either
937
// CommitmentBroadcasted or CoopBroadcasted, but the logs will be misleading.
938
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
939
        state channeldb.ChannelStatus) error {
10✔
940

10✔
941
        chanPoint := channel.FundingOutpoint
10✔
942

10✔
943
        var (
10✔
944
                closeTx *wire.MsgTx
10✔
945
                kind    string
10✔
946
                err     error
10✔
947
        )
10✔
948
        switch state {
10✔
949
        case channeldb.ChanStatusCommitBroadcasted:
5✔
950
                kind = "force"
5✔
951
                closeTx, err = channel.BroadcastedCommitment()
5✔
952

953
        case channeldb.ChanStatusCoopBroadcasted:
5✔
954
                kind = "coop"
5✔
955
                closeTx, err = channel.BroadcastedCooperative()
5✔
956

957
        default:
×
958
                return fmt.Errorf("unknown closing state: %v", state)
×
959
        }
960

961
        switch {
10✔
962
        // This can happen for channels that had their closing tx published
963
        // before we started storing it to disk.
964
        case err == channeldb.ErrNoCloseTx:
×
965
                log.Warnf("Channel %v is in state %v, but no %s closing tx "+
×
966
                        "to re-publish...", chanPoint, state, kind)
×
967
                return nil
×
968

969
        case err != nil:
×
970
                return err
×
971
        }
972

973
        log.Infof("Re-publishing %s close tx(%v) for channel %v",
10✔
974
                kind, closeTx.TxHash(), chanPoint)
10✔
975

10✔
976
        label := labels.MakeLabel(
10✔
977
                labels.LabelTypeChannelClose, &channel.ShortChannelID,
10✔
978
        )
10✔
979
        err = c.cfg.PublishTx(closeTx, label)
10✔
980
        if err != nil && err != lnwallet.ErrDoubleSpend {
10✔
981
                log.Warnf("Unable to broadcast %s close tx(%v): %v",
×
982
                        kind, closeTx.TxHash(), err)
×
983
        }
×
984

985
        return nil
10✔
986
}
987

988
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
989
// channel arbitrators will be signalled to exit, and this method will block
990
// until they've all exited.
991
func (c *ChainArbitrator) Stop() error {
2✔
992
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
2✔
993
                return nil
×
994
        }
×
995

996
        log.Info("ChainArbitrator shutting down...")
2✔
997
        defer log.Debug("ChainArbitrator shutdown complete")
2✔
998

2✔
999
        close(c.quit)
2✔
1000

2✔
1001
        var (
2✔
1002
                activeWatchers = make(map[wire.OutPoint]*chainWatcher)
2✔
1003
                activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
2✔
1004
        )
2✔
1005

2✔
1006
        // Copy the current set of active watchers and arbitrators to shutdown.
2✔
1007
        // We don't want to hold the lock when shutting down each watcher or
2✔
1008
        // arbitrator individually, as they may need to acquire this mutex.
2✔
1009
        c.Lock()
2✔
1010
        for chanPoint, watcher := range c.activeWatchers {
12✔
1011
                activeWatchers[chanPoint] = watcher
10✔
1012
        }
10✔
1013
        for chanPoint, arbitrator := range c.activeChannels {
12✔
1014
                activeChannels[chanPoint] = arbitrator
10✔
1015
        }
10✔
1016
        c.Unlock()
2✔
1017

2✔
1018
        for chanPoint, watcher := range activeWatchers {
12✔
1019
                log.Tracef("Attempting to stop ChainWatcher(%v)",
10✔
1020
                        chanPoint)
10✔
1021

10✔
1022
                if err := watcher.Stop(); err != nil {
10✔
1023
                        log.Errorf("unable to stop watcher for "+
×
1024
                                "ChannelPoint(%v): %v", chanPoint, err)
×
1025
                }
×
1026
        }
1027
        for chanPoint, arbitrator := range activeChannels {
12✔
1028
                log.Tracef("Attempting to stop ChannelArbitrator(%v)",
10✔
1029
                        chanPoint)
10✔
1030

10✔
1031
                if err := arbitrator.Stop(); err != nil {
10✔
1032
                        log.Errorf("unable to stop arbitrator for "+
×
1033
                                "ChannelPoint(%v): %v", chanPoint, err)
×
1034
                }
×
1035
        }
1036

1037
        c.wg.Wait()
2✔
1038

2✔
1039
        return nil
2✔
1040
}
1041

1042
// ContractUpdate is a message packages the latest set of active HTLCs on a
1043
// commitment, and also identifies which commitment received a new set of
1044
// HTLCs.
1045
type ContractUpdate struct {
1046
        // HtlcKey identifies which commitment the HTLCs below are present on.
1047
        HtlcKey HtlcSetKey
1048

1049
        // Htlcs are the of active HTLCs on the commitment identified by the
1050
        // above HtlcKey.
1051
        Htlcs []channeldb.HTLC
1052
}
1053

1054
// ContractSignals is used by outside subsystems to notify a channel arbitrator
1055
// of its ShortChannelID.
1056
type ContractSignals struct {
1057
        // ShortChanID is the up to date short channel ID for a contract. This
1058
        // can change either if when the contract was added it didn't yet have
1059
        // a stable identifier, or in the case of a reorg.
1060
        ShortChanID lnwire.ShortChannelID
1061
}
1062

1063
// UpdateContractSignals sends a set of active, up to date contract signals to
1064
// the ChannelArbitrator which is has been assigned to the channel infield by
1065
// the passed channel point.
1066
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
1067
        signals *ContractSignals) error {
×
1068

×
1069
        log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
×
1070
                chanPoint)
×
1071

×
1072
        c.Lock()
×
1073
        arbitrator, ok := c.activeChannels[chanPoint]
×
1074
        c.Unlock()
×
1075
        if !ok {
×
1076
                return fmt.Errorf("unable to find arbitrator")
×
1077
        }
×
1078

1079
        arbitrator.UpdateContractSignals(signals)
×
1080

×
1081
        return nil
×
1082
}
1083

1084
// NotifyContractUpdate lets a channel arbitrator know that a new
1085
// ContractUpdate is available. This calls the ChannelArbitrator's internal
1086
// method NotifyContractUpdate which waits for a response on a done chan before
1087
// returning. This method will return an error if the ChannelArbitrator is not
1088
// in the activeChannels map. However, this only happens if the arbitrator is
1089
// resolved and the related link would already be shut down.
1090
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
1091
        update *ContractUpdate) error {
×
1092

×
1093
        c.Lock()
×
1094
        arbitrator, ok := c.activeChannels[chanPoint]
×
1095
        c.Unlock()
×
1096
        if !ok {
×
1097
                return fmt.Errorf("can't find arbitrator for %v", chanPoint)
×
1098
        }
×
1099

1100
        arbitrator.notifyContractUpdate(update)
×
1101
        return nil
×
1102
}
1103

1104
// GetChannelArbitrator safely returns the channel arbitrator for a given
1105
// channel outpoint.
1106
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
1107
        *ChannelArbitrator, error) {
×
1108

×
1109
        c.Lock()
×
1110
        arbitrator, ok := c.activeChannels[chanPoint]
×
1111
        c.Unlock()
×
1112
        if !ok {
×
1113
                return nil, fmt.Errorf("unable to find arbitrator")
×
1114
        }
×
1115

1116
        return arbitrator, nil
×
1117
}
1118

1119
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
1120
// that watches a particular channel to broadcast the commitment transaction,
1121
// and enter the resolution phase of the channel.
1122
type forceCloseReq struct {
1123
        // errResp is a channel that will be sent upon either in the case of
1124
        // force close success (nil error), or in the case on an error.
1125
        //
1126
        // NOTE; This channel MUST be buffered.
1127
        errResp chan error
1128

1129
        // closeTx is a channel that carries the transaction which ultimately
1130
        // closed out the channel.
1131
        closeTx chan *wire.MsgTx
1132
}
1133

1134
// ForceCloseContract attempts to force close the channel infield by the passed
1135
// channel point. A force close will immediately terminate the contract,
1136
// causing it to enter the resolution phase. If the force close was successful,
1137
// then the force close transaction itself will be returned.
1138
//
1139
// TODO(roasbeef): just return the summary itself?
1140
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
×
1141
        c.Lock()
×
1142
        arbitrator, ok := c.activeChannels[chanPoint]
×
1143
        c.Unlock()
×
1144
        if !ok {
×
1145
                return nil, fmt.Errorf("unable to find arbitrator")
×
1146
        }
×
1147

1148
        log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
×
1149

×
1150
        // Before closing, we'll attempt to send a disable update for the
×
1151
        // channel. We do so before closing the channel as otherwise the current
×
1152
        // edge policy won't be retrievable from the graph.
×
1153
        if err := c.cfg.DisableChannel(chanPoint); err != nil {
×
1154
                log.Warnf("Unable to disable channel %v on "+
×
1155
                        "close: %v", chanPoint, err)
×
1156
        }
×
1157

1158
        errChan := make(chan error, 1)
×
1159
        respChan := make(chan *wire.MsgTx, 1)
×
1160

×
1161
        // With the channel found, and the request crafted, we'll send over a
×
1162
        // force close request to the arbitrator that watches this channel.
×
1163
        select {
×
1164
        case arbitrator.forceCloseReqs <- &forceCloseReq{
1165
                errResp: errChan,
1166
                closeTx: respChan,
1167
        }:
×
1168
        case <-c.quit:
×
1169
                return nil, ErrChainArbExiting
×
1170
        }
1171

1172
        // We'll await two responses: the error response, and the transaction
1173
        // that closed out the channel.
1174
        select {
×
1175
        case err := <-errChan:
×
1176
                if err != nil {
×
1177
                        return nil, err
×
1178
                }
×
1179
        case <-c.quit:
×
1180
                return nil, ErrChainArbExiting
×
1181
        }
1182

1183
        var closeTx *wire.MsgTx
×
1184
        select {
×
1185
        case closeTx = <-respChan:
×
1186
        case <-c.quit:
×
1187
                return nil, ErrChainArbExiting
×
1188
        }
1189

1190
        return closeTx, nil
×
1191
}
1192

1193
// WatchNewChannel sends the ChainArbitrator a message to create a
1194
// ChannelArbitrator tasked with watching over a new channel. Once a new
1195
// channel has finished its final funding flow, it should be registered with
1196
// the ChainArbitrator so we can properly react to any on-chain events.
1197
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
×
1198
        c.Lock()
×
1199
        defer c.Unlock()
×
1200

×
1201
        chanPoint := newChan.FundingOutpoint
×
1202

×
1203
        log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
×
1204
                chanPoint)
×
1205

×
1206
        // If we're already watching this channel, then we'll ignore this
×
1207
        // request.
×
1208
        if _, ok := c.activeChannels[chanPoint]; ok {
×
1209
                return nil
×
1210
        }
×
1211

1212
        // First, also create an active chainWatcher for this channel to ensure
1213
        // that we detect any relevant on chain events.
1214
        chainWatcher, err := newChainWatcher(
×
1215
                chainWatcherConfig{
×
1216
                        chanState: newChan,
×
1217
                        notifier:  c.cfg.Notifier,
×
1218
                        signer:    c.cfg.Signer,
×
1219
                        isOurAddr: c.cfg.IsOurAddress,
×
1220
                        contractBreach: func(
×
1221
                                retInfo *lnwallet.BreachRetribution) error {
×
1222

×
1223
                                return c.cfg.ContractBreach(
×
1224
                                        chanPoint, retInfo,
×
1225
                                )
×
1226
                        },
×
1227
                        extractStateNumHint: lnwallet.GetStateNumHint,
1228
                        auxLeafStore:        c.cfg.AuxLeafStore,
1229
                        auxResolver:         c.cfg.AuxResolver,
1230
                },
1231
        )
1232
        if err != nil {
×
1233
                return err
×
1234
        }
×
1235

1236
        c.activeWatchers[chanPoint] = chainWatcher
×
1237

×
1238
        // We'll also create a new channel arbitrator instance using this new
×
1239
        // channel, and our internal state.
×
1240
        channelArb, err := newActiveChannelArbitrator(
×
1241
                newChan, c, chainWatcher.SubscribeChannelEvents(),
×
1242
        )
×
1243
        if err != nil {
×
1244
                return err
×
1245
        }
×
1246

1247
        // With the arbitrator created, we'll add it to our set of active
1248
        // arbitrators, then launch it.
1249
        c.activeChannels[chanPoint] = channelArb
×
1250

×
1251
        if err := channelArb.Start(nil); err != nil {
×
1252
                return err
×
1253
        }
×
1254

1255
        return chainWatcher.Start()
×
1256
}
1257

1258
// SubscribeChannelEvents returns a new active subscription for the set of
1259
// possible on-chain events for a particular channel. The struct can be used by
1260
// callers to be notified whenever an event that changes the state of the
1261
// channel on-chain occurs.
1262
func (c *ChainArbitrator) SubscribeChannelEvents(
1263
        chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
×
1264

×
1265
        // First, we'll attempt to look up the active watcher for this channel.
×
1266
        // If we can't find it, then we'll return an error back to the caller.
×
1267
        c.Lock()
×
1268
        watcher, ok := c.activeWatchers[chanPoint]
×
1269
        c.Unlock()
×
1270

×
1271
        if !ok {
×
1272
                return nil, fmt.Errorf("unable to find watcher for: %v",
×
1273
                        chanPoint)
×
1274
        }
×
1275

1276
        // With the watcher located, we'll request for it to create a new chain
1277
        // event subscription client.
1278
        return watcher.SubscribeChannelEvents(), nil
×
1279
}
1280

1281
// FindOutgoingHTLCDeadline returns the deadline in absolute block height for
1282
// the specified outgoing HTLC. For an outgoing HTLC, its deadline is defined
1283
// by the timeout height of its corresponding incoming HTLC - this is the
1284
// expiry height the that remote peer can spend his/her outgoing HTLC via the
1285
// timeout path.
1286
func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
1287
        outgoingHTLC channeldb.HTLC) fn.Option[int32] {
×
1288

×
1289
        // Find the outgoing HTLC's corresponding incoming HTLC in the circuit
×
1290
        // map.
×
1291
        rHash := outgoingHTLC.RHash
×
1292
        circuit := models.CircuitKey{
×
1293
                ChanID: scid,
×
1294
                HtlcID: outgoingHTLC.HtlcIndex,
×
1295
        }
×
1296
        incomingCircuit := c.cfg.QueryIncomingCircuit(circuit)
×
1297

×
1298
        // If there's no incoming circuit found, we will use the default
×
1299
        // deadline.
×
1300
        if incomingCircuit == nil {
×
1301
                log.Warnf("ChannelArbitrator(%v): incoming circuit key not "+
×
1302
                        "found for rHash=%x, using default deadline instead",
×
1303
                        scid, rHash)
×
1304

×
1305
                return fn.None[int32]()
×
1306
        }
×
1307

1308
        // If this is a locally initiated HTLC, it means we are the first hop.
1309
        // In this case, we can relax the deadline.
1310
        if incomingCircuit.ChanID.IsDefault() {
×
1311
                log.Infof("ChannelArbitrator(%v): using default deadline for "+
×
1312
                        "locally initiated HTLC for rHash=%x", scid, rHash)
×
1313

×
1314
                return fn.None[int32]()
×
1315
        }
×
1316

1317
        log.Debugf("Found incoming circuit %v for rHash=%x using outgoing "+
×
1318
                "circuit %v", incomingCircuit, rHash, circuit)
×
1319

×
1320
        c.Lock()
×
1321
        defer c.Unlock()
×
1322

×
1323
        // Iterate over all active channels to find the incoming HTLC specified
×
1324
        // by its circuit key.
×
1325
        for cp, channelArb := range c.activeChannels {
×
1326
                // Skip if the SCID doesn't match.
×
1327
                if channelArb.cfg.ShortChanID != incomingCircuit.ChanID {
×
1328
                        continue
×
1329
                }
1330

1331
                // Make sure the channel arbitrator has the latest view of its
1332
                // active HTLCs.
1333
                channelArb.updateActiveHTLCs()
×
1334

×
1335
                // Iterate all the known HTLCs to find the targeted incoming
×
1336
                // HTLC.
×
1337
                for _, htlcs := range channelArb.activeHTLCs {
×
1338
                        for _, htlc := range htlcs.incomingHTLCs {
×
1339
                                // Skip if the index doesn't match.
×
1340
                                if htlc.HtlcIndex != incomingCircuit.HtlcID {
×
1341
                                        continue
×
1342
                                }
1343

1344
                                log.Debugf("ChannelArbitrator(%v): found "+
×
1345
                                        "incoming HTLC in channel=%v using "+
×
1346
                                        "rHash=%x, refundTimeout=%v", scid,
×
1347
                                        cp, rHash, htlc.RefundTimeout)
×
1348

×
1349
                                return fn.Some(int32(htlc.RefundTimeout))
×
1350
                        }
1351
                }
1352
        }
1353

1354
        // If there's no incoming HTLC found, yet we have the incoming circuit,
1355
        // something is wrong - in this case, we return the none deadline.
1356
        log.Errorf("ChannelArbitrator(%v): incoming HTLC not found for "+
×
1357
                "rHash=%x, using default deadline instead", scid, rHash)
×
1358

×
1359
        return fn.None[int32]()
×
1360
}
1361

1362
// TODO(roasbeef): arbitration reports
1363
//  * types: contested, waiting for success conf, etc
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc