• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16811814134

07 Aug 2025 05:46PM UTC coverage: 57.463% (-9.5%) from 66.947%
16811814134

Pull #9844

github

web-flow
Merge 4b08ee16d into 2269859d9
Pull Request #9844: Refactor Payment PR 3

434 of 645 new or added lines in 17 files covered. (67.29%)

28260 existing lines in 457 files now uncovered.

99053 of 172378 relevant lines covered (57.46%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.25
/contractcourt/chain_arbitrator.go
1
package contractcourt
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/chainio"
15
        "github.com/lightningnetwork/lnd/chainntnfs"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
        "github.com/lightningnetwork/lnd/clock"
18
        "github.com/lightningnetwork/lnd/fn/v2"
19
        "github.com/lightningnetwork/lnd/graph/db/models"
20
        "github.com/lightningnetwork/lnd/input"
21
        "github.com/lightningnetwork/lnd/kvdb"
22
        "github.com/lightningnetwork/lnd/labels"
23
        "github.com/lightningnetwork/lnd/lnwallet"
24
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
25
        "github.com/lightningnetwork/lnd/lnwire"
26
)
27

28
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
29
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
30

31
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
32
// outgoing contract has been fully resolved. For multi-hop contracts, if we
33
// resolve the outgoing contract, we'll also need to ensure that the incoming
34
// contract is resolved as well. We package the items required to resolve the
35
// incoming contracts within this message.
36
type ResolutionMsg struct {
37
        // SourceChan identifies the channel that this message is being sent
38
        // from. This is the channel's short channel ID.
39
        SourceChan lnwire.ShortChannelID
40

41
        // HtlcIndex is the index of the contract within the original
42
        // commitment trace.
43
        HtlcIndex uint64
44

45
        // Failure will be non-nil if the incoming contract should be canceled
46
        // all together. This can happen if the outgoing contract was dust, if
47
        // if the outgoing HTLC timed out.
48
        Failure lnwire.FailureMessage
49

50
        // PreImage will be non-nil if the incoming contract can successfully
51
        // be redeemed. This can happen if we learn of the preimage from the
52
        // outgoing HTLC on-chain.
53
        PreImage *[32]byte
54
}
55

56
// ChainArbitratorConfig is a configuration struct that contains all the
57
// function closures and interface that required to arbitrate on-chain
58
// contracts for a particular chain.
59
type ChainArbitratorConfig struct {
60
        // ChainHash is the chain that this arbitrator is to operate within.
61
        ChainHash chainhash.Hash
62

63
        // IncomingBroadcastDelta is the delta that we'll use to decide when to
64
        // broadcast our commitment transaction if we have incoming htlcs. This
65
        // value should be set based on our current fee estimation of the
66
        // commitment transaction. We use this to determine when we should
67
        // broadcast instead of just the HTLC timeout, as we want to ensure
68
        // that the commitment transaction is already confirmed, by the time the
69
        // HTLC expires. Otherwise we may end up not settling the htlc on-chain
70
        // because the other party managed to time it out.
71
        IncomingBroadcastDelta uint32
72

73
        // OutgoingBroadcastDelta is the delta that we'll use to decide when to
74
        // broadcast our commitment transaction if there are active outgoing
75
        // htlcs. This value can be lower than the incoming broadcast delta.
76
        OutgoingBroadcastDelta uint32
77

78
        // NewSweepAddr is a function that returns a new address under control
79
        // by the wallet. We'll use this to sweep any no-delay outputs as a
80
        // result of unilateral channel closes.
81
        //
82
        // NOTE: This SHOULD return a p2wkh script.
83
        NewSweepAddr func() ([]byte, error)
84

85
        // PublishTx reliably broadcasts a transaction to the network. Once
86
        // this function exits without an error, then they transaction MUST
87
        // continually be rebroadcast if needed.
88
        PublishTx func(*wire.MsgTx, string) error
89

90
        // DeliverResolutionMsg is a function that will append an outgoing
91
        // message to the "out box" for a ChannelLink. This is used to cancel
92
        // backwards any HTLC's that are either dust, we're timing out, or
93
        // settling on-chain to the incoming link.
94
        DeliverResolutionMsg func(...ResolutionMsg) error
95

96
        // MarkLinkInactive is a function closure that the ChainArbitrator will
97
        // use to mark that active HTLC's shouldn't be attempted to be routed
98
        // over a particular channel. This function will be called in that a
99
        // ChannelArbitrator decides that it needs to go to chain in order to
100
        // resolve contracts.
101
        //
102
        // TODO(roasbeef): rename, routing based
103
        MarkLinkInactive func(wire.OutPoint) error
104

105
        // ContractBreach is a function closure that the ChainArbitrator will
106
        // use to notify the BreachArbitrator about a contract breach. It should
107
        // only return a non-nil error when the BreachArbitrator has preserved
108
        // the necessary breach info for this channel point. Once the breach
109
        // resolution is persisted in the ChannelArbitrator, it will be safe
110
        // to mark the channel closed.
111
        ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
112

113
        // IsOurAddress is a function that returns true if the passed address
114
        // is known to the underlying wallet. Otherwise, false should be
115
        // returned.
116
        IsOurAddress func(btcutil.Address) bool
117

118
        // IncubateOutputs sends either an incoming HTLC, an outgoing HTLC, or
119
        // both to the utxo nursery. Once this function returns, the nursery
120
        // should have safely persisted the outputs to disk, and should start
121
        // the process of incubation. This is used when a resolver wishes to
122
        // pass off the output to the nursery as we're only waiting on an
123
        // absolute/relative item block.
124
        IncubateOutputs func(wire.OutPoint,
125
                fn.Option[lnwallet.OutgoingHtlcResolution],
126
                fn.Option[lnwallet.IncomingHtlcResolution],
127
                uint32, fn.Option[int32]) error
128

129
        // PreimageDB is a global store of all known pre-images. We'll use this
130
        // to decide if we should broadcast a commitment transaction to claim
131
        // an HTLC on-chain.
132
        PreimageDB WitnessBeacon
133

134
        // Notifier is an instance of a chain notifier we'll use to watch for
135
        // certain on-chain events.
136
        Notifier chainntnfs.ChainNotifier
137

138
        // Mempool is the a mempool watcher that allows us to watch for events
139
        // happened in mempool.
140
        Mempool chainntnfs.MempoolWatcher
141

142
        // Signer is a signer backed by the active lnd node. This should be
143
        // capable of producing a signature as specified by a valid
144
        // SignDescriptor.
145
        Signer input.Signer
146

147
        // FeeEstimator will be used to return fee estimates.
148
        FeeEstimator chainfee.Estimator
149

150
        // ChainIO allows us to query the state of the current main chain.
151
        ChainIO lnwallet.BlockChainIO
152

153
        // DisableChannel disables a channel, resulting in it not being able to
154
        // forward payments.
155
        DisableChannel func(wire.OutPoint) error
156

157
        // Sweeper allows resolvers to sweep their final outputs.
158
        Sweeper UtxoSweeper
159

160
        // Registry is the invoice database that is used by resolvers to lookup
161
        // preimages and settle invoices.
162
        Registry Registry
163

164
        // NotifyClosedChannel is a function closure that the ChainArbitrator
165
        // will use to notify the ChannelNotifier about a newly closed channel.
166
        NotifyClosedChannel func(wire.OutPoint)
167

168
        // NotifyFullyResolvedChannel is a function closure that the
169
        // ChainArbitrator will use to notify the ChannelNotifier about a newly
170
        // resolved channel. The main difference to NotifyClosedChannel is that
171
        // in case of a local force close the NotifyClosedChannel is called when
172
        // the published commitment transaction confirms while
173
        // NotifyFullyResolvedChannel is only called when the channel is fully
174
        // resolved (which includes sweeping any time locked funds).
175
        NotifyFullyResolvedChannel func(point wire.OutPoint)
176

177
        // OnionProcessor is used to decode onion payloads for on-chain
178
        // resolution.
179
        OnionProcessor OnionProcessor
180

181
        // PaymentsExpirationGracePeriod indicates a time window we let the
182
        // other node to cancel an outgoing htlc that our node has initiated and
183
        // has timed out.
184
        PaymentsExpirationGracePeriod time.Duration
185

186
        // IsForwardedHTLC checks for a given htlc, identified by channel id and
187
        // htlcIndex, if it is a forwarded one.
188
        IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
189

190
        // Clock is the clock implementation that ChannelArbitrator uses.
191
        // It is useful for testing.
192
        Clock clock.Clock
193

194
        // SubscribeBreachComplete is used by the breachResolver to register a
195
        // subscription that notifies when the breach resolution process is
196
        // complete.
197
        SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
198
                bool, error)
199

200
        // PutFinalHtlcOutcome stores the final outcome of an htlc in the
201
        // database.
202
        PutFinalHtlcOutcome func(chanId lnwire.ShortChannelID,
203
                htlcId uint64, settled bool) error
204

205
        // HtlcNotifier is an interface that htlc events are sent to.
206
        HtlcNotifier HtlcNotifier
207

208
        // Budget is the configured budget for the arbitrator.
209
        Budget BudgetConfig
210

211
        // QueryIncomingCircuit is used to find the outgoing HTLC's
212
        // corresponding incoming HTLC circuit. It queries the circuit map for
213
        // a given outgoing circuit key and returns the incoming circuit key.
214
        //
215
        // TODO(yy): this is a hacky way to get around the cycling import issue
216
        // as we cannot import `htlcswitch` here. A proper way is to define an
217
        // interface here that asks for method `LookupOpenCircuit`,
218
        // meanwhile, turn `PaymentCircuit` into an interface or bring it to a
219
        // lower package.
220
        QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey
221

222
        // AuxLeafStore is an optional store that can be used to store auxiliary
223
        // leaves for certain custom channel types.
224
        AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
225

226
        // AuxSigner is an optional signer that can be used to sign auxiliary
227
        // leaves for certain custom channel types.
228
        AuxSigner fn.Option[lnwallet.AuxSigner]
229

230
        // AuxResolver is an optional interface that can be used to modify the
231
        // way contracts are resolved.
232
        AuxResolver fn.Option[lnwallet.AuxContractResolver]
233
}
234

235
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
236
// active, and channel that are in the "pending close" state. Within the
237
// contractcourt package, the ChainArbitrator manages a set of active
238
// ContractArbitrators. Each ContractArbitrators is responsible for watching
239
// the chain for any activity that affects the state of the channel, and also
240
// for monitoring each contract in order to determine if any on-chain activity is
241
// required. Outside sub-systems interact with the ChainArbitrator in order to
242
// forcibly exit a contract, update the set of live signals for each contract,
243
// and to receive reports on the state of contract resolution.
244
type ChainArbitrator struct {
245
        started int32 // To be used atomically.
246
        stopped int32 // To be used atomically.
247

248
        // Embed the blockbeat consumer struct to get access to the method
249
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
250
        chainio.BeatConsumer
251

252
        sync.Mutex
253

254
        // activeChannels is a map of all the active contracts that are still
255
        // open, and not fully resolved.
256
        activeChannels map[wire.OutPoint]*ChannelArbitrator
257

258
        // activeWatchers is a map of all the active chainWatchers for channels
259
        // that are still considered open.
260
        activeWatchers map[wire.OutPoint]*chainWatcher
261

262
        // cfg is the config struct for the arbitrator that contains all
263
        // methods and interface it needs to operate.
264
        cfg ChainArbitratorConfig
265

266
        // chanSource will be used by the ChainArbitrator to fetch all the
267
        // active channels that it must still watch over.
268
        chanSource *channeldb.DB
269

270
        // beat is the current best known blockbeat.
271
        beat chainio.Blockbeat
272

273
        // resolvedChan is used to signal that the given channel outpoint has
274
        // been resolved onchain. Once received, chain arbitrator will perform
275
        // cleanups.
276
        resolvedChan chan wire.OutPoint
277

278
        quit chan struct{}
279

280
        wg sync.WaitGroup
281
}
282

283
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
284
// passed config struct, and backing persistent database.
285
func NewChainArbitrator(cfg ChainArbitratorConfig,
286
        db *channeldb.DB) *ChainArbitrator {
3✔
287

3✔
288
        c := &ChainArbitrator{
3✔
289
                cfg:            cfg,
3✔
290
                activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
3✔
291
                activeWatchers: make(map[wire.OutPoint]*chainWatcher),
3✔
292
                chanSource:     db,
3✔
293
                quit:           make(chan struct{}),
3✔
294
                resolvedChan:   make(chan wire.OutPoint),
3✔
295
        }
3✔
296

3✔
297
        // Mount the block consumer.
3✔
298
        c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())
3✔
299

3✔
300
        return c
3✔
301
}
3✔
302

303
// Compile-time check for the chainio.Consumer interface.
304
var _ chainio.Consumer = (*ChainArbitrator)(nil)
305

306
// arbChannel is a wrapper around an open channel that channel arbitrators
307
// interact with.
308
type arbChannel struct {
309
        // channel is the in-memory channel state.
310
        channel *channeldb.OpenChannel
311

312
        // c references the chain arbitrator and is used by arbChannel
313
        // internally.
314
        c *ChainArbitrator
315
}
316

317
// NewAnchorResolutions returns the anchor resolutions for currently valid
318
// commitment transactions.
319
//
320
// NOTE: Part of the ArbChannel interface.
321
func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
322
        error) {
3✔
323

3✔
324
        // Get a fresh copy of the database state to base the anchor resolutions
3✔
325
        // on. Unfortunately the channel instance that we have here isn't the
3✔
326
        // same instance that is used by the link.
3✔
327
        chanPoint := a.channel.FundingOutpoint
3✔
328

3✔
329
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
3✔
330
        if err != nil {
3✔
331
                return nil, err
×
332
        }
×
333

334
        var chanOpts []lnwallet.ChannelOpt
3✔
335
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
3✔
336
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
337
        })
×
338
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
3✔
339
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
340
        })
×
341
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
3✔
342
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
343
        })
×
344

345
        chanMachine, err := lnwallet.NewLightningChannel(
3✔
346
                a.c.cfg.Signer, channel, nil, chanOpts...,
3✔
347
        )
3✔
348
        if err != nil {
3✔
349
                return nil, err
×
350
        }
×
351

352
        return chanMachine.NewAnchorResolutions()
3✔
353
}
354

355
// ForceCloseChan should force close the contract that this attendant is
356
// watching over. We'll use this when we decide that we need to go to chain. It
357
// should in addition tell the switch to remove the corresponding link, such
358
// that we won't accept any new updates.
359
//
360
// NOTE: Part of the ArbChannel interface.
361
func (a *arbChannel) ForceCloseChan() (*wire.MsgTx, error) {
3✔
362
        // First, we mark the channel as borked, this ensure
3✔
363
        // that no new state transitions can happen, and also
3✔
364
        // that the link won't be loaded into the switch.
3✔
365
        if err := a.channel.MarkBorked(); err != nil {
3✔
366
                return nil, err
×
367
        }
×
368

369
        // With the channel marked as borked, we'll now remove
370
        // the link from the switch if its there. If the link
371
        // is active, then this method will block until it
372
        // exits.
373
        chanPoint := a.channel.FundingOutpoint
3✔
374

3✔
375
        if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
3✔
376
                log.Errorf("unable to mark link inactive: %v", err)
×
377
        }
×
378

379
        // Now that we know the link can't mutate the channel
380
        // state, we'll read the channel from disk the target
381
        // channel according to its channel point.
382
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
3✔
383
        if err != nil {
3✔
384
                return nil, err
×
385
        }
×
386

387
        var chanOpts []lnwallet.ChannelOpt
3✔
388
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
3✔
389
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
390
        })
×
391
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
3✔
392
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
393
        })
×
394
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
3✔
395
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
396
        })
×
397

398
        // Finally, we'll force close the channel completing
399
        // the force close workflow.
400
        chanMachine, err := lnwallet.NewLightningChannel(
3✔
401
                a.c.cfg.Signer, channel, nil, chanOpts...,
3✔
402
        )
3✔
403
        if err != nil {
3✔
404
                return nil, err
×
405
        }
×
406

407
        closeSummary, err := chanMachine.ForceClose(
3✔
408
                lnwallet.WithSkipContractResolutions(),
3✔
409
        )
3✔
410
        if err != nil {
3✔
411
                return nil, err
×
412
        }
×
413

414
        return closeSummary.CloseTx, nil
3✔
415
}
416

417
// newActiveChannelArbitrator creates a new instance of an active channel
418
// arbitrator given the state of the target channel.
419
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
420
        c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
3✔
421

3✔
422
        // TODO(roasbeef): fetch best height (or pass in) so can ensure block
3✔
423
        // epoch delivers all the notifications to
3✔
424

3✔
425
        chanPoint := channel.FundingOutpoint
3✔
426

3✔
427
        log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", chanPoint)
3✔
428

3✔
429
        // Next we'll create the matching configuration struct that contains
3✔
430
        // all interfaces and methods the arbitrator needs to do its job.
3✔
431
        arbCfg := ChannelArbitratorConfig{
3✔
432
                ChanPoint:   chanPoint,
3✔
433
                Channel:     c.getArbChannel(channel),
3✔
434
                ShortChanID: channel.ShortChanID(),
3✔
435

3✔
436
                MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
3✔
437
                MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
3✔
438
                        statuses ...channeldb.ChannelStatus) error {
6✔
439

3✔
440
                        err := channel.CloseChannel(summary, statuses...)
3✔
441
                        if err != nil {
3✔
442
                                return err
×
443
                        }
×
444
                        c.cfg.NotifyClosedChannel(summary.ChanPoint)
3✔
445
                        return nil
3✔
446
                },
447
                IsPendingClose:        false,
448
                ChainArbitratorConfig: c.cfg,
449
                ChainEvents:           chanEvents,
450
                PutResolverReport: func(tx kvdb.RwTx,
451
                        report *channeldb.ResolverReport) error {
3✔
452

3✔
453
                        return c.chanSource.PutResolverReport(
3✔
454
                                tx, c.cfg.ChainHash, &chanPoint, report,
3✔
455
                        )
3✔
456
                },
3✔
457
                FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
3✔
458
                        chanStateDB := c.chanSource.ChannelStateDB()
3✔
459
                        return chanStateDB.FetchHistoricalChannel(&chanPoint)
3✔
460
                },
3✔
461
                FindOutgoingHTLCDeadline: func(
462
                        htlc channeldb.HTLC) fn.Option[int32] {
3✔
463

3✔
464
                        return c.FindOutgoingHTLCDeadline(
3✔
465
                                channel.ShortChanID(), htlc,
3✔
466
                        )
3✔
467
                },
3✔
468
                NotifyChannelResolved: func() {
3✔
469
                        c.notifyChannelResolved(chanPoint)
3✔
470
                },
3✔
471
        }
472

473
        // The final component needed is an arbitrator log that the arbitrator
474
        // will use to keep track of its internal state using a backed
475
        // persistent log.
476
        //
477
        // TODO(roasbeef); abstraction leak...
478
        //  * rework: adaptor method to set log scope w/ factory func
479
        chanLog, err := newBoltArbitratorLog(
3✔
480
                c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
3✔
481
        )
3✔
482
        if err != nil {
3✔
483
                return nil, err
×
484
        }
×
485

486
        // Finally, we'll need to construct a series of htlc Sets based on all
487
        // currently known valid commitments.
488
        htlcSets := make(map[HtlcSetKey]htlcSet)
3✔
489
        htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
3✔
490
        htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
3✔
491

3✔
492
        pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
3✔
493
        if err != nil && err != channeldb.ErrNoPendingCommit {
3✔
494
                return nil, err
×
495
        }
×
496
        if pendingRemoteCommitment != nil {
3✔
497
                htlcSets[RemotePendingHtlcSet] = newHtlcSet(
×
498
                        pendingRemoteCommitment.Commitment.Htlcs,
×
499
                )
×
500
        }
×
501

502
        return NewChannelArbitrator(
3✔
503
                arbCfg, htlcSets, chanLog,
3✔
504
        ), nil
3✔
505
}
506

507
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
508
func (c *ChainArbitrator) getArbChannel(
509
        channel *channeldb.OpenChannel) *arbChannel {
3✔
510

3✔
511
        return &arbChannel{
3✔
512
                channel: channel,
3✔
513
                c:       c,
3✔
514
        }
3✔
515
}
3✔
516

517
// ResolveContract marks a contract as fully resolved within the database.
518
// This is only to be done once all contracts which were live on the channel
519
// before hitting the chain have been resolved.
520
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
3✔
521
        log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
3✔
522

3✔
523
        // First, we'll we'll mark the channel as fully closed from the PoV of
3✔
524
        // the channel source.
3✔
525
        err := c.chanSource.ChannelStateDB().MarkChanFullyClosed(&chanPoint)
3✔
526
        if err != nil {
3✔
527
                log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
×
528
                        "fully closed: %v", chanPoint, err)
×
529
                return err
×
530
        }
×
531

532
        // Now that the channel has been marked as fully closed, we'll stop
533
        // both the channel arbitrator and chain watcher for this channel if
534
        // they're still active.
535
        var arbLog ArbitratorLog
3✔
536
        c.Lock()
3✔
537
        chainArb := c.activeChannels[chanPoint]
3✔
538
        delete(c.activeChannels, chanPoint)
3✔
539

3✔
540
        chainWatcher := c.activeWatchers[chanPoint]
3✔
541
        delete(c.activeWatchers, chanPoint)
3✔
542
        c.Unlock()
3✔
543

3✔
544
        if chainArb != nil {
6✔
545
                arbLog = chainArb.log
3✔
546

3✔
547
                if err := chainArb.Stop(); err != nil {
3✔
548
                        log.Warnf("unable to stop ChannelArbitrator(%v): %v",
×
549
                                chanPoint, err)
×
550
                }
×
551
        }
552
        if chainWatcher != nil {
6✔
553
                if err := chainWatcher.Stop(); err != nil {
3✔
554
                        log.Warnf("unable to stop ChainWatcher(%v): %v",
×
555
                                chanPoint, err)
×
556
                }
×
557
        }
558

559
        // Once this has been marked as resolved, we'll wipe the log that the
560
        // channel arbitrator was using to store its persistent state. We do
561
        // this after marking the channel resolved, as otherwise, the
562
        // arbitrator would be re-created, and think it was starting from the
563
        // default state.
564
        if arbLog != nil {
6✔
565
                if err := arbLog.WipeHistory(); err != nil {
3✔
566
                        return err
×
567
                }
×
568
        }
569

570
        return nil
3✔
571
}
572

573
// Start launches all goroutines that the ChainArbitrator needs to operate.
574
func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
3✔
575
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
3✔
576
                return nil
×
577
        }
×
578

579
        // Set the current beat.
580
        c.beat = beat
3✔
581

3✔
582
        // Start the goroutine which listens for signals to mark the channel as
3✔
583
        // resolved.
3✔
584
        //
3✔
585
        // NOTE: We must start this goroutine here we won't block the following
3✔
586
        // channel loading.
3✔
587
        c.wg.Add(1)
3✔
588
        go func() {
6✔
589
                defer c.wg.Done()
3✔
590
                c.resolveContracts()
3✔
591
        }()
3✔
592

593
        // First, we'll fetch all the channels that are still open, in order to
594
        // collect them within our set of active contracts.
595
        if err := c.loadOpenChannels(); err != nil {
3✔
596
                return err
×
597
        }
×
598

599
        // In addition to the channels that we know to be open, we'll also
600
        // launch arbitrators to finishing resolving any channels that are in
601
        // the pending close state.
602
        if err := c.loadPendingCloseChannels(); err != nil {
3✔
603
                return err
×
604
        }
×
605

606
        // Now, we'll start all chain watchers in parallel to shorten start up
607
        // duration. In neutrino mode, this allows spend registrations to take
608
        // advantage of batch spend reporting, instead of doing a single rescan
609
        // per chain watcher.
610
        //
611
        // NOTE: After this point, we Stop the chain arb to ensure that any
612
        // lingering goroutines are cleaned up before exiting.
613
        watcherErrs := make(chan error, len(c.activeWatchers))
3✔
614
        var wg sync.WaitGroup
3✔
615
        for _, watcher := range c.activeWatchers {
6✔
616
                wg.Add(1)
3✔
617
                go func(w *chainWatcher) {
6✔
618
                        defer wg.Done()
3✔
619
                        select {
3✔
620
                        case watcherErrs <- w.Start():
3✔
621
                        case <-c.quit:
×
622
                                watcherErrs <- ErrChainArbExiting
×
623
                        }
624
                }(watcher)
625
        }
626

627
        // Once all chain watchers have been started, seal the err chan to
628
        // signal the end of the err stream.
629
        go func() {
6✔
630
                wg.Wait()
3✔
631
                close(watcherErrs)
3✔
632
        }()
3✔
633

634
        // stopAndLog is a helper function which shuts down the chain arb and
635
        // logs errors if they occur.
636
        stopAndLog := func() {
3✔
637
                if err := c.Stop(); err != nil {
×
638
                        log.Errorf("ChainArbitrator could not shutdown: %v", err)
×
639
                }
×
640
        }
641

642
        // Handle all errors returned from spawning our chain watchers. If any
643
        // of them failed, we will stop the chain arb to shutdown any active
644
        // goroutines.
645
        for err := range watcherErrs {
6✔
646
                if err != nil {
3✔
647
                        stopAndLog()
×
648
                        return err
×
649
                }
×
650
        }
651

652
        // Before we start all of our arbitrators, we do a preliminary state
653
        // lookup so that we can combine all of these lookups in a single db
654
        // transaction.
655
        var startStates map[wire.OutPoint]*chanArbStartState
3✔
656

3✔
657
        err := kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
6✔
658
                for _, arbitrator := range c.activeChannels {
6✔
659
                        startState, err := arbitrator.getStartState(tx)
3✔
660
                        if err != nil {
3✔
661
                                return err
×
662
                        }
×
663

664
                        startStates[arbitrator.cfg.ChanPoint] = startState
3✔
665
                }
666

667
                return nil
3✔
668
        }, func() {
3✔
669
                startStates = make(
3✔
670
                        map[wire.OutPoint]*chanArbStartState,
3✔
671
                        len(c.activeChannels),
3✔
672
                )
3✔
673
        })
3✔
674
        if err != nil {
3✔
675
                stopAndLog()
×
676
                return err
×
677
        }
×
678

679
        // Launch all the goroutines for each arbitrator so they can carry out
680
        // their duties.
681
        for _, arbitrator := range c.activeChannels {
6✔
682
                startState, ok := startStates[arbitrator.cfg.ChanPoint]
3✔
683
                if !ok {
3✔
684
                        stopAndLog()
×
685
                        return fmt.Errorf("arbitrator: %v has no start state",
×
686
                                arbitrator.cfg.ChanPoint)
×
687
                }
×
688

689
                if err := arbitrator.Start(startState, c.beat); err != nil {
3✔
690
                        stopAndLog()
×
691
                        return err
×
692
                }
×
693
        }
694

695
        // Start our goroutine which will dispatch blocks to each arbitrator.
696
        c.wg.Add(1)
3✔
697
        go func() {
6✔
698
                defer c.wg.Done()
3✔
699
                c.dispatchBlocks()
3✔
700
        }()
3✔
701

702
        log.Infof("ChainArbitrator starting at height %d with %d chain "+
3✔
703
                "watchers, %d channel arbitrators, and budget config=[%v]",
3✔
704
                c.beat.Height(), len(c.activeWatchers), len(c.activeChannels),
3✔
705
                &c.cfg.Budget)
3✔
706

3✔
707
        // TODO(roasbeef): eventually move all breach watching here
3✔
708

3✔
709
        return nil
3✔
710
}
711

712
// resolveContracts listens to the `resolvedChan` to mark a given channel as
713
// fully resolved.
714
func (c *ChainArbitrator) resolveContracts() {
3✔
715
        for {
6✔
716
                select {
3✔
717
                // The channel arbitrator signals that a given channel has been
718
                // resolved, we now update chain arbitrator's internal state for
719
                // this channel.
720
                case cp := <-c.resolvedChan:
3✔
721
                        if c.cfg.NotifyFullyResolvedChannel != nil {
6✔
722
                                c.cfg.NotifyFullyResolvedChannel(cp)
3✔
723
                        }
3✔
724

725
                        err := c.ResolveContract(cp)
3✔
726
                        if err != nil {
3✔
727
                                log.Errorf("Failed to resolve contract for "+
×
728
                                        "channel %v", cp)
×
729
                        }
×
730

731
                // Exit if the chain arbitrator is shutting down.
732
                case <-c.quit:
3✔
733
                        return
3✔
734
                }
735
        }
736
}
737

738
// dispatchBlocks consumes a block epoch notification stream and dispatches
739
// blocks to each of the chain arb's active channel arbitrators. This function
740
// must be run in a goroutine.
741
func (c *ChainArbitrator) dispatchBlocks() {
3✔
742
        // Consume block epochs until we receive the instruction to shutdown.
3✔
743
        for {
6✔
744
                select {
3✔
745
                // Consume block epochs, exiting if our subscription is
746
                // terminated.
747
                case beat := <-c.BlockbeatChan:
3✔
748
                        // Set the current blockbeat.
3✔
749
                        c.beat = beat
3✔
750

3✔
751
                        // Send this blockbeat to all the active channels and
3✔
752
                        // wait for them to finish processing it.
3✔
753
                        c.handleBlockbeat(beat)
3✔
754

755
                // Exit if the chain arbitrator is shutting down.
756
                case <-c.quit:
3✔
757
                        return
3✔
758
                }
759
        }
760
}
761

762
// handleBlockbeat sends the blockbeat to all active channel arbitrator in
763
// parallel and wait for them to finish processing it.
764
func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
3✔
765
        // Read the active channels in a lock.
3✔
766
        c.Lock()
3✔
767

3✔
768
        // Create a slice to record active channel arbitrator.
3✔
769
        channels := make([]chainio.Consumer, 0, len(c.activeChannels))
3✔
770
        watchers := make([]chainio.Consumer, 0, len(c.activeWatchers))
3✔
771

3✔
772
        // Copy the active channels to the slice.
3✔
773
        for _, channel := range c.activeChannels {
6✔
774
                channels = append(channels, channel)
3✔
775
        }
3✔
776

777
        for _, watcher := range c.activeWatchers {
6✔
778
                watchers = append(watchers, watcher)
3✔
779
        }
3✔
780

781
        c.Unlock()
3✔
782

3✔
783
        // Iterate all the copied watchers and send the blockbeat to them.
3✔
784
        err := chainio.DispatchConcurrent(beat, watchers)
3✔
785
        if err != nil {
3✔
786
                log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
×
787
        }
×
788

789
        // Iterate all the copied channels and send the blockbeat to them.
790
        //
791
        // NOTE: This method will timeout if the processing of blocks of the
792
        // subsystems is too long (60s).
793
        err = chainio.DispatchConcurrent(beat, channels)
3✔
794
        if err != nil {
3✔
795
                log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
×
796
                        err)
×
797
        }
×
798

799
        // Notify the chain arbitrator has processed the block.
800
        c.NotifyBlockProcessed(beat, err)
3✔
801
}
802

803
// notifyChannelResolved is used by the channel arbitrator to signal that a
804
// given channel has been resolved.
805
func (c *ChainArbitrator) notifyChannelResolved(cp wire.OutPoint) {
3✔
806
        select {
3✔
807
        case c.resolvedChan <- cp:
3✔
808
        case <-c.quit:
×
809
                return
×
810
        }
811
}
812

813
// republishClosingTxs will load any stored cooperative or unilateral closing
814
// transactions and republish them. This helps ensure propagation of the
815
// transactions in the event that prior publications failed.
816
func (c *ChainArbitrator) republishClosingTxs(
817
        channel *channeldb.OpenChannel) error {
3✔
818

3✔
819
        // If the channel has had its unilateral close broadcasted already,
3✔
820
        // republish it in case it didn't propagate.
3✔
821
        if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
6✔
822
                err := c.rebroadcast(
3✔
823
                        channel, channeldb.ChanStatusCommitBroadcasted,
3✔
824
                )
3✔
825
                if err != nil {
3✔
826
                        return err
×
827
                }
×
828
        }
829

830
        // If the channel has had its cooperative close broadcasted
831
        // already, republish it in case it didn't propagate.
832
        if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
3✔
UNCOV
833
                err := c.rebroadcast(
×
UNCOV
834
                        channel, channeldb.ChanStatusCoopBroadcasted,
×
UNCOV
835
                )
×
UNCOV
836
                if err != nil {
×
837
                        return err
×
838
                }
×
839
        }
840

841
        return nil
3✔
842
}
843

844
// rebroadcast is a helper method which will republish the unilateral or
845
// cooperative close transaction or a channel in a particular state.
846
//
847
// NOTE: There is no risk to calling this method if the channel isn't in either
848
// CommitmentBroadcasted or CoopBroadcasted, but the logs will be misleading.
849
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
850
        state channeldb.ChannelStatus) error {
3✔
851

3✔
852
        chanPoint := channel.FundingOutpoint
3✔
853

3✔
854
        var (
3✔
855
                closeTx *wire.MsgTx
3✔
856
                kind    string
3✔
857
                err     error
3✔
858
        )
3✔
859
        switch state {
3✔
860
        case channeldb.ChanStatusCommitBroadcasted:
3✔
861
                kind = "force"
3✔
862
                closeTx, err = channel.BroadcastedCommitment()
3✔
863

UNCOV
864
        case channeldb.ChanStatusCoopBroadcasted:
×
UNCOV
865
                kind = "coop"
×
UNCOV
866
                closeTx, err = channel.BroadcastedCooperative()
×
867

868
        default:
×
869
                return fmt.Errorf("unknown closing state: %v", state)
×
870
        }
871

872
        switch {
3✔
873
        // This can happen for channels that had their closing tx published
874
        // before we started storing it to disk.
875
        case err == channeldb.ErrNoCloseTx:
×
876
                log.Warnf("Channel %v is in state %v, but no %s closing tx "+
×
877
                        "to re-publish...", chanPoint, state, kind)
×
878
                return nil
×
879

880
        case err != nil:
×
881
                return err
×
882
        }
883

884
        log.Infof("Re-publishing %s close tx(%v) for channel %v",
3✔
885
                kind, closeTx.TxHash(), chanPoint)
3✔
886

3✔
887
        label := labels.MakeLabel(
3✔
888
                labels.LabelTypeChannelClose, &channel.ShortChannelID,
3✔
889
        )
3✔
890
        err = c.cfg.PublishTx(closeTx, label)
3✔
891
        if err != nil && err != lnwallet.ErrDoubleSpend {
3✔
892
                log.Warnf("Unable to broadcast %s close tx(%v): %v",
×
893
                        kind, closeTx.TxHash(), err)
×
894
        }
×
895

896
        return nil
3✔
897
}
898

899
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
900
// channel arbitrators will be signalled to exit, and this method will block
901
// until they've all exited.
902
func (c *ChainArbitrator) Stop() error {
3✔
903
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
3✔
904
                return nil
×
905
        }
×
906

907
        log.Info("ChainArbitrator shutting down...")
3✔
908
        defer log.Debug("ChainArbitrator shutdown complete")
3✔
909

3✔
910
        close(c.quit)
3✔
911

3✔
912
        var (
3✔
913
                activeWatchers = make(map[wire.OutPoint]*chainWatcher)
3✔
914
                activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
3✔
915
        )
3✔
916

3✔
917
        // Copy the current set of active watchers and arbitrators to shutdown.
3✔
918
        // We don't want to hold the lock when shutting down each watcher or
3✔
919
        // arbitrator individually, as they may need to acquire this mutex.
3✔
920
        c.Lock()
3✔
921
        for chanPoint, watcher := range c.activeWatchers {
6✔
922
                activeWatchers[chanPoint] = watcher
3✔
923
        }
3✔
924
        for chanPoint, arbitrator := range c.activeChannels {
6✔
925
                activeChannels[chanPoint] = arbitrator
3✔
926
        }
3✔
927
        c.Unlock()
3✔
928

3✔
929
        for chanPoint, watcher := range activeWatchers {
6✔
930
                log.Tracef("Attempting to stop ChainWatcher(%v)",
3✔
931
                        chanPoint)
3✔
932

3✔
933
                if err := watcher.Stop(); err != nil {
3✔
934
                        log.Errorf("unable to stop watcher for "+
×
935
                                "ChannelPoint(%v): %v", chanPoint, err)
×
936
                }
×
937
        }
938
        for chanPoint, arbitrator := range activeChannels {
6✔
939
                log.Tracef("Attempting to stop ChannelArbitrator(%v)",
3✔
940
                        chanPoint)
3✔
941

3✔
942
                if err := arbitrator.Stop(); err != nil {
3✔
943
                        log.Errorf("unable to stop arbitrator for "+
×
944
                                "ChannelPoint(%v): %v", chanPoint, err)
×
945
                }
×
946
        }
947

948
        c.wg.Wait()
3✔
949

3✔
950
        return nil
3✔
951
}
952

953
// ContractUpdate is a message packages the latest set of active HTLCs on a
954
// commitment, and also identifies which commitment received a new set of
955
// HTLCs.
956
type ContractUpdate struct {
957
        // HtlcKey identifies which commitment the HTLCs below are present on.
958
        HtlcKey HtlcSetKey
959

960
        // Htlcs are the of active HTLCs on the commitment identified by the
961
        // above HtlcKey.
962
        Htlcs []channeldb.HTLC
963
}
964

965
// ContractSignals is used by outside subsystems to notify a channel arbitrator
966
// of its ShortChannelID.
967
type ContractSignals struct {
968
        // ShortChanID is the up to date short channel ID for a contract. This
969
        // can change either if when the contract was added it didn't yet have
970
        // a stable identifier, or in the case of a reorg.
971
        ShortChanID lnwire.ShortChannelID
972
}
973

974
// UpdateContractSignals sends a set of active, up to date contract signals to
975
// the ChannelArbitrator which is has been assigned to the channel infield by
976
// the passed channel point.
977
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
978
        signals *ContractSignals) error {
3✔
979

3✔
980
        log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
3✔
981
                chanPoint)
3✔
982

3✔
983
        c.Lock()
3✔
984
        arbitrator, ok := c.activeChannels[chanPoint]
3✔
985
        c.Unlock()
3✔
986
        if !ok {
3✔
987
                return fmt.Errorf("unable to find arbitrator")
×
988
        }
×
989

990
        arbitrator.UpdateContractSignals(signals)
3✔
991

3✔
992
        return nil
3✔
993
}
994

995
// NotifyContractUpdate lets a channel arbitrator know that a new
996
// ContractUpdate is available. This calls the ChannelArbitrator's internal
997
// method NotifyContractUpdate which waits for a response on a done chan before
998
// returning. This method will return an error if the ChannelArbitrator is not
999
// in the activeChannels map. However, this only happens if the arbitrator is
1000
// resolved and the related link would already be shut down.
1001
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
1002
        update *ContractUpdate) error {
3✔
1003

3✔
1004
        c.Lock()
3✔
1005
        arbitrator, ok := c.activeChannels[chanPoint]
3✔
1006
        c.Unlock()
3✔
1007
        if !ok {
3✔
1008
                return fmt.Errorf("can't find arbitrator for %v", chanPoint)
×
1009
        }
×
1010

1011
        arbitrator.notifyContractUpdate(update)
3✔
1012
        return nil
3✔
1013
}
1014

1015
// GetChannelArbitrator safely returns the channel arbitrator for a given
1016
// channel outpoint.
1017
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
1018
        *ChannelArbitrator, error) {
3✔
1019

3✔
1020
        c.Lock()
3✔
1021
        arbitrator, ok := c.activeChannels[chanPoint]
3✔
1022
        c.Unlock()
3✔
1023
        if !ok {
3✔
1024
                return nil, fmt.Errorf("unable to find arbitrator")
×
1025
        }
×
1026

1027
        return arbitrator, nil
3✔
1028
}
1029

1030
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
1031
// that watches a particular channel to broadcast the commitment transaction,
1032
// and enter the resolution phase of the channel.
1033
type forceCloseReq struct {
1034
        // errResp is a channel that will be sent upon either in the case of
1035
        // force close success (nil error), or in the case on an error.
1036
        //
1037
        // NOTE; This channel MUST be buffered.
1038
        errResp chan error
1039

1040
        // closeTx is a channel that carries the transaction which ultimately
1041
        // closed out the channel.
1042
        closeTx chan *wire.MsgTx
1043
}
1044

1045
// ForceCloseContract attempts to force close the channel infield by the passed
1046
// channel point. A force close will immediately terminate the contract,
1047
// causing it to enter the resolution phase. If the force close was successful,
1048
// then the force close transaction itself will be returned.
1049
//
1050
// TODO(roasbeef): just return the summary itself?
1051
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
3✔
1052
        c.Lock()
3✔
1053
        arbitrator, ok := c.activeChannels[chanPoint]
3✔
1054
        c.Unlock()
3✔
1055
        if !ok {
3✔
1056
                return nil, fmt.Errorf("unable to find arbitrator")
×
1057
        }
×
1058

1059
        log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
3✔
1060

3✔
1061
        // Before closing, we'll attempt to send a disable update for the
3✔
1062
        // channel. We do so before closing the channel as otherwise the current
3✔
1063
        // edge policy won't be retrievable from the graph.
3✔
1064
        if err := c.cfg.DisableChannel(chanPoint); err != nil {
3✔
1065
                log.Warnf("Unable to disable channel %v on "+
×
1066
                        "close: %v", chanPoint, err)
×
1067
        }
×
1068

1069
        errChan := make(chan error, 1)
3✔
1070
        respChan := make(chan *wire.MsgTx, 1)
3✔
1071

3✔
1072
        // With the channel found, and the request crafted, we'll send over a
3✔
1073
        // force close request to the arbitrator that watches this channel.
3✔
1074
        select {
3✔
1075
        case arbitrator.forceCloseReqs <- &forceCloseReq{
1076
                errResp: errChan,
1077
                closeTx: respChan,
1078
        }:
3✔
1079
        case <-c.quit:
×
1080
                return nil, ErrChainArbExiting
×
1081
        }
1082

1083
        // We'll await two responses: the error response, and the transaction
1084
        // that closed out the channel.
1085
        select {
3✔
1086
        case err := <-errChan:
3✔
1087
                if err != nil {
6✔
1088
                        return nil, err
3✔
1089
                }
3✔
1090
        case <-c.quit:
×
1091
                return nil, ErrChainArbExiting
×
1092
        }
1093

1094
        var closeTx *wire.MsgTx
3✔
1095
        select {
3✔
1096
        case closeTx = <-respChan:
3✔
1097
        case <-c.quit:
×
1098
                return nil, ErrChainArbExiting
×
1099
        }
1100

1101
        return closeTx, nil
3✔
1102
}
1103

1104
// WatchNewChannel sends the ChainArbitrator a message to create a
1105
// ChannelArbitrator tasked with watching over a new channel. Once a new
1106
// channel has finished its final funding flow, it should be registered with
1107
// the ChainArbitrator so we can properly react to any on-chain events.
1108
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
3✔
1109
        c.Lock()
3✔
1110
        defer c.Unlock()
3✔
1111

3✔
1112
        chanPoint := newChan.FundingOutpoint
3✔
1113

3✔
1114
        log.Infof("Creating new chainWatcher and ChannelArbitrator for "+
3✔
1115
                "ChannelPoint(%v)", chanPoint)
3✔
1116

3✔
1117
        // If we're already watching this channel, then we'll ignore this
3✔
1118
        // request.
3✔
1119
        if _, ok := c.activeChannels[chanPoint]; ok {
3✔
1120
                return nil
×
1121
        }
×
1122

1123
        // First, also create an active chainWatcher for this channel to ensure
1124
        // that we detect any relevant on chain events.
1125
        chainWatcher, err := newChainWatcher(
3✔
1126
                chainWatcherConfig{
3✔
1127
                        chanState: newChan,
3✔
1128
                        notifier:  c.cfg.Notifier,
3✔
1129
                        signer:    c.cfg.Signer,
3✔
1130
                        isOurAddr: c.cfg.IsOurAddress,
3✔
1131
                        contractBreach: func(
3✔
1132
                                retInfo *lnwallet.BreachRetribution) error {
6✔
1133

3✔
1134
                                return c.cfg.ContractBreach(
3✔
1135
                                        chanPoint, retInfo,
3✔
1136
                                )
3✔
1137
                        },
3✔
1138
                        extractStateNumHint: lnwallet.GetStateNumHint,
1139
                        auxLeafStore:        c.cfg.AuxLeafStore,
1140
                        auxResolver:         c.cfg.AuxResolver,
1141
                },
1142
        )
1143
        if err != nil {
3✔
1144
                return err
×
1145
        }
×
1146

1147
        c.activeWatchers[chanPoint] = chainWatcher
3✔
1148

3✔
1149
        // We'll also create a new channel arbitrator instance using this new
3✔
1150
        // channel, and our internal state.
3✔
1151
        channelArb, err := newActiveChannelArbitrator(
3✔
1152
                newChan, c, chainWatcher.SubscribeChannelEvents(),
3✔
1153
        )
3✔
1154
        if err != nil {
3✔
1155
                return err
×
1156
        }
×
1157

1158
        // With the arbitrator created, we'll add it to our set of active
1159
        // arbitrators, then launch it.
1160
        c.activeChannels[chanPoint] = channelArb
3✔
1161

3✔
1162
        if err := channelArb.Start(nil, c.beat); err != nil {
3✔
1163
                return err
×
1164
        }
×
1165

1166
        return chainWatcher.Start()
3✔
1167
}
1168

1169
// SubscribeChannelEvents returns a new active subscription for the set of
1170
// possible on-chain events for a particular channel. The struct can be used by
1171
// callers to be notified whenever an event that changes the state of the
1172
// channel on-chain occurs.
1173
func (c *ChainArbitrator) SubscribeChannelEvents(
1174
        chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
3✔
1175

3✔
1176
        // First, we'll attempt to look up the active watcher for this channel.
3✔
1177
        // If we can't find it, then we'll return an error back to the caller.
3✔
1178
        c.Lock()
3✔
1179
        watcher, ok := c.activeWatchers[chanPoint]
3✔
1180
        c.Unlock()
3✔
1181

3✔
1182
        if !ok {
3✔
1183
                return nil, fmt.Errorf("unable to find watcher for: %v",
×
1184
                        chanPoint)
×
1185
        }
×
1186

1187
        // With the watcher located, we'll request for it to create a new chain
1188
        // event subscription client.
1189
        return watcher.SubscribeChannelEvents(), nil
3✔
1190
}
1191

1192
// FindOutgoingHTLCDeadline returns the deadline in absolute block height for
1193
// the specified outgoing HTLC. For an outgoing HTLC, its deadline is defined
1194
// by the timeout height of its corresponding incoming HTLC - this is the
1195
// expiry height the that remote peer can spend his/her outgoing HTLC via the
1196
// timeout path.
1197
func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
1198
        outgoingHTLC channeldb.HTLC) fn.Option[int32] {
3✔
1199

3✔
1200
        // Find the outgoing HTLC's corresponding incoming HTLC in the circuit
3✔
1201
        // map.
3✔
1202
        rHash := outgoingHTLC.RHash
3✔
1203
        circuit := models.CircuitKey{
3✔
1204
                ChanID: scid,
3✔
1205
                HtlcID: outgoingHTLC.HtlcIndex,
3✔
1206
        }
3✔
1207
        incomingCircuit := c.cfg.QueryIncomingCircuit(circuit)
3✔
1208

3✔
1209
        // If there's no incoming circuit found, we will use the default
3✔
1210
        // deadline.
3✔
1211
        if incomingCircuit == nil {
5✔
1212
                log.Warnf("ChannelArbitrator(%v): incoming circuit key not "+
2✔
1213
                        "found for rHash=%x, using default deadline instead",
2✔
1214
                        scid, rHash)
2✔
1215

2✔
1216
                return fn.None[int32]()
2✔
1217
        }
2✔
1218

1219
        // If this is a locally initiated HTLC, it means we are the first hop.
1220
        // In this case, we can relax the deadline.
1221
        if incomingCircuit.ChanID.IsDefault() {
6✔
1222
                log.Infof("ChannelArbitrator(%v): using default deadline for "+
3✔
1223
                        "locally initiated HTLC for rHash=%x", scid, rHash)
3✔
1224

3✔
1225
                return fn.None[int32]()
3✔
1226
        }
3✔
1227

1228
        log.Debugf("Found incoming circuit %v for rHash=%x using outgoing "+
3✔
1229
                "circuit %v", incomingCircuit, rHash, circuit)
3✔
1230

3✔
1231
        c.Lock()
3✔
1232
        defer c.Unlock()
3✔
1233

3✔
1234
        // Iterate over all active channels to find the incoming HTLC specified
3✔
1235
        // by its circuit key.
3✔
1236
        for cp, channelArb := range c.activeChannels {
6✔
1237
                // Skip if the SCID doesn't match.
3✔
1238
                if channelArb.cfg.ShortChanID != incomingCircuit.ChanID {
6✔
1239
                        continue
3✔
1240
                }
1241

1242
                // Make sure the channel arbitrator has the latest view of its
1243
                // active HTLCs.
1244
                channelArb.updateActiveHTLCs()
3✔
1245

3✔
1246
                // Iterate all the known HTLCs to find the targeted incoming
3✔
1247
                // HTLC.
3✔
1248
                for _, htlcs := range channelArb.activeHTLCs {
6✔
1249
                        for _, htlc := range htlcs.incomingHTLCs {
6✔
1250
                                // Skip if the index doesn't match.
3✔
1251
                                if htlc.HtlcIndex != incomingCircuit.HtlcID {
6✔
1252
                                        continue
3✔
1253
                                }
1254

1255
                                log.Debugf("ChannelArbitrator(%v): found "+
3✔
1256
                                        "incoming HTLC in channel=%v using "+
3✔
1257
                                        "rHash=%x, refundTimeout=%v", scid,
3✔
1258
                                        cp, rHash, htlc.RefundTimeout)
3✔
1259

3✔
1260
                                return fn.Some(int32(htlc.RefundTimeout))
3✔
1261
                        }
1262
                }
1263
        }
1264

1265
        // If there's no incoming HTLC found, yet we have the incoming circuit,
1266
        // something is wrong - in this case, we return the none deadline.
1267
        log.Errorf("ChannelArbitrator(%v): incoming HTLC not found for "+
3✔
1268
                "rHash=%x, using default deadline instead", scid, rHash)
3✔
1269

3✔
1270
        return fn.None[int32]()
3✔
1271
}
1272

1273
// TODO(roasbeef): arbitration reports
1274
//  * types: contested, waiting for success conf, etc
1275

1276
// NOTE: part of the `chainio.Consumer` interface.
1277
func (c *ChainArbitrator) Name() string {
3✔
1278
        return "ChainArbitrator"
3✔
1279
}
3✔
1280

1281
// loadOpenChannels loads all channels that are currently open in the database
1282
// and registers them with the chainWatcher for future notification.
1283
func (c *ChainArbitrator) loadOpenChannels() error {
3✔
1284
        openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
3✔
1285
        if err != nil {
3✔
1286
                return err
×
1287
        }
×
1288

1289
        if len(openChannels) == 0 {
6✔
1290
                return nil
3✔
1291
        }
3✔
1292

1293
        log.Infof("Creating ChannelArbitrators for %v active channels",
3✔
1294
                len(openChannels))
3✔
1295

3✔
1296
        // For each open channel, we'll configure then launch a corresponding
3✔
1297
        // ChannelArbitrator.
3✔
1298
        for _, channel := range openChannels {
6✔
1299
                chanPoint := channel.FundingOutpoint
3✔
1300
                channel := channel
3✔
1301

3✔
1302
                // First, we'll create an active chainWatcher for this channel
3✔
1303
                // to ensure that we detect any relevant on chain events.
3✔
1304
                breachClosure := func(ret *lnwallet.BreachRetribution) error {
6✔
1305
                        return c.cfg.ContractBreach(chanPoint, ret)
3✔
1306
                }
3✔
1307

1308
                chainWatcher, err := newChainWatcher(
3✔
1309
                        chainWatcherConfig{
3✔
1310
                                chanState:           channel,
3✔
1311
                                notifier:            c.cfg.Notifier,
3✔
1312
                                signer:              c.cfg.Signer,
3✔
1313
                                isOurAddr:           c.cfg.IsOurAddress,
3✔
1314
                                contractBreach:      breachClosure,
3✔
1315
                                extractStateNumHint: lnwallet.GetStateNumHint,
3✔
1316
                                auxLeafStore:        c.cfg.AuxLeafStore,
3✔
1317
                                auxResolver:         c.cfg.AuxResolver,
3✔
1318
                        },
3✔
1319
                )
3✔
1320
                if err != nil {
3✔
1321
                        return err
×
1322
                }
×
1323

1324
                c.activeWatchers[chanPoint] = chainWatcher
3✔
1325
                channelArb, err := newActiveChannelArbitrator(
3✔
1326
                        channel, c, chainWatcher.SubscribeChannelEvents(),
3✔
1327
                )
3✔
1328
                if err != nil {
3✔
1329
                        return err
×
1330
                }
×
1331

1332
                c.activeChannels[chanPoint] = channelArb
3✔
1333

3✔
1334
                // Republish any closing transactions for this channel.
3✔
1335
                err = c.republishClosingTxs(channel)
3✔
1336
                if err != nil {
3✔
1337
                        log.Errorf("Failed to republish closing txs for "+
×
1338
                                "channel %v", chanPoint)
×
1339
                }
×
1340
        }
1341

1342
        return nil
3✔
1343
}
1344

1345
// loadPendingCloseChannels loads all channels that are currently pending
1346
// closure in the database and registers them with the ChannelArbitrator to
1347
// continue the resolution process.
1348
func (c *ChainArbitrator) loadPendingCloseChannels() error {
3✔
1349
        chanStateDB := c.chanSource.ChannelStateDB()
3✔
1350

3✔
1351
        closingChannels, err := chanStateDB.FetchClosedChannels(true)
3✔
1352
        if err != nil {
3✔
1353
                return err
×
1354
        }
×
1355

1356
        if len(closingChannels) == 0 {
6✔
1357
                return nil
3✔
1358
        }
3✔
1359

1360
        log.Infof("Creating ChannelArbitrators for %v closing channels",
3✔
1361
                len(closingChannels))
3✔
1362

3✔
1363
        // Next, for each channel is the closing state, we'll launch a
3✔
1364
        // corresponding more restricted resolver, as we don't have to watch
3✔
1365
        // the chain any longer, only resolve the contracts on the confirmed
3✔
1366
        // commitment.
3✔
1367
        //nolint:ll
3✔
1368
        for _, closeChanInfo := range closingChannels {
6✔
1369
                // We can leave off the CloseContract and ForceCloseChan
3✔
1370
                // methods as the channel is already closed at this point.
3✔
1371
                chanPoint := closeChanInfo.ChanPoint
3✔
1372
                arbCfg := ChannelArbitratorConfig{
3✔
1373
                        ChanPoint:             chanPoint,
3✔
1374
                        ShortChanID:           closeChanInfo.ShortChanID,
3✔
1375
                        ChainArbitratorConfig: c.cfg,
3✔
1376
                        ChainEvents:           &ChainEventSubscription{},
3✔
1377
                        IsPendingClose:        true,
3✔
1378
                        ClosingHeight:         closeChanInfo.CloseHeight,
3✔
1379
                        CloseType:             closeChanInfo.CloseType,
3✔
1380
                        PutResolverReport: func(tx kvdb.RwTx,
3✔
1381
                                report *channeldb.ResolverReport) error {
6✔
1382

3✔
1383
                                return c.chanSource.PutResolverReport(
3✔
1384
                                        tx, c.cfg.ChainHash, &chanPoint, report,
3✔
1385
                                )
3✔
1386
                        },
3✔
1387
                        FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
3✔
1388
                                return chanStateDB.FetchHistoricalChannel(&chanPoint)
3✔
1389
                        },
3✔
1390
                        FindOutgoingHTLCDeadline: func(
1391
                                htlc channeldb.HTLC) fn.Option[int32] {
3✔
1392

3✔
1393
                                return c.FindOutgoingHTLCDeadline(
3✔
1394
                                        closeChanInfo.ShortChanID, htlc,
3✔
1395
                                )
3✔
1396
                        },
3✔
1397
                        NotifyChannelResolved: func() {
3✔
1398
                                c.notifyChannelResolved(chanPoint)
3✔
1399
                        },
3✔
1400
                }
1401
                chanLog, err := newBoltArbitratorLog(
3✔
1402
                        c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
3✔
1403
                )
3✔
1404
                if err != nil {
3✔
1405
                        return err
×
1406
                }
×
1407

1408
                // We create an empty map of HTLC's here since it's possible
1409
                // that the channel is in StateDefault and updateActiveHTLCs is
1410
                // called. We want to avoid writing to an empty map. Since the
1411
                // channel is already in the process of being resolved, no new
1412
                // HTLCs will be added.
1413
                c.activeChannels[chanPoint] = NewChannelArbitrator(
3✔
1414
                        arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
3✔
1415
                )
3✔
1416
        }
1417

1418
        return nil
3✔
1419
}
1420

1421
// RedispatchBlockbeat resends the current blockbeat to the channels specified
1422
// by the chanPoints. It is used when a channel is added to the chain
1423
// arbitrator after it has been started, e.g., during the channel restore
1424
// process.
1425
func (c *ChainArbitrator) RedispatchBlockbeat(chanPoints []wire.OutPoint) {
3✔
1426
        // Get the current blockbeat.
3✔
1427
        beat := c.beat
3✔
1428

3✔
1429
        // Prepare two sets of consumers.
3✔
1430
        channels := make([]chainio.Consumer, 0, len(chanPoints))
3✔
1431
        watchers := make([]chainio.Consumer, 0, len(chanPoints))
3✔
1432

3✔
1433
        // Read the active channels in a lock.
3✔
1434
        c.Lock()
3✔
1435
        for _, op := range chanPoints {
6✔
1436
                if channel, ok := c.activeChannels[op]; ok {
6✔
1437
                        channels = append(channels, channel)
3✔
1438
                }
3✔
1439

1440
                if watcher, ok := c.activeWatchers[op]; ok {
6✔
1441
                        watchers = append(watchers, watcher)
3✔
1442
                }
3✔
1443
        }
1444
        c.Unlock()
3✔
1445

3✔
1446
        // Iterate all the copied watchers and send the blockbeat to them.
3✔
1447
        err := chainio.DispatchConcurrent(beat, watchers)
3✔
1448
        if err != nil {
3✔
1449
                log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
×
1450
        }
×
1451

1452
        // Iterate all the copied channels and send the blockbeat to them.
1453
        err = chainio.DispatchConcurrent(beat, channels)
3✔
1454
        if err != nil {
3✔
1455
                // Shutdown lnd if there's an error processing the block.
×
1456
                log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
×
1457
                        err)
×
1458
        }
×
1459
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc