• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12343072627

15 Dec 2024 11:09PM UTC coverage: 57.504% (-1.1%) from 58.636%
12343072627

Pull #9315

github

yyforyongyu
contractcourt: offer outgoing htlc one block earlier before its expiry

We need to offer the outgoing htlc one block earlier to make sure when
the expiry height hits, the sweeper will not miss sweeping it in the
same block. This also means the outgoing contest resolver now only does
one thing - watch for preimage spend till height expiry-1, which can
easily be moved into the timeout resolver instead in the future.
Pull Request #9315: Implement `blockbeat`

1445 of 2007 new or added lines in 26 files covered. (72.0%)

19246 existing lines in 249 files now uncovered.

102342 of 177975 relevant lines covered (57.5%)

24772.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.55
/contractcourt/chain_arbitrator.go
1
package contractcourt
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/chainio"
15
        "github.com/lightningnetwork/lnd/chainntnfs"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
        "github.com/lightningnetwork/lnd/clock"
18
        "github.com/lightningnetwork/lnd/fn/v2"
19
        "github.com/lightningnetwork/lnd/graph/db/models"
20
        "github.com/lightningnetwork/lnd/input"
21
        "github.com/lightningnetwork/lnd/kvdb"
22
        "github.com/lightningnetwork/lnd/labels"
23
        "github.com/lightningnetwork/lnd/lnwallet"
24
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
25
        "github.com/lightningnetwork/lnd/lnwire"
26
)
27

28
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
29
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
30

31
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
32
// outgoing contract has been fully resolved. For multi-hop contracts, if we
33
// resolve the outgoing contract, we'll also need to ensure that the incoming
34
// contract is resolved as well. We package the items required to resolve the
35
// incoming contracts within this message.
36
type ResolutionMsg struct {
37
        // SourceChan identifies the channel that this message is being sent
38
        // from. This is the channel's short channel ID.
39
        SourceChan lnwire.ShortChannelID
40

41
        // HtlcIndex is the index of the contract within the original
42
        // commitment trace.
43
        HtlcIndex uint64
44

45
        // Failure will be non-nil if the incoming contract should be canceled
46
        // all together. This can happen if the outgoing contract was dust, if
47
        // if the outgoing HTLC timed out.
48
        Failure lnwire.FailureMessage
49

50
        // PreImage will be non-nil if the incoming contract can successfully
51
        // be redeemed. This can happen if we learn of the preimage from the
52
        // outgoing HTLC on-chain.
53
        PreImage *[32]byte
54
}
55

56
// ChainArbitratorConfig is a configuration struct that contains all the
57
// function closures and interface that required to arbitrate on-chain
58
// contracts for a particular chain.
59
type ChainArbitratorConfig struct {
60
        // ChainHash is the chain that this arbitrator is to operate within.
61
        ChainHash chainhash.Hash
62

63
        // IncomingBroadcastDelta is the delta that we'll use to decide when to
64
        // broadcast our commitment transaction if we have incoming htlcs. This
65
        // value should be set based on our current fee estimation of the
66
        // commitment transaction. We use this to determine when we should
67
        // broadcast instead of just the HTLC timeout, as we want to ensure
68
        // that the commitment transaction is already confirmed, by the time the
69
        // HTLC expires. Otherwise we may end up not settling the htlc on-chain
70
        // because the other party managed to time it out.
71
        IncomingBroadcastDelta uint32
72

73
        // OutgoingBroadcastDelta is the delta that we'll use to decide when to
74
        // broadcast our commitment transaction if there are active outgoing
75
        // htlcs. This value can be lower than the incoming broadcast delta.
76
        OutgoingBroadcastDelta uint32
77

78
        // NewSweepAddr is a function that returns a new address under control
79
        // by the wallet. We'll use this to sweep any no-delay outputs as a
80
        // result of unilateral channel closes.
81
        //
82
        // NOTE: This SHOULD return a p2wkh script.
83
        NewSweepAddr func() ([]byte, error)
84

85
        // PublishTx reliably broadcasts a transaction to the network. Once
86
        // this function exits without an error, then they transaction MUST
87
        // continually be rebroadcast if needed.
88
        PublishTx func(*wire.MsgTx, string) error
89

90
        // DeliverResolutionMsg is a function that will append an outgoing
91
        // message to the "out box" for a ChannelLink. This is used to cancel
92
        // backwards any HTLC's that are either dust, we're timing out, or
93
        // settling on-chain to the incoming link.
94
        DeliverResolutionMsg func(...ResolutionMsg) error
95

96
        // MarkLinkInactive is a function closure that the ChainArbitrator will
97
        // use to mark that active HTLC's shouldn't be attempted to be routed
98
        // over a particular channel. This function will be called in that a
99
        // ChannelArbitrator decides that it needs to go to chain in order to
100
        // resolve contracts.
101
        //
102
        // TODO(roasbeef): rename, routing based
103
        MarkLinkInactive func(wire.OutPoint) error
104

105
        // ContractBreach is a function closure that the ChainArbitrator will
106
        // use to notify the BreachArbitrator about a contract breach. It should
107
        // only return a non-nil error when the BreachArbitrator has preserved
108
        // the necessary breach info for this channel point. Once the breach
109
        // resolution is persisted in the ChannelArbitrator, it will be safe
110
        // to mark the channel closed.
111
        ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
112

113
        // IsOurAddress is a function that returns true if the passed address
114
        // is known to the underlying wallet. Otherwise, false should be
115
        // returned.
116
        IsOurAddress func(btcutil.Address) bool
117

118
        // IncubateOutputs sends either an incoming HTLC, an outgoing HTLC, or
119
        // both to the utxo nursery. Once this function returns, the nursery
120
        // should have safely persisted the outputs to disk, and should start
121
        // the process of incubation. This is used when a resolver wishes to
122
        // pass off the output to the nursery as we're only waiting on an
123
        // absolute/relative item block.
124
        IncubateOutputs func(wire.OutPoint,
125
                fn.Option[lnwallet.OutgoingHtlcResolution],
126
                fn.Option[lnwallet.IncomingHtlcResolution],
127
                uint32, fn.Option[int32]) error
128

129
        // PreimageDB is a global store of all known pre-images. We'll use this
130
        // to decide if we should broadcast a commitment transaction to claim
131
        // an HTLC on-chain.
132
        PreimageDB WitnessBeacon
133

134
        // Notifier is an instance of a chain notifier we'll use to watch for
135
        // certain on-chain events.
136
        Notifier chainntnfs.ChainNotifier
137

138
        // Mempool is the a mempool watcher that allows us to watch for events
139
        // happened in mempool.
140
        Mempool chainntnfs.MempoolWatcher
141

142
        // Signer is a signer backed by the active lnd node. This should be
143
        // capable of producing a signature as specified by a valid
144
        // SignDescriptor.
145
        Signer input.Signer
146

147
        // FeeEstimator will be used to return fee estimates.
148
        FeeEstimator chainfee.Estimator
149

150
        // ChainIO allows us to query the state of the current main chain.
151
        ChainIO lnwallet.BlockChainIO
152

153
        // DisableChannel disables a channel, resulting in it not being able to
154
        // forward payments.
155
        DisableChannel func(wire.OutPoint) error
156

157
        // Sweeper allows resolvers to sweep their final outputs.
158
        Sweeper UtxoSweeper
159

160
        // Registry is the invoice database that is used by resolvers to lookup
161
        // preimages and settle invoices.
162
        Registry Registry
163

164
        // NotifyClosedChannel is a function closure that the ChainArbitrator
165
        // will use to notify the ChannelNotifier about a newly closed channel.
166
        NotifyClosedChannel func(wire.OutPoint)
167

168
        // NotifyFullyResolvedChannel is a function closure that the
169
        // ChainArbitrator will use to notify the ChannelNotifier about a newly
170
        // resolved channel. The main difference to NotifyClosedChannel is that
171
        // in case of a local force close the NotifyClosedChannel is called when
172
        // the published commitment transaction confirms while
173
        // NotifyFullyResolvedChannel is only called when the channel is fully
174
        // resolved (which includes sweeping any time locked funds).
175
        NotifyFullyResolvedChannel func(point wire.OutPoint)
176

177
        // OnionProcessor is used to decode onion payloads for on-chain
178
        // resolution.
179
        OnionProcessor OnionProcessor
180

181
        // PaymentsExpirationGracePeriod indicates a time window we let the
182
        // other node to cancel an outgoing htlc that our node has initiated and
183
        // has timed out.
184
        PaymentsExpirationGracePeriod time.Duration
185

186
        // IsForwardedHTLC checks for a given htlc, identified by channel id and
187
        // htlcIndex, if it is a forwarded one.
188
        IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
189

190
        // Clock is the clock implementation that ChannelArbitrator uses.
191
        // It is useful for testing.
192
        Clock clock.Clock
193

194
        // SubscribeBreachComplete is used by the breachResolver to register a
195
        // subscription that notifies when the breach resolution process is
196
        // complete.
197
        SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
198
                bool, error)
199

200
        // PutFinalHtlcOutcome stores the final outcome of an htlc in the
201
        // database.
202
        PutFinalHtlcOutcome func(chanId lnwire.ShortChannelID,
203
                htlcId uint64, settled bool) error
204

205
        // HtlcNotifier is an interface that htlc events are sent to.
206
        HtlcNotifier HtlcNotifier
207

208
        // Budget is the configured budget for the arbitrator.
209
        Budget BudgetConfig
210

211
        // QueryIncomingCircuit is used to find the outgoing HTLC's
212
        // corresponding incoming HTLC circuit. It queries the circuit map for
213
        // a given outgoing circuit key and returns the incoming circuit key.
214
        //
215
        // TODO(yy): this is a hacky way to get around the cycling import issue
216
        // as we cannot import `htlcswitch` here. A proper way is to define an
217
        // interface here that asks for method `LookupOpenCircuit`,
218
        // meanwhile, turn `PaymentCircuit` into an interface or bring it to a
219
        // lower package.
220
        QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey
221

222
        // AuxLeafStore is an optional store that can be used to store auxiliary
223
        // leaves for certain custom channel types.
224
        AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
225

226
        // AuxSigner is an optional signer that can be used to sign auxiliary
227
        // leaves for certain custom channel types.
228
        AuxSigner fn.Option[lnwallet.AuxSigner]
229

230
        // AuxResolver is an optional interface that can be used to modify the
231
        // way contracts are resolved.
232
        AuxResolver fn.Option[lnwallet.AuxContractResolver]
233
}
234

235
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
236
// active, and channel that are in the "pending close" state. Within the
237
// contractcourt package, the ChainArbitrator manages a set of active
238
// ContractArbitrators. Each ContractArbitrators is responsible for watching
239
// the chain for any activity that affects the state of the channel, and also
240
// for monitoring each contract in order to determine if any on-chain activity is
241
// required. Outside sub-systems interact with the ChainArbitrator in order to
242
// forcibly exit a contract, update the set of live signals for each contract,
243
// and to receive reports on the state of contract resolution.
244
type ChainArbitrator struct {
245
        started int32 // To be used atomically.
246
        stopped int32 // To be used atomically.
247

248
        // Embed the blockbeat consumer struct to get access to the method
249
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
250
        chainio.BeatConsumer
251

252
        sync.Mutex
253

254
        // activeChannels is a map of all the active contracts that are still
255
        // open, and not fully resolved.
256
        activeChannels map[wire.OutPoint]*ChannelArbitrator
257

258
        // activeWatchers is a map of all the active chainWatchers for channels
259
        // that are still considered open.
260
        activeWatchers map[wire.OutPoint]*chainWatcher
261

262
        // cfg is the config struct for the arbitrator that contains all
263
        // methods and interface it needs to operate.
264
        cfg ChainArbitratorConfig
265

266
        // chanSource will be used by the ChainArbitrator to fetch all the
267
        // active channels that it must still watch over.
268
        chanSource *channeldb.DB
269

270
        // beat is the current best known blockbeat.
271
        beat chainio.Blockbeat
272

273
        quit chan struct{}
274

275
        wg sync.WaitGroup
276
}
277

278
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
279
// passed config struct, and backing persistent database.
280
func NewChainArbitrator(cfg ChainArbitratorConfig,
281
        db *channeldb.DB) *ChainArbitrator {
2✔
282

2✔
283
        c := &ChainArbitrator{
2✔
284
                cfg:            cfg,
2✔
285
                activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
2✔
286
                activeWatchers: make(map[wire.OutPoint]*chainWatcher),
2✔
287
                chanSource:     db,
2✔
288
                quit:           make(chan struct{}),
2✔
289
        }
2✔
290

2✔
291
        // Mount the block consumer.
2✔
292
        c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())
2✔
293

2✔
294
        return c
2✔
295
}
2✔
296

297
// Compile-time check for the chainio.Consumer interface.
298
var _ chainio.Consumer = (*ChainArbitrator)(nil)
299

300
// arbChannel is a wrapper around an open channel that channel arbitrators
301
// interact with.
302
type arbChannel struct {
303
        // channel is the in-memory channel state.
304
        channel *channeldb.OpenChannel
305

306
        // c references the chain arbitrator and is used by arbChannel
307
        // internally.
308
        c *ChainArbitrator
309
}
310

311
// NewAnchorResolutions returns the anchor resolutions for currently valid
312
// commitment transactions.
313
//
314
// NOTE: Part of the ArbChannel interface.
315
func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
UNCOV
316
        error) {
×
UNCOV
317

×
UNCOV
318
        // Get a fresh copy of the database state to base the anchor resolutions
×
UNCOV
319
        // on. Unfortunately the channel instance that we have here isn't the
×
UNCOV
320
        // same instance that is used by the link.
×
UNCOV
321
        chanPoint := a.channel.FundingOutpoint
×
UNCOV
322

×
UNCOV
323
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
×
UNCOV
324
        if err != nil {
×
325
                return nil, err
×
326
        }
×
327

UNCOV
328
        var chanOpts []lnwallet.ChannelOpt
×
UNCOV
329
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
330
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
331
        })
×
UNCOV
332
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
333
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
334
        })
×
UNCOV
335
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
336
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
337
        })
×
338

UNCOV
339
        chanMachine, err := lnwallet.NewLightningChannel(
×
UNCOV
340
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
UNCOV
341
        )
×
UNCOV
342
        if err != nil {
×
343
                return nil, err
×
344
        }
×
345

UNCOV
346
        return chanMachine.NewAnchorResolutions()
×
347
}
348

349
// ForceCloseChan should force close the contract that this attendant is
350
// watching over. We'll use this when we decide that we need to go to chain. It
351
// should in addition tell the switch to remove the corresponding link, such
352
// that we won't accept any new updates.
353
//
354
// NOTE: Part of the ArbChannel interface.
UNCOV
355
func (a *arbChannel) ForceCloseChan() (*wire.MsgTx, error) {
×
UNCOV
356
        // First, we mark the channel as borked, this ensure
×
UNCOV
357
        // that no new state transitions can happen, and also
×
UNCOV
358
        // that the link won't be loaded into the switch.
×
UNCOV
359
        if err := a.channel.MarkBorked(); err != nil {
×
360
                return nil, err
×
361
        }
×
362

363
        // With the channel marked as borked, we'll now remove
364
        // the link from the switch if its there. If the link
365
        // is active, then this method will block until it
366
        // exits.
UNCOV
367
        chanPoint := a.channel.FundingOutpoint
×
UNCOV
368

×
UNCOV
369
        if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
×
370
                log.Errorf("unable to mark link inactive: %v", err)
×
371
        }
×
372

373
        // Now that we know the link can't mutate the channel
374
        // state, we'll read the channel from disk the target
375
        // channel according to its channel point.
UNCOV
376
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
×
UNCOV
377
        if err != nil {
×
378
                return nil, err
×
379
        }
×
380

UNCOV
381
        var chanOpts []lnwallet.ChannelOpt
×
UNCOV
382
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
383
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
384
        })
×
UNCOV
385
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
386
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
387
        })
×
UNCOV
388
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
389
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
390
        })
×
391

392
        // Finally, we'll force close the channel completing
393
        // the force close workflow.
UNCOV
394
        chanMachine, err := lnwallet.NewLightningChannel(
×
UNCOV
395
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
UNCOV
396
        )
×
UNCOV
397
        if err != nil {
×
398
                return nil, err
×
399
        }
×
400

UNCOV
401
        closeSummary, err := chanMachine.ForceClose(
×
UNCOV
402
                lnwallet.WithSkipContractResolutions(),
×
UNCOV
403
        )
×
UNCOV
404
        if err != nil {
×
405
                return nil, err
×
406
        }
×
407

UNCOV
408
        return closeSummary.CloseTx, nil
×
409
}
410

411
// newActiveChannelArbitrator creates a new instance of an active channel
412
// arbitrator given the state of the target channel.
413
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
414
        c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
11✔
415

11✔
416
        // TODO(roasbeef): fetch best height (or pass in) so can ensure block
11✔
417
        // epoch delivers all the notifications to
11✔
418

11✔
419
        chanPoint := channel.FundingOutpoint
11✔
420

11✔
421
        log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", chanPoint)
11✔
422

11✔
423
        // Next we'll create the matching configuration struct that contains
11✔
424
        // all interfaces and methods the arbitrator needs to do its job.
11✔
425
        arbCfg := ChannelArbitratorConfig{
11✔
426
                ChanPoint:   chanPoint,
11✔
427
                Channel:     c.getArbChannel(channel),
11✔
428
                ShortChanID: channel.ShortChanID(),
11✔
429

11✔
430
                MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
11✔
431
                MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
11✔
432
                        statuses ...channeldb.ChannelStatus) error {
11✔
UNCOV
433

×
UNCOV
434
                        err := channel.CloseChannel(summary, statuses...)
×
UNCOV
435
                        if err != nil {
×
436
                                return err
×
437
                        }
×
UNCOV
438
                        c.cfg.NotifyClosedChannel(summary.ChanPoint)
×
UNCOV
439
                        return nil
×
440
                },
441
                IsPendingClose:        false,
442
                ChainArbitratorConfig: c.cfg,
443
                ChainEvents:           chanEvents,
444
                PutResolverReport: func(tx kvdb.RwTx,
UNCOV
445
                        report *channeldb.ResolverReport) error {
×
UNCOV
446

×
UNCOV
447
                        return c.chanSource.PutResolverReport(
×
UNCOV
448
                                tx, c.cfg.ChainHash, &chanPoint, report,
×
UNCOV
449
                        )
×
UNCOV
450
                },
×
UNCOV
451
                FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
×
UNCOV
452
                        chanStateDB := c.chanSource.ChannelStateDB()
×
UNCOV
453
                        return chanStateDB.FetchHistoricalChannel(&chanPoint)
×
UNCOV
454
                },
×
455
                FindOutgoingHTLCDeadline: func(
UNCOV
456
                        htlc channeldb.HTLC) fn.Option[int32] {
×
UNCOV
457

×
UNCOV
458
                        return c.FindOutgoingHTLCDeadline(
×
UNCOV
459
                                channel.ShortChanID(), htlc,
×
UNCOV
460
                        )
×
UNCOV
461
                },
×
462
        }
463

464
        // The final component needed is an arbitrator log that the arbitrator
465
        // will use to keep track of its internal state using a backed
466
        // persistent log.
467
        //
468
        // TODO(roasbeef); abstraction leak...
469
        //  * rework: adaptor method to set log scope w/ factory func
470
        chanLog, err := newBoltArbitratorLog(
11✔
471
                c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
11✔
472
        )
11✔
473
        if err != nil {
11✔
474
                return nil, err
×
475
        }
×
476

477
        arbCfg.MarkChannelResolved = func() error {
11✔
UNCOV
478
                if c.cfg.NotifyFullyResolvedChannel != nil {
×
UNCOV
479
                        c.cfg.NotifyFullyResolvedChannel(chanPoint)
×
UNCOV
480
                }
×
481

UNCOV
482
                return c.ResolveContract(chanPoint)
×
483
        }
484

485
        // Finally, we'll need to construct a series of htlc Sets based on all
486
        // currently known valid commitments.
487
        htlcSets := make(map[HtlcSetKey]htlcSet)
11✔
488
        htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
11✔
489
        htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
11✔
490

11✔
491
        pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
11✔
492
        if err != nil && err != channeldb.ErrNoPendingCommit {
11✔
493
                return nil, err
×
494
        }
×
495
        if pendingRemoteCommitment != nil {
11✔
496
                htlcSets[RemotePendingHtlcSet] = newHtlcSet(
×
497
                        pendingRemoteCommitment.Commitment.Htlcs,
×
498
                )
×
499
        }
×
500

501
        return NewChannelArbitrator(
11✔
502
                arbCfg, htlcSets, chanLog,
11✔
503
        ), nil
11✔
504
}
505

506
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
507
func (c *ChainArbitrator) getArbChannel(
508
        channel *channeldb.OpenChannel) *arbChannel {
11✔
509

11✔
510
        return &arbChannel{
11✔
511
                channel: channel,
11✔
512
                c:       c,
11✔
513
        }
11✔
514
}
11✔
515

516
// ResolveContract marks a contract as fully resolved within the database.
517
// This is only to be done once all contracts which were live on the channel
518
// before hitting the chain have been resolved.
519
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
2✔
520
        log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
2✔
521

2✔
522
        // First, we'll we'll mark the channel as fully closed from the PoV of
2✔
523
        // the channel source.
2✔
524
        err := c.chanSource.ChannelStateDB().MarkChanFullyClosed(&chanPoint)
2✔
525
        if err != nil {
2✔
526
                log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
×
527
                        "fully closed: %v", chanPoint, err)
×
528
                return err
×
529
        }
×
530

531
        // Now that the channel has been marked as fully closed, we'll stop
532
        // both the channel arbitrator and chain watcher for this channel if
533
        // they're still active.
534
        var arbLog ArbitratorLog
2✔
535
        c.Lock()
2✔
536
        chainArb := c.activeChannels[chanPoint]
2✔
537
        delete(c.activeChannels, chanPoint)
2✔
538

2✔
539
        chainWatcher := c.activeWatchers[chanPoint]
2✔
540
        delete(c.activeWatchers, chanPoint)
2✔
541
        c.Unlock()
2✔
542

2✔
543
        if chainArb != nil {
3✔
544
                arbLog = chainArb.log
1✔
545

1✔
546
                if err := chainArb.Stop(); err != nil {
1✔
547
                        log.Warnf("unable to stop ChannelArbitrator(%v): %v",
×
548
                                chanPoint, err)
×
549
                }
×
550
        }
551
        if chainWatcher != nil {
3✔
552
                if err := chainWatcher.Stop(); err != nil {
1✔
553
                        log.Warnf("unable to stop ChainWatcher(%v): %v",
×
554
                                chanPoint, err)
×
555
                }
×
556
        }
557

558
        // Once this has been marked as resolved, we'll wipe the log that the
559
        // channel arbitrator was using to store its persistent state. We do
560
        // this after marking the channel resolved, as otherwise, the
561
        // arbitrator would be re-created, and think it was starting from the
562
        // default state.
563
        if arbLog != nil {
3✔
564
                if err := arbLog.WipeHistory(); err != nil {
1✔
565
                        return err
×
566
                }
×
567
        }
568

569
        return nil
2✔
570
}
571

572
// Start launches all goroutines that the ChainArbitrator needs to operate.
573
func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
2✔
574
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
2✔
575
                return nil
×
576
        }
×
577

578
        // Set the current beat.
579
        c.beat = beat
2✔
580

2✔
581
        log.Infof("ChainArbitrator starting at height %d with budget=[%v]",
2✔
582
                &c.cfg.Budget, c.beat.Height())
2✔
583

2✔
584
        // First, we'll fetch all the channels that are still open, in order to
2✔
585
        // collect them within our set of active contracts.
2✔
586
        if err := c.loadOpenChannels(); err != nil {
2✔
UNCOV
587
                return err
×
588
        }
×
589

590
        // In addition to the channels that we know to be open, we'll also
591
        // launch arbitrators to finishing resolving any channels that are in
592
        // the pending close state.
593
        if err := c.loadPendingCloseChannels(); err != nil {
2✔
UNCOV
594
                return err
×
595
        }
×
596

597
        // Now, we'll start all chain watchers in parallel to shorten start up
598
        // duration. In neutrino mode, this allows spend registrations to take
599
        // advantage of batch spend reporting, instead of doing a single rescan
600
        // per chain watcher.
601
        //
602
        // NOTE: After this point, we Stop the chain arb to ensure that any
603
        // lingering goroutines are cleaned up before exiting.
604
        watcherErrs := make(chan error, len(c.activeWatchers))
2✔
605
        var wg sync.WaitGroup
2✔
606
        for _, watcher := range c.activeWatchers {
13✔
607
                wg.Add(1)
11✔
608
                go func(w *chainWatcher) {
22✔
609
                        defer wg.Done()
11✔
610
                        select {
11✔
611
                        case watcherErrs <- w.Start():
11✔
612
                        case <-c.quit:
×
613
                                watcherErrs <- ErrChainArbExiting
×
614
                        }
615
                }(watcher)
616
        }
617

618
        // Once all chain watchers have been started, seal the err chan to
619
        // signal the end of the err stream.
620
        go func() {
4✔
621
                wg.Wait()
2✔
622
                close(watcherErrs)
2✔
623
        }()
2✔
624

625
        // stopAndLog is a helper function which shuts down the chain arb and
626
        // logs errors if they occur.
627
        stopAndLog := func() {
2✔
628
                if err := c.Stop(); err != nil {
×
629
                        log.Errorf("ChainArbitrator could not shutdown: %v", err)
×
630
                }
×
631
        }
632

633
        // Handle all errors returned from spawning our chain watchers. If any
634
        // of them failed, we will stop the chain arb to shutdown any active
635
        // goroutines.
636
        for err := range watcherErrs {
13✔
637
                if err != nil {
11✔
638
                        stopAndLog()
×
639
                        return err
×
640
                }
×
641
        }
642

643
        // Before we start all of our arbitrators, we do a preliminary state
644
        // lookup so that we can combine all of these lookups in a single db
645
        // transaction.
646
        var startStates map[wire.OutPoint]*chanArbStartState
2✔
647

2✔
648
        err := kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
4✔
649
                for _, arbitrator := range c.activeChannels {
13✔
650
                        startState, err := arbitrator.getStartState(tx)
11✔
651
                        if err != nil {
11✔
652
                                return err
×
653
                        }
×
654

655
                        startStates[arbitrator.cfg.ChanPoint] = startState
11✔
656
                }
657

658
                return nil
2✔
659
        }, func() {
2✔
660
                startStates = make(
2✔
661
                        map[wire.OutPoint]*chanArbStartState,
2✔
662
                        len(c.activeChannels),
2✔
663
                )
2✔
664
        })
2✔
665
        if err != nil {
2✔
666
                stopAndLog()
×
667
                return err
×
668
        }
×
669

670
        // Launch all the goroutines for each arbitrator so they can carry out
671
        // their duties.
672
        for _, arbitrator := range c.activeChannels {
13✔
673
                startState, ok := startStates[arbitrator.cfg.ChanPoint]
11✔
674
                if !ok {
11✔
675
                        stopAndLog()
×
676
                        return fmt.Errorf("arbitrator: %v has no start state",
×
677
                                arbitrator.cfg.ChanPoint)
×
678
                }
×
679

680
                if err := arbitrator.Start(startState, c.beat); err != nil {
11✔
681
                        stopAndLog()
×
682
                        return err
×
683
                }
×
684
        }
685

686
        // Start our goroutine which will dispatch blocks to each arbitrator.
687
        c.wg.Add(1)
2✔
688
        go func() {
4✔
689
                defer c.wg.Done()
2✔
690
                c.dispatchBlocks()
2✔
691
        }()
2✔
692

693
        // TODO(roasbeef): eventually move all breach watching here
694

695
        return nil
2✔
696
}
697

698
// dispatchBlocks consumes a block epoch notification stream and dispatches
699
// blocks to each of the chain arb's active channel arbitrators. This function
700
// must be run in a goroutine.
701
func (c *ChainArbitrator) dispatchBlocks() {
2✔
702
        // Consume block epochs until we receive the instruction to shutdown.
2✔
703
        for {
4✔
704
                select {
2✔
705
                // Consume block epochs, exiting if our subscription is
706
                // terminated.
NEW
707
                case beat := <-c.BlockbeatChan:
×
NEW
708
                        // Set the current blockbeat.
×
NEW
709
                        c.beat = beat
×
UNCOV
710

×
NEW
711
                        // Send this blockbeat to all the active channels and
×
NEW
712
                        // wait for them to finish processing it.
×
NEW
713
                        c.handleBlockbeat(beat)
×
714

715
                // Exit if the chain arbitrator is shutting down.
716
                case <-c.quit:
2✔
717
                        return
2✔
718
                }
719
        }
720
}
721

722
// handleBlockbeat sends the blockbeat to all active channel arbitrator in
723
// parallel and wait for them to finish processing it.
NEW
724
func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
×
NEW
725
        // Read the active channels in a lock.
×
NEW
726
        c.Lock()
×
NEW
727

×
NEW
728
        // Create a slice to record active channel arbitrator.
×
NEW
729
        channels := make([]chainio.Consumer, 0, len(c.activeChannels))
×
NEW
730

×
NEW
731
        // Copy the active channels to the slice.
×
NEW
732
        for _, channel := range c.activeChannels {
×
NEW
733
                channels = append(channels, channel)
×
NEW
734
        }
×
735

NEW
736
        c.Unlock()
×
NEW
737

×
NEW
738
        // Iterate all the copied channels and send the blockbeat to them.
×
NEW
739
        //
×
NEW
740
        // NOTE: This method will timeout if the processing of blocks of the
×
NEW
741
        // subsystems is too long (60s).
×
NEW
742
        err := chainio.DispatchConcurrent(beat, channels)
×
NEW
743

×
NEW
744
        // Notify the chain arbitrator has processed the block.
×
NEW
745
        c.NotifyBlockProcessed(beat, err)
×
746
}
747

748
// republishClosingTxs will load any stored cooperative or unilateral closing
749
// transactions and republish them. This helps ensure propagation of the
750
// transactions in the event that prior publications failed.
751
func (c *ChainArbitrator) republishClosingTxs(
752
        channel *channeldb.OpenChannel) error {
11✔
753

11✔
754
        // If the channel has had its unilateral close broadcasted already,
11✔
755
        // republish it in case it didn't propagate.
11✔
756
        if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
16✔
757
                err := c.rebroadcast(
5✔
758
                        channel, channeldb.ChanStatusCommitBroadcasted,
5✔
759
                )
5✔
760
                if err != nil {
5✔
761
                        return err
×
762
                }
×
763
        }
764

765
        // If the channel has had its cooperative close broadcasted
766
        // already, republish it in case it didn't propagate.
767
        if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
16✔
768
                err := c.rebroadcast(
5✔
769
                        channel, channeldb.ChanStatusCoopBroadcasted,
5✔
770
                )
5✔
771
                if err != nil {
5✔
772
                        return err
×
773
                }
×
774
        }
775

776
        return nil
11✔
777
}
778

779
// rebroadcast is a helper method which will republish the unilateral or
780
// cooperative close transaction or a channel in a particular state.
781
//
782
// NOTE: There is no risk to calling this method if the channel isn't in either
783
// CommitmentBroadcasted or CoopBroadcasted, but the logs will be misleading.
784
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
785
        state channeldb.ChannelStatus) error {
10✔
786

10✔
787
        chanPoint := channel.FundingOutpoint
10✔
788

10✔
789
        var (
10✔
790
                closeTx *wire.MsgTx
10✔
791
                kind    string
10✔
792
                err     error
10✔
793
        )
10✔
794
        switch state {
10✔
795
        case channeldb.ChanStatusCommitBroadcasted:
5✔
796
                kind = "force"
5✔
797
                closeTx, err = channel.BroadcastedCommitment()
5✔
798

799
        case channeldb.ChanStatusCoopBroadcasted:
5✔
800
                kind = "coop"
5✔
801
                closeTx, err = channel.BroadcastedCooperative()
5✔
802

803
        default:
×
804
                return fmt.Errorf("unknown closing state: %v", state)
×
805
        }
806

807
        switch {
10✔
808
        // This can happen for channels that had their closing tx published
809
        // before we started storing it to disk.
810
        case err == channeldb.ErrNoCloseTx:
×
811
                log.Warnf("Channel %v is in state %v, but no %s closing tx "+
×
812
                        "to re-publish...", chanPoint, state, kind)
×
813
                return nil
×
814

815
        case err != nil:
×
816
                return err
×
817
        }
818

819
        log.Infof("Re-publishing %s close tx(%v) for channel %v",
10✔
820
                kind, closeTx.TxHash(), chanPoint)
10✔
821

10✔
822
        label := labels.MakeLabel(
10✔
823
                labels.LabelTypeChannelClose, &channel.ShortChannelID,
10✔
824
        )
10✔
825
        err = c.cfg.PublishTx(closeTx, label)
10✔
826
        if err != nil && err != lnwallet.ErrDoubleSpend {
10✔
827
                log.Warnf("Unable to broadcast %s close tx(%v): %v",
×
828
                        kind, closeTx.TxHash(), err)
×
829
        }
×
830

831
        return nil
10✔
832
}
833

834
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
835
// channel arbitrators will be signalled to exit, and this method will block
836
// until they've all exited.
837
func (c *ChainArbitrator) Stop() error {
2✔
838
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
2✔
839
                return nil
×
840
        }
×
841

842
        log.Info("ChainArbitrator shutting down...")
2✔
843
        defer log.Debug("ChainArbitrator shutdown complete")
2✔
844

2✔
845
        close(c.quit)
2✔
846

2✔
847
        var (
2✔
848
                activeWatchers = make(map[wire.OutPoint]*chainWatcher)
2✔
849
                activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
2✔
850
        )
2✔
851

2✔
852
        // Copy the current set of active watchers and arbitrators to shutdown.
2✔
853
        // We don't want to hold the lock when shutting down each watcher or
2✔
854
        // arbitrator individually, as they may need to acquire this mutex.
2✔
855
        c.Lock()
2✔
856
        for chanPoint, watcher := range c.activeWatchers {
12✔
857
                activeWatchers[chanPoint] = watcher
10✔
858
        }
10✔
859
        for chanPoint, arbitrator := range c.activeChannels {
12✔
860
                activeChannels[chanPoint] = arbitrator
10✔
861
        }
10✔
862
        c.Unlock()
2✔
863

2✔
864
        for chanPoint, watcher := range activeWatchers {
12✔
865
                log.Tracef("Attempting to stop ChainWatcher(%v)",
10✔
866
                        chanPoint)
10✔
867

10✔
868
                if err := watcher.Stop(); err != nil {
10✔
869
                        log.Errorf("unable to stop watcher for "+
×
870
                                "ChannelPoint(%v): %v", chanPoint, err)
×
871
                }
×
872
        }
873
        for chanPoint, arbitrator := range activeChannels {
12✔
874
                log.Tracef("Attempting to stop ChannelArbitrator(%v)",
10✔
875
                        chanPoint)
10✔
876

10✔
877
                if err := arbitrator.Stop(); err != nil {
10✔
878
                        log.Errorf("unable to stop arbitrator for "+
×
879
                                "ChannelPoint(%v): %v", chanPoint, err)
×
880
                }
×
881
        }
882

883
        c.wg.Wait()
2✔
884

2✔
885
        return nil
2✔
886
}
887

888
// ContractUpdate is a message packages the latest set of active HTLCs on a
889
// commitment, and also identifies which commitment received a new set of
890
// HTLCs.
891
type ContractUpdate struct {
892
        // HtlcKey identifies which commitment the HTLCs below are present on.
893
        HtlcKey HtlcSetKey
894

895
        // Htlcs are the of active HTLCs on the commitment identified by the
896
        // above HtlcKey.
897
        Htlcs []channeldb.HTLC
898
}
899

900
// ContractSignals is used by outside subsystems to notify a channel arbitrator
901
// of its ShortChannelID.
902
type ContractSignals struct {
903
        // ShortChanID is the up to date short channel ID for a contract. This
904
        // can change either if when the contract was added it didn't yet have
905
        // a stable identifier, or in the case of a reorg.
906
        ShortChanID lnwire.ShortChannelID
907
}
908

909
// UpdateContractSignals sends a set of active, up to date contract signals to
910
// the ChannelArbitrator which is has been assigned to the channel infield by
911
// the passed channel point.
912
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
UNCOV
913
        signals *ContractSignals) error {
×
UNCOV
914

×
UNCOV
915
        log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
×
UNCOV
916
                chanPoint)
×
UNCOV
917

×
UNCOV
918
        c.Lock()
×
UNCOV
919
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
920
        c.Unlock()
×
UNCOV
921
        if !ok {
×
922
                return fmt.Errorf("unable to find arbitrator")
×
923
        }
×
924

UNCOV
925
        arbitrator.UpdateContractSignals(signals)
×
UNCOV
926

×
UNCOV
927
        return nil
×
928
}
929

930
// NotifyContractUpdate lets a channel arbitrator know that a new
931
// ContractUpdate is available. This calls the ChannelArbitrator's internal
932
// method NotifyContractUpdate which waits for a response on a done chan before
933
// returning. This method will return an error if the ChannelArbitrator is not
934
// in the activeChannels map. However, this only happens if the arbitrator is
935
// resolved and the related link would already be shut down.
936
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
UNCOV
937
        update *ContractUpdate) error {
×
UNCOV
938

×
UNCOV
939
        c.Lock()
×
UNCOV
940
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
941
        c.Unlock()
×
UNCOV
942
        if !ok {
×
943
                return fmt.Errorf("can't find arbitrator for %v", chanPoint)
×
944
        }
×
945

UNCOV
946
        arbitrator.notifyContractUpdate(update)
×
UNCOV
947
        return nil
×
948
}
949

950
// GetChannelArbitrator safely returns the channel arbitrator for a given
951
// channel outpoint.
952
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
UNCOV
953
        *ChannelArbitrator, error) {
×
UNCOV
954

×
UNCOV
955
        c.Lock()
×
UNCOV
956
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
957
        c.Unlock()
×
UNCOV
958
        if !ok {
×
959
                return nil, fmt.Errorf("unable to find arbitrator")
×
960
        }
×
961

UNCOV
962
        return arbitrator, nil
×
963
}
964

965
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
966
// that watches a particular channel to broadcast the commitment transaction,
967
// and enter the resolution phase of the channel.
968
type forceCloseReq struct {
969
        // errResp is a channel that will be sent upon either in the case of
970
        // force close success (nil error), or in the case on an error.
971
        //
972
        // NOTE; This channel MUST be buffered.
973
        errResp chan error
974

975
        // closeTx is a channel that carries the transaction which ultimately
976
        // closed out the channel.
977
        closeTx chan *wire.MsgTx
978
}
979

980
// ForceCloseContract attempts to force close the channel infield by the passed
981
// channel point. A force close will immediately terminate the contract,
982
// causing it to enter the resolution phase. If the force close was successful,
983
// then the force close transaction itself will be returned.
984
//
985
// TODO(roasbeef): just return the summary itself?
UNCOV
986
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
×
UNCOV
987
        c.Lock()
×
UNCOV
988
        arbitrator, ok := c.activeChannels[chanPoint]
×
UNCOV
989
        c.Unlock()
×
UNCOV
990
        if !ok {
×
991
                return nil, fmt.Errorf("unable to find arbitrator")
×
992
        }
×
993

UNCOV
994
        log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
×
UNCOV
995

×
UNCOV
996
        // Before closing, we'll attempt to send a disable update for the
×
UNCOV
997
        // channel. We do so before closing the channel as otherwise the current
×
UNCOV
998
        // edge policy won't be retrievable from the graph.
×
UNCOV
999
        if err := c.cfg.DisableChannel(chanPoint); err != nil {
×
UNCOV
1000
                log.Warnf("Unable to disable channel %v on "+
×
UNCOV
1001
                        "close: %v", chanPoint, err)
×
UNCOV
1002
        }
×
1003

UNCOV
1004
        errChan := make(chan error, 1)
×
UNCOV
1005
        respChan := make(chan *wire.MsgTx, 1)
×
UNCOV
1006

×
UNCOV
1007
        // With the channel found, and the request crafted, we'll send over a
×
UNCOV
1008
        // force close request to the arbitrator that watches this channel.
×
UNCOV
1009
        select {
×
1010
        case arbitrator.forceCloseReqs <- &forceCloseReq{
1011
                errResp: errChan,
1012
                closeTx: respChan,
UNCOV
1013
        }:
×
1014
        case <-c.quit:
×
1015
                return nil, ErrChainArbExiting
×
1016
        }
1017

1018
        // We'll await two responses: the error response, and the transaction
1019
        // that closed out the channel.
UNCOV
1020
        select {
×
UNCOV
1021
        case err := <-errChan:
×
UNCOV
1022
                if err != nil {
×
UNCOV
1023
                        return nil, err
×
UNCOV
1024
                }
×
1025
        case <-c.quit:
×
1026
                return nil, ErrChainArbExiting
×
1027
        }
1028

UNCOV
1029
        var closeTx *wire.MsgTx
×
UNCOV
1030
        select {
×
UNCOV
1031
        case closeTx = <-respChan:
×
1032
        case <-c.quit:
×
1033
                return nil, ErrChainArbExiting
×
1034
        }
1035

UNCOV
1036
        return closeTx, nil
×
1037
}
1038

1039
// WatchNewChannel sends the ChainArbitrator a message to create a
1040
// ChannelArbitrator tasked with watching over a new channel. Once a new
1041
// channel has finished its final funding flow, it should be registered with
1042
// the ChainArbitrator so we can properly react to any on-chain events.
UNCOV
1043
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
×
UNCOV
1044
        c.Lock()
×
UNCOV
1045
        defer c.Unlock()
×
UNCOV
1046

×
UNCOV
1047
        chanPoint := newChan.FundingOutpoint
×
UNCOV
1048

×
UNCOV
1049
        log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
×
UNCOV
1050
                chanPoint)
×
UNCOV
1051

×
UNCOV
1052
        // If we're already watching this channel, then we'll ignore this
×
UNCOV
1053
        // request.
×
UNCOV
1054
        if _, ok := c.activeChannels[chanPoint]; ok {
×
1055
                return nil
×
1056
        }
×
1057

1058
        // First, also create an active chainWatcher for this channel to ensure
1059
        // that we detect any relevant on chain events.
UNCOV
1060
        chainWatcher, err := newChainWatcher(
×
UNCOV
1061
                chainWatcherConfig{
×
UNCOV
1062
                        chanState: newChan,
×
UNCOV
1063
                        notifier:  c.cfg.Notifier,
×
UNCOV
1064
                        signer:    c.cfg.Signer,
×
UNCOV
1065
                        isOurAddr: c.cfg.IsOurAddress,
×
UNCOV
1066
                        contractBreach: func(
×
UNCOV
1067
                                retInfo *lnwallet.BreachRetribution) error {
×
UNCOV
1068

×
UNCOV
1069
                                return c.cfg.ContractBreach(
×
UNCOV
1070
                                        chanPoint, retInfo,
×
UNCOV
1071
                                )
×
UNCOV
1072
                        },
×
1073
                        extractStateNumHint: lnwallet.GetStateNumHint,
1074
                        auxLeafStore:        c.cfg.AuxLeafStore,
1075
                        auxResolver:         c.cfg.AuxResolver,
1076
                },
1077
        )
UNCOV
1078
        if err != nil {
×
1079
                return err
×
1080
        }
×
1081

UNCOV
1082
        c.activeWatchers[chanPoint] = chainWatcher
×
UNCOV
1083

×
UNCOV
1084
        // We'll also create a new channel arbitrator instance using this new
×
UNCOV
1085
        // channel, and our internal state.
×
UNCOV
1086
        channelArb, err := newActiveChannelArbitrator(
×
UNCOV
1087
                newChan, c, chainWatcher.SubscribeChannelEvents(),
×
UNCOV
1088
        )
×
UNCOV
1089
        if err != nil {
×
1090
                return err
×
1091
        }
×
1092

1093
        // With the arbitrator created, we'll add it to our set of active
1094
        // arbitrators, then launch it.
UNCOV
1095
        c.activeChannels[chanPoint] = channelArb
×
UNCOV
1096

×
NEW
1097
        if err := channelArb.Start(nil, c.beat); err != nil {
×
1098
                return err
×
1099
        }
×
1100

UNCOV
1101
        return chainWatcher.Start()
×
1102
}
1103

1104
// SubscribeChannelEvents returns a new active subscription for the set of
1105
// possible on-chain events for a particular channel. The struct can be used by
1106
// callers to be notified whenever an event that changes the state of the
1107
// channel on-chain occurs.
1108
func (c *ChainArbitrator) SubscribeChannelEvents(
UNCOV
1109
        chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
×
UNCOV
1110

×
UNCOV
1111
        // First, we'll attempt to look up the active watcher for this channel.
×
UNCOV
1112
        // If we can't find it, then we'll return an error back to the caller.
×
UNCOV
1113
        c.Lock()
×
UNCOV
1114
        watcher, ok := c.activeWatchers[chanPoint]
×
UNCOV
1115
        c.Unlock()
×
UNCOV
1116

×
UNCOV
1117
        if !ok {
×
1118
                return nil, fmt.Errorf("unable to find watcher for: %v",
×
1119
                        chanPoint)
×
1120
        }
×
1121

1122
        // With the watcher located, we'll request for it to create a new chain
1123
        // event subscription client.
UNCOV
1124
        return watcher.SubscribeChannelEvents(), nil
×
1125
}
1126

1127
// FindOutgoingHTLCDeadline returns the deadline in absolute block height for
1128
// the specified outgoing HTLC. For an outgoing HTLC, its deadline is defined
1129
// by the timeout height of its corresponding incoming HTLC - this is the
1130
// expiry height the that remote peer can spend his/her outgoing HTLC via the
1131
// timeout path.
1132
func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
UNCOV
1133
        outgoingHTLC channeldb.HTLC) fn.Option[int32] {
×
UNCOV
1134

×
UNCOV
1135
        // Find the outgoing HTLC's corresponding incoming HTLC in the circuit
×
UNCOV
1136
        // map.
×
UNCOV
1137
        rHash := outgoingHTLC.RHash
×
UNCOV
1138
        circuit := models.CircuitKey{
×
UNCOV
1139
                ChanID: scid,
×
UNCOV
1140
                HtlcID: outgoingHTLC.HtlcIndex,
×
UNCOV
1141
        }
×
UNCOV
1142
        incomingCircuit := c.cfg.QueryIncomingCircuit(circuit)
×
UNCOV
1143

×
UNCOV
1144
        // If there's no incoming circuit found, we will use the default
×
UNCOV
1145
        // deadline.
×
UNCOV
1146
        if incomingCircuit == nil {
×
UNCOV
1147
                log.Warnf("ChannelArbitrator(%v): incoming circuit key not "+
×
UNCOV
1148
                        "found for rHash=%x, using default deadline instead",
×
UNCOV
1149
                        scid, rHash)
×
UNCOV
1150

×
UNCOV
1151
                return fn.None[int32]()
×
UNCOV
1152
        }
×
1153

1154
        // If this is a locally initiated HTLC, it means we are the first hop.
1155
        // In this case, we can relax the deadline.
UNCOV
1156
        if incomingCircuit.ChanID.IsDefault() {
×
UNCOV
1157
                log.Infof("ChannelArbitrator(%v): using default deadline for "+
×
UNCOV
1158
                        "locally initiated HTLC for rHash=%x", scid, rHash)
×
UNCOV
1159

×
UNCOV
1160
                return fn.None[int32]()
×
UNCOV
1161
        }
×
1162

UNCOV
1163
        log.Debugf("Found incoming circuit %v for rHash=%x using outgoing "+
×
UNCOV
1164
                "circuit %v", incomingCircuit, rHash, circuit)
×
UNCOV
1165

×
UNCOV
1166
        c.Lock()
×
UNCOV
1167
        defer c.Unlock()
×
UNCOV
1168

×
UNCOV
1169
        // Iterate over all active channels to find the incoming HTLC specified
×
UNCOV
1170
        // by its circuit key.
×
UNCOV
1171
        for cp, channelArb := range c.activeChannels {
×
UNCOV
1172
                // Skip if the SCID doesn't match.
×
UNCOV
1173
                if channelArb.cfg.ShortChanID != incomingCircuit.ChanID {
×
UNCOV
1174
                        continue
×
1175
                }
1176

1177
                // Make sure the channel arbitrator has the latest view of its
1178
                // active HTLCs.
UNCOV
1179
                channelArb.updateActiveHTLCs()
×
UNCOV
1180

×
UNCOV
1181
                // Iterate all the known HTLCs to find the targeted incoming
×
UNCOV
1182
                // HTLC.
×
UNCOV
1183
                for _, htlcs := range channelArb.activeHTLCs {
×
UNCOV
1184
                        for _, htlc := range htlcs.incomingHTLCs {
×
UNCOV
1185
                                // Skip if the index doesn't match.
×
UNCOV
1186
                                if htlc.HtlcIndex != incomingCircuit.HtlcID {
×
UNCOV
1187
                                        continue
×
1188
                                }
1189

UNCOV
1190
                                log.Debugf("ChannelArbitrator(%v): found "+
×
UNCOV
1191
                                        "incoming HTLC in channel=%v using "+
×
UNCOV
1192
                                        "rHash=%x, refundTimeout=%v", scid,
×
UNCOV
1193
                                        cp, rHash, htlc.RefundTimeout)
×
UNCOV
1194

×
UNCOV
1195
                                return fn.Some(int32(htlc.RefundTimeout))
×
1196
                        }
1197
                }
1198
        }
1199

1200
        // If there's no incoming HTLC found, yet we have the incoming circuit,
1201
        // something is wrong - in this case, we return the none deadline.
UNCOV
1202
        log.Errorf("ChannelArbitrator(%v): incoming HTLC not found for "+
×
UNCOV
1203
                "rHash=%x, using default deadline instead", scid, rHash)
×
UNCOV
1204

×
UNCOV
1205
        return fn.None[int32]()
×
1206
}
1207

1208
// TODO(roasbeef): arbitration reports
1209
//  * types: contested, waiting for success conf, etc
1210

1211
// NOTE: part of the `chainio.Consumer` interface.
1212
func (c *ChainArbitrator) Name() string {
2✔
1213
        return "ChainArbitrator"
2✔
1214
}
2✔
1215

1216
// loadOpenChannels loads all channels that are currently open in the database
1217
// and registers them with the chainWatcher for future notification.
1218
func (c *ChainArbitrator) loadOpenChannels() error {
2✔
1219
        openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
2✔
1220
        if err != nil {
2✔
NEW
1221
                return err
×
NEW
1222
        }
×
1223

1224
        if len(openChannels) == 0 {
2✔
NEW
1225
                return nil
×
NEW
1226
        }
×
1227

1228
        log.Infof("Creating ChannelArbitrators for %v active channels",
2✔
1229
                len(openChannels))
2✔
1230

2✔
1231
        // For each open channel, we'll configure then launch a corresponding
2✔
1232
        // ChannelArbitrator.
2✔
1233
        for _, channel := range openChannels {
13✔
1234
                chanPoint := channel.FundingOutpoint
11✔
1235
                channel := channel
11✔
1236

11✔
1237
                // First, we'll create an active chainWatcher for this channel
11✔
1238
                // to ensure that we detect any relevant on chain events.
11✔
1239
                breachClosure := func(ret *lnwallet.BreachRetribution) error {
11✔
NEW
1240
                        return c.cfg.ContractBreach(chanPoint, ret)
×
NEW
1241
                }
×
1242

1243
                chainWatcher, err := newChainWatcher(
11✔
1244
                        chainWatcherConfig{
11✔
1245
                                chanState:           channel,
11✔
1246
                                notifier:            c.cfg.Notifier,
11✔
1247
                                signer:              c.cfg.Signer,
11✔
1248
                                isOurAddr:           c.cfg.IsOurAddress,
11✔
1249
                                contractBreach:      breachClosure,
11✔
1250
                                extractStateNumHint: lnwallet.GetStateNumHint,
11✔
1251
                                auxLeafStore:        c.cfg.AuxLeafStore,
11✔
1252
                                auxResolver:         c.cfg.AuxResolver,
11✔
1253
                        },
11✔
1254
                )
11✔
1255
                if err != nil {
11✔
NEW
1256
                        return err
×
NEW
1257
                }
×
1258

1259
                c.activeWatchers[chanPoint] = chainWatcher
11✔
1260
                channelArb, err := newActiveChannelArbitrator(
11✔
1261
                        channel, c, chainWatcher.SubscribeChannelEvents(),
11✔
1262
                )
11✔
1263
                if err != nil {
11✔
NEW
1264
                        return err
×
NEW
1265
                }
×
1266

1267
                c.activeChannels[chanPoint] = channelArb
11✔
1268

11✔
1269
                // Republish any closing transactions for this channel.
11✔
1270
                err = c.republishClosingTxs(channel)
11✔
1271
                if err != nil {
11✔
NEW
1272
                        log.Errorf("Failed to republish closing txs for "+
×
NEW
1273
                                "channel %v", chanPoint)
×
NEW
1274
                }
×
1275
        }
1276

1277
        return nil
2✔
1278
}
1279

1280
// loadPendingCloseChannels loads all channels that are currently pending
1281
// closure in the database and registers them with the ChannelArbitrator to
1282
// continue the resolution process.
1283
func (c *ChainArbitrator) loadPendingCloseChannels() error {
2✔
1284
        chanStateDB := c.chanSource.ChannelStateDB()
2✔
1285

2✔
1286
        closingChannels, err := chanStateDB.FetchClosedChannels(true)
2✔
1287
        if err != nil {
2✔
NEW
1288
                return err
×
NEW
1289
        }
×
1290

1291
        if len(closingChannels) == 0 {
4✔
1292
                return nil
2✔
1293
        }
2✔
1294

NEW
1295
        log.Infof("Creating ChannelArbitrators for %v closing channels",
×
NEW
1296
                len(closingChannels))
×
NEW
1297

×
NEW
1298
        // Next, for each channel is the closing state, we'll launch a
×
NEW
1299
        // corresponding more restricted resolver, as we don't have to watch
×
NEW
1300
        // the chain any longer, only resolve the contracts on the confirmed
×
NEW
1301
        // commitment.
×
NEW
1302
        //nolint:ll
×
NEW
1303
        for _, closeChanInfo := range closingChannels {
×
NEW
1304
                // We can leave off the CloseContract and ForceCloseChan
×
NEW
1305
                // methods as the channel is already closed at this point.
×
NEW
1306
                chanPoint := closeChanInfo.ChanPoint
×
NEW
1307
                arbCfg := ChannelArbitratorConfig{
×
NEW
1308
                        ChanPoint:             chanPoint,
×
NEW
1309
                        ShortChanID:           closeChanInfo.ShortChanID,
×
NEW
1310
                        ChainArbitratorConfig: c.cfg,
×
NEW
1311
                        ChainEvents:           &ChainEventSubscription{},
×
NEW
1312
                        IsPendingClose:        true,
×
NEW
1313
                        ClosingHeight:         closeChanInfo.CloseHeight,
×
NEW
1314
                        CloseType:             closeChanInfo.CloseType,
×
NEW
1315
                        PutResolverReport: func(tx kvdb.RwTx,
×
NEW
1316
                                report *channeldb.ResolverReport) error {
×
NEW
1317

×
NEW
1318
                                return c.chanSource.PutResolverReport(
×
NEW
1319
                                        tx, c.cfg.ChainHash, &chanPoint, report,
×
NEW
1320
                                )
×
NEW
1321
                        },
×
NEW
1322
                        FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
×
NEW
1323
                                return chanStateDB.FetchHistoricalChannel(&chanPoint)
×
NEW
1324
                        },
×
1325
                        FindOutgoingHTLCDeadline: func(
NEW
1326
                                htlc channeldb.HTLC) fn.Option[int32] {
×
NEW
1327

×
NEW
1328
                                return c.FindOutgoingHTLCDeadline(
×
NEW
1329
                                        closeChanInfo.ShortChanID, htlc,
×
NEW
1330
                                )
×
NEW
1331
                        },
×
1332
                }
NEW
1333
                chanLog, err := newBoltArbitratorLog(
×
NEW
1334
                        c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
×
NEW
1335
                )
×
NEW
1336
                if err != nil {
×
NEW
1337
                        return err
×
NEW
1338
                }
×
NEW
1339
                arbCfg.MarkChannelResolved = func() error {
×
NEW
1340
                        if c.cfg.NotifyFullyResolvedChannel != nil {
×
NEW
1341
                                c.cfg.NotifyFullyResolvedChannel(chanPoint)
×
NEW
1342
                        }
×
1343

NEW
1344
                        return c.ResolveContract(chanPoint)
×
1345
                }
1346

1347
                // We create an empty map of HTLC's here since it's possible
1348
                // that the channel is in StateDefault and updateActiveHTLCs is
1349
                // called. We want to avoid writing to an empty map. Since the
1350
                // channel is already in the process of being resolved, no new
1351
                // HTLCs will be added.
NEW
1352
                c.activeChannels[chanPoint] = NewChannelArbitrator(
×
NEW
1353
                        arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
×
NEW
1354
                )
×
1355
        }
1356

NEW
1357
        return nil
×
1358
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc