• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12583319996

02 Jan 2025 01:38PM UTC coverage: 57.522% (-1.1%) from 58.598%
12583319996

Pull #9361

github

starius
fn/ContextGuard: use context.AfterFunc to wait

Simplifies context cancellation handling by using context.AfterFunc instead of a
goroutine to wait for context cancellation. This approach avoids the overhead of
a goroutine during the waiting period.

For ctxQuitUnsafe, since g.quit is closed only in the Quit method (which also
cancels all associated contexts), waiting on context cancellation ensures the
same behavior without unnecessary dependency on g.quit.

Added a test to ensure that the Create method does not launch any goroutines.
Pull Request #9361: fn: optimize context guard

102587 of 178344 relevant lines covered (57.52%)

24734.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.04
/contractcourt/chain_arbitrator.go
1
package contractcourt
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/chainio"
15
        "github.com/lightningnetwork/lnd/chainntnfs"
16
        "github.com/lightningnetwork/lnd/channeldb"
17
        "github.com/lightningnetwork/lnd/clock"
18
        "github.com/lightningnetwork/lnd/fn/v2"
19
        "github.com/lightningnetwork/lnd/graph/db/models"
20
        "github.com/lightningnetwork/lnd/input"
21
        "github.com/lightningnetwork/lnd/kvdb"
22
        "github.com/lightningnetwork/lnd/labels"
23
        "github.com/lightningnetwork/lnd/lnwallet"
24
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
25
        "github.com/lightningnetwork/lnd/lnwire"
26
)
27

28
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
29
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
30

31
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
32
// outgoing contract has been fully resolved. For multi-hop contracts, if we
33
// resolve the outgoing contract, we'll also need to ensure that the incoming
34
// contract is resolved as well. We package the items required to resolve the
35
// incoming contracts within this message.
36
type ResolutionMsg struct {
37
        // SourceChan identifies the channel that this message is being sent
38
        // from. This is the channel's short channel ID.
39
        SourceChan lnwire.ShortChannelID
40

41
        // HtlcIndex is the index of the contract within the original
42
        // commitment trace.
43
        HtlcIndex uint64
44

45
        // Failure will be non-nil if the incoming contract should be canceled
46
        // all together. This can happen if the outgoing contract was dust, if
47
        // if the outgoing HTLC timed out.
48
        Failure lnwire.FailureMessage
49

50
        // PreImage will be non-nil if the incoming contract can successfully
51
        // be redeemed. This can happen if we learn of the preimage from the
52
        // outgoing HTLC on-chain.
53
        PreImage *[32]byte
54
}
55

56
// ChainArbitratorConfig is a configuration struct that contains all the
57
// function closures and interface that required to arbitrate on-chain
58
// contracts for a particular chain.
59
type ChainArbitratorConfig struct {
60
        // ChainHash is the chain that this arbitrator is to operate within.
61
        ChainHash chainhash.Hash
62

63
        // IncomingBroadcastDelta is the delta that we'll use to decide when to
64
        // broadcast our commitment transaction if we have incoming htlcs. This
65
        // value should be set based on our current fee estimation of the
66
        // commitment transaction. We use this to determine when we should
67
        // broadcast instead of just the HTLC timeout, as we want to ensure
68
        // that the commitment transaction is already confirmed, by the time the
69
        // HTLC expires. Otherwise we may end up not settling the htlc on-chain
70
        // because the other party managed to time it out.
71
        IncomingBroadcastDelta uint32
72

73
        // OutgoingBroadcastDelta is the delta that we'll use to decide when to
74
        // broadcast our commitment transaction if there are active outgoing
75
        // htlcs. This value can be lower than the incoming broadcast delta.
76
        OutgoingBroadcastDelta uint32
77

78
        // NewSweepAddr is a function that returns a new address under control
79
        // by the wallet. We'll use this to sweep any no-delay outputs as a
80
        // result of unilateral channel closes.
81
        //
82
        // NOTE: This SHOULD return a p2wkh script.
83
        NewSweepAddr func() ([]byte, error)
84

85
        // PublishTx reliably broadcasts a transaction to the network. Once
86
        // this function exits without an error, then they transaction MUST
87
        // continually be rebroadcast if needed.
88
        PublishTx func(*wire.MsgTx, string) error
89

90
        // DeliverResolutionMsg is a function that will append an outgoing
91
        // message to the "out box" for a ChannelLink. This is used to cancel
92
        // backwards any HTLC's that are either dust, we're timing out, or
93
        // settling on-chain to the incoming link.
94
        DeliverResolutionMsg func(...ResolutionMsg) error
95

96
        // MarkLinkInactive is a function closure that the ChainArbitrator will
97
        // use to mark that active HTLC's shouldn't be attempted to be routed
98
        // over a particular channel. This function will be called in that a
99
        // ChannelArbitrator decides that it needs to go to chain in order to
100
        // resolve contracts.
101
        //
102
        // TODO(roasbeef): rename, routing based
103
        MarkLinkInactive func(wire.OutPoint) error
104

105
        // ContractBreach is a function closure that the ChainArbitrator will
106
        // use to notify the BreachArbitrator about a contract breach. It should
107
        // only return a non-nil error when the BreachArbitrator has preserved
108
        // the necessary breach info for this channel point. Once the breach
109
        // resolution is persisted in the ChannelArbitrator, it will be safe
110
        // to mark the channel closed.
111
        ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
112

113
        // IsOurAddress is a function that returns true if the passed address
114
        // is known to the underlying wallet. Otherwise, false should be
115
        // returned.
116
        IsOurAddress func(btcutil.Address) bool
117

118
        // IncubateOutputs sends either an incoming HTLC, an outgoing HTLC, or
119
        // both to the utxo nursery. Once this function returns, the nursery
120
        // should have safely persisted the outputs to disk, and should start
121
        // the process of incubation. This is used when a resolver wishes to
122
        // pass off the output to the nursery as we're only waiting on an
123
        // absolute/relative item block.
124
        IncubateOutputs func(wire.OutPoint,
125
                fn.Option[lnwallet.OutgoingHtlcResolution],
126
                fn.Option[lnwallet.IncomingHtlcResolution],
127
                uint32, fn.Option[int32]) error
128

129
        // PreimageDB is a global store of all known pre-images. We'll use this
130
        // to decide if we should broadcast a commitment transaction to claim
131
        // an HTLC on-chain.
132
        PreimageDB WitnessBeacon
133

134
        // Notifier is an instance of a chain notifier we'll use to watch for
135
        // certain on-chain events.
136
        Notifier chainntnfs.ChainNotifier
137

138
        // Mempool is the a mempool watcher that allows us to watch for events
139
        // happened in mempool.
140
        Mempool chainntnfs.MempoolWatcher
141

142
        // Signer is a signer backed by the active lnd node. This should be
143
        // capable of producing a signature as specified by a valid
144
        // SignDescriptor.
145
        Signer input.Signer
146

147
        // FeeEstimator will be used to return fee estimates.
148
        FeeEstimator chainfee.Estimator
149

150
        // ChainIO allows us to query the state of the current main chain.
151
        ChainIO lnwallet.BlockChainIO
152

153
        // DisableChannel disables a channel, resulting in it not being able to
154
        // forward payments.
155
        DisableChannel func(wire.OutPoint) error
156

157
        // Sweeper allows resolvers to sweep their final outputs.
158
        Sweeper UtxoSweeper
159

160
        // Registry is the invoice database that is used by resolvers to lookup
161
        // preimages and settle invoices.
162
        Registry Registry
163

164
        // NotifyClosedChannel is a function closure that the ChainArbitrator
165
        // will use to notify the ChannelNotifier about a newly closed channel.
166
        NotifyClosedChannel func(wire.OutPoint)
167

168
        // NotifyFullyResolvedChannel is a function closure that the
169
        // ChainArbitrator will use to notify the ChannelNotifier about a newly
170
        // resolved channel. The main difference to NotifyClosedChannel is that
171
        // in case of a local force close the NotifyClosedChannel is called when
172
        // the published commitment transaction confirms while
173
        // NotifyFullyResolvedChannel is only called when the channel is fully
174
        // resolved (which includes sweeping any time locked funds).
175
        NotifyFullyResolvedChannel func(point wire.OutPoint)
176

177
        // OnionProcessor is used to decode onion payloads for on-chain
178
        // resolution.
179
        OnionProcessor OnionProcessor
180

181
        // PaymentsExpirationGracePeriod indicates a time window we let the
182
        // other node to cancel an outgoing htlc that our node has initiated and
183
        // has timed out.
184
        PaymentsExpirationGracePeriod time.Duration
185

186
        // IsForwardedHTLC checks for a given htlc, identified by channel id and
187
        // htlcIndex, if it is a forwarded one.
188
        IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
189

190
        // Clock is the clock implementation that ChannelArbitrator uses.
191
        // It is useful for testing.
192
        Clock clock.Clock
193

194
        // SubscribeBreachComplete is used by the breachResolver to register a
195
        // subscription that notifies when the breach resolution process is
196
        // complete.
197
        SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
198
                bool, error)
199

200
        // PutFinalHtlcOutcome stores the final outcome of an htlc in the
201
        // database.
202
        PutFinalHtlcOutcome func(chanId lnwire.ShortChannelID,
203
                htlcId uint64, settled bool) error
204

205
        // HtlcNotifier is an interface that htlc events are sent to.
206
        HtlcNotifier HtlcNotifier
207

208
        // Budget is the configured budget for the arbitrator.
209
        Budget BudgetConfig
210

211
        // QueryIncomingCircuit is used to find the outgoing HTLC's
212
        // corresponding incoming HTLC circuit. It queries the circuit map for
213
        // a given outgoing circuit key and returns the incoming circuit key.
214
        //
215
        // TODO(yy): this is a hacky way to get around the cycling import issue
216
        // as we cannot import `htlcswitch` here. A proper way is to define an
217
        // interface here that asks for method `LookupOpenCircuit`,
218
        // meanwhile, turn `PaymentCircuit` into an interface or bring it to a
219
        // lower package.
220
        QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey
221

222
        // AuxLeafStore is an optional store that can be used to store auxiliary
223
        // leaves for certain custom channel types.
224
        AuxLeafStore fn.Option[lnwallet.AuxLeafStore]
225

226
        // AuxSigner is an optional signer that can be used to sign auxiliary
227
        // leaves for certain custom channel types.
228
        AuxSigner fn.Option[lnwallet.AuxSigner]
229

230
        // AuxResolver is an optional interface that can be used to modify the
231
        // way contracts are resolved.
232
        AuxResolver fn.Option[lnwallet.AuxContractResolver]
233
}
234

235
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
236
// active, and channel that are in the "pending close" state. Within the
237
// contractcourt package, the ChainArbitrator manages a set of active
238
// ContractArbitrators. Each ContractArbitrators is responsible for watching
239
// the chain for any activity that affects the state of the channel, and also
240
// for monitoring each contract in order to determine if any on-chain activity is
241
// required. Outside sub-systems interact with the ChainArbitrator in order to
242
// forcibly exit a contract, update the set of live signals for each contract,
243
// and to receive reports on the state of contract resolution.
244
type ChainArbitrator struct {
245
        started int32 // To be used atomically.
246
        stopped int32 // To be used atomically.
247

248
        // Embed the blockbeat consumer struct to get access to the method
249
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
250
        chainio.BeatConsumer
251

252
        sync.Mutex
253

254
        // activeChannels is a map of all the active contracts that are still
255
        // open, and not fully resolved.
256
        activeChannels map[wire.OutPoint]*ChannelArbitrator
257

258
        // activeWatchers is a map of all the active chainWatchers for channels
259
        // that are still considered open.
260
        activeWatchers map[wire.OutPoint]*chainWatcher
261

262
        // cfg is the config struct for the arbitrator that contains all
263
        // methods and interface it needs to operate.
264
        cfg ChainArbitratorConfig
265

266
        // chanSource will be used by the ChainArbitrator to fetch all the
267
        // active channels that it must still watch over.
268
        chanSource *channeldb.DB
269

270
        // beat is the current best known blockbeat.
271
        beat chainio.Blockbeat
272

273
        quit chan struct{}
274

275
        wg sync.WaitGroup
276
}
277

278
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
279
// passed config struct, and backing persistent database.
280
func NewChainArbitrator(cfg ChainArbitratorConfig,
281
        db *channeldb.DB) *ChainArbitrator {
2✔
282

2✔
283
        c := &ChainArbitrator{
2✔
284
                cfg:            cfg,
2✔
285
                activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
2✔
286
                activeWatchers: make(map[wire.OutPoint]*chainWatcher),
2✔
287
                chanSource:     db,
2✔
288
                quit:           make(chan struct{}),
2✔
289
        }
2✔
290

2✔
291
        // Mount the block consumer.
2✔
292
        c.BeatConsumer = chainio.NewBeatConsumer(c.quit, c.Name())
2✔
293

2✔
294
        return c
2✔
295
}
2✔
296

297
// Compile-time check for the chainio.Consumer interface.
298
var _ chainio.Consumer = (*ChainArbitrator)(nil)
299

300
// arbChannel is a wrapper around an open channel that channel arbitrators
301
// interact with.
302
type arbChannel struct {
303
        // channel is the in-memory channel state.
304
        channel *channeldb.OpenChannel
305

306
        // c references the chain arbitrator and is used by arbChannel
307
        // internally.
308
        c *ChainArbitrator
309
}
310

311
// NewAnchorResolutions returns the anchor resolutions for currently valid
312
// commitment transactions.
313
//
314
// NOTE: Part of the ArbChannel interface.
315
func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
316
        error) {
×
317

×
318
        // Get a fresh copy of the database state to base the anchor resolutions
×
319
        // on. Unfortunately the channel instance that we have here isn't the
×
320
        // same instance that is used by the link.
×
321
        chanPoint := a.channel.FundingOutpoint
×
322

×
323
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
×
324
        if err != nil {
×
325
                return nil, err
×
326
        }
×
327

328
        var chanOpts []lnwallet.ChannelOpt
×
329
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
330
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
331
        })
×
332
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
333
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
334
        })
×
335
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
336
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
337
        })
×
338

339
        chanMachine, err := lnwallet.NewLightningChannel(
×
340
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
341
        )
×
342
        if err != nil {
×
343
                return nil, err
×
344
        }
×
345

346
        return chanMachine.NewAnchorResolutions()
×
347
}
348

349
// ForceCloseChan should force close the contract that this attendant is
350
// watching over. We'll use this when we decide that we need to go to chain. It
351
// should in addition tell the switch to remove the corresponding link, such
352
// that we won't accept any new updates.
353
//
354
// NOTE: Part of the ArbChannel interface.
355
func (a *arbChannel) ForceCloseChan() (*wire.MsgTx, error) {
×
356
        // First, we mark the channel as borked, this ensure
×
357
        // that no new state transitions can happen, and also
×
358
        // that the link won't be loaded into the switch.
×
359
        if err := a.channel.MarkBorked(); err != nil {
×
360
                return nil, err
×
361
        }
×
362

363
        // With the channel marked as borked, we'll now remove
364
        // the link from the switch if its there. If the link
365
        // is active, then this method will block until it
366
        // exits.
367
        chanPoint := a.channel.FundingOutpoint
×
368

×
369
        if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
×
370
                log.Errorf("unable to mark link inactive: %v", err)
×
371
        }
×
372

373
        // Now that we know the link can't mutate the channel
374
        // state, we'll read the channel from disk the target
375
        // channel according to its channel point.
376
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(chanPoint)
×
377
        if err != nil {
×
378
                return nil, err
×
379
        }
×
380

381
        var chanOpts []lnwallet.ChannelOpt
×
382
        a.c.cfg.AuxLeafStore.WhenSome(func(s lnwallet.AuxLeafStore) {
×
383
                chanOpts = append(chanOpts, lnwallet.WithLeafStore(s))
×
384
        })
×
385
        a.c.cfg.AuxSigner.WhenSome(func(s lnwallet.AuxSigner) {
×
386
                chanOpts = append(chanOpts, lnwallet.WithAuxSigner(s))
×
387
        })
×
388
        a.c.cfg.AuxResolver.WhenSome(func(s lnwallet.AuxContractResolver) {
×
389
                chanOpts = append(chanOpts, lnwallet.WithAuxResolver(s))
×
390
        })
×
391

392
        // Finally, we'll force close the channel completing
393
        // the force close workflow.
394
        chanMachine, err := lnwallet.NewLightningChannel(
×
395
                a.c.cfg.Signer, channel, nil, chanOpts...,
×
396
        )
×
397
        if err != nil {
×
398
                return nil, err
×
399
        }
×
400

401
        closeSummary, err := chanMachine.ForceClose(
×
402
                lnwallet.WithSkipContractResolutions(),
×
403
        )
×
404
        if err != nil {
×
405
                return nil, err
×
406
        }
×
407

408
        return closeSummary.CloseTx, nil
×
409
}
410

411
// newActiveChannelArbitrator creates a new instance of an active channel
412
// arbitrator given the state of the target channel.
413
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
414
        c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
11✔
415

11✔
416
        // TODO(roasbeef): fetch best height (or pass in) so can ensure block
11✔
417
        // epoch delivers all the notifications to
11✔
418

11✔
419
        chanPoint := channel.FundingOutpoint
11✔
420

11✔
421
        log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", chanPoint)
11✔
422

11✔
423
        // Next we'll create the matching configuration struct that contains
11✔
424
        // all interfaces and methods the arbitrator needs to do its job.
11✔
425
        arbCfg := ChannelArbitratorConfig{
11✔
426
                ChanPoint:   chanPoint,
11✔
427
                Channel:     c.getArbChannel(channel),
11✔
428
                ShortChanID: channel.ShortChanID(),
11✔
429

11✔
430
                MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
11✔
431
                MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
11✔
432
                        statuses ...channeldb.ChannelStatus) error {
11✔
433

×
434
                        err := channel.CloseChannel(summary, statuses...)
×
435
                        if err != nil {
×
436
                                return err
×
437
                        }
×
438
                        c.cfg.NotifyClosedChannel(summary.ChanPoint)
×
439
                        return nil
×
440
                },
441
                IsPendingClose:        false,
442
                ChainArbitratorConfig: c.cfg,
443
                ChainEvents:           chanEvents,
444
                PutResolverReport: func(tx kvdb.RwTx,
445
                        report *channeldb.ResolverReport) error {
×
446

×
447
                        return c.chanSource.PutResolverReport(
×
448
                                tx, c.cfg.ChainHash, &chanPoint, report,
×
449
                        )
×
450
                },
×
451
                FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
×
452
                        chanStateDB := c.chanSource.ChannelStateDB()
×
453
                        return chanStateDB.FetchHistoricalChannel(&chanPoint)
×
454
                },
×
455
                FindOutgoingHTLCDeadline: func(
456
                        htlc channeldb.HTLC) fn.Option[int32] {
×
457

×
458
                        return c.FindOutgoingHTLCDeadline(
×
459
                                channel.ShortChanID(), htlc,
×
460
                        )
×
461
                },
×
462
        }
463

464
        // The final component needed is an arbitrator log that the arbitrator
465
        // will use to keep track of its internal state using a backed
466
        // persistent log.
467
        //
468
        // TODO(roasbeef); abstraction leak...
469
        //  * rework: adaptor method to set log scope w/ factory func
470
        chanLog, err := newBoltArbitratorLog(
11✔
471
                c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
11✔
472
        )
11✔
473
        if err != nil {
11✔
474
                return nil, err
×
475
        }
×
476

477
        arbCfg.MarkChannelResolved = func() error {
11✔
478
                if c.cfg.NotifyFullyResolvedChannel != nil {
×
479
                        c.cfg.NotifyFullyResolvedChannel(chanPoint)
×
480
                }
×
481

482
                return c.ResolveContract(chanPoint)
×
483
        }
484

485
        // Finally, we'll need to construct a series of htlc Sets based on all
486
        // currently known valid commitments.
487
        htlcSets := make(map[HtlcSetKey]htlcSet)
11✔
488
        htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
11✔
489
        htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
11✔
490

11✔
491
        pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
11✔
492
        if err != nil && err != channeldb.ErrNoPendingCommit {
11✔
493
                return nil, err
×
494
        }
×
495
        if pendingRemoteCommitment != nil {
11✔
496
                htlcSets[RemotePendingHtlcSet] = newHtlcSet(
×
497
                        pendingRemoteCommitment.Commitment.Htlcs,
×
498
                )
×
499
        }
×
500

501
        return NewChannelArbitrator(
11✔
502
                arbCfg, htlcSets, chanLog,
11✔
503
        ), nil
11✔
504
}
505

506
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
507
func (c *ChainArbitrator) getArbChannel(
508
        channel *channeldb.OpenChannel) *arbChannel {
11✔
509

11✔
510
        return &arbChannel{
11✔
511
                channel: channel,
11✔
512
                c:       c,
11✔
513
        }
11✔
514
}
11✔
515

516
// ResolveContract marks a contract as fully resolved within the database.
517
// This is only to be done once all contracts which were live on the channel
518
// before hitting the chain have been resolved.
519
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
2✔
520
        log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
2✔
521

2✔
522
        // First, we'll we'll mark the channel as fully closed from the PoV of
2✔
523
        // the channel source.
2✔
524
        err := c.chanSource.ChannelStateDB().MarkChanFullyClosed(&chanPoint)
2✔
525
        if err != nil {
2✔
526
                log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
×
527
                        "fully closed: %v", chanPoint, err)
×
528
                return err
×
529
        }
×
530

531
        // Now that the channel has been marked as fully closed, we'll stop
532
        // both the channel arbitrator and chain watcher for this channel if
533
        // they're still active.
534
        var arbLog ArbitratorLog
2✔
535
        c.Lock()
2✔
536
        chainArb := c.activeChannels[chanPoint]
2✔
537
        delete(c.activeChannels, chanPoint)
2✔
538

2✔
539
        chainWatcher := c.activeWatchers[chanPoint]
2✔
540
        delete(c.activeWatchers, chanPoint)
2✔
541
        c.Unlock()
2✔
542

2✔
543
        if chainArb != nil {
3✔
544
                arbLog = chainArb.log
1✔
545

1✔
546
                if err := chainArb.Stop(); err != nil {
1✔
547
                        log.Warnf("unable to stop ChannelArbitrator(%v): %v",
×
548
                                chanPoint, err)
×
549
                }
×
550
        }
551
        if chainWatcher != nil {
3✔
552
                if err := chainWatcher.Stop(); err != nil {
1✔
553
                        log.Warnf("unable to stop ChainWatcher(%v): %v",
×
554
                                chanPoint, err)
×
555
                }
×
556
        }
557

558
        // Once this has been marked as resolved, we'll wipe the log that the
559
        // channel arbitrator was using to store its persistent state. We do
560
        // this after marking the channel resolved, as otherwise, the
561
        // arbitrator would be re-created, and think it was starting from the
562
        // default state.
563
        if arbLog != nil {
3✔
564
                if err := arbLog.WipeHistory(); err != nil {
1✔
565
                        return err
×
566
                }
×
567
        }
568

569
        return nil
2✔
570
}
571

572
// Start launches all goroutines that the ChainArbitrator needs to operate.
573
func (c *ChainArbitrator) Start(beat chainio.Blockbeat) error {
2✔
574
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
2✔
575
                return nil
×
576
        }
×
577

578
        // Set the current beat.
579
        c.beat = beat
2✔
580

2✔
581
        // First, we'll fetch all the channels that are still open, in order to
2✔
582
        // collect them within our set of active contracts.
2✔
583
        if err := c.loadOpenChannels(); err != nil {
2✔
584
                return err
×
585
        }
×
586

587
        // In addition to the channels that we know to be open, we'll also
588
        // launch arbitrators to finishing resolving any channels that are in
589
        // the pending close state.
590
        if err := c.loadPendingCloseChannels(); err != nil {
2✔
591
                return err
×
592
        }
×
593

594
        // Now, we'll start all chain watchers in parallel to shorten start up
595
        // duration. In neutrino mode, this allows spend registrations to take
596
        // advantage of batch spend reporting, instead of doing a single rescan
597
        // per chain watcher.
598
        //
599
        // NOTE: After this point, we Stop the chain arb to ensure that any
600
        // lingering goroutines are cleaned up before exiting.
601
        watcherErrs := make(chan error, len(c.activeWatchers))
2✔
602
        var wg sync.WaitGroup
2✔
603
        for _, watcher := range c.activeWatchers {
13✔
604
                wg.Add(1)
11✔
605
                go func(w *chainWatcher) {
22✔
606
                        defer wg.Done()
11✔
607
                        select {
11✔
608
                        case watcherErrs <- w.Start():
11✔
609
                        case <-c.quit:
×
610
                                watcherErrs <- ErrChainArbExiting
×
611
                        }
612
                }(watcher)
613
        }
614

615
        // Once all chain watchers have been started, seal the err chan to
616
        // signal the end of the err stream.
617
        go func() {
4✔
618
                wg.Wait()
2✔
619
                close(watcherErrs)
2✔
620
        }()
2✔
621

622
        // stopAndLog is a helper function which shuts down the chain arb and
623
        // logs errors if they occur.
624
        stopAndLog := func() {
2✔
625
                if err := c.Stop(); err != nil {
×
626
                        log.Errorf("ChainArbitrator could not shutdown: %v", err)
×
627
                }
×
628
        }
629

630
        // Handle all errors returned from spawning our chain watchers. If any
631
        // of them failed, we will stop the chain arb to shutdown any active
632
        // goroutines.
633
        for err := range watcherErrs {
13✔
634
                if err != nil {
11✔
635
                        stopAndLog()
×
636
                        return err
×
637
                }
×
638
        }
639

640
        // Before we start all of our arbitrators, we do a preliminary state
641
        // lookup so that we can combine all of these lookups in a single db
642
        // transaction.
643
        var startStates map[wire.OutPoint]*chanArbStartState
2✔
644

2✔
645
        err := kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
4✔
646
                for _, arbitrator := range c.activeChannels {
13✔
647
                        startState, err := arbitrator.getStartState(tx)
11✔
648
                        if err != nil {
11✔
649
                                return err
×
650
                        }
×
651

652
                        startStates[arbitrator.cfg.ChanPoint] = startState
11✔
653
                }
654

655
                return nil
2✔
656
        }, func() {
2✔
657
                startStates = make(
2✔
658
                        map[wire.OutPoint]*chanArbStartState,
2✔
659
                        len(c.activeChannels),
2✔
660
                )
2✔
661
        })
2✔
662
        if err != nil {
2✔
663
                stopAndLog()
×
664
                return err
×
665
        }
×
666

667
        // Launch all the goroutines for each arbitrator so they can carry out
668
        // their duties.
669
        for _, arbitrator := range c.activeChannels {
13✔
670
                startState, ok := startStates[arbitrator.cfg.ChanPoint]
11✔
671
                if !ok {
11✔
672
                        stopAndLog()
×
673
                        return fmt.Errorf("arbitrator: %v has no start state",
×
674
                                arbitrator.cfg.ChanPoint)
×
675
                }
×
676

677
                if err := arbitrator.Start(startState, c.beat); err != nil {
11✔
678
                        stopAndLog()
×
679
                        return err
×
680
                }
×
681
        }
682

683
        // Start our goroutine which will dispatch blocks to each arbitrator.
684
        c.wg.Add(1)
2✔
685
        go func() {
4✔
686
                defer c.wg.Done()
2✔
687
                c.dispatchBlocks()
2✔
688
        }()
2✔
689

690
        log.Infof("ChainArbitrator starting at height %d with %d chain "+
2✔
691
                "watchers, %d channel arbitrators, and budget config=[%v]",
2✔
692
                c.beat.Height(), len(c.activeWatchers), len(c.activeChannels),
2✔
693
                &c.cfg.Budget)
2✔
694

2✔
695
        // TODO(roasbeef): eventually move all breach watching here
2✔
696

2✔
697
        return nil
2✔
698
}
699

700
// dispatchBlocks consumes a block epoch notification stream and dispatches
701
// blocks to each of the chain arb's active channel arbitrators. This function
702
// must be run in a goroutine.
703
func (c *ChainArbitrator) dispatchBlocks() {
2✔
704
        // Consume block epochs until we receive the instruction to shutdown.
2✔
705
        for {
4✔
706
                select {
2✔
707
                // Consume block epochs, exiting if our subscription is
708
                // terminated.
709
                case beat := <-c.BlockbeatChan:
×
710
                        // Set the current blockbeat.
×
711
                        c.beat = beat
×
712

×
713
                        // Send this blockbeat to all the active channels and
×
714
                        // wait for them to finish processing it.
×
715
                        c.handleBlockbeat(beat)
×
716

717
                // Exit if the chain arbitrator is shutting down.
718
                case <-c.quit:
2✔
719
                        return
2✔
720
                }
721
        }
722
}
723

724
// handleBlockbeat sends the blockbeat to all active channel arbitrator in
725
// parallel and wait for them to finish processing it.
726
func (c *ChainArbitrator) handleBlockbeat(beat chainio.Blockbeat) {
×
727
        // Read the active channels in a lock.
×
728
        c.Lock()
×
729

×
730
        // Create a slice to record active channel arbitrator.
×
731
        channels := make([]chainio.Consumer, 0, len(c.activeChannels))
×
732
        watchers := make([]chainio.Consumer, 0, len(c.activeWatchers))
×
733

×
734
        // Copy the active channels to the slice.
×
735
        for _, channel := range c.activeChannels {
×
736
                channels = append(channels, channel)
×
737
        }
×
738

739
        for _, watcher := range c.activeWatchers {
×
740
                watchers = append(watchers, watcher)
×
741
        }
×
742

743
        c.Unlock()
×
744

×
745
        // Iterate all the copied watchers and send the blockbeat to them.
×
746
        err := chainio.DispatchConcurrent(beat, watchers)
×
747
        if err != nil {
×
748
                log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
×
749
        }
×
750

751
        // Iterate all the copied channels and send the blockbeat to them.
752
        //
753
        // NOTE: This method will timeout if the processing of blocks of the
754
        // subsystems is too long (60s).
755
        err = chainio.DispatchConcurrent(beat, channels)
×
756
        if err != nil {
×
757
                log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
×
758
                        err)
×
759
        }
×
760

761
        // Notify the chain arbitrator has processed the block.
762
        c.NotifyBlockProcessed(beat, err)
×
763
}
764

765
// republishClosingTxs will load any stored cooperative or unilateral closing
766
// transactions and republish them. This helps ensure propagation of the
767
// transactions in the event that prior publications failed.
768
func (c *ChainArbitrator) republishClosingTxs(
769
        channel *channeldb.OpenChannel) error {
11✔
770

11✔
771
        // If the channel has had its unilateral close broadcasted already,
11✔
772
        // republish it in case it didn't propagate.
11✔
773
        if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
16✔
774
                err := c.rebroadcast(
5✔
775
                        channel, channeldb.ChanStatusCommitBroadcasted,
5✔
776
                )
5✔
777
                if err != nil {
5✔
778
                        return err
×
779
                }
×
780
        }
781

782
        // If the channel has had its cooperative close broadcasted
783
        // already, republish it in case it didn't propagate.
784
        if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
16✔
785
                err := c.rebroadcast(
5✔
786
                        channel, channeldb.ChanStatusCoopBroadcasted,
5✔
787
                )
5✔
788
                if err != nil {
5✔
789
                        return err
×
790
                }
×
791
        }
792

793
        return nil
11✔
794
}
795

796
// rebroadcast is a helper method which will republish the unilateral or
797
// cooperative close transaction or a channel in a particular state.
798
//
799
// NOTE: There is no risk to calling this method if the channel isn't in either
800
// CommitmentBroadcasted or CoopBroadcasted, but the logs will be misleading.
801
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
802
        state channeldb.ChannelStatus) error {
10✔
803

10✔
804
        chanPoint := channel.FundingOutpoint
10✔
805

10✔
806
        var (
10✔
807
                closeTx *wire.MsgTx
10✔
808
                kind    string
10✔
809
                err     error
10✔
810
        )
10✔
811
        switch state {
10✔
812
        case channeldb.ChanStatusCommitBroadcasted:
5✔
813
                kind = "force"
5✔
814
                closeTx, err = channel.BroadcastedCommitment()
5✔
815

816
        case channeldb.ChanStatusCoopBroadcasted:
5✔
817
                kind = "coop"
5✔
818
                closeTx, err = channel.BroadcastedCooperative()
5✔
819

820
        default:
×
821
                return fmt.Errorf("unknown closing state: %v", state)
×
822
        }
823

824
        switch {
10✔
825
        // This can happen for channels that had their closing tx published
826
        // before we started storing it to disk.
827
        case err == channeldb.ErrNoCloseTx:
×
828
                log.Warnf("Channel %v is in state %v, but no %s closing tx "+
×
829
                        "to re-publish...", chanPoint, state, kind)
×
830
                return nil
×
831

832
        case err != nil:
×
833
                return err
×
834
        }
835

836
        log.Infof("Re-publishing %s close tx(%v) for channel %v",
10✔
837
                kind, closeTx.TxHash(), chanPoint)
10✔
838

10✔
839
        label := labels.MakeLabel(
10✔
840
                labels.LabelTypeChannelClose, &channel.ShortChannelID,
10✔
841
        )
10✔
842
        err = c.cfg.PublishTx(closeTx, label)
10✔
843
        if err != nil && err != lnwallet.ErrDoubleSpend {
10✔
844
                log.Warnf("Unable to broadcast %s close tx(%v): %v",
×
845
                        kind, closeTx.TxHash(), err)
×
846
        }
×
847

848
        return nil
10✔
849
}
850

851
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
852
// channel arbitrators will be signalled to exit, and this method will block
853
// until they've all exited.
854
func (c *ChainArbitrator) Stop() error {
2✔
855
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
2✔
856
                return nil
×
857
        }
×
858

859
        log.Info("ChainArbitrator shutting down...")
2✔
860
        defer log.Debug("ChainArbitrator shutdown complete")
2✔
861

2✔
862
        close(c.quit)
2✔
863

2✔
864
        var (
2✔
865
                activeWatchers = make(map[wire.OutPoint]*chainWatcher)
2✔
866
                activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
2✔
867
        )
2✔
868

2✔
869
        // Copy the current set of active watchers and arbitrators to shutdown.
2✔
870
        // We don't want to hold the lock when shutting down each watcher or
2✔
871
        // arbitrator individually, as they may need to acquire this mutex.
2✔
872
        c.Lock()
2✔
873
        for chanPoint, watcher := range c.activeWatchers {
12✔
874
                activeWatchers[chanPoint] = watcher
10✔
875
        }
10✔
876
        for chanPoint, arbitrator := range c.activeChannels {
12✔
877
                activeChannels[chanPoint] = arbitrator
10✔
878
        }
10✔
879
        c.Unlock()
2✔
880

2✔
881
        for chanPoint, watcher := range activeWatchers {
12✔
882
                log.Tracef("Attempting to stop ChainWatcher(%v)",
10✔
883
                        chanPoint)
10✔
884

10✔
885
                if err := watcher.Stop(); err != nil {
10✔
886
                        log.Errorf("unable to stop watcher for "+
×
887
                                "ChannelPoint(%v): %v", chanPoint, err)
×
888
                }
×
889
        }
890
        for chanPoint, arbitrator := range activeChannels {
12✔
891
                log.Tracef("Attempting to stop ChannelArbitrator(%v)",
10✔
892
                        chanPoint)
10✔
893

10✔
894
                if err := arbitrator.Stop(); err != nil {
10✔
895
                        log.Errorf("unable to stop arbitrator for "+
×
896
                                "ChannelPoint(%v): %v", chanPoint, err)
×
897
                }
×
898
        }
899

900
        c.wg.Wait()
2✔
901

2✔
902
        return nil
2✔
903
}
904

905
// ContractUpdate is a message packages the latest set of active HTLCs on a
906
// commitment, and also identifies which commitment received a new set of
907
// HTLCs.
908
type ContractUpdate struct {
909
        // HtlcKey identifies which commitment the HTLCs below are present on.
910
        HtlcKey HtlcSetKey
911

912
        // Htlcs are the of active HTLCs on the commitment identified by the
913
        // above HtlcKey.
914
        Htlcs []channeldb.HTLC
915
}
916

917
// ContractSignals is used by outside subsystems to notify a channel arbitrator
918
// of its ShortChannelID.
919
type ContractSignals struct {
920
        // ShortChanID is the up to date short channel ID for a contract. This
921
        // can change either if when the contract was added it didn't yet have
922
        // a stable identifier, or in the case of a reorg.
923
        ShortChanID lnwire.ShortChannelID
924
}
925

926
// UpdateContractSignals sends a set of active, up to date contract signals to
927
// the ChannelArbitrator which is has been assigned to the channel infield by
928
// the passed channel point.
929
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
930
        signals *ContractSignals) error {
×
931

×
932
        log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
×
933
                chanPoint)
×
934

×
935
        c.Lock()
×
936
        arbitrator, ok := c.activeChannels[chanPoint]
×
937
        c.Unlock()
×
938
        if !ok {
×
939
                return fmt.Errorf("unable to find arbitrator")
×
940
        }
×
941

942
        arbitrator.UpdateContractSignals(signals)
×
943

×
944
        return nil
×
945
}
946

947
// NotifyContractUpdate lets a channel arbitrator know that a new
948
// ContractUpdate is available. This calls the ChannelArbitrator's internal
949
// method NotifyContractUpdate which waits for a response on a done chan before
950
// returning. This method will return an error if the ChannelArbitrator is not
951
// in the activeChannels map. However, this only happens if the arbitrator is
952
// resolved and the related link would already be shut down.
953
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
954
        update *ContractUpdate) error {
×
955

×
956
        c.Lock()
×
957
        arbitrator, ok := c.activeChannels[chanPoint]
×
958
        c.Unlock()
×
959
        if !ok {
×
960
                return fmt.Errorf("can't find arbitrator for %v", chanPoint)
×
961
        }
×
962

963
        arbitrator.notifyContractUpdate(update)
×
964
        return nil
×
965
}
966

967
// GetChannelArbitrator safely returns the channel arbitrator for a given
968
// channel outpoint.
969
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
970
        *ChannelArbitrator, error) {
×
971

×
972
        c.Lock()
×
973
        arbitrator, ok := c.activeChannels[chanPoint]
×
974
        c.Unlock()
×
975
        if !ok {
×
976
                return nil, fmt.Errorf("unable to find arbitrator")
×
977
        }
×
978

979
        return arbitrator, nil
×
980
}
981

982
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
983
// that watches a particular channel to broadcast the commitment transaction,
984
// and enter the resolution phase of the channel.
985
type forceCloseReq struct {
986
        // errResp is a channel that will be sent upon either in the case of
987
        // force close success (nil error), or in the case on an error.
988
        //
989
        // NOTE; This channel MUST be buffered.
990
        errResp chan error
991

992
        // closeTx is a channel that carries the transaction which ultimately
993
        // closed out the channel.
994
        closeTx chan *wire.MsgTx
995
}
996

997
// ForceCloseContract attempts to force close the channel infield by the passed
998
// channel point. A force close will immediately terminate the contract,
999
// causing it to enter the resolution phase. If the force close was successful,
1000
// then the force close transaction itself will be returned.
1001
//
1002
// TODO(roasbeef): just return the summary itself?
1003
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
×
1004
        c.Lock()
×
1005
        arbitrator, ok := c.activeChannels[chanPoint]
×
1006
        c.Unlock()
×
1007
        if !ok {
×
1008
                return nil, fmt.Errorf("unable to find arbitrator")
×
1009
        }
×
1010

1011
        log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
×
1012

×
1013
        // Before closing, we'll attempt to send a disable update for the
×
1014
        // channel. We do so before closing the channel as otherwise the current
×
1015
        // edge policy won't be retrievable from the graph.
×
1016
        if err := c.cfg.DisableChannel(chanPoint); err != nil {
×
1017
                log.Warnf("Unable to disable channel %v on "+
×
1018
                        "close: %v", chanPoint, err)
×
1019
        }
×
1020

1021
        errChan := make(chan error, 1)
×
1022
        respChan := make(chan *wire.MsgTx, 1)
×
1023

×
1024
        // With the channel found, and the request crafted, we'll send over a
×
1025
        // force close request to the arbitrator that watches this channel.
×
1026
        select {
×
1027
        case arbitrator.forceCloseReqs <- &forceCloseReq{
1028
                errResp: errChan,
1029
                closeTx: respChan,
1030
        }:
×
1031
        case <-c.quit:
×
1032
                return nil, ErrChainArbExiting
×
1033
        }
1034

1035
        // We'll await two responses: the error response, and the transaction
1036
        // that closed out the channel.
1037
        select {
×
1038
        case err := <-errChan:
×
1039
                if err != nil {
×
1040
                        return nil, err
×
1041
                }
×
1042
        case <-c.quit:
×
1043
                return nil, ErrChainArbExiting
×
1044
        }
1045

1046
        var closeTx *wire.MsgTx
×
1047
        select {
×
1048
        case closeTx = <-respChan:
×
1049
        case <-c.quit:
×
1050
                return nil, ErrChainArbExiting
×
1051
        }
1052

1053
        return closeTx, nil
×
1054
}
1055

1056
// WatchNewChannel sends the ChainArbitrator a message to create a
1057
// ChannelArbitrator tasked with watching over a new channel. Once a new
1058
// channel has finished its final funding flow, it should be registered with
1059
// the ChainArbitrator so we can properly react to any on-chain events.
1060
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
×
1061
        c.Lock()
×
1062
        defer c.Unlock()
×
1063

×
1064
        chanPoint := newChan.FundingOutpoint
×
1065

×
1066
        log.Infof("Creating new chainWatcher and ChannelArbitrator for "+
×
1067
                "ChannelPoint(%v)", chanPoint)
×
1068

×
1069
        // If we're already watching this channel, then we'll ignore this
×
1070
        // request.
×
1071
        if _, ok := c.activeChannels[chanPoint]; ok {
×
1072
                return nil
×
1073
        }
×
1074

1075
        // First, also create an active chainWatcher for this channel to ensure
1076
        // that we detect any relevant on chain events.
1077
        chainWatcher, err := newChainWatcher(
×
1078
                chainWatcherConfig{
×
1079
                        chanState: newChan,
×
1080
                        notifier:  c.cfg.Notifier,
×
1081
                        signer:    c.cfg.Signer,
×
1082
                        isOurAddr: c.cfg.IsOurAddress,
×
1083
                        contractBreach: func(
×
1084
                                retInfo *lnwallet.BreachRetribution) error {
×
1085

×
1086
                                return c.cfg.ContractBreach(
×
1087
                                        chanPoint, retInfo,
×
1088
                                )
×
1089
                        },
×
1090
                        extractStateNumHint: lnwallet.GetStateNumHint,
1091
                        auxLeafStore:        c.cfg.AuxLeafStore,
1092
                        auxResolver:         c.cfg.AuxResolver,
1093
                },
1094
        )
1095
        if err != nil {
×
1096
                return err
×
1097
        }
×
1098

1099
        c.activeWatchers[chanPoint] = chainWatcher
×
1100

×
1101
        // We'll also create a new channel arbitrator instance using this new
×
1102
        // channel, and our internal state.
×
1103
        channelArb, err := newActiveChannelArbitrator(
×
1104
                newChan, c, chainWatcher.SubscribeChannelEvents(),
×
1105
        )
×
1106
        if err != nil {
×
1107
                return err
×
1108
        }
×
1109

1110
        // With the arbitrator created, we'll add it to our set of active
1111
        // arbitrators, then launch it.
1112
        c.activeChannels[chanPoint] = channelArb
×
1113

×
1114
        if err := channelArb.Start(nil, c.beat); err != nil {
×
1115
                return err
×
1116
        }
×
1117

1118
        return chainWatcher.Start()
×
1119
}
1120

1121
// SubscribeChannelEvents returns a new active subscription for the set of
1122
// possible on-chain events for a particular channel. The struct can be used by
1123
// callers to be notified whenever an event that changes the state of the
1124
// channel on-chain occurs.
1125
func (c *ChainArbitrator) SubscribeChannelEvents(
1126
        chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
×
1127

×
1128
        // First, we'll attempt to look up the active watcher for this channel.
×
1129
        // If we can't find it, then we'll return an error back to the caller.
×
1130
        c.Lock()
×
1131
        watcher, ok := c.activeWatchers[chanPoint]
×
1132
        c.Unlock()
×
1133

×
1134
        if !ok {
×
1135
                return nil, fmt.Errorf("unable to find watcher for: %v",
×
1136
                        chanPoint)
×
1137
        }
×
1138

1139
        // With the watcher located, we'll request for it to create a new chain
1140
        // event subscription client.
1141
        return watcher.SubscribeChannelEvents(), nil
×
1142
}
1143

1144
// FindOutgoingHTLCDeadline returns the deadline in absolute block height for
1145
// the specified outgoing HTLC. For an outgoing HTLC, its deadline is defined
1146
// by the timeout height of its corresponding incoming HTLC - this is the
1147
// expiry height the that remote peer can spend his/her outgoing HTLC via the
1148
// timeout path.
1149
func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
1150
        outgoingHTLC channeldb.HTLC) fn.Option[int32] {
×
1151

×
1152
        // Find the outgoing HTLC's corresponding incoming HTLC in the circuit
×
1153
        // map.
×
1154
        rHash := outgoingHTLC.RHash
×
1155
        circuit := models.CircuitKey{
×
1156
                ChanID: scid,
×
1157
                HtlcID: outgoingHTLC.HtlcIndex,
×
1158
        }
×
1159
        incomingCircuit := c.cfg.QueryIncomingCircuit(circuit)
×
1160

×
1161
        // If there's no incoming circuit found, we will use the default
×
1162
        // deadline.
×
1163
        if incomingCircuit == nil {
×
1164
                log.Warnf("ChannelArbitrator(%v): incoming circuit key not "+
×
1165
                        "found for rHash=%x, using default deadline instead",
×
1166
                        scid, rHash)
×
1167

×
1168
                return fn.None[int32]()
×
1169
        }
×
1170

1171
        // If this is a locally initiated HTLC, it means we are the first hop.
1172
        // In this case, we can relax the deadline.
1173
        if incomingCircuit.ChanID.IsDefault() {
×
1174
                log.Infof("ChannelArbitrator(%v): using default deadline for "+
×
1175
                        "locally initiated HTLC for rHash=%x", scid, rHash)
×
1176

×
1177
                return fn.None[int32]()
×
1178
        }
×
1179

1180
        log.Debugf("Found incoming circuit %v for rHash=%x using outgoing "+
×
1181
                "circuit %v", incomingCircuit, rHash, circuit)
×
1182

×
1183
        c.Lock()
×
1184
        defer c.Unlock()
×
1185

×
1186
        // Iterate over all active channels to find the incoming HTLC specified
×
1187
        // by its circuit key.
×
1188
        for cp, channelArb := range c.activeChannels {
×
1189
                // Skip if the SCID doesn't match.
×
1190
                if channelArb.cfg.ShortChanID != incomingCircuit.ChanID {
×
1191
                        continue
×
1192
                }
1193

1194
                // Make sure the channel arbitrator has the latest view of its
1195
                // active HTLCs.
1196
                channelArb.updateActiveHTLCs()
×
1197

×
1198
                // Iterate all the known HTLCs to find the targeted incoming
×
1199
                // HTLC.
×
1200
                for _, htlcs := range channelArb.activeHTLCs {
×
1201
                        for _, htlc := range htlcs.incomingHTLCs {
×
1202
                                // Skip if the index doesn't match.
×
1203
                                if htlc.HtlcIndex != incomingCircuit.HtlcID {
×
1204
                                        continue
×
1205
                                }
1206

1207
                                log.Debugf("ChannelArbitrator(%v): found "+
×
1208
                                        "incoming HTLC in channel=%v using "+
×
1209
                                        "rHash=%x, refundTimeout=%v", scid,
×
1210
                                        cp, rHash, htlc.RefundTimeout)
×
1211

×
1212
                                return fn.Some(int32(htlc.RefundTimeout))
×
1213
                        }
1214
                }
1215
        }
1216

1217
        // If there's no incoming HTLC found, yet we have the incoming circuit,
1218
        // something is wrong - in this case, we return the none deadline.
1219
        log.Errorf("ChannelArbitrator(%v): incoming HTLC not found for "+
×
1220
                "rHash=%x, using default deadline instead", scid, rHash)
×
1221

×
1222
        return fn.None[int32]()
×
1223
}
1224

1225
// TODO(roasbeef): arbitration reports
1226
//  * types: contested, waiting for success conf, etc
1227

1228
// NOTE: part of the `chainio.Consumer` interface.
1229
func (c *ChainArbitrator) Name() string {
2✔
1230
        return "ChainArbitrator"
2✔
1231
}
2✔
1232

1233
// loadOpenChannels loads all channels that are currently open in the database
1234
// and registers them with the chainWatcher for future notification.
1235
func (c *ChainArbitrator) loadOpenChannels() error {
2✔
1236
        openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
2✔
1237
        if err != nil {
2✔
1238
                return err
×
1239
        }
×
1240

1241
        if len(openChannels) == 0 {
2✔
1242
                return nil
×
1243
        }
×
1244

1245
        log.Infof("Creating ChannelArbitrators for %v active channels",
2✔
1246
                len(openChannels))
2✔
1247

2✔
1248
        // For each open channel, we'll configure then launch a corresponding
2✔
1249
        // ChannelArbitrator.
2✔
1250
        for _, channel := range openChannels {
13✔
1251
                chanPoint := channel.FundingOutpoint
11✔
1252
                channel := channel
11✔
1253

11✔
1254
                // First, we'll create an active chainWatcher for this channel
11✔
1255
                // to ensure that we detect any relevant on chain events.
11✔
1256
                breachClosure := func(ret *lnwallet.BreachRetribution) error {
11✔
1257
                        return c.cfg.ContractBreach(chanPoint, ret)
×
1258
                }
×
1259

1260
                chainWatcher, err := newChainWatcher(
11✔
1261
                        chainWatcherConfig{
11✔
1262
                                chanState:           channel,
11✔
1263
                                notifier:            c.cfg.Notifier,
11✔
1264
                                signer:              c.cfg.Signer,
11✔
1265
                                isOurAddr:           c.cfg.IsOurAddress,
11✔
1266
                                contractBreach:      breachClosure,
11✔
1267
                                extractStateNumHint: lnwallet.GetStateNumHint,
11✔
1268
                                auxLeafStore:        c.cfg.AuxLeafStore,
11✔
1269
                                auxResolver:         c.cfg.AuxResolver,
11✔
1270
                        },
11✔
1271
                )
11✔
1272
                if err != nil {
11✔
1273
                        return err
×
1274
                }
×
1275

1276
                c.activeWatchers[chanPoint] = chainWatcher
11✔
1277
                channelArb, err := newActiveChannelArbitrator(
11✔
1278
                        channel, c, chainWatcher.SubscribeChannelEvents(),
11✔
1279
                )
11✔
1280
                if err != nil {
11✔
1281
                        return err
×
1282
                }
×
1283

1284
                c.activeChannels[chanPoint] = channelArb
11✔
1285

11✔
1286
                // Republish any closing transactions for this channel.
11✔
1287
                err = c.republishClosingTxs(channel)
11✔
1288
                if err != nil {
11✔
1289
                        log.Errorf("Failed to republish closing txs for "+
×
1290
                                "channel %v", chanPoint)
×
1291
                }
×
1292
        }
1293

1294
        return nil
2✔
1295
}
1296

1297
// loadPendingCloseChannels loads all channels that are currently pending
1298
// closure in the database and registers them with the ChannelArbitrator to
1299
// continue the resolution process.
1300
func (c *ChainArbitrator) loadPendingCloseChannels() error {
2✔
1301
        chanStateDB := c.chanSource.ChannelStateDB()
2✔
1302

2✔
1303
        closingChannels, err := chanStateDB.FetchClosedChannels(true)
2✔
1304
        if err != nil {
2✔
1305
                return err
×
1306
        }
×
1307

1308
        if len(closingChannels) == 0 {
4✔
1309
                return nil
2✔
1310
        }
2✔
1311

1312
        log.Infof("Creating ChannelArbitrators for %v closing channels",
×
1313
                len(closingChannels))
×
1314

×
1315
        // Next, for each channel is the closing state, we'll launch a
×
1316
        // corresponding more restricted resolver, as we don't have to watch
×
1317
        // the chain any longer, only resolve the contracts on the confirmed
×
1318
        // commitment.
×
1319
        //nolint:ll
×
1320
        for _, closeChanInfo := range closingChannels {
×
1321
                // We can leave off the CloseContract and ForceCloseChan
×
1322
                // methods as the channel is already closed at this point.
×
1323
                chanPoint := closeChanInfo.ChanPoint
×
1324
                arbCfg := ChannelArbitratorConfig{
×
1325
                        ChanPoint:             chanPoint,
×
1326
                        ShortChanID:           closeChanInfo.ShortChanID,
×
1327
                        ChainArbitratorConfig: c.cfg,
×
1328
                        ChainEvents:           &ChainEventSubscription{},
×
1329
                        IsPendingClose:        true,
×
1330
                        ClosingHeight:         closeChanInfo.CloseHeight,
×
1331
                        CloseType:             closeChanInfo.CloseType,
×
1332
                        PutResolverReport: func(tx kvdb.RwTx,
×
1333
                                report *channeldb.ResolverReport) error {
×
1334

×
1335
                                return c.chanSource.PutResolverReport(
×
1336
                                        tx, c.cfg.ChainHash, &chanPoint, report,
×
1337
                                )
×
1338
                        },
×
1339
                        FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
×
1340
                                return chanStateDB.FetchHistoricalChannel(&chanPoint)
×
1341
                        },
×
1342
                        FindOutgoingHTLCDeadline: func(
1343
                                htlc channeldb.HTLC) fn.Option[int32] {
×
1344

×
1345
                                return c.FindOutgoingHTLCDeadline(
×
1346
                                        closeChanInfo.ShortChanID, htlc,
×
1347
                                )
×
1348
                        },
×
1349
                }
1350
                chanLog, err := newBoltArbitratorLog(
×
1351
                        c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
×
1352
                )
×
1353
                if err != nil {
×
1354
                        return err
×
1355
                }
×
1356
                arbCfg.MarkChannelResolved = func() error {
×
1357
                        if c.cfg.NotifyFullyResolvedChannel != nil {
×
1358
                                c.cfg.NotifyFullyResolvedChannel(chanPoint)
×
1359
                        }
×
1360

1361
                        return c.ResolveContract(chanPoint)
×
1362
                }
1363

1364
                // We create an empty map of HTLC's here since it's possible
1365
                // that the channel is in StateDefault and updateActiveHTLCs is
1366
                // called. We want to avoid writing to an empty map. Since the
1367
                // channel is already in the process of being resolved, no new
1368
                // HTLCs will be added.
1369
                c.activeChannels[chanPoint] = NewChannelArbitrator(
×
1370
                        arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
×
1371
                )
×
1372
        }
1373

1374
        return nil
×
1375
}
1376

1377
// RedispatchBlockbeat resends the current blockbeat to the channels specified
1378
// by the chanPoints. It is used when a channel is added to the chain
1379
// arbitrator after it has been started, e.g., during the channel restore
1380
// process.
1381
func (c *ChainArbitrator) RedispatchBlockbeat(chanPoints []wire.OutPoint) {
×
1382
        // Get the current blockbeat.
×
1383
        beat := c.beat
×
1384

×
1385
        // Prepare two sets of consumers.
×
1386
        channels := make([]chainio.Consumer, 0, len(chanPoints))
×
1387
        watchers := make([]chainio.Consumer, 0, len(chanPoints))
×
1388

×
1389
        // Read the active channels in a lock.
×
1390
        c.Lock()
×
1391
        for _, op := range chanPoints {
×
1392
                if channel, ok := c.activeChannels[op]; ok {
×
1393
                        channels = append(channels, channel)
×
1394
                }
×
1395

1396
                if watcher, ok := c.activeWatchers[op]; ok {
×
1397
                        watchers = append(watchers, watcher)
×
1398
                }
×
1399
        }
1400
        c.Unlock()
×
1401

×
1402
        // Iterate all the copied watchers and send the blockbeat to them.
×
1403
        err := chainio.DispatchConcurrent(beat, watchers)
×
1404
        if err != nil {
×
1405
                log.Errorf("Notify blockbeat for chainWatcher failed: %v", err)
×
1406
        }
×
1407

1408
        // Iterate all the copied channels and send the blockbeat to them.
1409
        err = chainio.DispatchConcurrent(beat, channels)
×
1410
        if err != nil {
×
1411
                // Shutdown lnd if there's an error processing the block.
×
1412
                log.Errorf("Notify blockbeat for ChannelArbitrator failed: %v",
×
1413
                        err)
×
1414
        }
×
1415
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc