• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 10418626636

16 Aug 2024 10:36AM UTC coverage: 58.722% (+0.2%) from 58.558%
10418626636

Pull #9011

github

ziggie1984
docs: update release-notes.
Pull Request #9011: Fix TimeStamp issue in the Gossip Syncer

58 of 70 new or added lines in 3 files covered. (82.86%)

117 existing lines in 15 files now uncovered.

126269 of 215030 relevant lines covered (58.72%)

28151.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.21
/contractcourt/chain_arbitrator.go
1
package contractcourt
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/wire"
13
        "github.com/btcsuite/btcwallet/walletdb"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/channeldb"
16
        "github.com/lightningnetwork/lnd/channeldb/models"
17
        "github.com/lightningnetwork/lnd/clock"
18
        "github.com/lightningnetwork/lnd/fn"
19
        "github.com/lightningnetwork/lnd/input"
20
        "github.com/lightningnetwork/lnd/kvdb"
21
        "github.com/lightningnetwork/lnd/labels"
22
        "github.com/lightningnetwork/lnd/lnwallet"
23
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
24
        "github.com/lightningnetwork/lnd/lnwire"
25
)
26

27
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
28
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
29

30
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an
31
// outgoing contract has been fully resolved. For multi-hop contracts, if we
32
// resolve the outgoing contract, we'll also need to ensure that the incoming
33
// contract is resolved as well. We package the items required to resolve the
34
// incoming contracts within this message.
35
type ResolutionMsg struct {
36
        // SourceChan identifies the channel that this message is being sent
37
        // from. This is the channel's short channel ID.
38
        SourceChan lnwire.ShortChannelID
39

40
        // HtlcIndex is the index of the contract within the original
41
        // commitment trace.
42
        HtlcIndex uint64
43

44
        // Failure will be non-nil if the incoming contract should be canceled
45
        // all together. This can happen if the outgoing contract was dust, if
46
        // if the outgoing HTLC timed out.
47
        Failure lnwire.FailureMessage
48

49
        // PreImage will be non-nil if the incoming contract can successfully
50
        // be redeemed. This can happen if we learn of the preimage from the
51
        // outgoing HTLC on-chain.
52
        PreImage *[32]byte
53
}
54

55
// ChainArbitratorConfig is a configuration struct that contains all the
56
// function closures and interface that required to arbitrate on-chain
57
// contracts for a particular chain.
58
type ChainArbitratorConfig struct {
59
        // ChainHash is the chain that this arbitrator is to operate within.
60
        ChainHash chainhash.Hash
61

62
        // IncomingBroadcastDelta is the delta that we'll use to decide when to
63
        // broadcast our commitment transaction if we have incoming htlcs. This
64
        // value should be set based on our current fee estimation of the
65
        // commitment transaction. We use this to determine when we should
66
        // broadcast instead of just the HTLC timeout, as we want to ensure
67
        // that the commitment transaction is already confirmed, by the time the
68
        // HTLC expires. Otherwise we may end up not settling the htlc on-chain
69
        // because the other party managed to time it out.
70
        IncomingBroadcastDelta uint32
71

72
        // OutgoingBroadcastDelta is the delta that we'll use to decide when to
73
        // broadcast our commitment transaction if there are active outgoing
74
        // htlcs. This value can be lower than the incoming broadcast delta.
75
        OutgoingBroadcastDelta uint32
76

77
        // NewSweepAddr is a function that returns a new address under control
78
        // by the wallet. We'll use this to sweep any no-delay outputs as a
79
        // result of unilateral channel closes.
80
        //
81
        // NOTE: This SHOULD return a p2wkh script.
82
        NewSweepAddr func() ([]byte, error)
83

84
        // PublishTx reliably broadcasts a transaction to the network. Once
85
        // this function exits without an error, then they transaction MUST
86
        // continually be rebroadcast if needed.
87
        PublishTx func(*wire.MsgTx, string) error
88

89
        // DeliverResolutionMsg is a function that will append an outgoing
90
        // message to the "out box" for a ChannelLink. This is used to cancel
91
        // backwards any HTLC's that are either dust, we're timing out, or
92
        // settling on-chain to the incoming link.
93
        DeliverResolutionMsg func(...ResolutionMsg) error
94

95
        // MarkLinkInactive is a function closure that the ChainArbitrator will
96
        // use to mark that active HTLC's shouldn't be attempted to be routed
97
        // over a particular channel. This function will be called in that a
98
        // ChannelArbitrator decides that it needs to go to chain in order to
99
        // resolve contracts.
100
        //
101
        // TODO(roasbeef): rename, routing based
102
        MarkLinkInactive func(wire.OutPoint) error
103

104
        // ContractBreach is a function closure that the ChainArbitrator will
105
        // use to notify the BreachArbitrator about a contract breach. It should
106
        // only return a non-nil error when the BreachArbitrator has preserved
107
        // the necessary breach info for this channel point. Once the breach
108
        // resolution is persisted in the ChannelArbitrator, it will be safe
109
        // to mark the channel closed.
110
        ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error
111

112
        // IsOurAddress is a function that returns true if the passed address
113
        // is known to the underlying wallet. Otherwise, false should be
114
        // returned.
115
        IsOurAddress func(btcutil.Address) bool
116

117
        // IncubateOutputs sends either an incoming HTLC, an outgoing HTLC, or
118
        // both to the utxo nursery. Once this function returns, the nursery
119
        // should have safely persisted the outputs to disk, and should start
120
        // the process of incubation. This is used when a resolver wishes to
121
        // pass off the output to the nursery as we're only waiting on an
122
        // absolute/relative item block.
123
        IncubateOutputs func(wire.OutPoint,
124
                fn.Option[lnwallet.OutgoingHtlcResolution],
125
                fn.Option[lnwallet.IncomingHtlcResolution],
126
                uint32, fn.Option[int32]) error
127

128
        // PreimageDB is a global store of all known pre-images. We'll use this
129
        // to decide if we should broadcast a commitment transaction to claim
130
        // an HTLC on-chain.
131
        PreimageDB WitnessBeacon
132

133
        // Notifier is an instance of a chain notifier we'll use to watch for
134
        // certain on-chain events.
135
        Notifier chainntnfs.ChainNotifier
136

137
        // Mempool is the a mempool watcher that allows us to watch for events
138
        // happened in mempool.
139
        Mempool chainntnfs.MempoolWatcher
140

141
        // Signer is a signer backed by the active lnd node. This should be
142
        // capable of producing a signature as specified by a valid
143
        // SignDescriptor.
144
        Signer input.Signer
145

146
        // FeeEstimator will be used to return fee estimates.
147
        FeeEstimator chainfee.Estimator
148

149
        // ChainIO allows us to query the state of the current main chain.
150
        ChainIO lnwallet.BlockChainIO
151

152
        // DisableChannel disables a channel, resulting in it not being able to
153
        // forward payments.
154
        DisableChannel func(wire.OutPoint) error
155

156
        // Sweeper allows resolvers to sweep their final outputs.
157
        Sweeper UtxoSweeper
158

159
        // Registry is the invoice database that is used by resolvers to lookup
160
        // preimages and settle invoices.
161
        Registry Registry
162

163
        // NotifyClosedChannel is a function closure that the ChainArbitrator
164
        // will use to notify the ChannelNotifier about a newly closed channel.
165
        NotifyClosedChannel func(wire.OutPoint)
166

167
        // NotifyFullyResolvedChannel is a function closure that the
168
        // ChainArbitrator will use to notify the ChannelNotifier about a newly
169
        // resolved channel. The main difference to NotifyClosedChannel is that
170
        // in case of a local force close the NotifyClosedChannel is called when
171
        // the published commitment transaction confirms while
172
        // NotifyFullyResolvedChannel is only called when the channel is fully
173
        // resolved (which includes sweeping any time locked funds).
174
        NotifyFullyResolvedChannel func(point wire.OutPoint)
175

176
        // OnionProcessor is used to decode onion payloads for on-chain
177
        // resolution.
178
        OnionProcessor OnionProcessor
179

180
        // PaymentsExpirationGracePeriod indicates a time window we let the
181
        // other node to cancel an outgoing htlc that our node has initiated and
182
        // has timed out.
183
        PaymentsExpirationGracePeriod time.Duration
184

185
        // IsForwardedHTLC checks for a given htlc, identified by channel id and
186
        // htlcIndex, if it is a forwarded one.
187
        IsForwardedHTLC func(chanID lnwire.ShortChannelID, htlcIndex uint64) bool
188

189
        // Clock is the clock implementation that ChannelArbitrator uses.
190
        // It is useful for testing.
191
        Clock clock.Clock
192

193
        // SubscribeBreachComplete is used by the breachResolver to register a
194
        // subscription that notifies when the breach resolution process is
195
        // complete.
196
        SubscribeBreachComplete func(op *wire.OutPoint, c chan struct{}) (
197
                bool, error)
198

199
        // PutFinalHtlcOutcome stores the final outcome of an htlc in the
200
        // database.
201
        PutFinalHtlcOutcome func(chanId lnwire.ShortChannelID,
202
                htlcId uint64, settled bool) error
203

204
        // HtlcNotifier is an interface that htlc events are sent to.
205
        HtlcNotifier HtlcNotifier
206

207
        // Budget is the configured budget for the arbitrator.
208
        Budget BudgetConfig
209

210
        // QueryIncomingCircuit is used to find the outgoing HTLC's
211
        // corresponding incoming HTLC circuit. It queries the circuit map for
212
        // a given outgoing circuit key and returns the incoming circuit key.
213
        //
214
        // TODO(yy): this is a hacky way to get around the cycling import issue
215
        // as we cannot import `htlcswitch` here. A proper way is to define an
216
        // interface here that asks for method `LookupOpenCircuit`,
217
        // meanwhile, turn `PaymentCircuit` into an interface or bring it to a
218
        // lower package.
219
        QueryIncomingCircuit func(circuit models.CircuitKey) *models.CircuitKey
220
}
221

222
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
223
// active, and channel that are in the "pending close" state. Within the
224
// contractcourt package, the ChainArbitrator manages a set of active
225
// ContractArbitrators. Each ContractArbitrators is responsible for watching
226
// the chain for any activity that affects the state of the channel, and also
227
// for monitoring each contract in order to determine if any on-chain activity is
228
// required. Outside sub-systems interact with the ChainArbitrator in order to
229
// forcibly exit a contract, update the set of live signals for each contract,
230
// and to receive reports on the state of contract resolution.
231
type ChainArbitrator struct {
232
        started int32 // To be used atomically.
233
        stopped int32 // To be used atomically.
234

235
        sync.Mutex
236

237
        // activeChannels is a map of all the active contracts that are still
238
        // open, and not fully resolved.
239
        activeChannels map[wire.OutPoint]*ChannelArbitrator
240

241
        // activeWatchers is a map of all the active chainWatchers for channels
242
        // that are still considered open.
243
        activeWatchers map[wire.OutPoint]*chainWatcher
244

245
        // cfg is the config struct for the arbitrator that contains all
246
        // methods and interface it needs to operate.
247
        cfg ChainArbitratorConfig
248

249
        // chanSource will be used by the ChainArbitrator to fetch all the
250
        // active channels that it must still watch over.
251
        chanSource *channeldb.DB
252

253
        quit chan struct{}
254

255
        wg sync.WaitGroup
256
}
257

258
// NewChainArbitrator returns a new instance of the ChainArbitrator using the
259
// passed config struct, and backing persistent database.
260
func NewChainArbitrator(cfg ChainArbitratorConfig,
261
        db *channeldb.DB) *ChainArbitrator {
5✔
262

5✔
263
        return &ChainArbitrator{
5✔
264
                cfg:            cfg,
5✔
265
                activeChannels: make(map[wire.OutPoint]*ChannelArbitrator),
5✔
266
                activeWatchers: make(map[wire.OutPoint]*chainWatcher),
5✔
267
                chanSource:     db,
5✔
268
                quit:           make(chan struct{}),
5✔
269
        }
5✔
270
}
5✔
271

272
// arbChannel is a wrapper around an open channel that channel arbitrators
273
// interact with.
274
type arbChannel struct {
275
        // channel is the in-memory channel state.
276
        channel *channeldb.OpenChannel
277

278
        // c references the chain arbitrator and is used by arbChannel
279
        // internally.
280
        c *ChainArbitrator
281
}
282

283
// NewAnchorResolutions returns the anchor resolutions for currently valid
284
// commitment transactions.
285
//
286
// NOTE: Part of the ArbChannel interface.
287
func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
288
        error) {
3✔
289

3✔
290
        // Get a fresh copy of the database state to base the anchor resolutions
3✔
291
        // on. Unfortunately the channel instance that we have here isn't the
3✔
292
        // same instance that is used by the link.
3✔
293
        chanPoint := a.channel.FundingOutpoint
3✔
294

3✔
295
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(
3✔
296
                nil, chanPoint,
3✔
297
        )
3✔
298
        if err != nil {
3✔
299
                return nil, err
×
300
        }
×
301

302
        chanMachine, err := lnwallet.NewLightningChannel(
3✔
303
                a.c.cfg.Signer, channel, nil,
3✔
304
        )
3✔
305
        if err != nil {
3✔
306
                return nil, err
×
307
        }
×
308

309
        return chanMachine.NewAnchorResolutions()
3✔
310
}
311

312
// ForceCloseChan should force close the contract that this attendant is
313
// watching over. We'll use this when we decide that we need to go to chain. It
314
// should in addition tell the switch to remove the corresponding link, such
315
// that we won't accept any new updates. The returned summary contains all items
316
// needed to eventually resolve all outputs on chain.
317
//
318
// NOTE: Part of the ArbChannel interface.
319
func (a *arbChannel) ForceCloseChan() (*lnwallet.LocalForceCloseSummary, error) {
3✔
320
        // First, we mark the channel as borked, this ensure
3✔
321
        // that no new state transitions can happen, and also
3✔
322
        // that the link won't be loaded into the switch.
3✔
323
        if err := a.channel.MarkBorked(); err != nil {
3✔
324
                return nil, err
×
325
        }
×
326

327
        // With the channel marked as borked, we'll now remove
328
        // the link from the switch if its there. If the link
329
        // is active, then this method will block until it
330
        // exits.
331
        chanPoint := a.channel.FundingOutpoint
3✔
332

3✔
333
        if err := a.c.cfg.MarkLinkInactive(chanPoint); err != nil {
3✔
334
                log.Errorf("unable to mark link inactive: %v", err)
×
335
        }
×
336

337
        // Now that we know the link can't mutate the channel
338
        // state, we'll read the channel from disk the target
339
        // channel according to its channel point.
340
        channel, err := a.c.chanSource.ChannelStateDB().FetchChannel(
3✔
341
                nil, chanPoint,
3✔
342
        )
3✔
343
        if err != nil {
3✔
344
                return nil, err
×
345
        }
×
346

347
        // Finally, we'll force close the channel completing
348
        // the force close workflow.
349
        chanMachine, err := lnwallet.NewLightningChannel(
3✔
350
                a.c.cfg.Signer, channel, nil,
3✔
351
        )
3✔
352
        if err != nil {
3✔
353
                return nil, err
×
354
        }
×
355
        return chanMachine.ForceClose()
3✔
356
}
357

358
// newActiveChannelArbitrator creates a new instance of an active channel
359
// arbitrator given the state of the target channel.
360
func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
361
        c *ChainArbitrator, chanEvents *ChainEventSubscription) (*ChannelArbitrator, error) {
14✔
362

14✔
363
        // TODO(roasbeef): fetch best height (or pass in) so can ensure block
14✔
364
        // epoch delivers all the notifications to
14✔
365

14✔
366
        chanPoint := channel.FundingOutpoint
14✔
367

14✔
368
        log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", chanPoint)
14✔
369

14✔
370
        // Next we'll create the matching configuration struct that contains
14✔
371
        // all interfaces and methods the arbitrator needs to do its job.
14✔
372
        arbCfg := ChannelArbitratorConfig{
14✔
373
                ChanPoint:   chanPoint,
14✔
374
                Channel:     c.getArbChannel(channel),
14✔
375
                ShortChanID: channel.ShortChanID(),
14✔
376

14✔
377
                MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
14✔
378
                MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
14✔
379
                        statuses ...channeldb.ChannelStatus) error {
17✔
380

3✔
381
                        err := channel.CloseChannel(summary, statuses...)
3✔
382
                        if err != nil {
3✔
383
                                return err
×
384
                        }
×
385
                        c.cfg.NotifyClosedChannel(summary.ChanPoint)
3✔
386
                        return nil
3✔
387
                },
388
                IsPendingClose:        false,
389
                ChainArbitratorConfig: c.cfg,
390
                ChainEvents:           chanEvents,
391
                PutResolverReport: func(tx kvdb.RwTx,
392
                        report *channeldb.ResolverReport) error {
3✔
393

3✔
394
                        return c.chanSource.PutResolverReport(
3✔
395
                                tx, c.cfg.ChainHash, &chanPoint, report,
3✔
396
                        )
3✔
397
                },
3✔
398
                FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
3✔
399
                        chanStateDB := c.chanSource.ChannelStateDB()
3✔
400
                        return chanStateDB.FetchHistoricalChannel(&chanPoint)
3✔
401
                },
3✔
402
                FindOutgoingHTLCDeadline: func(
403
                        htlc channeldb.HTLC) fn.Option[int32] {
3✔
404

3✔
405
                        return c.FindOutgoingHTLCDeadline(
3✔
406
                                channel.ShortChanID(), htlc,
3✔
407
                        )
3✔
408
                },
3✔
409
        }
410

411
        // The final component needed is an arbitrator log that the arbitrator
412
        // will use to keep track of its internal state using a backed
413
        // persistent log.
414
        //
415
        // TODO(roasbeef); abstraction leak...
416
        //  * rework: adaptor method to set log scope w/ factory func
417
        chanLog, err := newBoltArbitratorLog(
14✔
418
                c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
14✔
419
        )
14✔
420
        if err != nil {
14✔
421
                return nil, err
×
422
        }
×
423

424
        arbCfg.MarkChannelResolved = func() error {
17✔
425
                if c.cfg.NotifyFullyResolvedChannel != nil {
6✔
426
                        c.cfg.NotifyFullyResolvedChannel(chanPoint)
3✔
427
                }
3✔
428

429
                return c.ResolveContract(chanPoint)
3✔
430
        }
431

432
        // Finally, we'll need to construct a series of htlc Sets based on all
433
        // currently known valid commitments.
434
        htlcSets := make(map[HtlcSetKey]htlcSet)
14✔
435
        htlcSets[LocalHtlcSet] = newHtlcSet(channel.LocalCommitment.Htlcs)
14✔
436
        htlcSets[RemoteHtlcSet] = newHtlcSet(channel.RemoteCommitment.Htlcs)
14✔
437

14✔
438
        pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
14✔
439
        if err != nil && err != channeldb.ErrNoPendingCommit {
14✔
440
                return nil, err
×
441
        }
×
442
        if pendingRemoteCommitment != nil {
14✔
443
                htlcSets[RemotePendingHtlcSet] = newHtlcSet(
×
444
                        pendingRemoteCommitment.Commitment.Htlcs,
×
445
                )
×
446
        }
×
447

448
        return NewChannelArbitrator(
14✔
449
                arbCfg, htlcSets, chanLog,
14✔
450
        ), nil
14✔
451
}
452

453
// getArbChannel returns an open channel wrapper for use by channel arbitrators.
454
func (c *ChainArbitrator) getArbChannel(
455
        channel *channeldb.OpenChannel) *arbChannel {
14✔
456

14✔
457
        return &arbChannel{
14✔
458
                channel: channel,
14✔
459
                c:       c,
14✔
460
        }
14✔
461
}
14✔
462

463
// ResolveContract marks a contract as fully resolved within the database.
464
// This is only to be done once all contracts which were live on the channel
465
// before hitting the chain have been resolved.
466
func (c *ChainArbitrator) ResolveContract(chanPoint wire.OutPoint) error {
5✔
467
        log.Infof("Marking ChannelPoint(%v) fully resolved", chanPoint)
5✔
468

5✔
469
        // First, we'll we'll mark the channel as fully closed from the PoV of
5✔
470
        // the channel source.
5✔
471
        err := c.chanSource.ChannelStateDB().MarkChanFullyClosed(&chanPoint)
5✔
472
        if err != nil {
5✔
473
                log.Errorf("ChainArbitrator: unable to mark ChannelPoint(%v) "+
×
474
                        "fully closed: %v", chanPoint, err)
×
475
                return err
×
476
        }
×
477

478
        // Now that the channel has been marked as fully closed, we'll stop
479
        // both the channel arbitrator and chain watcher for this channel if
480
        // they're still active.
481
        var arbLog ArbitratorLog
5✔
482
        c.Lock()
5✔
483
        chainArb := c.activeChannels[chanPoint]
5✔
484
        delete(c.activeChannels, chanPoint)
5✔
485

5✔
486
        chainWatcher := c.activeWatchers[chanPoint]
5✔
487
        delete(c.activeWatchers, chanPoint)
5✔
488
        c.Unlock()
5✔
489

5✔
490
        if chainArb != nil {
9✔
491
                arbLog = chainArb.log
4✔
492

4✔
493
                if err := chainArb.Stop(); err != nil {
4✔
494
                        log.Warnf("unable to stop ChannelArbitrator(%v): %v",
×
495
                                chanPoint, err)
×
496
                }
×
497
        }
498
        if chainWatcher != nil {
9✔
499
                if err := chainWatcher.Stop(); err != nil {
4✔
500
                        log.Warnf("unable to stop ChainWatcher(%v): %v",
×
501
                                chanPoint, err)
×
502
                }
×
503
        }
504

505
        // Once this has been marked as resolved, we'll wipe the log that the
506
        // channel arbitrator was using to store its persistent state. We do
507
        // this after marking the channel resolved, as otherwise, the
508
        // arbitrator would be re-created, and think it was starting from the
509
        // default state.
510
        if arbLog != nil {
9✔
511
                if err := arbLog.WipeHistory(); err != nil {
4✔
512
                        return err
×
513
                }
×
514
        }
515

516
        return nil
5✔
517
}
518

519
// Start launches all goroutines that the ChainArbitrator needs to operate.
520
func (c *ChainArbitrator) Start() error {
5✔
521
        if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
5✔
522
                return nil
×
523
        }
×
524

525
        log.Infof("ChainArbitrator starting with config: budget=[%v]",
5✔
526
                &c.cfg.Budget)
5✔
527

5✔
528
        // First, we'll fetch all the channels that are still open, in order to
5✔
529
        // collect them within our set of active contracts.
5✔
530
        openChannels, err := c.chanSource.ChannelStateDB().FetchAllChannels()
5✔
531
        if err != nil {
5✔
532
                return err
×
533
        }
×
534

535
        if len(openChannels) > 0 {
10✔
536
                log.Infof("Creating ChannelArbitrators for %v active channels",
5✔
537
                        len(openChannels))
5✔
538
        }
5✔
539

540
        // For each open channel, we'll configure then launch a corresponding
541
        // ChannelArbitrator.
542
        for _, channel := range openChannels {
19✔
543
                chanPoint := channel.FundingOutpoint
14✔
544
                channel := channel
14✔
545

14✔
546
                // First, we'll create an active chainWatcher for this channel
14✔
547
                // to ensure that we detect any relevant on chain events.
14✔
548
                breachClosure := func(ret *lnwallet.BreachRetribution) error {
17✔
549
                        return c.cfg.ContractBreach(chanPoint, ret)
3✔
550
                }
3✔
551

552
                chainWatcher, err := newChainWatcher(
14✔
553
                        chainWatcherConfig{
14✔
554
                                chanState:           channel,
14✔
555
                                notifier:            c.cfg.Notifier,
14✔
556
                                signer:              c.cfg.Signer,
14✔
557
                                isOurAddr:           c.cfg.IsOurAddress,
14✔
558
                                contractBreach:      breachClosure,
14✔
559
                                extractStateNumHint: lnwallet.GetStateNumHint,
14✔
560
                        },
14✔
561
                )
14✔
562
                if err != nil {
14✔
563
                        return err
×
564
                }
×
565

566
                c.activeWatchers[chanPoint] = chainWatcher
14✔
567
                channelArb, err := newActiveChannelArbitrator(
14✔
568
                        channel, c, chainWatcher.SubscribeChannelEvents(),
14✔
569
                )
14✔
570
                if err != nil {
14✔
571
                        return err
×
572
                }
×
573

574
                c.activeChannels[chanPoint] = channelArb
14✔
575

14✔
576
                // Republish any closing transactions for this channel.
14✔
577
                err = c.republishClosingTxs(channel)
14✔
578
                if err != nil {
14✔
579
                        log.Errorf("Failed to republish closing txs for "+
×
580
                                "channel %v", chanPoint)
×
581
                }
×
582
        }
583

584
        // In addition to the channels that we know to be open, we'll also
585
        // launch arbitrators to finishing resolving any channels that are in
586
        // the pending close state.
587
        closingChannels, err := c.chanSource.ChannelStateDB().FetchClosedChannels(
5✔
588
                true,
5✔
589
        )
5✔
590
        if err != nil {
5✔
591
                return err
×
592
        }
×
593

594
        if len(closingChannels) > 0 {
8✔
595
                log.Infof("Creating ChannelArbitrators for %v closing channels",
3✔
596
                        len(closingChannels))
3✔
597
        }
3✔
598

599
        // Next, for each channel is the closing state, we'll launch a
600
        // corresponding more restricted resolver, as we don't have to watch
601
        // the chain any longer, only resolve the contracts on the confirmed
602
        // commitment.
603
        //nolint:lll
604
        for _, closeChanInfo := range closingChannels {
8✔
605
                // We can leave off the CloseContract and ForceCloseChan
3✔
606
                // methods as the channel is already closed at this point.
3✔
607
                chanPoint := closeChanInfo.ChanPoint
3✔
608
                arbCfg := ChannelArbitratorConfig{
3✔
609
                        ChanPoint:             chanPoint,
3✔
610
                        ShortChanID:           closeChanInfo.ShortChanID,
3✔
611
                        ChainArbitratorConfig: c.cfg,
3✔
612
                        ChainEvents:           &ChainEventSubscription{},
3✔
613
                        IsPendingClose:        true,
3✔
614
                        ClosingHeight:         closeChanInfo.CloseHeight,
3✔
615
                        CloseType:             closeChanInfo.CloseType,
3✔
616
                        PutResolverReport: func(tx kvdb.RwTx,
3✔
617
                                report *channeldb.ResolverReport) error {
6✔
618

3✔
619
                                return c.chanSource.PutResolverReport(
3✔
620
                                        tx, c.cfg.ChainHash, &chanPoint, report,
3✔
621
                                )
3✔
622
                        },
3✔
623
                        FetchHistoricalChannel: func() (*channeldb.OpenChannel, error) {
3✔
624
                                chanStateDB := c.chanSource.ChannelStateDB()
3✔
625
                                return chanStateDB.FetchHistoricalChannel(&chanPoint)
3✔
626
                        },
3✔
627
                        FindOutgoingHTLCDeadline: func(
628
                                htlc channeldb.HTLC) fn.Option[int32] {
3✔
629

3✔
630
                                return c.FindOutgoingHTLCDeadline(
3✔
631
                                        closeChanInfo.ShortChanID, htlc,
3✔
632
                                )
3✔
633
                        },
3✔
634
                }
635
                chanLog, err := newBoltArbitratorLog(
3✔
636
                        c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
3✔
637
                )
3✔
638
                if err != nil {
3✔
639
                        return err
×
640
                }
×
641
                arbCfg.MarkChannelResolved = func() error {
6✔
642
                        if c.cfg.NotifyFullyResolvedChannel != nil {
6✔
643
                                c.cfg.NotifyFullyResolvedChannel(chanPoint)
3✔
644
                        }
3✔
645

646
                        return c.ResolveContract(chanPoint)
3✔
647
                }
648

649
                // We create an empty map of HTLC's here since it's possible
650
                // that the channel is in StateDefault and updateActiveHTLCs is
651
                // called. We want to avoid writing to an empty map. Since the
652
                // channel is already in the process of being resolved, no new
653
                // HTLCs will be added.
654
                c.activeChannels[chanPoint] = NewChannelArbitrator(
3✔
655
                        arbCfg, make(map[HtlcSetKey]htlcSet), chanLog,
3✔
656
                )
3✔
657
        }
658

659
        // Now, we'll start all chain watchers in parallel to shorten start up
660
        // duration. In neutrino mode, this allows spend registrations to take
661
        // advantage of batch spend reporting, instead of doing a single rescan
662
        // per chain watcher.
663
        //
664
        // NOTE: After this point, we Stop the chain arb to ensure that any
665
        // lingering goroutines are cleaned up before exiting.
666
        watcherErrs := make(chan error, len(c.activeWatchers))
5✔
667
        var wg sync.WaitGroup
5✔
668
        for _, watcher := range c.activeWatchers {
19✔
669
                wg.Add(1)
14✔
670
                go func(w *chainWatcher) {
28✔
671
                        defer wg.Done()
14✔
672
                        select {
14✔
673
                        case watcherErrs <- w.Start():
14✔
674
                        case <-c.quit:
×
675
                                watcherErrs <- ErrChainArbExiting
×
676
                        }
677
                }(watcher)
678
        }
679

680
        // Once all chain watchers have been started, seal the err chan to
681
        // signal the end of the err stream.
682
        go func() {
10✔
683
                wg.Wait()
5✔
684
                close(watcherErrs)
5✔
685
        }()
5✔
686

687
        // stopAndLog is a helper function which shuts down the chain arb and
688
        // logs errors if they occur.
689
        stopAndLog := func() {
5✔
690
                if err := c.Stop(); err != nil {
×
691
                        log.Errorf("ChainArbitrator could not shutdown: %v", err)
×
692
                }
×
693
        }
694

695
        // Handle all errors returned from spawning our chain watchers. If any
696
        // of them failed, we will stop the chain arb to shutdown any active
697
        // goroutines.
698
        for err := range watcherErrs {
19✔
699
                if err != nil {
14✔
700
                        stopAndLog()
×
701
                        return err
×
702
                }
×
703
        }
704

705
        // Before we start all of our arbitrators, we do a preliminary state
706
        // lookup so that we can combine all of these lookups in a single db
707
        // transaction.
708
        var startStates map[wire.OutPoint]*chanArbStartState
5✔
709

5✔
710
        err = kvdb.View(c.chanSource, func(tx walletdb.ReadTx) error {
10✔
711
                for _, arbitrator := range c.activeChannels {
19✔
712
                        startState, err := arbitrator.getStartState(tx)
14✔
713
                        if err != nil {
14✔
714
                                return err
×
715
                        }
×
716

717
                        startStates[arbitrator.cfg.ChanPoint] = startState
14✔
718
                }
719

720
                return nil
5✔
721
        }, func() {
5✔
722
                startStates = make(
5✔
723
                        map[wire.OutPoint]*chanArbStartState,
5✔
724
                        len(c.activeChannels),
5✔
725
                )
5✔
726
        })
5✔
727
        if err != nil {
5✔
728
                stopAndLog()
×
729
                return err
×
730
        }
×
731

732
        // Launch all the goroutines for each arbitrator so they can carry out
733
        // their duties.
734
        for _, arbitrator := range c.activeChannels {
19✔
735
                startState, ok := startStates[arbitrator.cfg.ChanPoint]
14✔
736
                if !ok {
14✔
737
                        stopAndLog()
×
738
                        return fmt.Errorf("arbitrator: %v has no start state",
×
739
                                arbitrator.cfg.ChanPoint)
×
740
                }
×
741

742
                if err := arbitrator.Start(startState); err != nil {
14✔
743
                        stopAndLog()
×
744
                        return err
×
745
                }
×
746
        }
747

748
        // Subscribe to a single stream of block epoch notifications that we
749
        // will dispatch to all active arbitrators.
750
        blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
5✔
751
        if err != nil {
5✔
752
                return err
×
753
        }
×
754

755
        // Start our goroutine which will dispatch blocks to each arbitrator.
756
        c.wg.Add(1)
5✔
757
        go func() {
10✔
758
                defer c.wg.Done()
5✔
759
                c.dispatchBlocks(blockEpoch)
5✔
760
        }()
5✔
761

762
        // TODO(roasbeef): eventually move all breach watching here
763

764
        return nil
5✔
765
}
766

767
// blockRecipient contains the information we need to dispatch a block to a
768
// channel arbitrator.
769
type blockRecipient struct {
770
        // chanPoint is the funding outpoint of the channel.
771
        chanPoint wire.OutPoint
772

773
        // blocks is the channel that new block heights are sent into. This
774
        // channel should be sufficiently buffered as to not block the sender.
775
        blocks chan<- int32
776

777
        // quit is closed if the receiving entity is shutting down.
778
        quit chan struct{}
779
}
780

781
// dispatchBlocks consumes a block epoch notification stream and dispatches
782
// blocks to each of the chain arb's active channel arbitrators. This function
783
// must be run in a goroutine.
784
func (c *ChainArbitrator) dispatchBlocks(
785
        blockEpoch *chainntnfs.BlockEpochEvent) {
5✔
786

5✔
787
        // getRecipients is a helper function which acquires the chain arb
5✔
788
        // lock and returns a set of block recipients which can be used to
5✔
789
        // dispatch blocks.
5✔
790
        getRecipients := func() []blockRecipient {
10✔
791
                c.Lock()
5✔
792
                blocks := make([]blockRecipient, 0, len(c.activeChannels))
5✔
793
                for _, channel := range c.activeChannels {
18✔
794
                        blocks = append(blocks, blockRecipient{
13✔
795
                                chanPoint: channel.cfg.ChanPoint,
13✔
796
                                blocks:    channel.blocks,
13✔
797
                                quit:      channel.quit,
13✔
798
                        })
13✔
799
                }
13✔
800
                c.Unlock()
5✔
801

5✔
802
                return blocks
5✔
803
        }
804

805
        // On exit, cancel our blocks subscription and close each block channel
806
        // so that the arbitrators know they will no longer be receiving blocks.
807
        defer func() {
10✔
808
                blockEpoch.Cancel()
5✔
809

5✔
810
                recipients := getRecipients()
5✔
811
                for _, recipient := range recipients {
18✔
812
                        close(recipient.blocks)
13✔
813
                }
13✔
814
        }()
815

816
        // Consume block epochs until we receive the instruction to shutdown.
817
        for {
10✔
818
                select {
5✔
819
                // Consume block epochs, exiting if our subscription is
820
                // terminated.
821
                case block, ok := <-blockEpoch.Epochs:
3✔
822
                        if !ok {
3✔
823
                                log.Trace("dispatchBlocks block epoch " +
×
824
                                        "cancelled")
×
825
                                return
×
826
                        }
×
827

828
                        // Get the set of currently active channels block
829
                        // subscription channels and dispatch the block to
830
                        // each.
831
                        for _, recipient := range getRecipients() {
6✔
832
                                select {
3✔
833
                                // Deliver the block to the arbitrator.
834
                                case recipient.blocks <- block.Height:
3✔
835

836
                                // If the recipient is shutting down, exit
837
                                // without delivering the block. This may be
838
                                // the case when two blocks are mined in quick
839
                                // succession, and the arbitrator resolves
840
                                // after the first block, and does not need to
841
                                // consume the second block.
842
                                case <-recipient.quit:
×
843
                                        log.Debugf("channel: %v exit without "+
×
844
                                                "receiving block: %v",
×
845
                                                recipient.chanPoint,
×
846
                                                block.Height)
×
847

848
                                // If the chain arb is shutting down, we don't
849
                                // need to deliver any more blocks (everything
850
                                // will be shutting down).
851
                                case <-c.quit:
×
852
                                        return
×
853
                                }
854
                        }
855

856
                // Exit if the chain arbitrator is shutting down.
857
                case <-c.quit:
5✔
858
                        return
5✔
859
                }
860
        }
861
}
862

863
// republishClosingTxs will load any stored cooperative or unilateral closing
864
// transactions and republish them. This helps ensure propagation of the
865
// transactions in the event that prior publications failed.
866
func (c *ChainArbitrator) republishClosingTxs(
867
        channel *channeldb.OpenChannel) error {
14✔
868

14✔
869
        // If the channel has had its unilateral close broadcasted already,
14✔
870
        // republish it in case it didn't propagate.
14✔
871
        if channel.HasChanStatus(channeldb.ChanStatusCommitBroadcasted) {
22✔
872
                err := c.rebroadcast(
8✔
873
                        channel, channeldb.ChanStatusCommitBroadcasted,
8✔
874
                )
8✔
875
                if err != nil {
8✔
876
                        return err
×
877
                }
×
878
        }
879

880
        // If the channel has had its cooperative close broadcasted
881
        // already, republish it in case it didn't propagate.
882
        if channel.HasChanStatus(channeldb.ChanStatusCoopBroadcasted) {
19✔
883
                err := c.rebroadcast(
5✔
884
                        channel, channeldb.ChanStatusCoopBroadcasted,
5✔
885
                )
5✔
886
                if err != nil {
5✔
887
                        return err
×
888
                }
×
889
        }
890

891
        return nil
14✔
892
}
893

894
// rebroadcast is a helper method which will republish the unilateral or
895
// cooperative close transaction or a channel in a particular state.
896
//
897
// NOTE: There is no risk to calling this method if the channel isn't in either
898
// CommitmentBroadcasted or CoopBroadcasted, but the logs will be misleading.
899
func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
900
        state channeldb.ChannelStatus) error {
13✔
901

13✔
902
        chanPoint := channel.FundingOutpoint
13✔
903

13✔
904
        var (
13✔
905
                closeTx *wire.MsgTx
13✔
906
                kind    string
13✔
907
                err     error
13✔
908
        )
13✔
909
        switch state {
13✔
910
        case channeldb.ChanStatusCommitBroadcasted:
8✔
911
                kind = "force"
8✔
912
                closeTx, err = channel.BroadcastedCommitment()
8✔
913

914
        case channeldb.ChanStatusCoopBroadcasted:
5✔
915
                kind = "coop"
5✔
916
                closeTx, err = channel.BroadcastedCooperative()
5✔
917

918
        default:
×
919
                return fmt.Errorf("unknown closing state: %v", state)
×
920
        }
921

922
        switch {
13✔
923
        // This can happen for channels that had their closing tx published
924
        // before we started storing it to disk.
925
        case err == channeldb.ErrNoCloseTx:
×
926
                log.Warnf("Channel %v is in state %v, but no %s closing tx "+
×
927
                        "to re-publish...", chanPoint, state, kind)
×
928
                return nil
×
929

930
        case err != nil:
×
931
                return err
×
932
        }
933

934
        log.Infof("Re-publishing %s close tx(%v) for channel %v",
13✔
935
                kind, closeTx.TxHash(), chanPoint)
13✔
936

13✔
937
        label := labels.MakeLabel(
13✔
938
                labels.LabelTypeChannelClose, &channel.ShortChannelID,
13✔
939
        )
13✔
940
        err = c.cfg.PublishTx(closeTx, label)
13✔
941
        if err != nil && err != lnwallet.ErrDoubleSpend {
13✔
942
                log.Warnf("Unable to broadcast %s close tx(%v): %v",
×
943
                        kind, closeTx.TxHash(), err)
×
944
        }
×
945

946
        return nil
13✔
947
}
948

949
// Stop signals the ChainArbitrator to trigger a graceful shutdown. Any active
950
// channel arbitrators will be signalled to exit, and this method will block
951
// until they've all exited.
952
func (c *ChainArbitrator) Stop() error {
5✔
953
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
5✔
954
                return nil
×
955
        }
×
956

957
        log.Info("ChainArbitrator shutting down...")
5✔
958
        defer log.Debug("ChainArbitrator shutdown complete")
5✔
959

5✔
960
        close(c.quit)
5✔
961

5✔
962
        var (
5✔
963
                activeWatchers = make(map[wire.OutPoint]*chainWatcher)
5✔
964
                activeChannels = make(map[wire.OutPoint]*ChannelArbitrator)
5✔
965
        )
5✔
966

5✔
967
        // Copy the current set of active watchers and arbitrators to shutdown.
5✔
968
        // We don't want to hold the lock when shutting down each watcher or
5✔
969
        // arbitrator individually, as they may need to acquire this mutex.
5✔
970
        c.Lock()
5✔
971
        for chanPoint, watcher := range c.activeWatchers {
18✔
972
                activeWatchers[chanPoint] = watcher
13✔
973
        }
13✔
974
        for chanPoint, arbitrator := range c.activeChannels {
18✔
975
                activeChannels[chanPoint] = arbitrator
13✔
976
        }
13✔
977
        c.Unlock()
5✔
978

5✔
979
        for chanPoint, watcher := range activeWatchers {
18✔
980
                log.Tracef("Attempting to stop ChainWatcher(%v)",
13✔
981
                        chanPoint)
13✔
982

13✔
983
                if err := watcher.Stop(); err != nil {
13✔
984
                        log.Errorf("unable to stop watcher for "+
×
985
                                "ChannelPoint(%v): %v", chanPoint, err)
×
986
                }
×
987
        }
988
        for chanPoint, arbitrator := range activeChannels {
18✔
989
                log.Tracef("Attempting to stop ChannelArbitrator(%v)",
13✔
990
                        chanPoint)
13✔
991

13✔
992
                if err := arbitrator.Stop(); err != nil {
13✔
993
                        log.Errorf("unable to stop arbitrator for "+
×
994
                                "ChannelPoint(%v): %v", chanPoint, err)
×
995
                }
×
996
        }
997

998
        c.wg.Wait()
5✔
999

5✔
1000
        return nil
5✔
1001
}
1002

1003
// ContractUpdate is a message packages the latest set of active HTLCs on a
1004
// commitment, and also identifies which commitment received a new set of
1005
// HTLCs.
1006
type ContractUpdate struct {
1007
        // HtlcKey identifies which commitment the HTLCs below are present on.
1008
        HtlcKey HtlcSetKey
1009

1010
        // Htlcs are the of active HTLCs on the commitment identified by the
1011
        // above HtlcKey.
1012
        Htlcs []channeldb.HTLC
1013
}
1014

1015
// ContractSignals is used by outside subsystems to notify a channel arbitrator
1016
// of its ShortChannelID.
1017
type ContractSignals struct {
1018
        // ShortChanID is the up to date short channel ID for a contract. This
1019
        // can change either if when the contract was added it didn't yet have
1020
        // a stable identifier, or in the case of a reorg.
1021
        ShortChanID lnwire.ShortChannelID
1022
}
1023

1024
// UpdateContractSignals sends a set of active, up to date contract signals to
1025
// the ChannelArbitrator which is has been assigned to the channel infield by
1026
// the passed channel point.
1027
func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint,
1028
        signals *ContractSignals) error {
3✔
1029

3✔
1030
        log.Infof("Attempting to update ContractSignals for ChannelPoint(%v)",
3✔
1031
                chanPoint)
3✔
1032

3✔
1033
        c.Lock()
3✔
1034
        arbitrator, ok := c.activeChannels[chanPoint]
3✔
1035
        c.Unlock()
3✔
1036
        if !ok {
3✔
1037
                return fmt.Errorf("unable to find arbitrator")
×
1038
        }
×
1039

1040
        arbitrator.UpdateContractSignals(signals)
3✔
1041

3✔
1042
        return nil
3✔
1043
}
1044

1045
// NotifyContractUpdate lets a channel arbitrator know that a new
1046
// ContractUpdate is available. This calls the ChannelArbitrator's internal
1047
// method NotifyContractUpdate which waits for a response on a done chan before
1048
// returning. This method will return an error if the ChannelArbitrator is not
1049
// in the activeChannels map. However, this only happens if the arbitrator is
1050
// resolved and the related link would already be shut down.
1051
func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
1052
        update *ContractUpdate) error {
3✔
1053

3✔
1054
        c.Lock()
3✔
1055
        arbitrator, ok := c.activeChannels[chanPoint]
3✔
1056
        c.Unlock()
3✔
1057
        if !ok {
3✔
1058
                return fmt.Errorf("can't find arbitrator for %v", chanPoint)
×
1059
        }
×
1060

1061
        arbitrator.notifyContractUpdate(update)
3✔
1062
        return nil
3✔
1063
}
1064

1065
// GetChannelArbitrator safely returns the channel arbitrator for a given
1066
// channel outpoint.
1067
func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) (
1068
        *ChannelArbitrator, error) {
3✔
1069

3✔
1070
        c.Lock()
3✔
1071
        arbitrator, ok := c.activeChannels[chanPoint]
3✔
1072
        c.Unlock()
3✔
1073
        if !ok {
3✔
1074
                return nil, fmt.Errorf("unable to find arbitrator")
×
1075
        }
×
1076

1077
        return arbitrator, nil
3✔
1078
}
1079

1080
// forceCloseReq is a request sent from an outside sub-system to the arbitrator
1081
// that watches a particular channel to broadcast the commitment transaction,
1082
// and enter the resolution phase of the channel.
1083
type forceCloseReq struct {
1084
        // errResp is a channel that will be sent upon either in the case of
1085
        // force close success (nil error), or in the case on an error.
1086
        //
1087
        // NOTE; This channel MUST be buffered.
1088
        errResp chan error
1089

1090
        // closeTx is a channel that carries the transaction which ultimately
1091
        // closed out the channel.
1092
        closeTx chan *wire.MsgTx
1093
}
1094

1095
// ForceCloseContract attempts to force close the channel infield by the passed
1096
// channel point. A force close will immediately terminate the contract,
1097
// causing it to enter the resolution phase. If the force close was successful,
1098
// then the force close transaction itself will be returned.
1099
//
1100
// TODO(roasbeef): just return the summary itself?
1101
func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.MsgTx, error) {
3✔
1102
        c.Lock()
3✔
1103
        arbitrator, ok := c.activeChannels[chanPoint]
3✔
1104
        c.Unlock()
3✔
1105
        if !ok {
3✔
1106
                return nil, fmt.Errorf("unable to find arbitrator")
×
1107
        }
×
1108

1109
        log.Infof("Attempting to force close ChannelPoint(%v)", chanPoint)
3✔
1110

3✔
1111
        // Before closing, we'll attempt to send a disable update for the
3✔
1112
        // channel. We do so before closing the channel as otherwise the current
3✔
1113
        // edge policy won't be retrievable from the graph.
3✔
1114
        if err := c.cfg.DisableChannel(chanPoint); err != nil {
3✔
UNCOV
1115
                log.Warnf("Unable to disable channel %v on "+
×
UNCOV
1116
                        "close: %v", chanPoint, err)
×
UNCOV
1117
        }
×
1118

1119
        errChan := make(chan error, 1)
3✔
1120
        respChan := make(chan *wire.MsgTx, 1)
3✔
1121

3✔
1122
        // With the channel found, and the request crafted, we'll send over a
3✔
1123
        // force close request to the arbitrator that watches this channel.
3✔
1124
        select {
3✔
1125
        case arbitrator.forceCloseReqs <- &forceCloseReq{
1126
                errResp: errChan,
1127
                closeTx: respChan,
1128
        }:
3✔
1129
        case <-c.quit:
×
1130
                return nil, ErrChainArbExiting
×
1131
        }
1132

1133
        // We'll await two responses: the error response, and the transaction
1134
        // that closed out the channel.
1135
        select {
3✔
1136
        case err := <-errChan:
3✔
1137
                if err != nil {
6✔
1138
                        return nil, err
3✔
1139
                }
3✔
1140
        case <-c.quit:
×
1141
                return nil, ErrChainArbExiting
×
1142
        }
1143

1144
        var closeTx *wire.MsgTx
3✔
1145
        select {
3✔
1146
        case closeTx = <-respChan:
3✔
1147
        case <-c.quit:
×
1148
                return nil, ErrChainArbExiting
×
1149
        }
1150

1151
        return closeTx, nil
3✔
1152
}
1153

1154
// WatchNewChannel sends the ChainArbitrator a message to create a
1155
// ChannelArbitrator tasked with watching over a new channel. Once a new
1156
// channel has finished its final funding flow, it should be registered with
1157
// the ChainArbitrator so we can properly react to any on-chain events.
1158
func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error {
3✔
1159
        c.Lock()
3✔
1160
        defer c.Unlock()
3✔
1161

3✔
1162
        chanPoint := newChan.FundingOutpoint
3✔
1163

3✔
1164
        log.Infof("Creating new ChannelArbitrator for ChannelPoint(%v)",
3✔
1165
                chanPoint)
3✔
1166

3✔
1167
        // If we're already watching this channel, then we'll ignore this
3✔
1168
        // request.
3✔
1169
        if _, ok := c.activeChannels[chanPoint]; ok {
3✔
1170
                return nil
×
1171
        }
×
1172

1173
        // First, also create an active chainWatcher for this channel to ensure
1174
        // that we detect any relevant on chain events.
1175
        chainWatcher, err := newChainWatcher(
3✔
1176
                chainWatcherConfig{
3✔
1177
                        chanState: newChan,
3✔
1178
                        notifier:  c.cfg.Notifier,
3✔
1179
                        signer:    c.cfg.Signer,
3✔
1180
                        isOurAddr: c.cfg.IsOurAddress,
3✔
1181
                        contractBreach: func(
3✔
1182
                                retInfo *lnwallet.BreachRetribution) error {
6✔
1183

3✔
1184
                                return c.cfg.ContractBreach(
3✔
1185
                                        chanPoint, retInfo,
3✔
1186
                                )
3✔
1187
                        },
3✔
1188
                        extractStateNumHint: lnwallet.GetStateNumHint,
1189
                },
1190
        )
1191
        if err != nil {
3✔
1192
                return err
×
1193
        }
×
1194

1195
        c.activeWatchers[chanPoint] = chainWatcher
3✔
1196

3✔
1197
        // We'll also create a new channel arbitrator instance using this new
3✔
1198
        // channel, and our internal state.
3✔
1199
        channelArb, err := newActiveChannelArbitrator(
3✔
1200
                newChan, c, chainWatcher.SubscribeChannelEvents(),
3✔
1201
        )
3✔
1202
        if err != nil {
3✔
1203
                return err
×
1204
        }
×
1205

1206
        // With the arbitrator created, we'll add it to our set of active
1207
        // arbitrators, then launch it.
1208
        c.activeChannels[chanPoint] = channelArb
3✔
1209

3✔
1210
        if err := channelArb.Start(nil); err != nil {
3✔
1211
                return err
×
1212
        }
×
1213

1214
        return chainWatcher.Start()
3✔
1215
}
1216

1217
// SubscribeChannelEvents returns a new active subscription for the set of
1218
// possible on-chain events for a particular channel. The struct can be used by
1219
// callers to be notified whenever an event that changes the state of the
1220
// channel on-chain occurs.
1221
func (c *ChainArbitrator) SubscribeChannelEvents(
1222
        chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
3✔
1223

3✔
1224
        // First, we'll attempt to look up the active watcher for this channel.
3✔
1225
        // If we can't find it, then we'll return an error back to the caller.
3✔
1226
        c.Lock()
3✔
1227
        watcher, ok := c.activeWatchers[chanPoint]
3✔
1228
        c.Unlock()
3✔
1229

3✔
1230
        if !ok {
3✔
1231
                return nil, fmt.Errorf("unable to find watcher for: %v",
×
1232
                        chanPoint)
×
1233
        }
×
1234

1235
        // With the watcher located, we'll request for it to create a new chain
1236
        // event subscription client.
1237
        return watcher.SubscribeChannelEvents(), nil
3✔
1238
}
1239

1240
// FindOutgoingHTLCDeadline returns the deadline in absolute block height for
1241
// the specified outgoing HTLC. For an outgoing HTLC, its deadline is defined
1242
// by the timeout height of its corresponding incoming HTLC - this is the
1243
// expiry height the that remote peer can spend his/her outgoing HTLC via the
1244
// timeout path.
1245
func (c *ChainArbitrator) FindOutgoingHTLCDeadline(scid lnwire.ShortChannelID,
1246
        outgoingHTLC channeldb.HTLC) fn.Option[int32] {
3✔
1247

3✔
1248
        // Find the outgoing HTLC's corresponding incoming HTLC in the circuit
3✔
1249
        // map.
3✔
1250
        rHash := outgoingHTLC.RHash
3✔
1251
        circuit := models.CircuitKey{
3✔
1252
                ChanID: scid,
3✔
1253
                HtlcID: outgoingHTLC.HtlcIndex,
3✔
1254
        }
3✔
1255
        incomingCircuit := c.cfg.QueryIncomingCircuit(circuit)
3✔
1256

3✔
1257
        // If there's no incoming circuit found, we will use the default
3✔
1258
        // deadline.
3✔
1259
        if incomingCircuit == nil {
6✔
1260
                log.Warnf("ChannelArbitrator(%v): incoming circuit key not "+
3✔
1261
                        "found for rHash=%x, using default deadline instead",
3✔
1262
                        scid, rHash)
3✔
1263

3✔
1264
                return fn.None[int32]()
3✔
1265
        }
3✔
1266

1267
        // If this is a locally initiated HTLC, it means we are the first hop.
1268
        // In this case, we can relax the deadline.
1269
        if incomingCircuit.ChanID.IsDefault() {
6✔
1270
                log.Infof("ChannelArbitrator(%v): using default deadline for "+
3✔
1271
                        "locally initiated HTLC for rHash=%x", scid, rHash)
3✔
1272

3✔
1273
                return fn.None[int32]()
3✔
1274
        }
3✔
1275

1276
        log.Debugf("Found incoming circuit %v for rHash=%x using outgoing "+
3✔
1277
                "circuit %v", incomingCircuit, rHash, circuit)
3✔
1278

3✔
1279
        c.Lock()
3✔
1280
        defer c.Unlock()
3✔
1281

3✔
1282
        // Iterate over all active channels to find the incoming HTLC specified
3✔
1283
        // by its circuit key.
3✔
1284
        for cp, channelArb := range c.activeChannels {
6✔
1285
                // Skip if the SCID doesn't match.
3✔
1286
                if channelArb.cfg.ShortChanID != incomingCircuit.ChanID {
6✔
1287
                        continue
3✔
1288
                }
1289

1290
                // Make sure the channel arbitrator has the latest view of its
1291
                // active HTLCs.
1292
                channelArb.updateActiveHTLCs()
3✔
1293

3✔
1294
                // Iterate all the known HTLCs to find the targeted incoming
3✔
1295
                // HTLC.
3✔
1296
                for _, htlcs := range channelArb.activeHTLCs {
6✔
1297
                        for _, htlc := range htlcs.incomingHTLCs {
6✔
1298
                                // Skip if the index doesn't match.
3✔
1299
                                if htlc.HtlcIndex != incomingCircuit.HtlcID {
6✔
1300
                                        continue
3✔
1301
                                }
1302

1303
                                log.Debugf("ChannelArbitrator(%v): found "+
3✔
1304
                                        "incoming HTLC in channel=%v using "+
3✔
1305
                                        "rHash=%x, refundTimeout=%v", scid,
3✔
1306
                                        cp, rHash, htlc.RefundTimeout)
3✔
1307

3✔
1308
                                return fn.Some(int32(htlc.RefundTimeout))
3✔
1309
                        }
1310
                }
1311
        }
1312

1313
        // If there's no incoming HTLC found, yet we have the incoming circuit,
1314
        // something is wrong - in this case, we return the none deadline.
1315
        log.Errorf("ChannelArbitrator(%v): incoming HTLC not found for "+
3✔
1316
                "rHash=%x, using default deadline instead", scid, rHash)
3✔
1317

3✔
1318
        return fn.None[int32]()
3✔
1319
}
1320

1321
// TODO(roasbeef): arbitration reports
1322
//  * types: contested, waiting for success conf, etc
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc