• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.04
/contractcourt/chain_arbitrator.go
1
package contractcourt
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/chainio"
15
        "github.com/lightningnetwork/lnd/chainntnfs"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
        "github.com/lightningnetwork/lnd/clock"
18
        "github.com/lightningnetwork/lnd/fn/v2"
19
        "github.com/lightningnetwork/lnd/graph/db/models"
20
        "github.com/lightningnetwork/lnd/input"
21
        "github.com/lightningnetwork/lnd/kvdb"
22
        "github.com/lightningnetwork/lnd/labels"
23
        "github.com/lightningnetwork/lnd/lnwallet"
24
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
25
        "github.com/lightningnetwork/lnd/lnwire"
26
)
27

28
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
29
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
30

31
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
32
// outgoing contract has been fully resolved. For multi-hop contracts, if we
33
// resolve the outgoing contract, we'll also need to ensure that the incoming
34
// contract is resolved as well. We package the items required to resolve the
35
// incoming contracts within this message.
36
type ResolutionMsg struct {
37
        // SourceChan identifies the channel that this message is being sent
38
        // from. This is the channel's short channel ID.
39
        SourceChan lnwire.ShortChannelID
40

41
        // HtlcIndex is the index of the contract within the original
42
        // commitment trace.
43
        HtlcIndex uint64
44

45
        // Failure will be non-nil if the incoming contract should be canceled
46
        // all together. This can happen if the outgoing contract was dust, if
47
        // if the outgoing HTLC timed out.
48
        Failure lnwire.FailureMessage
49

50
        // PreImage will be non-nil if the incoming contract can successfully
51
        // be redeemed. This can happen if we learn of the preimage from the
52
        // outgoing HTLC on-chain.
53
        PreImage *[32]byte
54
}
55

56
// ChainArbitratorConfig is a configuration struct that contains all the
57
// function closures and interface that required to arbitrate on-chain
58
// contracts for a particular chain.
59
type ChainArbitratorConfig struct {
60
        // ChainHash is the chain that this arbitrator is to operate within.
61
        ChainHash chainhash.Hash
62

63
        // IncomingBroadcastDelta is the delta that we'll use to decide when to
64
        // broadcast our commitment transaction if we have incoming htlcs. This
65
        // value should be set based on our current fee estimation of the
66
        // commitment transaction. We use this to determine when we should
67
        // broadcast instead of just the HTLC timeout, as we want to ensure
68
        // that the commitment transaction is already confirmed, by the time the
69
        // HTLC expires. Otherwise we may end up not settling the htlc on-chain
70
        // because the other party managed to time it out.
71
        IncomingBroadcastDelta uint32
72

73
        // OutgoingBroadcastDelta is the delta that we'll use to decide when to
74
        // broadcast our commitment transaction if there are active outgoing
75
        // htlcs. This value can be lower than the incoming broadcast delta.
76
        OutgoingBroadcastDelta uint32
77

78
        // NewSweepAddr is a function that returns a new address under control
79
        // by the wallet. We'll use this to sweep any no-delay outputs as a
80
        // result of unilateral channel closes.
81
        //
82
        // NOTE: This SHOULD return a p2wkh script.
83
        NewSweepAddr func() ([]byte, error)
84

85
        // PublishTx reliably broadcasts a transaction to the network. Once
86
        // this function exits without an error, then they transaction MUST
87
        // continually be rebroadcast if needed.
88
        PublishTx func(*wire.MsgTx, string) error
89

90
        // DeliverResolutionMsg is a function that will append an outgoing
91
        // message to the "out box" for a ChannelLink. This is used to cancel
92
        // backwards any HTLC's that are either dust, we're timing out, or
93
        // settling on-chain to the incoming link.
94
        DeliverResolutionMsg func(...ResolutionMsg) error
95

96
        // MarkLinkInactive is a function closure that the ChainArbitrator will
97
        // use to mark that active HTLC's shouldn't be attempted to be routed
98
        // over a particular channel. This function will be called in that a
99
        // ChannelArbitrator decides that it needs to go to chain in order to
100
        // resolve contracts.
101
        //
102
        // TODO(roasbeef): rename, routing based
103
        MarkLinkInactive func(wire.OutPoint) error
104

105
        // ContractBreach is a function closure that the ChainArbitrator will
106
        // use to notify the BreachArbitrator about a contract breach. It should
107
        // only return a non-nil error when the BreachArbitrator has preserved
108
        // the necessary breach info for this channel point. Once the breach
109
        // resolution is persisted in the ChannelArbitrator, it will be safe
110
        // to mark the channel closed.
111
        ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
112

113
        // IsOurAddress is a function that returns true if the passed address
114
        // is known to the underlying wallet. Otherwise, false should be
115
        // returned.
116
        IsOurAddress func(btcutil.Address) bool
117

118
        // IncubateOutputs sends either an incoming HTLC, an outgoing HTLC, or
119
        // both to the utxo nursery. Once this function returns, the nursery
120
        // should have safely persisted the outputs to disk, and should start
121
        // the process of incubation. This is used when a resolver wishes to
122
        // pass off the output to the nursery as we're only waiting on an
123
        // absolute/relative item block.
124
        IncubateOutputs func(wire.OutPoint,
125
                fn.Option[lnwallet.OutgoingHtlcResolution],
126
                fn.Option[lnwallet.IncomingHtlcResolution],
127
                uint32, fn.Option[int32]) error
128

129
        // PreimageDB is a global store of all known pre-images. We'll use this
130
        // to decide if we should broadcast a commitment transaction to claim
131
        // an HTLC on-chain.
132
        PreimageDB WitnessBeacon
133

134
        // Notifier is an instance of a chain notifier we'll use to watch for
135
        // certain on-chain events.
136
        Notifier chainntnfs.ChainNotifier
137

138
        // Mempool is the a mempool watcher that allows us to watch for events
139
        // happened in mempool.
140
        Mempool chainntnfs.MempoolWatcher
141

142
        // Signer is a signer backed by the active lnd node. This should be
143
        // capable of producing a signature as specified by a valid
144
        // SignDescriptor.
145
        Signer input.Signer
146

147
        // FeeEstimator will be used to return fee estimates.
148
        FeeEstimator chainfee.Estimator
149

150
        // ChainIO allows us to query the state of the current main chain.
151
        ChainIO lnwallet.BlockChainIO
152

153
        // DisableChannel disables a channel, resulting in it not being able to
154
        // forward payments.
155
        DisableChannel func(wire.OutPoint) error
156

157
        // Sweeper allows resolvers to sweep their final outputs.
158
        Sweeper UtxoSweeper
159

160
        // Registry is the invoice database that is used by resolvers to lookup
161
        // preimages and settle invoices.
162
        Registry Registry
163

164
        // NotifyClosedChannel is a function closure that the ChainArbitrator
165
        // will use to notify the ChannelNotifier about a newly closed channel.
166
        NotifyClosedChannel func(wire.OutPoint)
167

168
        // NotifyFullyResolvedChannel is a function closure that the
169
        // ChainArbitrator will use to notify the ChannelNotifier about a newly
170
        // resolved channel. The main difference to NotifyClosedChannel is that
171
        // in case of a local force close the NotifyClosedChannel is called when
172
        // the published commitment transaction confirms while
173
        // NotifyFullyResolvedChannel is only called when the channel is fully
174
        // resolved (which includes sweeping any time locked funds).
175
        NotifyFullyResolvedChannel func(point wire.OutPoint)
176

177
        // OnionProcessor is used to decode onion payloads for on-chain
178
        // resolution.
179
        OnionProcessor OnionProcessor
180

181
        // PaymentsExpirationGracePeriod indicates a time window we let the
182
        // other node to cancel an outgoing htlc that our node has initiated and
183
        // has timed out.
184
        PaymentsExpirationGracePeriod time.Duration
185

186
        // IsForwardedHTLC checks for a given htlc, identified by channel id and
187
        // htlcIndex, if it is a forwarded one.
188
        IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
189

190
        // Clock is the clock implementation that ChannelArbitrator uses.
191
        // It is useful for testing.
192
        Clock clock.Clock
193

194
        // SubscribeBreachComplete is used by the breachResolver to register a
195
        // subscription that notifies when the breach resolution process is
196
        // complete.
197
        SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
198
                bool, error)
199

200
        // PutFinalHtlcOutcome stores the final outcome of an htlc in the
201
        // database.
202
        PutFinalHtlcOutcome func(chanId lnwire.ShortChannelID,
203
                htlcId uint64, settled bool) error
204

205
        // HtlcNotifier is an interface that htlc events are sent to.
206
        HtlcNotifier HtlcNotifier
207

208
        // Budget is the configured budget for the arbitrator.
209
        Budget BudgetConfig
210

211
        // QueryIncomingCircuit is used to find the outgoing HTLC's
212
        // corresponding incoming HTLC circuit. It queries the circuit map for
213
        // a given outgoing circuit key and returns the incoming circuit key.
214
        //
215
        // TODO(yy): this is a hacky way to get around the cycling import issue
216
        // as we cannot import `htlcswitch` here. A proper way is to define an
217
        // interface here that asks for method `LookupOpenCircuit`,
218
        // meanwhile, turn `PaymentCircuit` into an interface or bring it to a
219
        // lower package.
220
        QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey
221

222
        // AuxLeafStore is an optional store that can be used to store auxiliary
223
        // leaves for certain custom channel types.
224
        AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
225

226
        // AuxSigner is an optional signer that can be used to sign auxiliary
227
        // leaves for certain custom channel types.
228
        AuxSigner fn.Option[lnwallet.AuxSigner]
229

230
        // AuxResolver is an optional interface that can be used to modify the
231
        // way contracts are resolved.
232
        AuxResolver fn.Option[lnwallet.AuxContractResolver]
233
}
234

235
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
236
// active, and channel that are in the "pending close" state. Within the
237
// contractcourt package, the ChainArbitrator manages a set of active
238
// ContractArbitrators. Each ContractArbitrators is responsible for watching
239
// the chain for any activity that affects the state of the channel, and also
240
// for monitoring each contract in order to determine if any on-chain activity is
241
// required. Outside sub-systems interact with the ChainArbitrator in order to
242
// forcibly exit a contract, update the set of live signals for each contract,
243
// and to receive reports on the state of contract resolution.
244
type ChainArbitrator struct {
245
        started int32 // To be used atomically.
246
        stopped int32 // To be used atomically.
247

248
        // Embed the blockbeat consumer struct to get access to the method
249
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
250
        chainio.BeatConsumer
251

252
        sync.Mutex
253

254
        // activeChannels is a map of all the active contracts that are still
255
        // open, and not fully resolved.
256
        activeChannels map[wire.OutPoint]*ChannelArbitrator
257

258
        // activeWatchers is a map of all the active chainWatchers for channels
259
        // that are still considered open.
260
        activeWatchers map[wire.OutPoint]*chainWatcher
261

262
        // cfg is the config struct for the arbitrator that contains all
263
        // methods and interface it needs to operate.
264
        cfg ChainArbitratorConfig
265

266
        // chanSource will be used by the ChainArbitrator to fetch all the
267
        // active channels that it must still watch over.
268
        chanSource *channeldb.DB
269

270
        // beat is the current best known blockbeat.
271
        beat chainio.Blockbeat
272

273
        quit chan struct{}
274

275
        wg sync.WaitGroup
276
}
277

278
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
279
// passed config struct, and backing persistent database.
280
func NewChainArbitrator(cfg ChainArbitratorConfig,
281
        db *channeldb.DB) *ChainArbitrator {
2✔
282

2✔
283
        c := &ChainArbitrator{
2✔
284
                cfg:            cfg,
2✔
285
                activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
2✔
286
                activeWatchers: make(map[wire.OutPoint]*chainWatcher),
2✔
287
                chanSource:     db,
2✔
288
                quit:           make(chan struct{}),
2✔
289
        }
2✔
290

2✔
291
        // Mount the block consumer.
2✔
292
        c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())
2✔
293

2✔
294
        return c
2✔
295
}
2✔
296

297
// Compile-time check for the chainio.Consumer interface.
298
var _ chainio.Consumer = (*ChainArbitrator)(nil)
299

300
// arbChannel is a wrapper around an open channel that channel arbitrators
301
// interact with.
302
type arbChannel struct {
303
        // channel is the in-memory channel state.
304
        channel *channeldb.OpenChannel
305

306
        // c references the chain arbitrator and is used by arbChannel
307
        // internally.
308
        c *ChainArbitrator
309
}
310

311
// NewAnchorResolutions returns the anchor resolutions for currently valid
312
// commitment transactions.
313
//
314
// NOTE: Part of the ArbChannel interface.
315
func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
UNCOV
316
        error) {
×
UNCOV
317

×
UNCOV
318
        // Get a fresh copy of the database state to base the anchor resolutions
×
UNCOV
319
        // on. Unfortunately the channel instance that we have here isn't the
×
UNCOV
320
        // same instance that is used by the link.
×
UNCOV
321
        chanPoint := a.channel.FundingOutpoint
×
UNCOV
322

×
UNCOV
323
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
×
UNCOV
324
        if err != nil {
×
325
                return nil, err
×
326
        }
×
327

UNCOV
328
        var chanOpts []lnwallet.ChannelOpt
×
UNCOV
329
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
330
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
331
        })
×
UNCOV
332
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
333
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
334
        })
×
UNCOV
335
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
336
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
337
        })
×
338

UNCOV
339
        chanMachine, err := lnwallet.NewLightningChannel(
×
UNCOV
340
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
UNCOV
341
        )
×
UNCOV
342
        if err != nil {
×
343
                return nil, err
×
344
        }
×
345

UNCOV
346
        return chanMachine.NewAnchorResolutions()
×
347
}
348

349
// ForceCloseChan should force close the contract that this attendant is
350
// watching over. We'll use this when we decide that we need to go to chain. It
351
// should in addition tell the switch to remove the corresponding link, such
352
// that we won't accept any new updates.
353
//
354
// NOTE: Part of the ArbChannel interface.
UNCOV
355
func (a *arbChannel) ForceCloseChan() (*wire.MsgTx, error) {
×
UNCOV
356
        // First, we mark the channel as borked, this ensure
×
UNCOV
357
        // that no new state transitions can happen, and also
×
UNCOV
358
        // that the link won't be loaded into the switch.
×
UNCOV
359
        if err := a.channel.MarkBorked(); err != nil {
×
360
                return nil, err
×
361
        }
×
362

363
        // With the channel marked as borked, we'll now remove
364
        // the link from the switch if its there. If the link
365
        // is active, then this method will block until it
366
        // exits.
UNCOV
367
        chanPoint := a.channel.FundingOutpoint
×
UNCOV
368

×
UNCOV
369
        if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
×
370
                log.Errorf("unable to mark link inactive: %v", err)
×
371
        }
×
372

373
        // Now that we know the link can't mutate the channel
374
        // state, we'll read the channel from disk the target
375
        // channel according to its channel point.
UNCOV
376
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
×
UNCOV
377
        if err != nil {
×
378
                return nil, err
×
379
        }
×
380

UNCOV
381
        var chanOpts []lnwallet.ChannelOpt
×
UNCOV
382
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
383
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
384
        })
×
UNCOV
385
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
386
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
387
        })
×
UNCOV
388
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
389
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
390
        })
×
391

392
        // Finally, we'll force close the channel completing
393
        // the force close workflow.
UNCOV
394
        chanMachine, err := lnwallet.NewLightningChannel(
×
UNCOV
395
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
UNCOV
396
        )
×
UNCOV
397
        if err != nil {
×
398
                return nil, err
×
399
        }
×
400

UNCOV
401
        closeSummary, err := chanMachine.ForceClose(
×
UNCOV
402
                lnwallet.WithSkipContractResolutions(),
×
UNCOV
403
        )
×
UNCOV
404
        if err != nil {
×
405
                return nil, err
×
406
        }
×
407

UNCOV
408
        return closeSummary.CloseTx, nil
×
409
}
410

411
// newActiveChannelArbitrator creates a new instance of an active channel
412
// arbitrator given the state of the target channel.
413
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
414
        c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
11✔
415

11✔
416
        // TODO(roasbeef): fetch best height (or pass in) so can ensure block
11✔
417
        // epoch delivers all the notifications to
11✔
418

11✔
419
        chanPoint := channel.FundingOutpoint
11✔
420

11✔
421
        log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", chanPoint)
11✔
422

11✔
423
        // Next we'll create the matching configuration struct that contains
11✔
424
        // all interfaces and methods the arbitrator needs to do its job.
11✔
425
        arbCfg := ChannelArbitratorConfig{
11✔
426
                ChanPoint:   chanPoint,
11✔
427
                Channel:     c.getArbChannel(channel),
11✔
428
                ShortChanID: channel.ShortChanID(),
11✔
429

11✔
430
                MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
11✔
431
                MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
11✔
432
                        statuses ...channeldb.ChannelStatus) error {
11✔
UNCOV
433

×
UNCOV
434
                        err := channel.CloseChannel(summary, statuses...)
×
UNCOV
435
                        if err != nil {
×
436
                                return err
×
437
                        }
×
UNCOV
438
                        c.cfg.NotifyClosedChannel(summary.ChanPoint)
×
UNCOV
439
                        return nil
×
440
                },
441
                IsPendingClose:        false,
442
                ChainArbitratorConfig: c.cfg,
443
                ChainEvents:           chanEvents,
444
                PutResolverReport: func(tx kvdb.RwTx,
UNCOV
445
                        report *channeldb.ResolverReport) error {
×
UNCOV
446

×
UNCOV
447
                        return c.chanSource.PutResolverReport(
×
UNCOV
448
                                tx, c.cfg.ChainHash, &chanPoint, report,
×
UNCOV
449
                        )
×
UNCOV
450
                },
×
UNCOV
451
                FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
×
UNCOV
452
                        chanStateDB := c.chanSource.ChannelStateDB()
×
UNCOV
453
                        return chanStateDB.FetchHistoricalChannel(&chanPoint)
×
UNCOV
454
                },
×
455
                FindOutgoingHTLCDeadline: func(
UNCOV
456
                        htlc channeldb.HTLC) fn.Option[int32] {
×
UNCOV
457

×
UNCOV
458
                        return c.FindOutgoingHTLCDeadline(
×
UNCOV
459
                                channel.ShortChanID(), htlc,
×
UNCOV
460
                        )
×
UNCOV
461
                },
×
462
        }
463

464
        // The final component needed is an arbitrator log that the arbitrator
465
        // will use to keep track of its internal state using a backed
466
        // persistent log.
467
        //
468
        // TODO(roasbeef); abstraction leak...
469
        //  * rework: adaptor method to set log scope w/ factory func
470
        chanLog, err := newBoltArbitratorLog(
11✔
471
                c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
11✔
472
        )
11✔
473
        if err != nil {
11✔
474
                return nil, err
×
475
        }
×
476

477
        arbCfg.MarkChannelResolved = func() error {
11✔
UNCOV
478
                if c.cfg.NotifyFullyResolvedChannel != nil {
×
UNCOV
479
                        c.cfg.NotifyFullyResolvedChannel(chanPoint)
×
UNCOV
480
                }
×
481

UNCOV
482
                return c.ResolveContract(chanPoint)
×
483
        }
484

485
        // Finally, we'll need to construct a series of htlc Sets based on all
486
        // currently known valid commitments.
487
        htlcSets := make(map[HtlcSetKey]htlcSet)
11✔
488
        htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
11✔
489
        htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
11✔
490

11✔
491
        pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
11✔
492
        if err != nil && err != channeldb.ErrNoPendingCommit {
11✔
493
                return nil, err
×
494
        }
×
495
        if pendingRemoteCommitment != nil {
11✔
496
                htlcSets[RemotePendingHtlcSet] = newHtlcSet(
×
497
                        pendingRemoteCommitment.Commitment.Htlcs,
×
498
                )
×
499
        }
×
500

501
        return NewChannelArbitrator(
11✔
502
                arbCfg, htlcSets, chanLog,
11✔
503
        ), nil
11✔
504
}
505

506
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
507
func (c *ChainArbitrator) getArbChannel(
508
        channel *channeldb.OpenChannel) *arbChannel {
11✔
509

11✔
510
        return &arbChannel{
11✔
511
                channel: channel,
11✔
512
                c:       c,
11✔
513
        }
11✔
514
}
11✔
515

516
// ResolveContract marks a contract as fully resolved within the database.
517
// This is only to be done once all contracts which were live on the channel
518
// before hitting the chain have been resolved.
519
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
2✔
520
        log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
2✔
521

2✔
522
        // First, we'll we'll mark the channel as fully closed from the PoV of
2✔
523
        // the channel source.
2✔
524
        err := c.chanSource.ChannelStateDB().MarkChanFullyClosed(&chanPoint)
2✔
525
        if err != nil {
2✔
526
                log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
×
527
                        "fully closed: %v", chanPoint, err)
×
528
                return err
×
529
        }
×
530

531
        // Now that the channel has been marked as fully closed, we'll stop
532
        // both the channel arbitrator and chain watcher for this channel if
533
        // they're still active.
534
        var arbLog ArbitratorLog
2✔
535
        c.Lock()
2✔
536
        chainArb := c.activeChannels[chanPoint]
2✔
537
        delete(c.activeChannels, chanPoint)
2✔
538

2✔
539
        chainWatcher := c.activeWatchers[chanPoint]
2✔
540
        delete(c.activeWatchers, chanPoint)
2✔
541
        c.Unlock()
2✔
542

2✔
543
        if chainArb != nil {
3✔
544
                arbLog = chainArb.log
1✔
545

1✔
546
                if err := chainArb.Stop(); err != nil {
1✔
547
                        log.Warnf("unable to stop ChannelArbitrator(%v): %v",
×
548
                                chanPoint, err)
×
549
                }
×
550
        }
551
        if chainWatcher != nil {
3✔
552
                if err := chainWatcher.Stop(); err != nil {
1✔
553
                        log.Warnf("unable to stop ChainWatcher(%v): %v",
×
554
                                chanPoint, err)
×
555
                }
×
556
        }
557

558
        // Once this has been marked as resolved, we'll wipe the log that the
559
        // channel arbitrator was using to store its persistent state. We do
560
        // this after marking the channel resolved, as otherwise, the
561
        // arbitrator would be re-created, and think it was starting from the
562
        // default state.
563
        if arbLog != nil {
3✔
564
                if err := arbLog.WipeHistory(); err != nil {
1✔
565
                        return err
×
566
                }
×
567
        }
568

569
        return nil
2✔
570
}
571

572
// Start launches all goroutines that the ChainArbitrator needs to operate.
573
func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
2✔
574
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
2✔
575
                return nil
×
576
        }
×
577

578
        // Set the current beat.
579
        c.beat = beat
2✔
580

2✔
581
        // First, we'll fetch all the channels that are still open, in order to
2✔
582
        // collect them within our set of active contracts.
2✔
583
        if err := c.loadOpenChannels(); err != nil {
2✔
584
                return err
×
585
        }
×
586

587
        // In addition to the channels that we know to be open, we'll also
588
        // launch arbitrators to finishing resolving any channels that are in
589
        // the pending close state.
590
        if err := c.loadPendingCloseChannels(); err != nil {
2✔
591
                return err
×
592
        }
×
593

594
        // Now, we'll start all chain watchers in parallel to shorten start up
595
        // duration. In neutrino mode, this allows spend registrations to take
596
        // advantage of batch spend reporting, instead of doing a single rescan
597
        // per chain watcher.
598
        //
599
        // NOTE: After this point, we Stop the chain arb to ensure that any
600
        // lingering goroutines are cleaned up before exiting.
601
        watcherErrs := make(chan error, len(c.activeWatchers))
2✔
602
        var wg sync.WaitGroup
2✔
603
        for _, watcher := range c.activeWatchers {
13✔
604
                wg.Add(1)
11✔
605
                go func(w *chainWatcher) {
22✔
606
                        defer wg.Done()
11✔
607
                        select {
11✔
608
                        case watcherErrs <- w.Start():
11✔
609
                        case <-c.quit:
×
610
                                watcherErrs <- ErrChainArbExiting
×
611
                        }
612
                }(watcher)
613
        }
614

615
        // Once all chain watchers have been started, seal the err chan to
616
        // signal the end of the err stream.
617
        go func() {
4✔
618
                wg.Wait()
2✔
619
                close(watcherErrs)
2✔
620
        }()
2✔
621

622
        // stopAndLog is a helper function which shuts down the chain arb and
623
        // logs errors if they occur.
624
        stopAndLog := func() {
2✔
625
                if err := c.Stop(); err != nil {
×
626
                        log.Errorf("ChainArbitrator could not shutdown: %v", err)
×
627
                }
×
628
        }
629

630
        // Handle all errors returned from spawning our chain watchers. If any
631
        // of them failed, we will stop the chain arb to shutdown any active
632
        // goroutines.
633
        for err := range watcherErrs {
13✔
634
                if err != nil {
11✔
635
                        stopAndLog()
×
636
                        return err
×
637
                }
×
638
        }
639

640
        // Before we start all of our arbitrators, we do a preliminary state
641
        // lookup so that we can combine all of these lookups in a single db
642
        // transaction.
643
        var startStates map[wire.OutPoint]*chanArbStartState
2✔
644

2✔
645
        err := kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
4✔
646
                for _, arbitrator := range c.activeChannels {
13✔
647
                        startState, err := arbitrator.getStartState(tx)
11✔
648
                        if err != nil {
11✔
649
                                return err
×
650
                        }
×
651

652
                        startStates[arbitrator.cfg.ChanPoint] = startState
11✔
653
                }
654

655
                return nil
2✔
656
        }, func() {
2✔
657
                startStates = make(
2✔
658
                        map[wire.OutPoint]*chanArbStartState,
2✔
659
                        len(c.activeChannels),
2✔
660
                )
2✔
661
        })
2✔
662
        if err != nil {
2✔
663
                stopAndLog()
×
664
                return err
×
665
        }
×
666

667
        // Launch all the goroutines for each arbitrator so they can carry out
668
        // their duties.
669
        for _, arbitrator := range c.activeChannels {
13✔
670
                startState, ok := startStates[arbitrator.cfg.ChanPoint]
11✔
671
                if !ok {
11✔
672
                        stopAndLog()
×
673
                        return fmt.Errorf("arbitrator: %v has no start state",
×
674
                                arbitrator.cfg.ChanPoint)
×
675
                }
×
676

677
                if err := arbitrator.Start(startState, c.beat); err != nil {
11✔
678
                        stopAndLog()
×
679
                        return err
×
680
                }
×
681
        }
682

683
        // Start our goroutine which will dispatch blocks to each arbitrator.
684
        c.wg.Add(1)
2✔
685
        go func() {
4✔
686
                defer c.wg.Done()
2✔
687
                c.dispatchBlocks()
2✔
688
        }()
2✔
689

690
        log.Infof("ChainArbitrator starting at height %d with %d chain "+
2✔
691
                "watchers, %d channel arbitrators, and budget config=[%v]",
2✔
692
                c.beat.Height(), len(c.activeWatchers), len(c.activeChannels),
2✔
693
                &c.cfg.Budget)
2✔
694

2✔
695
        // TODO(roasbeef): eventually move all breach watching here
2✔
696

2✔
697
        return nil
2✔
698
}
699

700
// dispatchBlocks consumes a block epoch notification stream and dispatches
701
// blocks to each of the chain arb's active channel arbitrators. This function
702
// must be run in a goroutine.
703
func (c *ChainArbitrator) dispatchBlocks() {
2✔
704
        // Consume block epochs until we receive the instruction to shutdown.
2✔
705
        for {
4✔
706
                select {
2✔
707
                // Consume block epochs, exiting if our subscription is
708
                // terminated.
UNCOV
709
                case beat := <-c.BlockbeatChan:
×
UNCOV
710
                        // Set the current blockbeat.
×
UNCOV
711
                        c.beat = beat
×
UNCOV
712

×
UNCOV
713
                        // Send this blockbeat to all the active channels and
×
UNCOV
714
                        // wait for them to finish processing it.
×
UNCOV
715
                        c.handleBlockbeat(beat)
×
716

717
                // Exit if the chain arbitrator is shutting down.
718
                case <-c.quit:
2✔
719
                        return
2✔
720
                }
721
        }
722
}
723

724
// handleBlockbeat sends the blockbeat to all active channel arbitrator in
725
// parallel and wait for them to finish processing it.
UNCOV
726
func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
×
UNCOV
727
        // Read the active channels in a lock.
×
UNCOV
728
        c.Lock()
×
UNCOV
729

×
UNCOV
730
        // Create a slice to record active channel arbitrator.
×
UNCOV
731
        channels := make([]chainio.Consumer, 0, len(c.activeChannels))
×
UNCOV
732
        watchers := make([]chainio.Consumer, 0, len(c.activeWatchers))
×
UNCOV
733

×
UNCOV
734
        // Copy the active channels to the slice.
×
UNCOV
735
        for _, channel := range c.activeChannels {
×
UNCOV
736
                channels = append(channels, channel)
×
UNCOV
737
        }
×
738

UNCOV
739
        for _, watcher := range c.activeWatchers {
×
UNCOV
740
                watchers = append(watchers, watcher)
×
UNCOV
741
        }
×
742

UNCOV
743
        c.Unlock()
×
UNCOV
744

×
UNCOV
745
        // Iterate all the copied watchers and send the blockbeat to them.
×
UNCOV
746
        err := chainio.DispatchConcurrent(beat, watchers)
×
UNCOV
747
        if err != nil {
×
748
                log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
×
749
        }
×
750

751
        // Iterate all the copied channels and send the blockbeat to them.
752
        //
753
        // NOTE: This method will timeout if the processing of blocks of the
754
        // subsystems is too long (60s).
UNCOV
755
        err = chainio.DispatchConcurrent(beat, channels)
×
UNCOV
756
        if err != nil {
×
757
                log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
×
758
                        err)
×
759
        }
×
760

761
        // Notify the chain arbitrator has processed the block.
UNCOV
762
        c.NotifyBlockProcessed(beat, err)
×
763
}
764

765
// republishClosingTxs will load any stored cooperative or unilateral closing
766
// transactions and republish them. This helps ensure propagation of the
767
// transactions in the event that prior publications failed.
768
func (c *ChainArbitrator) republishClosingTxs(
769
        channel *channeldb.OpenChannel) error {
11✔
770

11✔
771
        // If the channel has had its unilateral close broadcasted already,
11✔
772
        // republish it in case it didn't propagate.
11✔
773
        if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
16✔
774
                err := c.rebroadcast(
5✔
775
                        channel, channeldb.ChanStatusCommitBroadcasted,
5✔
776
                )
5✔
777
                if err != nil {
5✔
778
                        return err
×
779
                }
×
780
        }
781

782
        // If the channel has had its cooperative close broadcasted
783
        // already, republish it in case it didn't propagate.
784
        if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
16✔
785
                err := c.rebroadcast(
5✔
786
                        channel, channeldb.ChanStatusCoopBroadcasted,
5✔
787
                )
5✔
788
                if err != nil {
5✔
789
                        return err
×
790
                }
×
791
        }
792

793
        return nil
11✔
794
}
795

796
// rebroadcast is a helper method which will republish the unilateral or
797
// cooperative close transaction or a channel in a particular state.
798
//
799
// NOTE: There is no risk to calling this method if the channel isn't in either
800
// CommitmentBroadcasted or CoopBroadcasted, but the logs will be misleading.
801
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
802
        state channeldb.ChannelStatus) error {
10✔
803

10✔
804
        chanPoint := channel.FundingOutpoint
10✔
805

10✔
806
        var (
10✔
807
                closeTx *wire.MsgTx
10✔
808
                kind    string
10✔
809
                err     error
10✔
810
        )
10✔
811
        switch state {
10✔
812
        case channeldb.ChanStatusCommitBroadcasted:
5✔
813
                kind = "force"
5✔
814
                closeTx, err = channel.BroadcastedCommitment()
5✔
815

816
        case channeldb.ChanStatusCoopBroadcasted:
5✔
817
                kind = "coop"
5✔
818
                closeTx, err = channel.BroadcastedCooperative()
5✔
819

820
        default:
×
821
                return fmt.Errorf("unknown closing state: %v", state)
×
822
        }
823

824
        switch {
10✔
825
        // This can happen for channels that had their closing tx published
826
        // before we started storing it to disk.
827
        case err == channeldb.ErrNoCloseTx:
×
828
                log.Warnf("Channel %v is in state %v, but no %s closing tx "+
×
829
                        "to re-publish...", chanPoint, state, kind)
×
830
                return nil
×
831

832
        case err != nil:
×
833
                return err
×
834
        }
835

836
        log.Infof("Re-publishing %s close tx(%v) for channel %v",
10✔
837
                kind, closeTx.TxHash(), chanPoint)
10✔
838

10✔
839
        label := labels.MakeLabel(
10✔
840
                labels.LabelTypeChannelClose, &channel.ShortChannelID,
10✔
841
        )
10✔
842
        err = c.cfg.PublishTx(closeTx, label)
10✔
843
        if err != nil && err != lnwallet.ErrDoubleSpend {
10✔
844
                log.Warnf("Unable to broadcast %s close tx(%v): %v",
×
845
                        kind, closeTx.TxHash(), err)
×
846
        }
×
847

848
        return nil
10✔
849
}
850

851
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
852
// channel arbitrators will be signalled to exit, and this method will block
853
// until they've all exited.
854
func (c *ChainArbitrator) Stop() error {
2✔
855
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
2✔
856
                return nil
×
857
        }
×
858

859
        log.Info("ChainArbitrator shutting down...")
2✔
860
        defer log.Debug("ChainArbitrator shutdown complete")
2✔
861

2✔
862
        close(c.quit)
2✔
863

2✔
864
        var (
2✔
865
                activeWatchers = make(map[wire.OutPoint]*chainWatcher)
2✔
866
                activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
2✔
867
        )
2✔
868

2✔
869
        // Copy the current set of active watchers and arbitrators to shutdown.
2✔
870
        // We don't want to hold the lock when shutting down each watcher or
2✔
871
        // arbitrator individually, as they may need to acquire this mutex.
2✔
872
        c.Lock()
2✔
873
        for chanPoint, watcher := range c.activeWatchers {
12✔
874
                activeWatchers[chanPoint] = watcher
10✔
875
        }
10✔
876
        for chanPoint, arbitrator := range c.activeChannels {
12✔
877
                activeChannels[chanPoint] = arbitrator
10✔
878
        }
10✔
879
        c.Unlock()
2✔
880

2✔
881
        for chanPoint, watcher := range activeWatchers {
12✔
882
                log.Tracef("Attempting to stop ChainWatcher(%v)",
10✔
883
                        chanPoint)
10✔
884

10✔
885
                if err := watcher.Stop(); err != nil {
10✔
886
                        log.Errorf("unable to stop watcher for "+
×
887
                                "ChannelPoint(%v): %v", chanPoint, err)
×
888
                }
×
889
        }
890
        for chanPoint, arbitrator := range activeChannels {
12✔
891
                log.Tracef("Attempting to stop ChannelArbitrator(%v)",
10✔
892
                        chanPoint)
10✔
893

10✔
894
                if err := arbitrator.Stop(); err != nil {
10✔
895
                        log.Errorf("unable to stop arbitrator for "+
×
896
                                "ChannelPoint(%v): %v", chanPoint, err)
×
897
                }
×
898
        }
899

900
        c.wg.Wait()
2✔
901

2✔
902
        return nil
2✔
903
}
904

905
// ContractUpdate is a message packages the latest set of active HTLCs on a
906
// commitment, and also identifies which commitment received a new set of
907
// HTLCs.
908
type ContractUpdate struct {
909
        // HtlcKey identifies which commitment the HTLCs below are present on.
910
        HtlcKey HtlcSetKey
911

912
        // Htlcs are the of active HTLCs on the commitment identified by the
913
        // above HtlcKey.
914
        Htlcs []channeldb.HTLC
915
}
916

917
// ContractSignals is used by outside subsystems to notify a channel arbitrator
918
// of its ShortChannelID.
919
type ContractSignals struct {
920
        // ShortChanID is the up to date short channel ID for a contract. This
921
        // can change either if when the contract was added it didn't yet have
922
        // a stable identifier, or in the case of a reorg.
923
        ShortChanID lnwire.ShortChannelID
924
}
925

926
// UpdateContractSignals sends a set of active, up to date contract signals to
927
// the ChannelArbitrator which is has been assigned to the channel infield by
928
// the passed channel point.
929
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
UNCOV
930
        signals *ContractSignals) error {
×
UNCOV
931

×
UNCOV
932
        log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
×
UNCOV
933
                chanPoint)
×
UNCOV
934

×
UNCOV
935
        c.Lock()
×
UNCOV
936
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
937
        c.Unlock()
×
UNCOV
938
        if !ok {
×
939
                return fmt.Errorf("unable to find arbitrator")
×
940
        }
×
941

UNCOV
942
        arbitrator.UpdateContractSignals(signals)
×
UNCOV
943

×
UNCOV
944
        return nil
×
945
}
946

947
// NotifyContractUpdate lets a channel arbitrator know that a new
948
// ContractUpdate is available. This calls the ChannelArbitrator's internal
949
// method NotifyContractUpdate which waits for a response on a done chan before
950
// returning. This method will return an error if the ChannelArbitrator is not
951
// in the activeChannels map. However, this only happens if the arbitrator is
952
// resolved and the related link would already be shut down.
953
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
UNCOV
954
        update *ContractUpdate) error {
×
UNCOV
955

×
UNCOV
956
        c.Lock()
×
UNCOV
957
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
958
        c.Unlock()
×
UNCOV
959
        if !ok {
×
960
                return fmt.Errorf("can't find arbitrator for %v", chanPoint)
×
961
        }
×
962

UNCOV
963
        arbitrator.notifyContractUpdate(update)
×
UNCOV
964
        return nil
×
965
}
966

967
// GetChannelArbitrator safely returns the channel arbitrator for a given
968
// channel outpoint.
969
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
UNCOV
970
        *ChannelArbitrator, error) {
×
UNCOV
971

×
UNCOV
972
        c.Lock()
×
UNCOV
973
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
974
        c.Unlock()
×
UNCOV
975
        if !ok {
×
976
                return nil, fmt.Errorf("unable to find arbitrator")
×
977
        }
×
978

UNCOV
979
        return arbitrator, nil
×
980
}
981

982
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
983
// that watches a particular channel to broadcast the commitment transaction,
984
// and enter the resolution phase of the channel.
985
type forceCloseReq struct {
986
        // errResp is a channel that will be sent upon either in the case of
987
        // force close success (nil error), or in the case on an error.
988
        //
989
        // NOTE; This channel MUST be buffered.
990
        errResp chan error
991

992
        // closeTx is a channel that carries the transaction which ultimately
993
        // closed out the channel.
994
        closeTx chan *wire.MsgTx
995
}
996

997
// ForceCloseContract attempts to force close the channel infield by the passed
998
// channel point. A force close will immediately terminate the contract,
999
// causing it to enter the resolution phase. If the force close was successful,
1000
// then the force close transaction itself will be returned.
1001
//
1002
// TODO(roasbeef): just return the summary itself?
UNCOV
1003
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
×
UNCOV
1004
        c.Lock()
×
UNCOV
1005
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
1006
        c.Unlock()
×
UNCOV
1007
        if !ok {
×
1008
                return nil, fmt.Errorf("unable to find arbitrator")
×
1009
        }
×
1010

UNCOV
1011
        log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
×
UNCOV
1012

×
UNCOV
1013
        // Before closing, we'll attempt to send a disable update for the
×
UNCOV
1014
        // channel. We do so before closing the channel as otherwise the current
×
UNCOV
1015
        // edge policy won't be retrievable from the graph.
×
UNCOV
1016
        if err := c.cfg.DisableChannel(chanPoint); err != nil {
×
1017
                log.Warnf("Unable to disable channel %v on "+
×
1018
                        "close: %v", chanPoint, err)
×
1019
        }
×
1020

UNCOV
1021
        errChan := make(chan error, 1)
×
UNCOV
1022
        respChan := make(chan *wire.MsgTx, 1)
×
UNCOV
1023

×
UNCOV
1024
        // With the channel found, and the request crafted, we'll send over a
×
UNCOV
1025
        // force close request to the arbitrator that watches this channel.
×
UNCOV
1026
        select {
×
1027
        case arbitrator.forceCloseReqs <- &forceCloseReq{
1028
                errResp: errChan,
1029
                closeTx: respChan,
UNCOV
1030
        }:
×
1031
        case <-c.quit:
×
1032
                return nil, ErrChainArbExiting
×
1033
        }
1034

1035
        // We'll await two responses: the error response, and the transaction
1036
        // that closed out the channel.
UNCOV
1037
        select {
×
UNCOV
1038
        case err := <-errChan:
×
UNCOV
1039
                if err != nil {
×
UNCOV
1040
                        return nil, err
×
UNCOV
1041
                }
×
1042
        case <-c.quit:
×
1043
                return nil, ErrChainArbExiting
×
1044
        }
1045

UNCOV
1046
        var closeTx *wire.MsgTx
×
UNCOV
1047
        select {
×
UNCOV
1048
        case closeTx = <-respChan:
×
1049
        case <-c.quit:
×
1050
                return nil, ErrChainArbExiting
×
1051
        }
1052

UNCOV
1053
        return closeTx, nil
×
1054
}
1055

1056
// WatchNewChannel sends the ChainArbitrator a message to create a
1057
// ChannelArbitrator tasked with watching over a new channel. Once a new
1058
// channel has finished its final funding flow, it should be registered with
1059
// the ChainArbitrator so we can properly react to any on-chain events.
UNCOV
1060
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
×
UNCOV
1061
        c.Lock()
×
UNCOV
1062
        defer c.Unlock()
×
UNCOV
1063

×
UNCOV
1064
        chanPoint := newChan.FundingOutpoint
×
UNCOV
1065

×
UNCOV
1066
        log.Infof("Creating new chainWatcher and ChannelArbitrator for "+
×
UNCOV
1067
                "ChannelPoint(%v)", chanPoint)
×
UNCOV
1068

×
UNCOV
1069
        // If we're already watching this channel, then we'll ignore this
×
UNCOV
1070
        // request.
×
UNCOV
1071
        if _, ok := c.activeChannels[chanPoint]; ok {
×
1072
                return nil
×
1073
        }
×
1074

1075
        // First, also create an active chainWatcher for this channel to ensure
1076
        // that we detect any relevant on chain events.
UNCOV
1077
        chainWatcher, err := newChainWatcher(
×
UNCOV
1078
                chainWatcherConfig{
×
UNCOV
1079
                        chanState: newChan,
×
UNCOV
1080
                        notifier:  c.cfg.Notifier,
×
UNCOV
1081
                        signer:    c.cfg.Signer,
×
UNCOV
1082
                        isOurAddr: c.cfg.IsOurAddress,
×
UNCOV
1083
                        contractBreach: func(
×
UNCOV
1084
                                retInfo *lnwallet.BreachRetribution) error {
×
UNCOV
1085

×
UNCOV
1086
                                return c.cfg.ContractBreach(
×
UNCOV
1087
                                        chanPoint, retInfo,
×
UNCOV
1088
                                )
×
UNCOV
1089
                        },
×
1090
                        extractStateNumHint: lnwallet.GetStateNumHint,
1091
                        auxLeafStore:        c.cfg.AuxLeafStore,
1092
                        auxResolver:         c.cfg.AuxResolver,
1093
                },
1094
        )
UNCOV
1095
        if err != nil {
×
1096
                return err
×
1097
        }
×
1098

UNCOV
1099
        c.activeWatchers[chanPoint] = chainWatcher
×
UNCOV
1100

×
UNCOV
1101
        // We'll also create a new channel arbitrator instance using this new
×
UNCOV
1102
        // channel, and our internal state.
×
UNCOV
1103
        channelArb, err := newActiveChannelArbitrator(
×
UNCOV
1104
                newChan, c, chainWatcher.SubscribeChannelEvents(),
×
UNCOV
1105
        )
×
UNCOV
1106
        if err != nil {
×
1107
                return err
×
1108
        }
×
1109

1110
        // With the arbitrator created, we'll add it to our set of active
1111
        // arbitrators, then launch it.
UNCOV
1112
        c.activeChannels[chanPoint] = channelArb
×
UNCOV
1113

×
UNCOV
1114
        if err := channelArb.Start(nil, c.beat); err != nil {
×
1115
                return err
×
1116
        }
×
1117

UNCOV
1118
        return chainWatcher.Start()
×
1119
}
1120

1121
// SubscribeChannelEvents returns a new active subscription for the set of
1122
// possible on-chain events for a particular channel. The struct can be used by
1123
// callers to be notified whenever an event that changes the state of the
1124
// channel on-chain occurs.
1125
func (c *ChainArbitrator) SubscribeChannelEvents(
UNCOV
1126
        chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
×
UNCOV
1127

×
UNCOV
1128
        // First, we'll attempt to look up the active watcher for this channel.
×
UNCOV
1129
        // If we can't find it, then we'll return an error back to the caller.
×
UNCOV
1130
        c.Lock()
×
UNCOV
1131
        watcher, ok := c.activeWatchers[chanPoint]
×
UNCOV
1132
        c.Unlock()
×
UNCOV
1133

×
UNCOV
1134
        if !ok {
×
1135
                return nil, fmt.Errorf("unable to find watcher for: %v",
×
1136
                        chanPoint)
×
1137
        }
×
1138

1139
        // With the watcher located, we'll request for it to create a new chain
1140
        // event subscription client.
UNCOV
1141
        return watcher.SubscribeChannelEvents(), nil
×
1142
}
1143

1144
// FindOutgoingHTLCDeadline returns the deadline in absolute block height for
1145
// the specified outgoing HTLC. For an outgoing HTLC, its deadline is defined
1146
// by the timeout height of its corresponding incoming HTLC - this is the
1147
// expiry height the that remote peer can spend his/her outgoing HTLC via the
1148
// timeout path.
1149
func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
UNCOV
1150
        outgoingHTLC channeldb.HTLC) fn.Option[int32] {
×
UNCOV
1151

×
UNCOV
1152
        // Find the outgoing HTLC's corresponding incoming HTLC in the circuit
×
UNCOV
1153
        // map.
×
UNCOV
1154
        rHash := outgoingHTLC.RHash
×
UNCOV
1155
        circuit := models.CircuitKey{
×
UNCOV
1156
                ChanID: scid,
×
UNCOV
1157
                HtlcID: outgoingHTLC.HtlcIndex,
×
UNCOV
1158
        }
×
UNCOV
1159
        incomingCircuit := c.cfg.QueryIncomingCircuit(circuit)
×
UNCOV
1160

×
UNCOV
1161
        // If there's no incoming circuit found, we will use the default
×
UNCOV
1162
        // deadline.
×
UNCOV
1163
        if incomingCircuit == nil {
×
UNCOV
1164
                log.Warnf("ChannelArbitrator(%v): incoming circuit key not "+
×
UNCOV
1165
                        "found for rHash=%x, using default deadline instead",
×
UNCOV
1166
                        scid, rHash)
×
UNCOV
1167

×
UNCOV
1168
                return fn.None[int32]()
×
UNCOV
1169
        }
×
1170

1171
        // If this is a locally initiated HTLC, it means we are the first hop.
1172
        // In this case, we can relax the deadline.
UNCOV
1173
        if incomingCircuit.ChanID.IsDefault() {
×
UNCOV
1174
                log.Infof("ChannelArbitrator(%v): using default deadline for "+
×
UNCOV
1175
                        "locally initiated HTLC for rHash=%x", scid, rHash)
×
UNCOV
1176

×
UNCOV
1177
                return fn.None[int32]()
×
UNCOV
1178
        }
×
1179

UNCOV
1180
        log.Debugf("Found incoming circuit %v for rHash=%x using outgoing "+
×
UNCOV
1181
                "circuit %v", incomingCircuit, rHash, circuit)
×
UNCOV
1182

×
UNCOV
1183
        c.Lock()
×
UNCOV
1184
        defer c.Unlock()
×
UNCOV
1185

×
UNCOV
1186
        // Iterate over all active channels to find the incoming HTLC specified
×
UNCOV
1187
        // by its circuit key.
×
UNCOV
1188
        for cp, channelArb := range c.activeChannels {
×
UNCOV
1189
                // Skip if the SCID doesn't match.
×
UNCOV
1190
                if channelArb.cfg.ShortChanID != incomingCircuit.ChanID {
×
UNCOV
1191
                        continue
×
1192
                }
1193

1194
                // Make sure the channel arbitrator has the latest view of its
1195
                // active HTLCs.
UNCOV
1196
                channelArb.updateActiveHTLCs()
×
UNCOV
1197

×
UNCOV
1198
                // Iterate all the known HTLCs to find the targeted incoming
×
UNCOV
1199
                // HTLC.
×
UNCOV
1200
                for _, htlcs := range channelArb.activeHTLCs {
×
UNCOV
1201
                        for _, htlc := range htlcs.incomingHTLCs {
×
UNCOV
1202
                                // Skip if the index doesn't match.
×
UNCOV
1203
                                if htlc.HtlcIndex != incomingCircuit.HtlcID {
×
UNCOV
1204
                                        continue
×
1205
                                }
1206

UNCOV
1207
                                log.Debugf("ChannelArbitrator(%v): found "+
×
UNCOV
1208
                                        "incoming HTLC in channel=%v using "+
×
UNCOV
1209
                                        "rHash=%x, refundTimeout=%v", scid,
×
UNCOV
1210
                                        cp, rHash, htlc.RefundTimeout)
×
UNCOV
1211

×
UNCOV
1212
                                return fn.Some(int32(htlc.RefundTimeout))
×
1213
                        }
1214
                }
1215
        }
1216

1217
        // If there's no incoming HTLC found, yet we have the incoming circuit,
1218
        // something is wrong - in this case, we return the none deadline.
UNCOV
1219
        log.Errorf("ChannelArbitrator(%v): incoming HTLC not found for "+
×
UNCOV
1220
                "rHash=%x, using default deadline instead", scid, rHash)
×
UNCOV
1221

×
UNCOV
1222
        return fn.None[int32]()
×
1223
}
1224

1225
// TODO(roasbeef): arbitration reports
1226
//  * types: contested, waiting for success conf, etc
1227

1228
// NOTE: part of the `chainio.Consumer` interface.
1229
func (c *ChainArbitrator) Name() string {
2✔
1230
        return "ChainArbitrator"
2✔
1231
}
2✔
1232

1233
// loadOpenChannels loads all channels that are currently open in the database
1234
// and registers them with the chainWatcher for future notification.
1235
func (c *ChainArbitrator) loadOpenChannels() error {
2✔
1236
        openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
2✔
1237
        if err != nil {
2✔
1238
                return err
×
1239
        }
×
1240

1241
        if len(openChannels) == 0 {
2✔
UNCOV
1242
                return nil
×
UNCOV
1243
        }
×
1244

1245
        log.Infof("Creating ChannelArbitrators for %v active channels",
2✔
1246
                len(openChannels))
2✔
1247

2✔
1248
        // For each open channel, we'll configure then launch a corresponding
2✔
1249
        // ChannelArbitrator.
2✔
1250
        for _, channel := range openChannels {
13✔
1251
                chanPoint := channel.FundingOutpoint
11✔
1252
                channel := channel
11✔
1253

11✔
1254
                // First, we'll create an active chainWatcher for this channel
11✔
1255
                // to ensure that we detect any relevant on chain events.
11✔
1256
                breachClosure := func(ret *lnwallet.BreachRetribution) error {
11✔
UNCOV
1257
                        return c.cfg.ContractBreach(chanPoint, ret)
×
UNCOV
1258
                }
×
1259

1260
                chainWatcher, err := newChainWatcher(
11✔
1261
                        chainWatcherConfig{
11✔
1262
                                chanState:           channel,
11✔
1263
                                notifier:            c.cfg.Notifier,
11✔
1264
                                signer:              c.cfg.Signer,
11✔
1265
                                isOurAddr:           c.cfg.IsOurAddress,
11✔
1266
                                contractBreach:      breachClosure,
11✔
1267
                                extractStateNumHint: lnwallet.GetStateNumHint,
11✔
1268
                                auxLeafStore:        c.cfg.AuxLeafStore,
11✔
1269
                                auxResolver:         c.cfg.AuxResolver,
11✔
1270
                        },
11✔
1271
                )
11✔
1272
                if err != nil {
11✔
1273
                        return err
×
1274
                }
×
1275

1276
                c.activeWatchers[chanPoint] = chainWatcher
11✔
1277
                channelArb, err := newActiveChannelArbitrator(
11✔
1278
                        channel, c, chainWatcher.SubscribeChannelEvents(),
11✔
1279
                )
11✔
1280
                if err != nil {
11✔
1281
                        return err
×
1282
                }
×
1283

1284
                c.activeChannels[chanPoint] = channelArb
11✔
1285

11✔
1286
                // Republish any closing transactions for this channel.
11✔
1287
                err = c.republishClosingTxs(channel)
11✔
1288
                if err != nil {
11✔
1289
                        log.Errorf("Failed to republish closing txs for "+
×
1290
                                "channel %v", chanPoint)
×
1291
                }
×
1292
        }
1293

1294
        return nil
2✔
1295
}
1296

1297
// loadPendingCloseChannels loads all channels that are currently pending
1298
// closure in the database and registers them with the ChannelArbitrator to
1299
// continue the resolution process.
1300
func (c *ChainArbitrator) loadPendingCloseChannels() error {
2✔
1301
        chanStateDB := c.chanSource.ChannelStateDB()
2✔
1302

2✔
1303
        closingChannels, err := chanStateDB.FetchClosedChannels(true)
2✔
1304
        if err != nil {
2✔
1305
                return err
×
1306
        }
×
1307

1308
        if len(closingChannels) == 0 {
4✔
1309
                return nil
2✔
1310
        }
2✔
1311

UNCOV
1312
        log.Infof("Creating ChannelArbitrators for %v closing channels",
×
UNCOV
1313
                len(closingChannels))
×
UNCOV
1314

×
UNCOV
1315
        // Next, for each channel is the closing state, we'll launch a
×
UNCOV
1316
        // corresponding more restricted resolver, as we don't have to watch
×
UNCOV
1317
        // the chain any longer, only resolve the contracts on the confirmed
×
UNCOV
1318
        // commitment.
×
UNCOV
1319
        //nolint:ll
×
UNCOV
1320
        for _, closeChanInfo := range closingChannels {
×
UNCOV
1321
                // We can leave off the CloseContract and ForceCloseChan
×
UNCOV
1322
                // methods as the channel is already closed at this point.
×
UNCOV
1323
                chanPoint := closeChanInfo.ChanPoint
×
UNCOV
1324
                arbCfg := ChannelArbitratorConfig{
×
UNCOV
1325
                        ChanPoint:             chanPoint,
×
UNCOV
1326
                        ShortChanID:           closeChanInfo.ShortChanID,
×
UNCOV
1327
                        ChainArbitratorConfig: c.cfg,
×
UNCOV
1328
                        ChainEvents:           &ChainEventSubscription{},
×
UNCOV
1329
                        IsPendingClose:        true,
×
UNCOV
1330
                        ClosingHeight:         closeChanInfo.CloseHeight,
×
UNCOV
1331
                        CloseType:             closeChanInfo.CloseType,
×
UNCOV
1332
                        PutResolverReport: func(tx kvdb.RwTx,
×
UNCOV
1333
                                report *channeldb.ResolverReport) error {
×
UNCOV
1334

×
UNCOV
1335
                                return c.chanSource.PutResolverReport(
×
UNCOV
1336
                                        tx, c.cfg.ChainHash, &chanPoint, report,
×
UNCOV
1337
                                )
×
UNCOV
1338
                        },
×
UNCOV
1339
                        FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
×
UNCOV
1340
                                return chanStateDB.FetchHistoricalChannel(&chanPoint)
×
UNCOV
1341
                        },
×
1342
                        FindOutgoingHTLCDeadline: func(
UNCOV
1343
                                htlc channeldb.HTLC) fn.Option[int32] {
×
UNCOV
1344

×
UNCOV
1345
                                return c.FindOutgoingHTLCDeadline(
×
UNCOV
1346
                                        closeChanInfo.ShortChanID, htlc,
×
UNCOV
1347
                                )
×
UNCOV
1348
                        },
×
1349
                }
UNCOV
1350
                chanLog, err := newBoltArbitratorLog(
×
UNCOV
1351
                        c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
×
UNCOV
1352
                )
×
UNCOV
1353
                if err != nil {
×
1354
                        return err
×
1355
                }
×
UNCOV
1356
                arbCfg.MarkChannelResolved = func() error {
×
UNCOV
1357
                        if c.cfg.NotifyFullyResolvedChannel != nil {
×
UNCOV
1358
                                c.cfg.NotifyFullyResolvedChannel(chanPoint)
×
UNCOV
1359
                        }
×
1360

UNCOV
1361
                        return c.ResolveContract(chanPoint)
×
1362
                }
1363

1364
                // We create an empty map of HTLC's here since it's possible
1365
                // that the channel is in StateDefault and updateActiveHTLCs is
1366
                // called. We want to avoid writing to an empty map. Since the
1367
                // channel is already in the process of being resolved, no new
1368
                // HTLCs will be added.
UNCOV
1369
                c.activeChannels[chanPoint] = NewChannelArbitrator(
×
UNCOV
1370
                        arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
×
UNCOV
1371
                )
×
1372
        }
1373

UNCOV
1374
        return nil
×
1375
}
1376

1377
// RedispatchBlockbeat resends the current blockbeat to the channels specified
1378
// by the chanPoints. It is used when a channel is added to the chain
1379
// arbitrator after it has been started, e.g., during the channel restore
1380
// process.
UNCOV
1381
func (c *ChainArbitrator) RedispatchBlockbeat(chanPoints []wire.OutPoint) {
×
UNCOV
1382
        // Get the current blockbeat.
×
UNCOV
1383
        beat := c.beat
×
UNCOV
1384

×
UNCOV
1385
        // Prepare two sets of consumers.
×
UNCOV
1386
        channels := make([]chainio.Consumer, 0, len(chanPoints))
×
UNCOV
1387
        watchers := make([]chainio.Consumer, 0, len(chanPoints))
×
UNCOV
1388

×
UNCOV
1389
        // Read the active channels in a lock.
×
UNCOV
1390
        c.Lock()
×
UNCOV
1391
        for _, op := range chanPoints {
×
UNCOV
1392
                if channel, ok := c.activeChannels[op]; ok {
×
UNCOV
1393
                        channels = append(channels, channel)
×
UNCOV
1394
                }
×
1395

UNCOV
1396
                if watcher, ok := c.activeWatchers[op]; ok {
×
UNCOV
1397
                        watchers = append(watchers, watcher)
×
UNCOV
1398
                }
×
1399
        }
UNCOV
1400
        c.Unlock()
×
UNCOV
1401

×
UNCOV
1402
        // Iterate all the copied watchers and send the blockbeat to them.
×
UNCOV
1403
        err := chainio.DispatchConcurrent(beat, watchers)
×
UNCOV
1404
        if err != nil {
×
1405
                log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
×
1406
        }
×
1407

1408
        // Iterate all the copied channels and send the blockbeat to them.
UNCOV
1409
        err = chainio.DispatchConcurrent(beat, channels)
×
UNCOV
1410
        if err != nil {
×
1411
                // Shutdown lnd if there's an error processing the block.
×
1412
                log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
×
1413
                        err)
×
1414
        }
×
1415
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc