• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12012751795

25 Nov 2024 02:40PM UTC coverage: 49.835% (-9.2%) from 59.013%
12012751795

Pull #9303

github

yyforyongyu
lnwallet: add debug logs
Pull Request #9303: htlcswitch+routing: handle nil pointer dereference properly

20 of 23 new or added lines in 4 files covered. (86.96%)

25467 existing lines in 425 files now uncovered.

99835 of 200331 relevant lines covered (49.84%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.43
/contractcourt/chain_arbitrator.go
1
package contractcourt
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/channeldb"
16
        "github.com/lightningnetwork/lnd/channeldb/models"
17
        "github.com/lightningnetwork/lnd/clock"
18
        "github.com/lightningnetwork/lnd/fn"
19
        "github.com/lightningnetwork/lnd/input"
20
        "github.com/lightningnetwork/lnd/kvdb"
21
        "github.com/lightningnetwork/lnd/labels"
22
        "github.com/lightningnetwork/lnd/lnwallet"
23
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
24
        "github.com/lightningnetwork/lnd/lnwire"
25
)
26

27
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
28
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
29

30
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
31
// outgoing contract has been fully resolved. For multi-hop contracts, if we
32
// resolve the outgoing contract, we'll also need to ensure that the incoming
33
// contract is resolved as well. We package the items required to resolve the
34
// incoming contracts within this message.
35
type ResolutionMsg struct {
36
        // SourceChan identifies the channel that this message is being sent
37
        // from. This is the channel's short channel ID.
38
        SourceChan lnwire.ShortChannelID
39

40
        // HtlcIndex is the index of the contract within the original
41
        // commitment trace.
42
        HtlcIndex uint64
43

44
        // Failure will be non-nil if the incoming contract should be canceled
45
        // all together. This can happen if the outgoing contract was dust, if
46
        // if the outgoing HTLC timed out.
47
        Failure lnwire.FailureMessage
48

49
        // PreImage will be non-nil if the incoming contract can successfully
50
        // be redeemed. This can happen if we learn of the preimage from the
51
        // outgoing HTLC on-chain.
52
        PreImage *[32]byte
53
}
54

55
// ChainArbitratorConfig is a configuration struct that contains all the
56
// function closures and interface that required to arbitrate on-chain
57
// contracts for a particular chain.
58
type ChainArbitratorConfig struct {
59
        // ChainHash is the chain that this arbitrator is to operate within.
60
        ChainHash chainhash.Hash
61

62
        // IncomingBroadcastDelta is the delta that we'll use to decide when to
63
        // broadcast our commitment transaction if we have incoming htlcs. This
64
        // value should be set based on our current fee estimation of the
65
        // commitment transaction. We use this to determine when we should
66
        // broadcast instead of just the HTLC timeout, as we want to ensure
67
        // that the commitment transaction is already confirmed, by the time the
68
        // HTLC expires. Otherwise we may end up not settling the htlc on-chain
69
        // because the other party managed to time it out.
70
        IncomingBroadcastDelta uint32
71

72
        // OutgoingBroadcastDelta is the delta that we'll use to decide when to
73
        // broadcast our commitment transaction if there are active outgoing
74
        // htlcs. This value can be lower than the incoming broadcast delta.
75
        OutgoingBroadcastDelta uint32
76

77
        // NewSweepAddr is a function that returns a new address under control
78
        // by the wallet. We'll use this to sweep any no-delay outputs as a
79
        // result of unilateral channel closes.
80
        //
81
        // NOTE: This SHOULD return a p2wkh script.
82
        NewSweepAddr func() ([]byte, error)
83

84
        // PublishTx reliably broadcasts a transaction to the network. Once
85
        // this function exits without an error, then they transaction MUST
86
        // continually be rebroadcast if needed.
87
        PublishTx func(*wire.MsgTx, string) error
88

89
        // DeliverResolutionMsg is a function that will append an outgoing
90
        // message to the "out box" for a ChannelLink. This is used to cancel
91
        // backwards any HTLC's that are either dust, we're timing out, or
92
        // settling on-chain to the incoming link.
93
        DeliverResolutionMsg func(...ResolutionMsg) error
94

95
        // MarkLinkInactive is a function closure that the ChainArbitrator will
96
        // use to mark that active HTLC's shouldn't be attempted to be routed
97
        // over a particular channel. This function will be called in that a
98
        // ChannelArbitrator decides that it needs to go to chain in order to
99
        // resolve contracts.
100
        //
101
        // TODO(roasbeef): rename, routing based
102
        MarkLinkInactive func(wire.OutPoint) error
103

104
        // ContractBreach is a function closure that the ChainArbitrator will
105
        // use to notify the BreachArbitrator about a contract breach. It should
106
        // only return a non-nil error when the BreachArbitrator has preserved
107
        // the necessary breach info for this channel point. Once the breach
108
        // resolution is persisted in the ChannelArbitrator, it will be safe
109
        // to mark the channel closed.
110
        ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
111

112
        // IsOurAddress is a function that returns true if the passed address
113
        // is known to the underlying wallet. Otherwise, false should be
114
        // returned.
115
        IsOurAddress func(btcutil.Address) bool
116

117
        // IncubateOutputs sends either an incoming HTLC, an outgoing HTLC, or
118
        // both to the utxo nursery. Once this function returns, the nursery
119
        // should have safely persisted the outputs to disk, and should start
120
        // the process of incubation. This is used when a resolver wishes to
121
        // pass off the output to the nursery as we're only waiting on an
122
        // absolute/relative item block.
123
        IncubateOutputs func(wire.OutPoint,
124
                fn.Option[lnwallet.OutgoingHtlcResolution],
125
                fn.Option[lnwallet.IncomingHtlcResolution],
126
                uint32, fn.Option[int32]) error
127

128
        // PreimageDB is a global store of all known pre-images. We'll use this
129
        // to decide if we should broadcast a commitment transaction to claim
130
        // an HTLC on-chain.
131
        PreimageDB WitnessBeacon
132

133
        // Notifier is an instance of a chain notifier we'll use to watch for
134
        // certain on-chain events.
135
        Notifier chainntnfs.ChainNotifier
136

137
        // Mempool is the a mempool watcher that allows us to watch for events
138
        // happened in mempool.
139
        Mempool chainntnfs.MempoolWatcher
140

141
        // Signer is a signer backed by the active lnd node. This should be
142
        // capable of producing a signature as specified by a valid
143
        // SignDescriptor.
144
        Signer input.Signer
145

146
        // FeeEstimator will be used to return fee estimates.
147
        FeeEstimator chainfee.Estimator
148

149
        // ChainIO allows us to query the state of the current main chain.
150
        ChainIO lnwallet.BlockChainIO
151

152
        // DisableChannel disables a channel, resulting in it not being able to
153
        // forward payments.
154
        DisableChannel func(wire.OutPoint) error
155

156
        // Sweeper allows resolvers to sweep their final outputs.
157
        Sweeper UtxoSweeper
158

159
        // Registry is the invoice database that is used by resolvers to lookup
160
        // preimages and settle invoices.
161
        Registry Registry
162

163
        // NotifyClosedChannel is a function closure that the ChainArbitrator
164
        // will use to notify the ChannelNotifier about a newly closed channel.
165
        NotifyClosedChannel func(wire.OutPoint)
166

167
        // NotifyFullyResolvedChannel is a function closure that the
168
        // ChainArbitrator will use to notify the ChannelNotifier about a newly
169
        // resolved channel. The main difference to NotifyClosedChannel is that
170
        // in case of a local force close the NotifyClosedChannel is called when
171
        // the published commitment transaction confirms while
172
        // NotifyFullyResolvedChannel is only called when the channel is fully
173
        // resolved (which includes sweeping any time locked funds).
174
        NotifyFullyResolvedChannel func(point wire.OutPoint)
175

176
        // OnionProcessor is used to decode onion payloads for on-chain
177
        // resolution.
178
        OnionProcessor OnionProcessor
179

180
        // PaymentsExpirationGracePeriod indicates a time window we let the
181
        // other node to cancel an outgoing htlc that our node has initiated and
182
        // has timed out.
183
        PaymentsExpirationGracePeriod time.Duration
184

185
        // IsForwardedHTLC checks for a given htlc, identified by channel id and
186
        // htlcIndex, if it is a forwarded one.
187
        IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
188

189
        // Clock is the clock implementation that ChannelArbitrator uses.
190
        // It is useful for testing.
191
        Clock clock.Clock
192

193
        // SubscribeBreachComplete is used by the breachResolver to register a
194
        // subscription that notifies when the breach resolution process is
195
        // complete.
196
        SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
197
                bool, error)
198

199
        // PutFinalHtlcOutcome stores the final outcome of an htlc in the
200
        // database.
201
        PutFinalHtlcOutcome func(chanId lnwire.ShortChannelID,
202
                htlcId uint64, settled bool) error
203

204
        // HtlcNotifier is an interface that htlc events are sent to.
205
        HtlcNotifier HtlcNotifier
206

207
        // Budget is the configured budget for the arbitrator.
208
        Budget BudgetConfig
209

210
        // QueryIncomingCircuit is used to find the outgoing HTLC's
211
        // corresponding incoming HTLC circuit. It queries the circuit map for
212
        // a given outgoing circuit key and returns the incoming circuit key.
213
        //
214
        // TODO(yy): this is a hacky way to get around the cycling import issue
215
        // as we cannot import `htlcswitch` here. A proper way is to define an
216
        // interface here that asks for method `LookupOpenCircuit`,
217
        // meanwhile, turn `PaymentCircuit` into an interface or bring it to a
218
        // lower package.
219
        QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey
220

221
        // AuxLeafStore is an optional store that can be used to store auxiliary
222
        // leaves for certain custom channel types.
223
        AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
224

225
        // AuxSigner is an optional signer that can be used to sign auxiliary
226
        // leaves for certain custom channel types.
227
        AuxSigner fn.Option[lnwallet.AuxSigner]
228

229
        // AuxResolver is an optional interface that can be used to modify the
230
        // way contracts are resolved.
231
        AuxResolver fn.Option[lnwallet.AuxContractResolver]
232
}
233

234
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
235
// active, and channel that are in the "pending close" state. Within the
236
// contractcourt package, the ChainArbitrator manages a set of active
237
// ContractArbitrators. Each ContractArbitrators is responsible for watching
238
// the chain for any activity that affects the state of the channel, and also
239
// for monitoring each contract in order to determine if any on-chain activity is
240
// required. Outside sub-systems interact with the ChainArbitrator in order to
241
// forcibly exit a contract, update the set of live signals for each contract,
242
// and to receive reports on the state of contract resolution.
243
type ChainArbitrator struct {
244
        started int32 // To be used atomically.
245
        stopped int32 // To be used atomically.
246

247
        sync.Mutex
248

249
        // activeChannels is a map of all the active contracts that are still
250
        // open, and not fully resolved.
251
        activeChannels map[wire.OutPoint]*ChannelArbitrator
252

253
        // activeWatchers is a map of all the active chainWatchers for channels
254
        // that are still considered open.
255
        activeWatchers map[wire.OutPoint]*chainWatcher
256

257
        // cfg is the config struct for the arbitrator that contains all
258
        // methods and interface it needs to operate.
259
        cfg ChainArbitratorConfig
260

261
        // chanSource will be used by the ChainArbitrator to fetch all the
262
        // active channels that it must still watch over.
263
        chanSource *channeldb.DB
264

265
        quit chan struct{}
266

267
        wg sync.WaitGroup
268
}
269

270
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
271
// passed config struct, and backing persistent database.
272
func NewChainArbitrator(cfg ChainArbitratorConfig,
273
        db *channeldb.DB) *ChainArbitrator {
4✔
274

4✔
275
        return &ChainArbitrator{
4✔
276
                cfg:            cfg,
4✔
277
                activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
4✔
278
                activeWatchers: make(map[wire.OutPoint]*chainWatcher),
4✔
279
                chanSource:     db,
4✔
280
                quit:           make(chan struct{}),
4✔
281
        }
4✔
282
}
4✔
283

284
// arbChannel is a wrapper around an open channel that channel arbitrators
285
// interact with.
286
type arbChannel struct {
287
        // channel is the in-memory channel state.
288
        channel *channeldb.OpenChannel
289

290
        // c references the chain arbitrator and is used by arbChannel
291
        // internally.
292
        c *ChainArbitrator
293
}
294

295
// NewAnchorResolutions returns the anchor resolutions for currently valid
296
// commitment transactions.
297
//
298
// NOTE: Part of the ArbChannel interface.
299
func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
300
        error) {
4✔
301

4✔
302
        // Get a fresh copy of the database state to base the anchor resolutions
4✔
303
        // on. Unfortunately the channel instance that we have here isn't the
4✔
304
        // same instance that is used by the link.
4✔
305
        chanPoint := a.channel.FundingOutpoint
4✔
306

4✔
307
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(
4✔
308
                nil, chanPoint,
4✔
309
        )
4✔
310
        if err != nil {
4✔
311
                return nil, err
×
312
        }
×
313

314
        var chanOpts []lnwallet.ChannelOpt
4✔
315
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
4✔
316
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
317
        })
×
318
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
4✔
319
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
320
        })
×
321
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
4✔
322
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
323
        })
×
324

325
        chanMachine, err := lnwallet.NewLightningChannel(
4✔
326
                a.c.cfg.Signer, channel, nil, chanOpts...,
4✔
327
        )
4✔
328
        if err != nil {
4✔
329
                return nil, err
×
330
        }
×
331

332
        return chanMachine.NewAnchorResolutions()
4✔
333
}
334

335
// ForceCloseChan should force close the contract that this attendant is
336
// watching over. We'll use this when we decide that we need to go to chain. It
337
// should in addition tell the switch to remove the corresponding link, such
338
// that we won't accept any new updates.
339
//
340
// NOTE: Part of the ArbChannel interface.
341
func (a *arbChannel) ForceCloseChan() (*wire.MsgTx, error) {
4✔
342
        // First, we mark the channel as borked, this ensure
4✔
343
        // that no new state transitions can happen, and also
4✔
344
        // that the link won't be loaded into the switch.
4✔
345
        if err := a.channel.MarkBorked(); err != nil {
4✔
346
                return nil, err
×
347
        }
×
348

349
        // With the channel marked as borked, we'll now remove
350
        // the link from the switch if its there. If the link
351
        // is active, then this method will block until it
352
        // exits.
353
        chanPoint := a.channel.FundingOutpoint
4✔
354

4✔
355
        if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
4✔
356
                log.Errorf("unable to mark link inactive: %v", err)
×
357
        }
×
358

359
        // Now that we know the link can't mutate the channel
360
        // state, we'll read the channel from disk the target
361
        // channel according to its channel point.
362
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(
4✔
363
                nil, chanPoint,
4✔
364
        )
4✔
365
        if err != nil {
4✔
366
                return nil, err
×
367
        }
×
368

369
        var chanOpts []lnwallet.ChannelOpt
4✔
370
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
4✔
371
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
372
        })
×
373
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
4✔
374
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
375
        })
×
376
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
4✔
377
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
378
        })
×
379

380
        // Finally, we'll force close the channel completing
381
        // the force close workflow.
382
        chanMachine, err := lnwallet.NewLightningChannel(
4✔
383
                a.c.cfg.Signer, channel, nil, chanOpts...,
4✔
384
        )
4✔
385
        if err != nil {
4✔
386
                return nil, err
×
387
        }
×
388

389
        closeSummary, err := chanMachine.ForceClose(
4✔
390
                lnwallet.WithSkipContractResolutions(),
4✔
391
        )
4✔
392
        if err != nil {
4✔
393
                return nil, err
×
394
        }
×
395

396
        return closeSummary.CloseTx, nil
4✔
397
}
398

399
// newActiveChannelArbitrator creates a new instance of an active channel
400
// arbitrator given the state of the target channel.
401
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
402
        c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
4✔
403

4✔
404
        // TODO(roasbeef): fetch best height (or pass in) so can ensure block
4✔
405
        // epoch delivers all the notifications to
4✔
406

4✔
407
        chanPoint := channel.FundingOutpoint
4✔
408

4✔
409
        log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", chanPoint)
4✔
410

4✔
411
        // Next we'll create the matching configuration struct that contains
4✔
412
        // all interfaces and methods the arbitrator needs to do its job.
4✔
413
        arbCfg := ChannelArbitratorConfig{
4✔
414
                ChanPoint:   chanPoint,
4✔
415
                Channel:     c.getArbChannel(channel),
4✔
416
                ShortChanID: channel.ShortChanID(),
4✔
417

4✔
418
                MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
4✔
419
                MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
4✔
420
                        statuses ...channeldb.ChannelStatus) error {
8✔
421

4✔
422
                        err := channel.CloseChannel(summary, statuses...)
4✔
423
                        if err != nil {
4✔
424
                                return err
×
425
                        }
×
426
                        c.cfg.NotifyClosedChannel(summary.ChanPoint)
4✔
427
                        return nil
4✔
428
                },
429
                IsPendingClose:        false,
430
                ChainArbitratorConfig: c.cfg,
431
                ChainEvents:           chanEvents,
432
                PutResolverReport: func(tx kvdb.RwTx,
433
                        report *channeldb.ResolverReport) error {
4✔
434

4✔
435
                        return c.chanSource.PutResolverReport(
4✔
436
                                tx, c.cfg.ChainHash, &chanPoint, report,
4✔
437
                        )
4✔
438
                },
4✔
439
                FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
4✔
440
                        chanStateDB := c.chanSource.ChannelStateDB()
4✔
441
                        return chanStateDB.FetchHistoricalChannel(&chanPoint)
4✔
442
                },
4✔
443
                FindOutgoingHTLCDeadline: func(
444
                        htlc channeldb.HTLC) fn.Option[int32] {
4✔
445

4✔
446
                        return c.FindOutgoingHTLCDeadline(
4✔
447
                                channel.ShortChanID(), htlc,
4✔
448
                        )
4✔
449
                },
4✔
450
        }
451

452
        // The final component needed is an arbitrator log that the arbitrator
453
        // will use to keep track of its internal state using a backed
454
        // persistent log.
455
        //
456
        // TODO(roasbeef); abstraction leak...
457
        //  * rework: adaptor method to set log scope w/ factory func
458
        chanLog, err := newBoltArbitratorLog(
4✔
459
                c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
4✔
460
        )
4✔
461
        if err != nil {
4✔
462
                return nil, err
×
463
        }
×
464

465
        arbCfg.MarkChannelResolved = func() error {
8✔
466
                if c.cfg.NotifyFullyResolvedChannel != nil {
8✔
467
                        c.cfg.NotifyFullyResolvedChannel(chanPoint)
4✔
468
                }
4✔
469

470
                return c.ResolveContract(chanPoint)
4✔
471
        }
472

473
        // Finally, we'll need to construct a series of htlc Sets based on all
474
        // currently known valid commitments.
475
        htlcSets := make(map[HtlcSetKey]htlcSet)
4✔
476
        htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
4✔
477
        htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
4✔
478

4✔
479
        pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
4✔
480
        if err != nil && err != channeldb.ErrNoPendingCommit {
4✔
481
                return nil, err
×
482
        }
×
483
        if pendingRemoteCommitment != nil {
4✔
484
                htlcSets[RemotePendingHtlcSet] = newHtlcSet(
×
485
                        pendingRemoteCommitment.Commitment.Htlcs,
×
486
                )
×
487
        }
×
488

489
        return NewChannelArbitrator(
4✔
490
                arbCfg, htlcSets, chanLog,
4✔
491
        ), nil
4✔
492
}
493

494
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
495
func (c *ChainArbitrator) getArbChannel(
496
        channel *channeldb.OpenChannel) *arbChannel {
4✔
497

4✔
498
        return &arbChannel{
4✔
499
                channel: channel,
4✔
500
                c:       c,
4✔
501
        }
4✔
502
}
4✔
503

504
// ResolveContract marks a contract as fully resolved within the database.
505
// This is only to be done once all contracts which were live on the channel
506
// before hitting the chain have been resolved.
507
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
4✔
508
        log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
4✔
509

4✔
510
        // First, we'll we'll mark the channel as fully closed from the PoV of
4✔
511
        // the channel source.
4✔
512
        err := c.chanSource.ChannelStateDB().MarkChanFullyClosed(&chanPoint)
4✔
513
        if err != nil {
4✔
514
                log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
×
515
                        "fully closed: %v", chanPoint, err)
×
516
                return err
×
517
        }
×
518

519
        // Now that the channel has been marked as fully closed, we'll stop
520
        // both the channel arbitrator and chain watcher for this channel if
521
        // they're still active.
522
        var arbLog ArbitratorLog
4✔
523
        c.Lock()
4✔
524
        chainArb := c.activeChannels[chanPoint]
4✔
525
        delete(c.activeChannels, chanPoint)
4✔
526

4✔
527
        chainWatcher := c.activeWatchers[chanPoint]
4✔
528
        delete(c.activeWatchers, chanPoint)
4✔
529
        c.Unlock()
4✔
530

4✔
531
        if chainArb != nil {
8✔
532
                arbLog = chainArb.log
4✔
533

4✔
534
                if err := chainArb.Stop(); err != nil {
4✔
535
                        log.Warnf("unable to stop ChannelArbitrator(%v): %v",
×
536
                                chanPoint, err)
×
537
                }
×
538
        }
539
        if chainWatcher != nil {
8✔
540
                if err := chainWatcher.Stop(); err != nil {
4✔
541
                        log.Warnf("unable to stop ChainWatcher(%v): %v",
×
542
                                chanPoint, err)
×
543
                }
×
544
        }
545

546
        // Once this has been marked as resolved, we'll wipe the log that the
547
        // channel arbitrator was using to store its persistent state. We do
548
        // this after marking the channel resolved, as otherwise, the
549
        // arbitrator would be re-created, and think it was starting from the
550
        // default state.
551
        if arbLog != nil {
8✔
552
                if err := arbLog.WipeHistory(); err != nil {
4✔
553
                        return err
×
554
                }
×
555
        }
556

557
        return nil
4✔
558
}
559

560
// Start launches all goroutines that the ChainArbitrator needs to operate.
561
func (c *ChainArbitrator) Start() error {
4✔
562
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
4✔
563
                return nil
×
564
        }
×
565

566
        log.Infof("ChainArbitrator starting with config: budget=[%v]",
4✔
567
                &c.cfg.Budget)
4✔
568

4✔
569
        // First, we'll fetch all the channels that are still open, in order to
4✔
570
        // collect them within our set of active contracts.
4✔
571
        openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
4✔
572
        if err != nil {
4✔
573
                return err
×
574
        }
×
575

576
        if len(openChannels) > 0 {
8✔
577
                log.Infof("Creating ChannelArbitrators for %v active channels",
4✔
578
                        len(openChannels))
4✔
579
        }
4✔
580

581
        // For each open channel, we'll configure then launch a corresponding
582
        // ChannelArbitrator.
583
        for _, channel := range openChannels {
8✔
584
                chanPoint := channel.FundingOutpoint
4✔
585
                channel := channel
4✔
586

4✔
587
                // First, we'll create an active chainWatcher for this channel
4✔
588
                // to ensure that we detect any relevant on chain events.
4✔
589
                breachClosure := func(ret *lnwallet.BreachRetribution) error {
8✔
590
                        return c.cfg.ContractBreach(chanPoint, ret)
4✔
591
                }
4✔
592

593
                chainWatcher, err := newChainWatcher(
4✔
594
                        chainWatcherConfig{
4✔
595
                                chanState:           channel,
4✔
596
                                notifier:            c.cfg.Notifier,
4✔
597
                                signer:              c.cfg.Signer,
4✔
598
                                isOurAddr:           c.cfg.IsOurAddress,
4✔
599
                                contractBreach:      breachClosure,
4✔
600
                                extractStateNumHint: lnwallet.GetStateNumHint,
4✔
601
                                auxLeafStore:        c.cfg.AuxLeafStore,
4✔
602
                                auxResolver:         c.cfg.AuxResolver,
4✔
603
                        },
4✔
604
                )
4✔
605
                if err != nil {
4✔
606
                        return err
×
607
                }
×
608

609
                c.activeWatchers[chanPoint] = chainWatcher
4✔
610
                channelArb, err := newActiveChannelArbitrator(
4✔
611
                        channel, c, chainWatcher.SubscribeChannelEvents(),
4✔
612
                )
4✔
613
                if err != nil {
4✔
614
                        return err
×
615
                }
×
616

617
                c.activeChannels[chanPoint] = channelArb
4✔
618

4✔
619
                // Republish any closing transactions for this channel.
4✔
620
                err = c.republishClosingTxs(channel)
4✔
621
                if err != nil {
4✔
622
                        log.Errorf("Failed to republish closing txs for "+
×
623
                                "channel %v", chanPoint)
×
624
                }
×
625
        }
626

627
        // In addition to the channels that we know to be open, we'll also
628
        // launch arbitrators to finishing resolving any channels that are in
629
        // the pending close state.
630
        closingChannels, err := c.chanSource.ChannelStateDB().FetchClosedChannels(
4✔
631
                true,
4✔
632
        )
4✔
633
        if err != nil {
4✔
634
                return err
×
635
        }
×
636

637
        if len(closingChannels) > 0 {
8✔
638
                log.Infof("Creating ChannelArbitrators for %v closing channels",
4✔
639
                        len(closingChannels))
4✔
640
        }
4✔
641

642
        // Next, for each channel is the closing state, we'll launch a
643
        // corresponding more restricted resolver, as we don't have to watch
644
        // the chain any longer, only resolve the contracts on the confirmed
645
        // commitment.
646
        //nolint:lll
647
        for _, closeChanInfo := range closingChannels {
8✔
648
                // We can leave off the CloseContract and ForceCloseChan
4✔
649
                // methods as the channel is already closed at this point.
4✔
650
                chanPoint := closeChanInfo.ChanPoint
4✔
651
                arbCfg := ChannelArbitratorConfig{
4✔
652
                        ChanPoint:             chanPoint,
4✔
653
                        ShortChanID:           closeChanInfo.ShortChanID,
4✔
654
                        ChainArbitratorConfig: c.cfg,
4✔
655
                        ChainEvents:           &ChainEventSubscription{},
4✔
656
                        IsPendingClose:        true,
4✔
657
                        ClosingHeight:         closeChanInfo.CloseHeight,
4✔
658
                        CloseType:             closeChanInfo.CloseType,
4✔
659
                        PutResolverReport: func(tx kvdb.RwTx,
4✔
660
                                report *channeldb.ResolverReport) error {
8✔
661

4✔
662
                                return c.chanSource.PutResolverReport(
4✔
663
                                        tx, c.cfg.ChainHash, &chanPoint, report,
4✔
664
                                )
4✔
665
                        },
4✔
666
                        FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
4✔
667
                                chanStateDB := c.chanSource.ChannelStateDB()
4✔
668
                                return chanStateDB.FetchHistoricalChannel(&chanPoint)
4✔
669
                        },
4✔
670
                        FindOutgoingHTLCDeadline: func(
671
                                htlc channeldb.HTLC) fn.Option[int32] {
4✔
672

4✔
673
                                return c.FindOutgoingHTLCDeadline(
4✔
674
                                        closeChanInfo.ShortChanID, htlc,
4✔
675
                                )
4✔
676
                        },
4✔
677
                }
678
                chanLog, err := newBoltArbitratorLog(
4✔
679
                        c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
4✔
680
                )
4✔
681
                if err != nil {
4✔
682
                        return err
×
683
                }
×
684
                arbCfg.MarkChannelResolved = func() error {
8✔
685
                        if c.cfg.NotifyFullyResolvedChannel != nil {
8✔
686
                                c.cfg.NotifyFullyResolvedChannel(chanPoint)
4✔
687
                        }
4✔
688

689
                        return c.ResolveContract(chanPoint)
4✔
690
                }
691

692
                // We create an empty map of HTLC's here since it's possible
693
                // that the channel is in StateDefault and updateActiveHTLCs is
694
                // called. We want to avoid writing to an empty map. Since the
695
                // channel is already in the process of being resolved, no new
696
                // HTLCs will be added.
697
                c.activeChannels[chanPoint] = NewChannelArbitrator(
4✔
698
                        arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
4✔
699
                )
4✔
700
        }
701

702
        // Now, we'll start all chain watchers in parallel to shorten start up
703
        // duration. In neutrino mode, this allows spend registrations to take
704
        // advantage of batch spend reporting, instead of doing a single rescan
705
        // per chain watcher.
706
        //
707
        // NOTE: After this point, we Stop the chain arb to ensure that any
708
        // lingering goroutines are cleaned up before exiting.
709
        watcherErrs := make(chan error, len(c.activeWatchers))
4✔
710
        var wg sync.WaitGroup
4✔
711
        for _, watcher := range c.activeWatchers {
8✔
712
                wg.Add(1)
4✔
713
                go func(w *chainWatcher) {
8✔
714
                        defer wg.Done()
4✔
715
                        select {
4✔
716
                        case watcherErrs <- w.Start():
4✔
717
                        case <-c.quit:
×
718
                                watcherErrs <- ErrChainArbExiting
×
719
                        }
720
                }(watcher)
721
        }
722

723
        // Once all chain watchers have been started, seal the err chan to
724
        // signal the end of the err stream.
725
        go func() {
8✔
726
                wg.Wait()
4✔
727
                close(watcherErrs)
4✔
728
        }()
4✔
729

730
        // stopAndLog is a helper function which shuts down the chain arb and
731
        // logs errors if they occur.
732
        stopAndLog := func() {
4✔
733
                if err := c.Stop(); err != nil {
×
734
                        log.Errorf("ChainArbitrator could not shutdown: %v", err)
×
735
                }
×
736
        }
737

738
        // Handle all errors returned from spawning our chain watchers. If any
739
        // of them failed, we will stop the chain arb to shutdown any active
740
        // goroutines.
741
        for err := range watcherErrs {
8✔
742
                if err != nil {
4✔
743
                        stopAndLog()
×
744
                        return err
×
745
                }
×
746
        }
747

748
        // Before we start all of our arbitrators, we do a preliminary state
749
        // lookup so that we can combine all of these lookups in a single db
750
        // transaction.
751
        var startStates map[wire.OutPoint]*chanArbStartState
4✔
752

4✔
753
        err = kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
8✔
754
                for _, arbitrator := range c.activeChannels {
8✔
755
                        startState, err := arbitrator.getStartState(tx)
4✔
756
                        if err != nil {
4✔
757
                                return err
×
758
                        }
×
759

760
                        startStates[arbitrator.cfg.ChanPoint] = startState
4✔
761
                }
762

763
                return nil
4✔
764
        }, func() {
4✔
765
                startStates = make(
4✔
766
                        map[wire.OutPoint]*chanArbStartState,
4✔
767
                        len(c.activeChannels),
4✔
768
                )
4✔
769
        })
4✔
770
        if err != nil {
4✔
771
                stopAndLog()
×
772
                return err
×
773
        }
×
774

775
        // Launch all the goroutines for each arbitrator so they can carry out
776
        // their duties.
777
        for _, arbitrator := range c.activeChannels {
8✔
778
                startState, ok := startStates[arbitrator.cfg.ChanPoint]
4✔
779
                if !ok {
4✔
780
                        stopAndLog()
×
781
                        return fmt.Errorf("arbitrator: %v has no start state",
×
782
                                arbitrator.cfg.ChanPoint)
×
783
                }
×
784

785
                if err := arbitrator.Start(startState); err != nil {
4✔
786
                        stopAndLog()
×
787
                        return err
×
788
                }
×
789
        }
790

791
        // Subscribe to a single stream of block epoch notifications that we
792
        // will dispatch to all active arbitrators.
793
        blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
4✔
794
        if err != nil {
4✔
795
                return err
×
796
        }
×
797

798
        // Start our goroutine which will dispatch blocks to each arbitrator.
799
        c.wg.Add(1)
4✔
800
        go func() {
8✔
801
                defer c.wg.Done()
4✔
802
                c.dispatchBlocks(blockEpoch)
4✔
803
        }()
4✔
804

805
        // TODO(roasbeef): eventually move all breach watching here
806

807
        return nil
4✔
808
}
809

810
// blockRecipient contains the information we need to dispatch a block to a
811
// channel arbitrator.
812
type blockRecipient struct {
813
        // chanPoint is the funding outpoint of the channel.
814
        chanPoint wire.OutPoint
815

816
        // blocks is the channel that new block heights are sent into. This
817
        // channel should be sufficiently buffered as to not block the sender.
818
        blocks chan<- int32
819

820
        // quit is closed if the receiving entity is shutting down.
821
        quit chan struct{}
822
}
823

824
// dispatchBlocks consumes a block epoch notification stream and dispatches
825
// blocks to each of the chain arb's active channel arbitrators. This function
826
// must be run in a goroutine.
827
func (c *ChainArbitrator) dispatchBlocks(
828
        blockEpoch *chainntnfs.BlockEpochEvent) {
4✔
829

4✔
830
        // getRecipients is a helper function which acquires the chain arb
4✔
831
        // lock and returns a set of block recipients which can be used to
4✔
832
        // dispatch blocks.
4✔
833
        getRecipients := func() []blockRecipient {
8✔
834
                c.Lock()
4✔
835
                blocks := make([]blockRecipient, 0, len(c.activeChannels))
4✔
836
                for _, channel := range c.activeChannels {
8✔
837
                        blocks = append(blocks, blockRecipient{
4✔
838
                                chanPoint: channel.cfg.ChanPoint,
4✔
839
                                blocks:    channel.blocks,
4✔
840
                                quit:      channel.quit,
4✔
841
                        })
4✔
842
                }
4✔
843
                c.Unlock()
4✔
844

4✔
845
                return blocks
4✔
846
        }
847

848
        // On exit, cancel our blocks subscription and close each block channel
849
        // so that the arbitrators know they will no longer be receiving blocks.
850
        defer func() {
8✔
851
                blockEpoch.Cancel()
4✔
852

4✔
853
                recipients := getRecipients()
4✔
854
                for _, recipient := range recipients {
8✔
855
                        close(recipient.blocks)
4✔
856
                }
4✔
857
        }()
858

859
        // Consume block epochs until we receive the instruction to shutdown.
860
        for {
8✔
861
                select {
4✔
862
                // Consume block epochs, exiting if our subscription is
863
                // terminated.
864
                case block, ok := <-blockEpoch.Epochs:
4✔
865
                        if !ok {
4✔
866
                                log.Trace("dispatchBlocks block epoch " +
×
867
                                        "cancelled")
×
868
                                return
×
869
                        }
×
870

871
                        // Get the set of currently active channels block
872
                        // subscription channels and dispatch the block to
873
                        // each.
874
                        for _, recipient := range getRecipients() {
8✔
875
                                select {
4✔
876
                                // Deliver the block to the arbitrator.
877
                                case recipient.blocks <- block.Height:
4✔
878

879
                                // If the recipient is shutting down, exit
880
                                // without delivering the block. This may be
881
                                // the case when two blocks are mined in quick
882
                                // succession, and the arbitrator resolves
883
                                // after the first block, and does not need to
884
                                // consume the second block.
885
                                case <-recipient.quit:
×
886
                                        log.Debugf("channel: %v exit without "+
×
887
                                                "receiving block: %v",
×
888
                                                recipient.chanPoint,
×
889
                                                block.Height)
×
890

891
                                // If the chain arb is shutting down, we don't
892
                                // need to deliver any more blocks (everything
893
                                // will be shutting down).
894
                                case <-c.quit:
×
895
                                        return
×
896
                                }
897
                        }
898

899
                // Exit if the chain arbitrator is shutting down.
900
                case <-c.quit:
4✔
901
                        return
4✔
902
                }
903
        }
904
}
905

906
// republishClosingTxs will load any stored cooperative or unilateral closing
907
// transactions and republish them. This helps ensure propagation of the
908
// transactions in the event that prior publications failed.
909
func (c *ChainArbitrator) republishClosingTxs(
910
        channel *channeldb.OpenChannel) error {
4✔
911

4✔
912
        // If the channel has had its unilateral close broadcasted already,
4✔
913
        // republish it in case it didn't propagate.
4✔
914
        if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
8✔
915
                err := c.rebroadcast(
4✔
916
                        channel, channeldb.ChanStatusCommitBroadcasted,
4✔
917
                )
4✔
918
                if err != nil {
4✔
919
                        return err
×
920
                }
×
921
        }
922

923
        // If the channel has had its cooperative close broadcasted
924
        // already, republish it in case it didn't propagate.
925
        if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
4✔
UNCOV
926
                err := c.rebroadcast(
×
UNCOV
927
                        channel, channeldb.ChanStatusCoopBroadcasted,
×
UNCOV
928
                )
×
UNCOV
929
                if err != nil {
×
930
                        return err
×
931
                }
×
932
        }
933

934
        return nil
4✔
935
}
936

937
// rebroadcast is a helper method which will republish the unilateral or
938
// cooperative close transaction or a channel in a particular state.
939
//
940
// NOTE: There is no risk to calling this method if the channel isn't in either
941
// CommitmentBroadcasted or CoopBroadcasted, but the logs will be misleading.
942
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
943
        state channeldb.ChannelStatus) error {
4✔
944

4✔
945
        chanPoint := channel.FundingOutpoint
4✔
946

4✔
947
        var (
4✔
948
                closeTx *wire.MsgTx
4✔
949
                kind    string
4✔
950
                err     error
4✔
951
        )
4✔
952
        switch state {
4✔
953
        case channeldb.ChanStatusCommitBroadcasted:
4✔
954
                kind = "force"
4✔
955
                closeTx, err = channel.BroadcastedCommitment()
4✔
956

UNCOV
957
        case channeldb.ChanStatusCoopBroadcasted:
×
UNCOV
958
                kind = "coop"
×
UNCOV
959
                closeTx, err = channel.BroadcastedCooperative()
×
960

961
        default:
×
962
                return fmt.Errorf("unknown closing state: %v", state)
×
963
        }
964

965
        switch {
4✔
966
        // This can happen for channels that had their closing tx published
967
        // before we started storing it to disk.
968
        case err == channeldb.ErrNoCloseTx:
×
969
                log.Warnf("Channel %v is in state %v, but no %s closing tx "+
×
970
                        "to re-publish...", chanPoint, state, kind)
×
971
                return nil
×
972

973
        case err != nil:
×
974
                return err
×
975
        }
976

977
        log.Infof("Re-publishing %s close tx(%v) for channel %v",
4✔
978
                kind, closeTx.TxHash(), chanPoint)
4✔
979

4✔
980
        label := labels.MakeLabel(
4✔
981
                labels.LabelTypeChannelClose, &channel.ShortChannelID,
4✔
982
        )
4✔
983
        err = c.cfg.PublishTx(closeTx, label)
4✔
984
        if err != nil && err != lnwallet.ErrDoubleSpend {
4✔
985
                log.Warnf("Unable to broadcast %s close tx(%v): %v",
×
986
                        kind, closeTx.TxHash(), err)
×
987
        }
×
988

989
        return nil
4✔
990
}
991

992
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
993
// channel arbitrators will be signalled to exit, and this method will block
994
// until they've all exited.
995
func (c *ChainArbitrator) Stop() error {
4✔
996
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
4✔
997
                return nil
×
998
        }
×
999

1000
        log.Info("ChainArbitrator shutting down...")
4✔
1001
        defer log.Debug("ChainArbitrator shutdown complete")
4✔
1002

4✔
1003
        close(c.quit)
4✔
1004

4✔
1005
        var (
4✔
1006
                activeWatchers = make(map[wire.OutPoint]*chainWatcher)
4✔
1007
                activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
4✔
1008
        )
4✔
1009

4✔
1010
        // Copy the current set of active watchers and arbitrators to shutdown.
4✔
1011
        // We don't want to hold the lock when shutting down each watcher or
4✔
1012
        // arbitrator individually, as they may need to acquire this mutex.
4✔
1013
        c.Lock()
4✔
1014
        for chanPoint, watcher := range c.activeWatchers {
8✔
1015
                activeWatchers[chanPoint] = watcher
4✔
1016
        }
4✔
1017
        for chanPoint, arbitrator := range c.activeChannels {
8✔
1018
                activeChannels[chanPoint] = arbitrator
4✔
1019
        }
4✔
1020
        c.Unlock()
4✔
1021

4✔
1022
        for chanPoint, watcher := range activeWatchers {
8✔
1023
                log.Tracef("Attempting to stop ChainWatcher(%v)",
4✔
1024
                        chanPoint)
4✔
1025

4✔
1026
                if err := watcher.Stop(); err != nil {
4✔
1027
                        log.Errorf("unable to stop watcher for "+
×
1028
                                "ChannelPoint(%v): %v", chanPoint, err)
×
1029
                }
×
1030
        }
1031
        for chanPoint, arbitrator := range activeChannels {
8✔
1032
                log.Tracef("Attempting to stop ChannelArbitrator(%v)",
4✔
1033
                        chanPoint)
4✔
1034

4✔
1035
                if err := arbitrator.Stop(); err != nil {
4✔
1036
                        log.Errorf("unable to stop arbitrator for "+
×
1037
                                "ChannelPoint(%v): %v", chanPoint, err)
×
1038
                }
×
1039
        }
1040

1041
        c.wg.Wait()
4✔
1042

4✔
1043
        return nil
4✔
1044
}
1045

1046
// ContractUpdate is a message packages the latest set of active HTLCs on a
1047
// commitment, and also identifies which commitment received a new set of
1048
// HTLCs.
1049
type ContractUpdate struct {
1050
        // HtlcKey identifies which commitment the HTLCs below are present on.
1051
        HtlcKey HtlcSetKey
1052

1053
        // Htlcs are the of active HTLCs on the commitment identified by the
1054
        // above HtlcKey.
1055
        Htlcs []channeldb.HTLC
1056
}
1057

1058
// ContractSignals is used by outside subsystems to notify a channel arbitrator
1059
// of its ShortChannelID.
1060
type ContractSignals struct {
1061
        // ShortChanID is the up to date short channel ID for a contract. This
1062
        // can change either if when the contract was added it didn't yet have
1063
        // a stable identifier, or in the case of a reorg.
1064
        ShortChanID lnwire.ShortChannelID
1065
}
1066

1067
// UpdateContractSignals sends a set of active, up to date contract signals to
1068
// the ChannelArbitrator which is has been assigned to the channel infield by
1069
// the passed channel point.
1070
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
1071
        signals *ContractSignals) error {
4✔
1072

4✔
1073
        log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
4✔
1074
                chanPoint)
4✔
1075

4✔
1076
        c.Lock()
4✔
1077
        arbitrator, ok := c.activeChannels[chanPoint]
4✔
1078
        c.Unlock()
4✔
1079
        if !ok {
4✔
1080
                return fmt.Errorf("unable to find arbitrator")
×
1081
        }
×
1082

1083
        arbitrator.UpdateContractSignals(signals)
4✔
1084

4✔
1085
        return nil
4✔
1086
}
1087

1088
// NotifyContractUpdate lets a channel arbitrator know that a new
1089
// ContractUpdate is available. This calls the ChannelArbitrator's internal
1090
// method NotifyContractUpdate which waits for a response on a done chan before
1091
// returning. This method will return an error if the ChannelArbitrator is not
1092
// in the activeChannels map. However, this only happens if the arbitrator is
1093
// resolved and the related link would already be shut down.
1094
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
1095
        update *ContractUpdate) error {
4✔
1096

4✔
1097
        c.Lock()
4✔
1098
        arbitrator, ok := c.activeChannels[chanPoint]
4✔
1099
        c.Unlock()
4✔
1100
        if !ok {
4✔
1101
                return fmt.Errorf("can't find arbitrator for %v", chanPoint)
×
1102
        }
×
1103

1104
        arbitrator.notifyContractUpdate(update)
4✔
1105
        return nil
4✔
1106
}
1107

1108
// GetChannelArbitrator safely returns the channel arbitrator for a given
1109
// channel outpoint.
1110
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
1111
        *ChannelArbitrator, error) {
4✔
1112

4✔
1113
        c.Lock()
4✔
1114
        arbitrator, ok := c.activeChannels[chanPoint]
4✔
1115
        c.Unlock()
4✔
1116
        if !ok {
4✔
1117
                return nil, fmt.Errorf("unable to find arbitrator")
×
1118
        }
×
1119

1120
        return arbitrator, nil
4✔
1121
}
1122

1123
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
1124
// that watches a particular channel to broadcast the commitment transaction,
1125
// and enter the resolution phase of the channel.
1126
type forceCloseReq struct {
1127
        // errResp is a channel that will be sent upon either in the case of
1128
        // force close success (nil error), or in the case on an error.
1129
        //
1130
        // NOTE; This channel MUST be buffered.
1131
        errResp chan error
1132

1133
        // closeTx is a channel that carries the transaction which ultimately
1134
        // closed out the channel.
1135
        closeTx chan *wire.MsgTx
1136
}
1137

1138
// ForceCloseContract attempts to force close the channel infield by the passed
1139
// channel point. A force close will immediately terminate the contract,
1140
// causing it to enter the resolution phase. If the force close was successful,
1141
// then the force close transaction itself will be returned.
1142
//
1143
// TODO(roasbeef): just return the summary itself?
1144
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
4✔
1145
        c.Lock()
4✔
1146
        arbitrator, ok := c.activeChannels[chanPoint]
4✔
1147
        c.Unlock()
4✔
1148
        if !ok {
4✔
1149
                return nil, fmt.Errorf("unable to find arbitrator")
×
1150
        }
×
1151

1152
        log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
4✔
1153

4✔
1154
        // Before closing, we'll attempt to send a disable update for the
4✔
1155
        // channel. We do so before closing the channel as otherwise the current
4✔
1156
        // edge policy won't be retrievable from the graph.
4✔
1157
        if err := c.cfg.DisableChannel(chanPoint); err != nil {
6✔
1158
                log.Warnf("Unable to disable channel %v on "+
2✔
1159
                        "close: %v", chanPoint, err)
2✔
1160
        }
2✔
1161

1162
        errChan := make(chan error, 1)
4✔
1163
        respChan := make(chan *wire.MsgTx, 1)
4✔
1164

4✔
1165
        // With the channel found, and the request crafted, we'll send over a
4✔
1166
        // force close request to the arbitrator that watches this channel.
4✔
1167
        select {
4✔
1168
        case arbitrator.forceCloseReqs <- &forceCloseReq{
1169
                errResp: errChan,
1170
                closeTx: respChan,
1171
        }:
4✔
1172
        case <-c.quit:
×
1173
                return nil, ErrChainArbExiting
×
1174
        }
1175

1176
        // We'll await two responses: the error response, and the transaction
1177
        // that closed out the channel.
1178
        select {
4✔
1179
        case err := <-errChan:
4✔
1180
                if err != nil {
8✔
1181
                        return nil, err
4✔
1182
                }
4✔
1183
        case <-c.quit:
×
1184
                return nil, ErrChainArbExiting
×
1185
        }
1186

1187
        var closeTx *wire.MsgTx
4✔
1188
        select {
4✔
1189
        case closeTx = <-respChan:
4✔
1190
        case <-c.quit:
×
1191
                return nil, ErrChainArbExiting
×
1192
        }
1193

1194
        return closeTx, nil
4✔
1195
}
1196

1197
// WatchNewChannel sends the ChainArbitrator a message to create a
1198
// ChannelArbitrator tasked with watching over a new channel. Once a new
1199
// channel has finished its final funding flow, it should be registered with
1200
// the ChainArbitrator so we can properly react to any on-chain events.
1201
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
4✔
1202
        c.Lock()
4✔
1203
        defer c.Unlock()
4✔
1204

4✔
1205
        chanPoint := newChan.FundingOutpoint
4✔
1206

4✔
1207
        log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
4✔
1208
                chanPoint)
4✔
1209

4✔
1210
        // If we're already watching this channel, then we'll ignore this
4✔
1211
        // request.
4✔
1212
        if _, ok := c.activeChannels[chanPoint]; ok {
4✔
1213
                return nil
×
1214
        }
×
1215

1216
        // First, also create an active chainWatcher for this channel to ensure
1217
        // that we detect any relevant on chain events.
1218
        chainWatcher, err := newChainWatcher(
4✔
1219
                chainWatcherConfig{
4✔
1220
                        chanState: newChan,
4✔
1221
                        notifier:  c.cfg.Notifier,
4✔
1222
                        signer:    c.cfg.Signer,
4✔
1223
                        isOurAddr: c.cfg.IsOurAddress,
4✔
1224
                        contractBreach: func(
4✔
1225
                                retInfo *lnwallet.BreachRetribution) error {
8✔
1226

4✔
1227
                                return c.cfg.ContractBreach(
4✔
1228
                                        chanPoint, retInfo,
4✔
1229
                                )
4✔
1230
                        },
4✔
1231
                        extractStateNumHint: lnwallet.GetStateNumHint,
1232
                        auxLeafStore:        c.cfg.AuxLeafStore,
1233
                        auxResolver:         c.cfg.AuxResolver,
1234
                },
1235
        )
1236
        if err != nil {
4✔
1237
                return err
×
1238
        }
×
1239

1240
        c.activeWatchers[chanPoint] = chainWatcher
4✔
1241

4✔
1242
        // We'll also create a new channel arbitrator instance using this new
4✔
1243
        // channel, and our internal state.
4✔
1244
        channelArb, err := newActiveChannelArbitrator(
4✔
1245
                newChan, c, chainWatcher.SubscribeChannelEvents(),
4✔
1246
        )
4✔
1247
        if err != nil {
4✔
1248
                return err
×
1249
        }
×
1250

1251
        // With the arbitrator created, we'll add it to our set of active
1252
        // arbitrators, then launch it.
1253
        c.activeChannels[chanPoint] = channelArb
4✔
1254

4✔
1255
        if err := channelArb.Start(nil); err != nil {
4✔
1256
                return err
×
1257
        }
×
1258

1259
        return chainWatcher.Start()
4✔
1260
}
1261

1262
// SubscribeChannelEvents returns a new active subscription for the set of
1263
// possible on-chain events for a particular channel. The struct can be used by
1264
// callers to be notified whenever an event that changes the state of the
1265
// channel on-chain occurs.
1266
func (c *ChainArbitrator) SubscribeChannelEvents(
1267
        chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
4✔
1268

4✔
1269
        // First, we'll attempt to look up the active watcher for this channel.
4✔
1270
        // If we can't find it, then we'll return an error back to the caller.
4✔
1271
        c.Lock()
4✔
1272
        watcher, ok := c.activeWatchers[chanPoint]
4✔
1273
        c.Unlock()
4✔
1274

4✔
1275
        if !ok {
4✔
1276
                return nil, fmt.Errorf("unable to find watcher for: %v",
×
1277
                        chanPoint)
×
1278
        }
×
1279

1280
        // With the watcher located, we'll request for it to create a new chain
1281
        // event subscription client.
1282
        return watcher.SubscribeChannelEvents(), nil
4✔
1283
}
1284

1285
// FindOutgoingHTLCDeadline returns the deadline in absolute block height for
1286
// the specified outgoing HTLC. For an outgoing HTLC, its deadline is defined
1287
// by the timeout height of its corresponding incoming HTLC - this is the
1288
// expiry height the that remote peer can spend his/her outgoing HTLC via the
1289
// timeout path.
1290
func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
1291
        outgoingHTLC channeldb.HTLC) fn.Option[int32] {
4✔
1292

4✔
1293
        // Find the outgoing HTLC's corresponding incoming HTLC in the circuit
4✔
1294
        // map.
4✔
1295
        rHash := outgoingHTLC.RHash
4✔
1296
        circuit := models.CircuitKey{
4✔
1297
                ChanID: scid,
4✔
1298
                HtlcID: outgoingHTLC.HtlcIndex,
4✔
1299
        }
4✔
1300
        incomingCircuit := c.cfg.QueryIncomingCircuit(circuit)
4✔
1301

4✔
1302
        // If there's no incoming circuit found, we will use the default
4✔
1303
        // deadline.
4✔
1304
        if incomingCircuit == nil {
8✔
1305
                log.Warnf("ChannelArbitrator(%v): incoming circuit key not "+
4✔
1306
                        "found for rHash=%x, using default deadline instead",
4✔
1307
                        scid, rHash)
4✔
1308

4✔
1309
                return fn.None[int32]()
4✔
1310
        }
4✔
1311

1312
        // If this is a locally initiated HTLC, it means we are the first hop.
1313
        // In this case, we can relax the deadline.
1314
        if incomingCircuit.ChanID.IsDefault() {
8✔
1315
                log.Infof("ChannelArbitrator(%v): using default deadline for "+
4✔
1316
                        "locally initiated HTLC for rHash=%x", scid, rHash)
4✔
1317

4✔
1318
                return fn.None[int32]()
4✔
1319
        }
4✔
1320

1321
        log.Debugf("Found incoming circuit %v for rHash=%x using outgoing "+
4✔
1322
                "circuit %v", incomingCircuit, rHash, circuit)
4✔
1323

4✔
1324
        c.Lock()
4✔
1325
        defer c.Unlock()
4✔
1326

4✔
1327
        // Iterate over all active channels to find the incoming HTLC specified
4✔
1328
        // by its circuit key.
4✔
1329
        for cp, channelArb := range c.activeChannels {
8✔
1330
                // Skip if the SCID doesn't match.
4✔
1331
                if channelArb.cfg.ShortChanID != incomingCircuit.ChanID {
8✔
1332
                        continue
4✔
1333
                }
1334

1335
                // Make sure the channel arbitrator has the latest view of its
1336
                // active HTLCs.
1337
                channelArb.updateActiveHTLCs()
4✔
1338

4✔
1339
                // Iterate all the known HTLCs to find the targeted incoming
4✔
1340
                // HTLC.
4✔
1341
                for _, htlcs := range channelArb.activeHTLCs {
8✔
1342
                        for _, htlc := range htlcs.incomingHTLCs {
8✔
1343
                                // Skip if the index doesn't match.
4✔
1344
                                if htlc.HtlcIndex != incomingCircuit.HtlcID {
8✔
1345
                                        continue
4✔
1346
                                }
1347

1348
                                log.Debugf("ChannelArbitrator(%v): found "+
4✔
1349
                                        "incoming HTLC in channel=%v using "+
4✔
1350
                                        "rHash=%x, refundTimeout=%v", scid,
4✔
1351
                                        cp, rHash, htlc.RefundTimeout)
4✔
1352

4✔
1353
                                return fn.Some(int32(htlc.RefundTimeout))
4✔
1354
                        }
1355
                }
1356
        }
1357

1358
        // If there's no incoming HTLC found, yet we have the incoming circuit,
1359
        // something is wrong - in this case, we return the none deadline.
1360
        log.Errorf("ChannelArbitrator(%v): incoming HTLC not found for "+
4✔
1361
                "rHash=%x, using default deadline instead", scid, rHash)
4✔
1362

4✔
1363
        return fn.None[int32]()
4✔
1364
}
1365

1366
// TODO(roasbeef): arbitration reports
1367
//  * types: contested, waiting for success conf, etc
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc