• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11219354629

07 Oct 2024 03:56PM UTC coverage: 58.585% (-0.2%) from 58.814%
11219354629

Pull #9147

github

ziggie1984
fixup! sqlc: migration up script for payments.
Pull Request #9147: [Part 1|3] Introduce SQL Payment schema into LND

130227 of 222287 relevant lines covered (58.59%)

29106.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.54
/contractcourt/chain_arbitrator.go
1
package contractcourt
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/channeldb"
16
        "github.com/lightningnetwork/lnd/channeldb/models"
17
        "github.com/lightningnetwork/lnd/clock"
18
        "github.com/lightningnetwork/lnd/fn"
19
        "github.com/lightningnetwork/lnd/input"
20
        "github.com/lightningnetwork/lnd/kvdb"
21
        "github.com/lightningnetwork/lnd/labels"
22
        "github.com/lightningnetwork/lnd/lnwallet"
23
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
24
        "github.com/lightningnetwork/lnd/lnwire"
25
)
26

27
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
28
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
29

30
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
31
// outgoing contract has been fully resolved. For multi-hop contracts, if we
32
// resolve the outgoing contract, we'll also need to ensure that the incoming
33
// contract is resolved as well. We package the items required to resolve the
34
// incoming contracts within this message.
35
type ResolutionMsg struct {
36
        // SourceChan identifies the channel that this message is being sent
37
        // from. This is the channel's short channel ID.
38
        SourceChan lnwire.ShortChannelID
39

40
        // HtlcIndex is the index of the contract within the original
41
        // commitment trace.
42
        HtlcIndex uint64
43

44
        // Failure will be non-nil if the incoming contract should be canceled
45
        // all together. This can happen if the outgoing contract was dust, if
46
        // if the outgoing HTLC timed out.
47
        Failure lnwire.FailureMessage
48

49
        // PreImage will be non-nil if the incoming contract can successfully
50
        // be redeemed. This can happen if we learn of the preimage from the
51
        // outgoing HTLC on-chain.
52
        PreImage *[32]byte
53
}
54

55
// ChainArbitratorConfig is a configuration struct that contains all the
56
// function closures and interface that required to arbitrate on-chain
57
// contracts for a particular chain.
58
type ChainArbitratorConfig struct {
59
        // ChainHash is the chain that this arbitrator is to operate within.
60
        ChainHash chainhash.Hash
61

62
        // IncomingBroadcastDelta is the delta that we'll use to decide when to
63
        // broadcast our commitment transaction if we have incoming htlcs. This
64
        // value should be set based on our current fee estimation of the
65
        // commitment transaction. We use this to determine when we should
66
        // broadcast instead of just the HTLC timeout, as we want to ensure
67
        // that the commitment transaction is already confirmed, by the time the
68
        // HTLC expires. Otherwise we may end up not settling the htlc on-chain
69
        // because the other party managed to time it out.
70
        IncomingBroadcastDelta uint32
71

72
        // OutgoingBroadcastDelta is the delta that we'll use to decide when to
73
        // broadcast our commitment transaction if there are active outgoing
74
        // htlcs. This value can be lower than the incoming broadcast delta.
75
        OutgoingBroadcastDelta uint32
76

77
        // NewSweepAddr is a function that returns a new address under control
78
        // by the wallet. We'll use this to sweep any no-delay outputs as a
79
        // result of unilateral channel closes.
80
        //
81
        // NOTE: This SHOULD return a p2wkh script.
82
        NewSweepAddr func() ([]byte, error)
83

84
        // PublishTx reliably broadcasts a transaction to the network. Once
85
        // this function exits without an error, then they transaction MUST
86
        // continually be rebroadcast if needed.
87
        PublishTx func(*wire.MsgTx, string) error
88

89
        // DeliverResolutionMsg is a function that will append an outgoing
90
        // message to the "out box" for a ChannelLink. This is used to cancel
91
        // backwards any HTLC's that are either dust, we're timing out, or
92
        // settling on-chain to the incoming link.
93
        DeliverResolutionMsg func(...ResolutionMsg) error
94

95
        // MarkLinkInactive is a function closure that the ChainArbitrator will
96
        // use to mark that active HTLC's shouldn't be attempted to be routed
97
        // over a particular channel. This function will be called in that a
98
        // ChannelArbitrator decides that it needs to go to chain in order to
99
        // resolve contracts.
100
        //
101
        // TODO(roasbeef): rename, routing based
102
        MarkLinkInactive func(wire.OutPoint) error
103

104
        // ContractBreach is a function closure that the ChainArbitrator will
105
        // use to notify the BreachArbitrator about a contract breach. It should
106
        // only return a non-nil error when the BreachArbitrator has preserved
107
        // the necessary breach info for this channel point. Once the breach
108
        // resolution is persisted in the ChannelArbitrator, it will be safe
109
        // to mark the channel closed.
110
        ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
111

112
        // IsOurAddress is a function that returns true if the passed address
113
        // is known to the underlying wallet. Otherwise, false should be
114
        // returned.
115
        IsOurAddress func(btcutil.Address) bool
116

117
        // IncubateOutputs sends either an incoming HTLC, an outgoing HTLC, or
118
        // both to the utxo nursery. Once this function returns, the nursery
119
        // should have safely persisted the outputs to disk, and should start
120
        // the process of incubation. This is used when a resolver wishes to
121
        // pass off the output to the nursery as we're only waiting on an
122
        // absolute/relative item block.
123
        IncubateOutputs func(wire.OutPoint,
124
                fn.Option[lnwallet.OutgoingHtlcResolution],
125
                fn.Option[lnwallet.IncomingHtlcResolution],
126
                uint32, fn.Option[int32]) error
127

128
        // PreimageDB is a global store of all known pre-images. We'll use this
129
        // to decide if we should broadcast a commitment transaction to claim
130
        // an HTLC on-chain.
131
        PreimageDB WitnessBeacon
132

133
        // Notifier is an instance of a chain notifier we'll use to watch for
134
        // certain on-chain events.
135
        Notifier chainntnfs.ChainNotifier
136

137
        // Mempool is the a mempool watcher that allows us to watch for events
138
        // happened in mempool.
139
        Mempool chainntnfs.MempoolWatcher
140

141
        // Signer is a signer backed by the active lnd node. This should be
142
        // capable of producing a signature as specified by a valid
143
        // SignDescriptor.
144
        Signer input.Signer
145

146
        // FeeEstimator will be used to return fee estimates.
147
        FeeEstimator chainfee.Estimator
148

149
        // ChainIO allows us to query the state of the current main chain.
150
        ChainIO lnwallet.BlockChainIO
151

152
        // DisableChannel disables a channel, resulting in it not being able to
153
        // forward payments.
154
        DisableChannel func(wire.OutPoint) error
155

156
        // Sweeper allows resolvers to sweep their final outputs.
157
        Sweeper UtxoSweeper
158

159
        // Registry is the invoice database that is used by resolvers to lookup
160
        // preimages and settle invoices.
161
        Registry Registry
162

163
        // NotifyClosedChannel is a function closure that the ChainArbitrator
164
        // will use to notify the ChannelNotifier about a newly closed channel.
165
        NotifyClosedChannel func(wire.OutPoint)
166

167
        // NotifyFullyResolvedChannel is a function closure that the
168
        // ChainArbitrator will use to notify the ChannelNotifier about a newly
169
        // resolved channel. The main difference to NotifyClosedChannel is that
170
        // in case of a local force close the NotifyClosedChannel is called when
171
        // the published commitment transaction confirms while
172
        // NotifyFullyResolvedChannel is only called when the channel is fully
173
        // resolved (which includes sweeping any time locked funds).
174
        NotifyFullyResolvedChannel func(point wire.OutPoint)
175

176
        // OnionProcessor is used to decode onion payloads for on-chain
177
        // resolution.
178
        OnionProcessor OnionProcessor
179

180
        // PaymentsExpirationGracePeriod indicates a time window we let the
181
        // other node to cancel an outgoing htlc that our node has initiated and
182
        // has timed out.
183
        PaymentsExpirationGracePeriod time.Duration
184

185
        // IsForwardedHTLC checks for a given htlc, identified by channel id and
186
        // htlcIndex, if it is a forwarded one.
187
        IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
188

189
        // Clock is the clock implementation that ChannelArbitrator uses.
190
        // It is useful for testing.
191
        Clock clock.Clock
192

193
        // SubscribeBreachComplete is used by the breachResolver to register a
194
        // subscription that notifies when the breach resolution process is
195
        // complete.
196
        SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
197
                bool, error)
198

199
        // PutFinalHtlcOutcome stores the final outcome of an htlc in the
200
        // database.
201
        PutFinalHtlcOutcome func(chanId lnwire.ShortChannelID,
202
                htlcId uint64, settled bool) error
203

204
        // HtlcNotifier is an interface that htlc events are sent to.
205
        HtlcNotifier HtlcNotifier
206

207
        // Budget is the configured budget for the arbitrator.
208
        Budget BudgetConfig
209

210
        // QueryIncomingCircuit is used to find the outgoing HTLC's
211
        // corresponding incoming HTLC circuit. It queries the circuit map for
212
        // a given outgoing circuit key and returns the incoming circuit key.
213
        //
214
        // TODO(yy): this is a hacky way to get around the cycling import issue
215
        // as we cannot import `htlcswitch` here. A proper way is to define an
216
        // interface here that asks for method `LookupOpenCircuit`,
217
        // meanwhile, turn `PaymentCircuit` into an interface or bring it to a
218
        // lower package.
219
        QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey
220

221
        // AuxLeafStore is an optional store that can be used to store auxiliary
222
        // leaves for certain custom channel types.
223
        AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
224

225
        // AuxSigner is an optional signer that can be used to sign auxiliary
226
        // leaves for certain custom channel types.
227
        AuxSigner fn.Option[lnwallet.AuxSigner]
228
}
229

230
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
231
// active, and channel that are in the "pending close" state. Within the
232
// contractcourt package, the ChainArbitrator manages a set of active
233
// ContractArbitrators. Each ContractArbitrators is responsible for watching
234
// the chain for any activity that affects the state of the channel, and also
235
// for monitoring each contract in order to determine if any on-chain activity is
236
// required. Outside sub-systems interact with the ChainArbitrator in order to
237
// forcibly exit a contract, update the set of live signals for each contract,
238
// and to receive reports on the state of contract resolution.
239
type ChainArbitrator struct {
240
        started int32 // To be used atomically.
241
        stopped int32 // To be used atomically.
242

243
        sync.Mutex
244

245
        // activeChannels is a map of all the active contracts that are still
246
        // open, and not fully resolved.
247
        activeChannels map[wire.OutPoint]*ChannelArbitrator
248

249
        // activeWatchers is a map of all the active chainWatchers for channels
250
        // that are still considered open.
251
        activeWatchers map[wire.OutPoint]*chainWatcher
252

253
        // cfg is the config struct for the arbitrator that contains all
254
        // methods and interface it needs to operate.
255
        cfg ChainArbitratorConfig
256

257
        // chanSource will be used by the ChainArbitrator to fetch all the
258
        // active channels that it must still watch over.
259
        chanSource *channeldb.DB
260

261
        quit chan struct{}
262

263
        wg sync.WaitGroup
264
}
265

266
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
267
// passed config struct, and backing persistent database.
268
func NewChainArbitrator(cfg ChainArbitratorConfig,
269
        db *channeldb.DB) *ChainArbitrator {
270

271
        return &ChainArbitrator{
272
                cfg:            cfg,
273
                activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
4✔
274
                activeWatchers: make(map[wire.OutPoint]*chainWatcher),
4✔
275
                chanSource:     db,
4✔
276
                quit:           make(chan struct{}),
4✔
277
        }
4✔
278
}
4✔
279

4✔
280
// arbChannel is a wrapper around an open channel that channel arbitrators
4✔
281
// interact with.
4✔
282
type arbChannel struct {
4✔
283
        // channel is the in-memory channel state.
284
        channel *channeldb.OpenChannel
285

286
        // c references the chain arbitrator and is used by arbChannel
287
        // internally.
288
        c *ChainArbitrator
289
}
290

291
// NewAnchorResolutions returns the anchor resolutions for currently valid
292
// commitment transactions.
293
//
294
// NOTE: Part of the ArbChannel interface.
295
func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
296
        error) {
297

298
        // Get a fresh copy of the database state to base the anchor resolutions
299
        // on. Unfortunately the channel instance that we have here isn't the
300
        // same instance that is used by the link.
2✔
301
        chanPoint := a.channel.FundingOutpoint
2✔
302

2✔
303
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(
2✔
304
                nil, chanPoint,
2✔
305
        )
2✔
306
        if err != nil {
2✔
307
                return nil, err
2✔
308
        }
2✔
309

2✔
310
        var chanOpts []lnwallet.ChannelOpt
2✔
311
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
312
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
313
        })
314
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
2✔
315
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
2✔
316
        })
×
317

×
318
        chanMachine, err := lnwallet.NewLightningChannel(
2✔
319
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
320
        )
×
321
        if err != nil {
2✔
322
                return nil, err
×
323
        }
×
324

325
        return chanMachine.NewAnchorResolutions()
2✔
326
}
2✔
327

2✔
328
// ForceCloseChan should force close the contract that this attendant is
2✔
329
// watching over. We'll use this when we decide that we need to go to chain. It
×
330
// should in addition tell the switch to remove the corresponding link, such
×
331
// that we won't accept any new updates. The returned summary contains all items
332
// needed to eventually resolve all outputs on chain.
2✔
333
//
334
// NOTE: Part of the ArbChannel interface.
335
func (a *arbChannel) ForceCloseChan() (*lnwallet.LocalForceCloseSummary, error) {
336
        // First, we mark the channel as borked, this ensure
337
        // that no new state transitions can happen, and also
338
        // that the link won't be loaded into the switch.
339
        if err := a.channel.MarkBorked(); err != nil {
340
                return nil, err
341
        }
342

2✔
343
        // With the channel marked as borked, we'll now remove
2✔
344
        // the link from the switch if its there. If the link
2✔
345
        // is active, then this method will block until it
2✔
346
        // exits.
2✔
347
        chanPoint := a.channel.FundingOutpoint
×
348

×
349
        if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
350
                log.Errorf("unable to mark link inactive: %v", err)
351
        }
352

353
        // Now that we know the link can't mutate the channel
354
        // state, we'll read the channel from disk the target
2✔
355
        // channel according to its channel point.
2✔
356
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(
2✔
357
                nil, chanPoint,
×
358
        )
×
359
        if err != nil {
360
                return nil, err
361
        }
362

363
        var chanOpts []lnwallet.ChannelOpt
2✔
364
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
2✔
365
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
2✔
366
        })
2✔
367
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
368
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
369
        })
370

2✔
371
        // Finally, we'll force close the channel completing
2✔
372
        // the force close workflow.
×
373
        chanMachine, err := lnwallet.NewLightningChannel(
×
374
                a.c.cfg.Signer, channel, nil, chanOpts...,
2✔
375
        )
×
376
        if err != nil {
×
377
                return nil, err
2✔
378
        }
×
379
        return chanMachine.ForceClose()
×
380
}
381

382
// newActiveChannelArbitrator creates a new instance of an active channel
383
// arbitrator given the state of the target channel.
2✔
384
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
2✔
385
        c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
2✔
386

2✔
387
        // TODO(roasbeef): fetch best height (or pass in) so can ensure block
×
388
        // epoch delivers all the notifications to
×
389

2✔
390
        chanPoint := channel.FundingOutpoint
391

392
        log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", chanPoint)
393

394
        // Next we'll create the matching configuration struct that contains
395
        // all interfaces and methods the arbitrator needs to do its job.
13✔
396
        arbCfg := ChannelArbitratorConfig{
13✔
397
                ChanPoint:   chanPoint,
13✔
398
                Channel:     c.getArbChannel(channel),
13✔
399
                ShortChanID: channel.ShortChanID(),
13✔
400

13✔
401
                MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
13✔
402
                MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
13✔
403
                        statuses ...channeldb.ChannelStatus) error {
13✔
404

13✔
405
                        err := channel.CloseChannel(summary, statuses...)
13✔
406
                        if err != nil {
13✔
407
                                return err
13✔
408
                        }
13✔
409
                        c.cfg.NotifyClosedChannel(summary.ChanPoint)
13✔
410
                        return nil
13✔
411
                },
13✔
412
                IsPendingClose:        false,
13✔
413
                ChainArbitratorConfig: c.cfg,
15✔
414
                ChainEvents:           chanEvents,
2✔
415
                PutResolverReport: func(tx kvdb.RwTx,
2✔
416
                        report *channeldb.ResolverReport) error {
2✔
417

×
418
                        return c.chanSource.PutResolverReport(
×
419
                                tx, c.cfg.ChainHash, &chanPoint, report,
2✔
420
                        )
2✔
421
                },
422
                FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
423
                        chanStateDB := c.chanSource.ChannelStateDB()
424
                        return chanStateDB.FetchHistoricalChannel(&chanPoint)
425
                },
426
                FindOutgoingHTLCDeadline: func(
2✔
427
                        htlc channeldb.HTLC) fn.Option[int32] {
2✔
428

2✔
429
                        return c.FindOutgoingHTLCDeadline(
2✔
430
                                channel.ShortChanID(), htlc,
2✔
431
                        )
2✔
432
                },
2✔
433
        }
2✔
434

2✔
435
        // The final component needed is an arbitrator log that the arbitrator
2✔
436
        // will use to keep track of its internal state using a backed
437
        // persistent log.
2✔
438
        //
2✔
439
        // TODO(roasbeef); abstraction leak...
2✔
440
        //  * rework: adaptor method to set log scope w/ factory func
2✔
441
        chanLog, err := newBoltArbitratorLog(
2✔
442
                c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
2✔
443
        )
444
        if err != nil {
445
                return nil, err
446
        }
447

448
        arbCfg.MarkChannelResolved = func() error {
449
                if c.cfg.NotifyFullyResolvedChannel != nil {
450
                        c.cfg.NotifyFullyResolvedChannel(chanPoint)
451
                }
13✔
452

13✔
453
                return c.ResolveContract(chanPoint)
13✔
454
        }
13✔
455

×
456
        // Finally, we'll need to construct a series of htlc Sets based on all
×
457
        // currently known valid commitments.
458
        htlcSets := make(map[HtlcSetKey]htlcSet)
15✔
459
        htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
4✔
460
        htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
2✔
461

2✔
462
        pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
463
        if err != nil && err != channeldb.ErrNoPendingCommit {
2✔
464
                return nil, err
465
        }
466
        if pendingRemoteCommitment != nil {
467
                htlcSets[RemotePendingHtlcSet] = newHtlcSet(
468
                        pendingRemoteCommitment.Commitment.Htlcs,
13✔
469
                )
13✔
470
        }
13✔
471

13✔
472
        return NewChannelArbitrator(
13✔
473
                arbCfg, htlcSets, chanLog,
13✔
474
        ), nil
×
475
}
×
476

13✔
477
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
×
478
func (c *ChainArbitrator) getArbChannel(
×
479
        channel *channeldb.OpenChannel) *arbChannel {
×
480

×
481
        return &arbChannel{
482
                channel: channel,
13✔
483
                c:       c,
13✔
484
        }
13✔
485
}
486

487
// ResolveContract marks a contract as fully resolved within the database.
488
// This is only to be done once all contracts which were live on the channel
489
// before hitting the chain have been resolved.
13✔
490
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
13✔
491
        log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
13✔
492

13✔
493
        // First, we'll we'll mark the channel as fully closed from the PoV of
13✔
494
        // the channel source.
13✔
495
        err := c.chanSource.ChannelStateDB().MarkChanFullyClosed(&chanPoint)
13✔
496
        if err != nil {
497
                log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
498
                        "fully closed: %v", chanPoint, err)
499
                return err
500
        }
4✔
501

4✔
502
        // Now that the channel has been marked as fully closed, we'll stop
4✔
503
        // both the channel arbitrator and chain watcher for this channel if
4✔
504
        // they're still active.
4✔
505
        var arbLog ArbitratorLog
4✔
506
        c.Lock()
4✔
507
        chainArb := c.activeChannels[chanPoint]
×
508
        delete(c.activeChannels, chanPoint)
×
509

×
510
        chainWatcher := c.activeWatchers[chanPoint]
×
511
        delete(c.activeWatchers, chanPoint)
512
        c.Unlock()
513

514
        if chainArb != nil {
515
                arbLog = chainArb.log
4✔
516

4✔
517
                if err := chainArb.Stop(); err != nil {
4✔
518
                        log.Warnf("unable to stop ChannelArbitrator(%v): %v",
4✔
519
                                chanPoint, err)
4✔
520
                }
4✔
521
        }
4✔
522
        if chainWatcher != nil {
4✔
523
                if err := chainWatcher.Stop(); err != nil {
4✔
524
                        log.Warnf("unable to stop ChainWatcher(%v): %v",
7✔
525
                                chanPoint, err)
3✔
526
                }
3✔
527
        }
3✔
528

×
529
        // Once this has been marked as resolved, we'll wipe the log that the
×
530
        // channel arbitrator was using to store its persistent state. We do
×
531
        // this after marking the channel resolved, as otherwise, the
532
        // arbitrator would be re-created, and think it was starting from the
7✔
533
        // default state.
3✔
534
        if arbLog != nil {
×
535
                if err := arbLog.WipeHistory(); err != nil {
×
536
                        return err
×
537
                }
538
        }
539

540
        return nil
541
}
542

543
// Start launches all goroutines that the ChainArbitrator needs to operate.
544
func (c *ChainArbitrator) Start() error {
7✔
545
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
3✔
546
                return nil
×
547
        }
×
548

549
        log.Infof("ChainArbitrator starting with config: budget=[%v]",
550
                &c.cfg.Budget)
4✔
551

552
        // First, we'll fetch all the channels that are still open, in order to
553
        // collect them within our set of active contracts.
554
        openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
4✔
555
        if err != nil {
4✔
556
                return err
×
557
        }
×
558

559
        if len(openChannels) > 0 {
4✔
560
                log.Infof("Creating ChannelArbitrators for %v active channels",
4✔
561
                        len(openChannels))
4✔
562
        }
4✔
563

4✔
564
        // For each open channel, we'll configure then launch a corresponding
4✔
565
        // ChannelArbitrator.
4✔
566
        for _, channel := range openChannels {
×
567
                chanPoint := channel.FundingOutpoint
×
568
                channel := channel
569

8✔
570
                // First, we'll create an active chainWatcher for this channel
4✔
571
                // to ensure that we detect any relevant on chain events.
4✔
572
                breachClosure := func(ret *lnwallet.BreachRetribution) error {
4✔
573
                        return c.cfg.ContractBreach(chanPoint, ret)
574
                }
575

576
                chainWatcher, err := newChainWatcher(
17✔
577
                        chainWatcherConfig{
13✔
578
                                chanState:           channel,
13✔
579
                                notifier:            c.cfg.Notifier,
13✔
580
                                signer:              c.cfg.Signer,
13✔
581
                                isOurAddr:           c.cfg.IsOurAddress,
13✔
582
                                contractBreach:      breachClosure,
15✔
583
                                extractStateNumHint: lnwallet.GetStateNumHint,
2✔
584
                        },
2✔
585
                )
586
                if err != nil {
13✔
587
                        return err
13✔
588
                }
13✔
589

13✔
590
                c.activeWatchers[chanPoint] = chainWatcher
13✔
591
                channelArb, err := newActiveChannelArbitrator(
13✔
592
                        channel, c, chainWatcher.SubscribeChannelEvents(),
13✔
593
                )
13✔
594
                if err != nil {
13✔
595
                        return err
13✔
596
                }
13✔
597

13✔
598
                c.activeChannels[chanPoint] = channelArb
13✔
599

×
600
                // Republish any closing transactions for this channel.
×
601
                err = c.republishClosingTxs(channel)
602
                if err != nil {
13✔
603
                        log.Errorf("Failed to republish closing txs for "+
13✔
604
                                "channel %v", chanPoint)
13✔
605
                }
13✔
606
        }
13✔
607

×
608
        // In addition to the channels that we know to be open, we'll also
×
609
        // launch arbitrators to finishing resolving any channels that are in
610
        // the pending close state.
13✔
611
        closingChannels, err := c.chanSource.ChannelStateDB().FetchClosedChannels(
13✔
612
                true,
13✔
613
        )
13✔
614
        if err != nil {
13✔
615
                return err
×
616
        }
×
617

×
618
        if len(closingChannels) > 0 {
619
                log.Infof("Creating ChannelArbitrators for %v closing channels",
620
                        len(closingChannels))
621
        }
622

623
        // Next, for each channel is the closing state, we'll launch a
4✔
624
        // corresponding more restricted resolver, as we don't have to watch
4✔
625
        // the chain any longer, only resolve the contracts on the confirmed
4✔
626
        // commitment.
4✔
627
        //nolint:lll
×
628
        for _, closeChanInfo := range closingChannels {
×
629
                // We can leave off the CloseContract and ForceCloseChan
630
                // methods as the channel is already closed at this point.
6✔
631
                chanPoint := closeChanInfo.ChanPoint
2✔
632
                arbCfg := ChannelArbitratorConfig{
2✔
633
                        ChanPoint:             chanPoint,
2✔
634
                        ShortChanID:           closeChanInfo.ShortChanID,
635
                        ChainArbitratorConfig: c.cfg,
636
                        ChainEvents:           &ChainEventSubscription{},
637
                        IsPendingClose:        true,
638
                        ClosingHeight:         closeChanInfo.CloseHeight,
639
                        CloseType:             closeChanInfo.CloseType,
640
                        PutResolverReport: func(tx kvdb.RwTx,
6✔
641
                                report *channeldb.ResolverReport) error {
2✔
642

2✔
643
                                return c.chanSource.PutResolverReport(
2✔
644
                                        tx, c.cfg.ChainHash, &chanPoint, report,
2✔
645
                                )
2✔
646
                        },
2✔
647
                        FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
2✔
648
                                chanStateDB := c.chanSource.ChannelStateDB()
2✔
649
                                return chanStateDB.FetchHistoricalChannel(&chanPoint)
2✔
650
                        },
2✔
651
                        FindOutgoingHTLCDeadline: func(
2✔
652
                                htlc channeldb.HTLC) fn.Option[int32] {
2✔
653

4✔
654
                                return c.FindOutgoingHTLCDeadline(
2✔
655
                                        closeChanInfo.ShortChanID, htlc,
2✔
656
                                )
2✔
657
                        },
2✔
658
                }
2✔
659
                chanLog, err := newBoltArbitratorLog(
2✔
660
                        c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
2✔
661
                )
2✔
662
                if err != nil {
2✔
663
                        return err
664
                }
2✔
665
                arbCfg.MarkChannelResolved = func() error {
2✔
666
                        if c.cfg.NotifyFullyResolvedChannel != nil {
2✔
667
                                c.cfg.NotifyFullyResolvedChannel(chanPoint)
2✔
668
                        }
2✔
669

2✔
670
                        return c.ResolveContract(chanPoint)
671
                }
2✔
672

2✔
673
                // We create an empty map of HTLC's here since it's possible
2✔
674
                // that the channel is in StateDefault and updateActiveHTLCs is
2✔
675
                // called. We want to avoid writing to an empty map. Since the
×
676
                // channel is already in the process of being resolved, no new
×
677
                // HTLCs will be added.
4✔
678
                c.activeChannels[chanPoint] = NewChannelArbitrator(
4✔
679
                        arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
2✔
680
                )
2✔
681
        }
682

2✔
683
        // Now, we'll start all chain watchers in parallel to shorten start up
684
        // duration. In neutrino mode, this allows spend registrations to take
685
        // advantage of batch spend reporting, instead of doing a single rescan
686
        // per chain watcher.
687
        //
688
        // NOTE: After this point, we Stop the chain arb to ensure that any
689
        // lingering goroutines are cleaned up before exiting.
690
        watcherErrs := make(chan error, len(c.activeWatchers))
2✔
691
        var wg sync.WaitGroup
2✔
692
        for _, watcher := range c.activeWatchers {
2✔
693
                wg.Add(1)
694
                go func(w *chainWatcher) {
695
                        defer wg.Done()
696
                        select {
697
                        case watcherErrs <- w.Start():
698
                        case <-c.quit:
699
                                watcherErrs <- ErrChainArbExiting
700
                        }
701
                }(watcher)
702
        }
4✔
703

4✔
704
        // Once all chain watchers have been started, seal the err chan to
17✔
705
        // signal the end of the err stream.
13✔
706
        go func() {
26✔
707
                wg.Wait()
13✔
708
                close(watcherErrs)
13✔
709
        }()
13✔
710

×
711
        // stopAndLog is a helper function which shuts down the chain arb and
×
712
        // logs errors if they occur.
713
        stopAndLog := func() {
714
                if err := c.Stop(); err != nil {
715
                        log.Errorf("ChainArbitrator could not shutdown: %v", err)
716
                }
717
        }
718

8✔
719
        // Handle all errors returned from spawning our chain watchers. If any
4✔
720
        // of them failed, we will stop the chain arb to shutdown any active
4✔
721
        // goroutines.
4✔
722
        for err := range watcherErrs {
723
                if err != nil {
724
                        stopAndLog()
725
                        return err
4✔
726
                }
×
727
        }
×
728

×
729
        // Before we start all of our arbitrators, we do a preliminary state
730
        // lookup so that we can combine all of these lookups in a single db
731
        // transaction.
732
        var startStates map[wire.OutPoint]*chanArbStartState
733

734
        err = kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
17✔
735
                for _, arbitrator := range c.activeChannels {
13✔
736
                        startState, err := arbitrator.getStartState(tx)
×
737
                        if err != nil {
×
738
                                return err
×
739
                        }
740

741
                        startStates[arbitrator.cfg.ChanPoint] = startState
742
                }
743

744
                return nil
4✔
745
        }, func() {
4✔
746
                startStates = make(
8✔
747
                        map[wire.OutPoint]*chanArbStartState,
17✔
748
                        len(c.activeChannels),
13✔
749
                )
13✔
750
        })
×
751
        if err != nil {
×
752
                stopAndLog()
753
                return err
13✔
754
        }
755

756
        // Launch all the goroutines for each arbitrator so they can carry out
4✔
757
        // their duties.
4✔
758
        for _, arbitrator := range c.activeChannels {
4✔
759
                startState, ok := startStates[arbitrator.cfg.ChanPoint]
4✔
760
                if !ok {
4✔
761
                        stopAndLog()
4✔
762
                        return fmt.Errorf("arbitrator: %v has no start state",
4✔
763
                                arbitrator.cfg.ChanPoint)
4✔
764
                }
×
765

×
766
                if err := arbitrator.Start(startState); err != nil {
×
767
                        stopAndLog()
768
                        return err
769
                }
770
        }
17✔
771

13✔
772
        // Subscribe to a single stream of block epoch notifications that we
13✔
773
        // will dispatch to all active arbitrators.
×
774
        blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
×
775
        if err != nil {
×
776
                return err
×
777
        }
778

13✔
779
        // Start our goroutine which will dispatch blocks to each arbitrator.
×
780
        c.wg.Add(1)
×
781
        go func() {
×
782
                defer c.wg.Done()
783
                c.dispatchBlocks(blockEpoch)
784
        }()
785

786
        // TODO(roasbeef): eventually move all breach watching here
4✔
787

4✔
788
        return nil
×
789
}
×
790

791
// blockRecipient contains the information we need to dispatch a block to a
792
// channel arbitrator.
4✔
793
type blockRecipient struct {
8✔
794
        // chanPoint is the funding outpoint of the channel.
4✔
795
        chanPoint wire.OutPoint
4✔
796

4✔
797
        // blocks is the channel that new block heights are sent into. This
798
        // channel should be sufficiently buffered as to not block the sender.
799
        blocks chan<- int32
800

4✔
801
        // quit is closed if the receiving entity is shutting down.
802
        quit chan struct{}
803
}
804

805
// dispatchBlocks consumes a block epoch notification stream and dispatches
806
// blocks to each of the chain arb's active channel arbitrators. This function
807
// must be run in a goroutine.
808
func (c *ChainArbitrator) dispatchBlocks(
809
        blockEpoch *chainntnfs.BlockEpochEvent) {
810

811
        // getRecipients is a helper function which acquires the chain arb
812
        // lock and returns a set of block recipients which can be used to
813
        // dispatch blocks.
814
        getRecipients := func() []blockRecipient {
815
                c.Lock()
816
                blocks := make([]blockRecipient, 0, len(c.activeChannels))
817
                for _, channel := range c.activeChannels {
818
                        blocks = append(blocks, blockRecipient{
819
                                chanPoint: channel.cfg.ChanPoint,
820
                                blocks:    channel.blocks,
821
                                quit:      channel.quit,
4✔
822
                        })
4✔
823
                }
4✔
824
                c.Unlock()
4✔
825

4✔
826
                return blocks
8✔
827
        }
4✔
828

4✔
829
        // On exit, cancel our blocks subscription and close each block channel
16✔
830
        // so that the arbitrators know they will no longer be receiving blocks.
12✔
831
        defer func() {
12✔
832
                blockEpoch.Cancel()
12✔
833

12✔
834
                recipients := getRecipients()
12✔
835
                for _, recipient := range recipients {
12✔
836
                        close(recipient.blocks)
4✔
837
                }
4✔
838
        }()
4✔
839

840
        // Consume block epochs until we receive the instruction to shutdown.
841
        for {
842
                select {
843
                // Consume block epochs, exiting if our subscription is
8✔
844
                // terminated.
4✔
845
                case block, ok := <-blockEpoch.Epochs:
4✔
846
                        if !ok {
4✔
847
                                log.Trace("dispatchBlocks block epoch " +
16✔
848
                                        "cancelled")
12✔
849
                                return
12✔
850
                        }
851

852
                        // Get the set of currently active channels block
853
                        // subscription channels and dispatch the block to
8✔
854
                        // each.
4✔
855
                        for _, recipient := range getRecipients() {
856
                                select {
857
                                // Deliver the block to the arbitrator.
2✔
858
                                case recipient.blocks <- block.Height:
2✔
859

×
860
                                // If the recipient is shutting down, exit
×
861
                                // without delivering the block. This may be
×
862
                                // the case when two blocks are mined in quick
×
863
                                // succession, and the arbitrator resolves
864
                                // after the first block, and does not need to
865
                                // consume the second block.
866
                                case <-recipient.quit:
867
                                        log.Debugf("channel: %v exit without "+
4✔
868
                                                "receiving block: %v",
2✔
869
                                                recipient.chanPoint,
870
                                                block.Height)
2✔
871

872
                                // If the chain arb is shutting down, we don't
873
                                // need to deliver any more blocks (everything
874
                                // will be shutting down).
875
                                case <-c.quit:
876
                                        return
877
                                }
878
                        }
×
879

×
880
                // Exit if the chain arbitrator is shutting down.
×
881
                case <-c.quit:
×
882
                        return
×
883
                }
884
        }
885
}
886

887
// republishClosingTxs will load any stored cooperative or unilateral closing
×
888
// transactions and republish them. This helps ensure propagation of the
×
889
// transactions in the event that prior publications failed.
890
func (c *ChainArbitrator) republishClosingTxs(
891
        channel *channeldb.OpenChannel) error {
892

893
        // If the channel has had its unilateral close broadcasted already,
4✔
894
        // republish it in case it didn't propagate.
4✔
895
        if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
896
                err := c.rebroadcast(
897
                        channel, channeldb.ChanStatusCommitBroadcasted,
898
                )
899
                if err != nil {
900
                        return err
901
                }
902
        }
903

13✔
904
        // If the channel has had its cooperative close broadcasted
13✔
905
        // already, republish it in case it didn't propagate.
13✔
906
        if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
13✔
907
                err := c.rebroadcast(
20✔
908
                        channel, channeldb.ChanStatusCoopBroadcasted,
7✔
909
                )
7✔
910
                if err != nil {
7✔
911
                        return err
7✔
912
                }
×
913
        }
×
914

915
        return nil
916
}
917

918
// rebroadcast is a helper method which will republish the unilateral or
18✔
919
// cooperative close transaction or a channel in a particular state.
5✔
920
//
5✔
921
// NOTE: There is no risk to calling this method if the channel isn't in either
5✔
922
// CommitmentBroadcasted or CoopBroadcasted, but the logs will be misleading.
5✔
923
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
×
924
        state channeldb.ChannelStatus) error {
×
925

926
        chanPoint := channel.FundingOutpoint
927

13✔
928
        var (
929
                closeTx *wire.MsgTx
930
                kind    string
931
                err     error
932
        )
933
        switch state {
934
        case channeldb.ChanStatusCommitBroadcasted:
935
                kind = "force"
936
                closeTx, err = channel.BroadcastedCommitment()
12✔
937

12✔
938
        case channeldb.ChanStatusCoopBroadcasted:
12✔
939
                kind = "coop"
12✔
940
                closeTx, err = channel.BroadcastedCooperative()
12✔
941

12✔
942
        default:
12✔
943
                return fmt.Errorf("unknown closing state: %v", state)
12✔
944
        }
12✔
945

12✔
946
        switch {
7✔
947
        // This can happen for channels that had their closing tx published
7✔
948
        // before we started storing it to disk.
7✔
949
        case err == channeldb.ErrNoCloseTx:
950
                log.Warnf("Channel %v is in state %v, but no %s closing tx "+
5✔
951
                        "to re-publish...", chanPoint, state, kind)
5✔
952
                return nil
5✔
953

954
        case err != nil:
×
955
                return err
×
956
        }
957

958
        log.Infof("Re-publishing %s close tx(%v) for channel %v",
12✔
959
                kind, closeTx.TxHash(), chanPoint)
960

961
        label := labels.MakeLabel(
×
962
                labels.LabelTypeChannelClose, &channel.ShortChannelID,
×
963
        )
×
964
        err = c.cfg.PublishTx(closeTx, label)
×
965
        if err != nil && err != lnwallet.ErrDoubleSpend {
966
                log.Warnf("Unable to broadcast %s close tx(%v): %v",
×
967
                        kind, closeTx.TxHash(), err)
×
968
        }
969

970
        return nil
12✔
971
}
12✔
972

12✔
973
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
12✔
974
// channel arbitrators will be signalled to exit, and this method will block
12✔
975
// until they've all exited.
12✔
976
func (c *ChainArbitrator) Stop() error {
12✔
977
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
12✔
978
                return nil
×
979
        }
×
980

×
981
        log.Info("ChainArbitrator shutting down...")
982
        defer log.Debug("ChainArbitrator shutdown complete")
12✔
983

984
        close(c.quit)
985

986
        var (
987
                activeWatchers = make(map[wire.OutPoint]*chainWatcher)
988
                activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
4✔
989
        )
4✔
990

×
991
        // Copy the current set of active watchers and arbitrators to shutdown.
×
992
        // We don't want to hold the lock when shutting down each watcher or
993
        // arbitrator individually, as they may need to acquire this mutex.
4✔
994
        c.Lock()
4✔
995
        for chanPoint, watcher := range c.activeWatchers {
4✔
996
                activeWatchers[chanPoint] = watcher
4✔
997
        }
4✔
998
        for chanPoint, arbitrator := range c.activeChannels {
4✔
999
                activeChannels[chanPoint] = arbitrator
4✔
1000
        }
4✔
1001
        c.Unlock()
4✔
1002

4✔
1003
        for chanPoint, watcher := range activeWatchers {
4✔
1004
                log.Tracef("Attempting to stop ChainWatcher(%v)",
4✔
1005
                        chanPoint)
4✔
1006

4✔
1007
                if err := watcher.Stop(); err != nil {
16✔
1008
                        log.Errorf("unable to stop watcher for "+
12✔
1009
                                "ChannelPoint(%v): %v", chanPoint, err)
12✔
1010
                }
16✔
1011
        }
12✔
1012
        for chanPoint, arbitrator := range activeChannels {
12✔
1013
                log.Tracef("Attempting to stop ChannelArbitrator(%v)",
4✔
1014
                        chanPoint)
4✔
1015

16✔
1016
                if err := arbitrator.Stop(); err != nil {
12✔
1017
                        log.Errorf("unable to stop arbitrator for "+
12✔
1018
                                "ChannelPoint(%v): %v", chanPoint, err)
12✔
1019
                }
12✔
1020
        }
×
1021

×
1022
        c.wg.Wait()
×
1023

1024
        return nil
16✔
1025
}
12✔
1026

12✔
1027
// ContractUpdate is a message packages the latest set of active HTLCs on a
12✔
1028
// commitment, and also identifies which commitment received a new set of
12✔
1029
// HTLCs.
×
1030
type ContractUpdate struct {
×
1031
        // HtlcKey identifies which commitment the HTLCs below are present on.
×
1032
        HtlcKey HtlcSetKey
1033

1034
        // Htlcs are the of active HTLCs on the commitment identified by the
4✔
1035
        // above HtlcKey.
4✔
1036
        Htlcs []channeldb.HTLC
4✔
1037
}
1038

1039
// ContractSignals is used by outside subsystems to notify a channel arbitrator
1040
// of its ShortChannelID.
1041
type ContractSignals struct {
1042
        // ShortChanID is the up to date short channel ID for a contract. This
1043
        // can change either if when the contract was added it didn't yet have
1044
        // a stable identifier, or in the case of a reorg.
1045
        ShortChanID lnwire.ShortChannelID
1046
}
1047

1048
// UpdateContractSignals sends a set of active, up to date contract signals to
1049
// the ChannelArbitrator which is has been assigned to the channel infield by
1050
// the passed channel point.
1051
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
1052
        signals *ContractSignals) error {
1053

1054
        log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
1055
                chanPoint)
1056

1057
        c.Lock()
1058
        arbitrator, ok := c.activeChannels[chanPoint]
1059
        c.Unlock()
1060
        if !ok {
1061
                return fmt.Errorf("unable to find arbitrator")
1062
        }
1063

1064
        arbitrator.UpdateContractSignals(signals)
2✔
1065

2✔
1066
        return nil
2✔
1067
}
2✔
1068

2✔
1069
// NotifyContractUpdate lets a channel arbitrator know that a new
2✔
1070
// ContractUpdate is available. This calls the ChannelArbitrator's internal
2✔
1071
// method NotifyContractUpdate which waits for a response on a done chan before
2✔
1072
// returning. This method will return an error if the ChannelArbitrator is not
2✔
1073
// in the activeChannels map. However, this only happens if the arbitrator is
×
1074
// resolved and the related link would already be shut down.
×
1075
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
1076
        update *ContractUpdate) error {
2✔
1077

2✔
1078
        c.Lock()
2✔
1079
        arbitrator, ok := c.activeChannels[chanPoint]
1080
        c.Unlock()
1081
        if !ok {
1082
                return fmt.Errorf("can't find arbitrator for %v", chanPoint)
1083
        }
1084

1085
        arbitrator.notifyContractUpdate(update)
1086
        return nil
1087
}
1088

2✔
1089
// GetChannelArbitrator safely returns the channel arbitrator for a given
2✔
1090
// channel outpoint.
2✔
1091
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
2✔
1092
        *ChannelArbitrator, error) {
2✔
1093

2✔
1094
        c.Lock()
×
1095
        arbitrator, ok := c.activeChannels[chanPoint]
×
1096
        c.Unlock()
1097
        if !ok {
2✔
1098
                return nil, fmt.Errorf("unable to find arbitrator")
2✔
1099
        }
1100

1101
        return arbitrator, nil
1102
}
1103

1104
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
2✔
1105
// that watches a particular channel to broadcast the commitment transaction,
2✔
1106
// and enter the resolution phase of the channel.
2✔
1107
type forceCloseReq struct {
2✔
1108
        // errResp is a channel that will be sent upon either in the case of
2✔
1109
        // force close success (nil error), or in the case on an error.
2✔
1110
        //
×
1111
        // NOTE; This channel MUST be buffered.
×
1112
        errResp chan error
1113

2✔
1114
        // closeTx is a channel that carries the transaction which ultimately
1115
        // closed out the channel.
1116
        closeTx chan *wire.MsgTx
1117
}
1118

1119
// ForceCloseContract attempts to force close the channel infield by the passed
1120
// channel point. A force close will immediately terminate the contract,
1121
// causing it to enter the resolution phase. If the force close was successful,
1122
// then the force close transaction itself will be returned.
1123
//
1124
// TODO(roasbeef): just return the summary itself?
1125
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
1126
        c.Lock()
1127
        arbitrator, ok := c.activeChannels[chanPoint]
1128
        c.Unlock()
1129
        if !ok {
1130
                return nil, fmt.Errorf("unable to find arbitrator")
1131
        }
1132

1133
        log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
1134

1135
        // Before closing, we'll attempt to send a disable update for the
1136
        // channel. We do so before closing the channel as otherwise the current
1137
        // edge policy won't be retrievable from the graph.
2✔
1138
        if err := c.cfg.DisableChannel(chanPoint); err != nil {
2✔
1139
                log.Warnf("Unable to disable channel %v on "+
2✔
1140
                        "close: %v", chanPoint, err)
2✔
1141
        }
2✔
1142

×
1143
        errChan := make(chan error, 1)
×
1144
        respChan := make(chan *wire.MsgTx, 1)
1145

2✔
1146
        // With the channel found, and the request crafted, we'll send over a
2✔
1147
        // force close request to the arbitrator that watches this channel.
2✔
1148
        select {
2✔
1149
        case arbitrator.forceCloseReqs <- &forceCloseReq{
2✔
1150
                errResp: errChan,
3✔
1151
                closeTx: respChan,
1✔
1152
        }:
1✔
1153
        case <-c.quit:
1✔
1154
                return nil, ErrChainArbExiting
1155
        }
2✔
1156

2✔
1157
        // We'll await two responses: the error response, and the transaction
2✔
1158
        // that closed out the channel.
2✔
1159
        select {
2✔
1160
        case err := <-errChan:
2✔
1161
                if err != nil {
1162
                        return nil, err
1163
                }
1164
        case <-c.quit:
2✔
1165
                return nil, ErrChainArbExiting
×
1166
        }
×
1167

1168
        var closeTx *wire.MsgTx
1169
        select {
1170
        case closeTx = <-respChan:
1171
        case <-c.quit:
2✔
1172
                return nil, ErrChainArbExiting
2✔
1173
        }
4✔
1174

2✔
1175
        return closeTx, nil
2✔
1176
}
×
1177

×
1178
// WatchNewChannel sends the ChainArbitrator a message to create a
1179
// ChannelArbitrator tasked with watching over a new channel. Once a new
1180
// channel has finished its final funding flow, it should be registered with
2✔
1181
// the ChainArbitrator so we can properly react to any on-chain events.
2✔
1182
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
2✔
1183
        c.Lock()
×
1184
        defer c.Unlock()
×
1185

1186
        chanPoint := newChan.FundingOutpoint
1187

2✔
1188
        log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
1189
                chanPoint)
1190

1191
        // If we're already watching this channel, then we'll ignore this
1192
        // request.
1193
        if _, ok := c.activeChannels[chanPoint]; ok {
1194
                return nil
2✔
1195
        }
2✔
1196

2✔
1197
        // First, also create an active chainWatcher for this channel to ensure
2✔
1198
        // that we detect any relevant on chain events.
2✔
1199
        chainWatcher, err := newChainWatcher(
2✔
1200
                chainWatcherConfig{
2✔
1201
                        chanState: newChan,
2✔
1202
                        notifier:  c.cfg.Notifier,
2✔
1203
                        signer:    c.cfg.Signer,
2✔
1204
                        isOurAddr: c.cfg.IsOurAddress,
2✔
1205
                        contractBreach: func(
2✔
1206
                                retInfo *lnwallet.BreachRetribution) error {
×
1207

×
1208
                                return c.cfg.ContractBreach(
1209
                                        chanPoint, retInfo,
1210
                                )
1211
                        },
2✔
1212
                        extractStateNumHint: lnwallet.GetStateNumHint,
2✔
1213
                },
2✔
1214
        )
2✔
1215
        if err != nil {
2✔
1216
                return err
2✔
1217
        }
2✔
1218

4✔
1219
        c.activeWatchers[chanPoint] = chainWatcher
2✔
1220

2✔
1221
        // We'll also create a new channel arbitrator instance using this new
2✔
1222
        // channel, and our internal state.
2✔
1223
        channelArb, err := newActiveChannelArbitrator(
2✔
1224
                newChan, c, chainWatcher.SubscribeChannelEvents(),
1225
        )
1226
        if err != nil {
1227
                return err
1228
        }
1229

2✔
1230
        // With the arbitrator created, we'll add it to our set of active
×
1231
        // arbitrators, then launch it.
×
1232
        c.activeChannels[chanPoint] = channelArb
1233

2✔
1234
        if err := channelArb.Start(nil); err != nil {
2✔
1235
                return err
2✔
1236
        }
2✔
1237

2✔
1238
        return chainWatcher.Start()
2✔
1239
}
2✔
1240

2✔
1241
// SubscribeChannelEvents returns a new active subscription for the set of
×
1242
// possible on-chain events for a particular channel. The struct can be used by
×
1243
// callers to be notified whenever an event that changes the state of the
1244
// channel on-chain occurs.
1245
func (c *ChainArbitrator) SubscribeChannelEvents(
1246
        chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
2✔
1247

2✔
1248
        // First, we'll attempt to look up the active watcher for this channel.
2✔
1249
        // If we can't find it, then we'll return an error back to the caller.
×
1250
        c.Lock()
×
1251
        watcher, ok := c.activeWatchers[chanPoint]
1252
        c.Unlock()
2✔
1253

1254
        if !ok {
1255
                return nil, fmt.Errorf("unable to find watcher for: %v",
1256
                        chanPoint)
1257
        }
1258

1259
        // With the watcher located, we'll request for it to create a new chain
1260
        // event subscription client.
2✔
1261
        return watcher.SubscribeChannelEvents(), nil
2✔
1262
}
2✔
1263

2✔
1264
// FindOutgoingHTLCDeadline returns the deadline in absolute block height for
2✔
1265
// the specified outgoing HTLC. For an outgoing HTLC, its deadline is defined
2✔
1266
// by the timeout height of its corresponding incoming HTLC - this is the
2✔
1267
// expiry height the that remote peer can spend his/her outgoing HTLC via the
2✔
1268
// timeout path.
2✔
1269
func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
×
1270
        outgoingHTLC channeldb.HTLC) fn.Option[int32] {
×
1271

×
1272
        // Find the outgoing HTLC's corresponding incoming HTLC in the circuit
1273
        // map.
1274
        rHash := outgoingHTLC.RHash
1275
        circuit := models.CircuitKey{
2✔
1276
                ChanID: scid,
1277
                HtlcID: outgoingHTLC.HtlcIndex,
1278
        }
1279
        incomingCircuit := c.cfg.QueryIncomingCircuit(circuit)
1280

1281
        // If there's no incoming circuit found, we will use the default
1282
        // deadline.
1283
        if incomingCircuit == nil {
1284
                log.Warnf("ChannelArbitrator(%v): incoming circuit key not "+
2✔
1285
                        "found for rHash=%x, using default deadline instead",
2✔
1286
                        scid, rHash)
2✔
1287

2✔
1288
                return fn.None[int32]()
2✔
1289
        }
2✔
1290

2✔
1291
        // If this is a locally initiated HTLC, it means we are the first hop.
2✔
1292
        // In this case, we can relax the deadline.
2✔
1293
        if incomingCircuit.ChanID.IsDefault() {
2✔
1294
                log.Infof("ChannelArbitrator(%v): using default deadline for "+
2✔
1295
                        "locally initiated HTLC for rHash=%x", scid, rHash)
2✔
1296

2✔
1297
                return fn.None[int32]()
4✔
1298
        }
2✔
1299

2✔
1300
        log.Debugf("Found incoming circuit %v for rHash=%x using outgoing "+
2✔
1301
                "circuit %v", incomingCircuit, rHash, circuit)
2✔
1302

2✔
1303
        c.Lock()
2✔
1304
        defer c.Unlock()
1305

1306
        // Iterate over all active channels to find the incoming HTLC specified
1307
        // by its circuit key.
4✔
1308
        for cp, channelArb := range c.activeChannels {
2✔
1309
                // Skip if the SCID doesn't match.
2✔
1310
                if channelArb.cfg.ShortChanID != incomingCircuit.ChanID {
2✔
1311
                        continue
2✔
1312
                }
2✔
1313

1314
                // Make sure the channel arbitrator has the latest view of its
2✔
1315
                // active HTLCs.
2✔
1316
                channelArb.updateActiveHTLCs()
2✔
1317

2✔
1318
                // Iterate all the known HTLCs to find the targeted incoming
2✔
1319
                // HTLC.
2✔
1320
                for _, htlcs := range channelArb.activeHTLCs {
2✔
1321
                        for _, htlc := range htlcs.incomingHTLCs {
2✔
1322
                                // Skip if the index doesn't match.
4✔
1323
                                if htlc.HtlcIndex != incomingCircuit.HtlcID {
2✔
1324
                                        continue
4✔
1325
                                }
2✔
1326

1327
                                log.Debugf("ChannelArbitrator(%v): found "+
1328
                                        "incoming HTLC in channel=%v using "+
1329
                                        "rHash=%x, refundTimeout=%v", scid,
1330
                                        cp, rHash, htlc.RefundTimeout)
2✔
1331

2✔
1332
                                return fn.Some(int32(htlc.RefundTimeout))
2✔
1333
                        }
2✔
1334
                }
4✔
1335
        }
4✔
1336

2✔
1337
        // If there's no incoming HTLC found, yet we have the incoming circuit,
4✔
1338
        // something is wrong - in this case, we return the none deadline.
2✔
1339
        log.Errorf("ChannelArbitrator(%v): incoming HTLC not found for "+
1340
                "rHash=%x, using default deadline instead", scid, rHash)
1341

2✔
1342
        return fn.None[int32]()
2✔
1343
}
2✔
1344

2✔
1345
// TODO(roasbeef): arbitration reports
2✔
1346
//  * types: contested, waiting for success conf, etc
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc