• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17685638204

12 Sep 2025 08:45PM UTC coverage: 66.624% (-0.03%) from 66.651%
17685638204

Pull #10015

github

web-flow
Merge 0f62b1831 into 5082566ed
Pull Request #10015: graph/db: add zombie channels cleanup routine

59 of 67 new or added lines in 2 files covered. (88.06%)

126 existing lines in 24 files now uncovered.

136275 of 204543 relevant lines covered (66.62%)

21395.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.31
/chainntnfs/txnotifier.go
1
package chainntnfs
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/txscript"
13
        "github.com/btcsuite/btcd/wire"
14
)
15

16
const (
17
        // ReorgSafetyLimit is the chain depth beyond which it is assumed a
18
        // block will not be reorganized out of the chain. This is used to
19
        // determine when to prune old confirmation requests so that reorgs are
20
        // handled correctly. The average number of blocks in a day is a
21
        // reasonable value to use.
22
        ReorgSafetyLimit = 144
23

24
        // MaxNumConfs is the maximum number of confirmations that can be
25
        // requested on a transaction.
26
        MaxNumConfs = ReorgSafetyLimit
27
)
28

29
var (
30
        // ZeroHash is the value that should be used as the txid when
31
        // registering for the confirmation of a script on-chain. This allows
32
        // the notifier to match _and_ dispatch upon the inclusion of the script
33
        // on-chain, rather than the txid.
34
        ZeroHash chainhash.Hash
35

36
        // ZeroOutPoint is the value that should be used as the outpoint when
37
        // registering for the spend of a script on-chain. This allows the
38
        // notifier to match _and_ dispatch upon detecting the spend of the
39
        // script on-chain, rather than the outpoint.
40
        ZeroOutPoint wire.OutPoint
41

42
        // zeroV1KeyPush is a pkScript that pushes an all-zero 32-byte Taproot
43
        // SegWit v1 key to the stack.
44
        zeroV1KeyPush = [34]byte{
45
                txscript.OP_1, txscript.OP_DATA_32, // 32 byte of zeroes here
46
        }
47

48
        // ZeroTaprootPkScript is the parsed txscript.PkScript of an empty
49
        // Taproot SegWit v1 key being pushed to the stack. This allows the
50
        // notifier to match _and_ dispatch upon detecting the spend of the
51
        // outpoint on-chain, rather than the pkScript (which cannot be derived
52
        // from the witness alone in the SegWit v1 case).
53
        ZeroTaprootPkScript, _ = txscript.ParsePkScript(zeroV1KeyPush[:])
54
)
55

56
var (
57
        // ErrTxNotifierExiting is an error returned when attempting to interact
58
        // with the TxNotifier but it been shut down.
59
        ErrTxNotifierExiting = errors.New("TxNotifier is exiting")
60

61
        // ErrNoScript is an error returned when a confirmation/spend
62
        // registration is attempted without providing an accompanying output
63
        // script.
64
        ErrNoScript = errors.New("an output script must be provided")
65

66
        // ErrNoHeightHint is an error returned when a confirmation/spend
67
        // registration is attempted without providing an accompanying height
68
        // hint.
69
        ErrNoHeightHint = errors.New("a height hint greater than 0 must be " +
70
                "provided")
71

72
        // ErrNumConfsOutOfRange is an error returned when a confirmation/spend
73
        // registration is attempted and the number of confirmations provided is
74
        // out of range.
75
        ErrNumConfsOutOfRange = fmt.Errorf("number of confirmations must be "+
76
                "between %d and %d", 1, MaxNumConfs)
77

78
        // ErrEmptyWitnessStack is returned when a spending transaction has an
79
        // empty witness stack. More details in,
80
        // - https://github.com/bitcoin/bitcoin/issues/28730
81
        ErrEmptyWitnessStack = errors.New("witness stack is empty")
82
)
83

84
// rescanState indicates the progression of a registration before the notifier
85
// can begin dispatching confirmations at tip.
86
type rescanState byte
87

88
const (
89
        // rescanNotStarted is the initial state, denoting that a historical
90
        // dispatch may be required.
91
        rescanNotStarted rescanState = iota
92

93
        // rescanPending indicates that a dispatch has already been made, and we
94
        // are waiting for its completion. No other rescans should be dispatched
95
        // while in this state.
96
        rescanPending
97

98
        // rescanComplete signals either that a rescan was dispatched and has
99
        // completed, or that we began watching at tip immediately. In either
100
        // case, the notifier can only dispatch notifications from tip when in
101
        // this state.
102
        rescanComplete
103
)
104

105
// confNtfnSet holds all known, registered confirmation notifications for a
106
// txid/output script. If duplicates notifications are requested, only one
107
// historical dispatch will be spawned to ensure redundant scans are not
108
// permitted. A single conf detail will be constructed and dispatched to all
109
// interested
110
// clients.
111
type confNtfnSet struct {
112
        // ntfns keeps tracks of all the active client notification requests for
113
        // a transaction/output script
114
        ntfns map[uint64]*ConfNtfn
115

116
        // rescanStatus represents the current rescan state for the
117
        // transaction/output script.
118
        rescanStatus rescanState
119

120
        // details serves as a cache of the confirmation details of a
121
        // transaction that we'll use to determine if a transaction/output
122
        // script has already confirmed at the time of registration.
123
        // details is also used to make sure that in case of an address reuse
124
        // (funds sent to a previously confirmed script) no additional
125
        // notification is registered which would lead to an inconsistent state.
126
        details *TxConfirmation
127
}
128

129
// newConfNtfnSet constructs a fresh confNtfnSet for a group of clients
130
// interested in a notification for a particular txid.
131
func newConfNtfnSet() *confNtfnSet {
158✔
132
        return &confNtfnSet{
158✔
133
                ntfns:        make(map[uint64]*ConfNtfn),
158✔
134
                rescanStatus: rescanNotStarted,
158✔
135
        }
158✔
136
}
158✔
137

138
// spendNtfnSet holds all known, registered spend notifications for a spend
139
// request (outpoint/output script). If duplicate notifications are requested,
140
// only one historical dispatch will be spawned to ensure redundant scans are
141
// not permitted.
142
type spendNtfnSet struct {
143
        // ntfns keeps tracks of all the active client notification requests for
144
        // an outpoint/output script.
145
        ntfns map[uint64]*SpendNtfn
146

147
        // rescanStatus represents the current rescan state for the spend
148
        // request (outpoint/output script).
149
        rescanStatus rescanState
150

151
        // details serves as a cache of the spend details for an outpoint/output
152
        // script that we'll use to determine if it has already been spent at
153
        // the time of registration.
154
        details *SpendDetail
155
}
156

157
// newSpendNtfnSet constructs a new spend notification set.
158
func newSpendNtfnSet() *spendNtfnSet {
49✔
159
        return &spendNtfnSet{
49✔
160
                ntfns:        make(map[uint64]*SpendNtfn),
49✔
161
                rescanStatus: rescanNotStarted,
49✔
162
        }
49✔
163
}
49✔
164

165
// ConfRequest encapsulates a request for a confirmation notification of either
166
// a txid or output script.
167
type ConfRequest struct {
168
        // TxID is the hash of the transaction for which confirmation
169
        // notifications are requested. If set to a zero hash, then a
170
        // confirmation notification will be dispatched upon inclusion of the
171
        // _script_, rather than the txid.
172
        TxID chainhash.Hash
173

174
        // PkScript is the public key script of an outpoint created in this
175
        // transaction.
176
        PkScript txscript.PkScript
177
}
178

179
// NewConfRequest creates a request for a confirmation notification of either a
180
// txid or output script. A nil txid or an allocated ZeroHash can be used to
181
// dispatch the confirmation notification on the script.
182
func NewConfRequest(txid *chainhash.Hash, pkScript []byte) (ConfRequest, error) {
236✔
183
        var r ConfRequest
236✔
184
        outputScript, err := txscript.ParsePkScript(pkScript)
236✔
185
        if err != nil {
236✔
186
                return r, err
×
187
        }
×
188

189
        // We'll only set a txid for which we'll dispatch a confirmation
190
        // notification on this request if one was provided. Otherwise, we'll
191
        // default to dispatching on the confirmation of the script instead.
192
        if txid != nil {
378✔
193
                r.TxID = *txid
142✔
194
        }
142✔
195
        r.PkScript = outputScript
236✔
196

236✔
197
        return r, nil
236✔
198
}
199

200
// String returns the string representation of the ConfRequest.
201
func (r ConfRequest) String() string {
3✔
202
        if r.TxID != ZeroHash {
6✔
203
                return fmt.Sprintf("txid=%v", r.TxID)
3✔
204
        }
3✔
205
        return fmt.Sprintf("script=%v", r.PkScript)
×
206
}
207

208
// MatchesTx determines whether the given transaction satisfies the confirmation
209
// request. If the confirmation request is for a script, then we'll check all of
210
// the outputs of the transaction to determine if it matches. Otherwise, we'll
211
// match on the txid.
212
func (r ConfRequest) MatchesTx(tx *wire.MsgTx) bool {
217✔
213
        scriptMatches := func() bool {
359✔
214
                pkScript := r.PkScript.Script()
142✔
215
                for _, txOut := range tx.TxOut {
315✔
216
                        if bytes.Equal(txOut.PkScript, pkScript) {
227✔
217
                                return true
54✔
218
                        }
54✔
219
                }
220

221
                return false
88✔
222
        }
223

224
        if r.TxID != ZeroHash {
348✔
225
                return r.TxID == tx.TxHash() && scriptMatches()
131✔
226
        }
131✔
227

228
        return scriptMatches()
86✔
229
}
230

231
// ConfNtfn represents a notifier client's request to receive a notification
232
// once the target transaction/output script gets sufficient confirmations. The
233
// client is asynchronously notified via the ConfirmationEvent channels.
234
type ConfNtfn struct {
235
        // ConfID uniquely identifies the confirmation notification request for
236
        // the specified transaction/output script.
237
        ConfID uint64
238

239
        // ConfRequest represents either the txid or script we should detect
240
        // inclusion of within the chain.
241
        ConfRequest
242

243
        // NumConfirmations is the number of confirmations after which the
244
        // notification is to be sent.
245
        NumConfirmations uint32
246

247
        // Event contains references to the channels that the notifications are
248
        // to be sent over.
249
        Event *ConfirmationEvent
250

251
        // HeightHint is the minimum height in the chain that we expect to find
252
        // this txid.
253
        HeightHint uint32
254

255
        // dispatched is false if the confirmed notification has not been sent
256
        // yet.
257
        dispatched bool
258

259
        // includeBlock is true if the dispatched notification should also have
260
        // the block included with it.
261
        includeBlock bool
262

263
        // numConfsLeft is the number of confirmations left to be sent to the
264
        // subscriber.
265
        numConfsLeft uint32
266
}
267

268
// HistoricalConfDispatch parametrizes a manual rescan for a particular
269
// transaction/output script. The parameters include the start and end block
270
// heights specifying the range of blocks to scan.
271
type HistoricalConfDispatch struct {
272
        // ConfRequest represents either the txid or script we should detect
273
        // inclusion of within the chain.
274
        ConfRequest
275

276
        // StartHeight specifies the block height at which to begin the
277
        // historical rescan.
278
        StartHeight uint32
279

280
        // EndHeight specifies the last block height (inclusive) that the
281
        // historical scan should consider.
282
        EndHeight uint32
283
}
284

285
// ConfRegistration encompasses all of the information required for callers to
286
// retrieve details about a confirmation event.
287
type ConfRegistration struct {
288
        // Event contains references to the channels that the notifications are
289
        // to be sent over.
290
        Event *ConfirmationEvent
291

292
        // HistoricalDispatch, if non-nil, signals to the client who registered
293
        // the notification that they are responsible for attempting to manually
294
        // rescan blocks for the txid/output script between the start and end
295
        // heights.
296
        HistoricalDispatch *HistoricalConfDispatch
297

298
        // Height is the height of the TxNotifier at the time the confirmation
299
        // notification was registered. This can be used so that backends can
300
        // request to be notified of confirmations from this point forwards.
301
        Height uint32
302
}
303

304
// SpendRequest encapsulates a request for a spend notification of either an
305
// outpoint or output script.
306
type SpendRequest struct {
307
        // OutPoint is the outpoint for which a client has requested a spend
308
        // notification for. If set to a zero outpoint, then a spend
309
        // notification will be dispatched upon detecting the spend of the
310
        // _script_, rather than the outpoint.
311
        OutPoint wire.OutPoint
312

313
        // PkScript is the script of the outpoint. If a zero outpoint is set,
314
        // then this can be an arbitrary script.
315
        PkScript txscript.PkScript
316
}
317

318
// NewSpendRequest creates a request for a spend notification of either an
319
// outpoint or output script. A nil outpoint or an allocated ZeroOutPoint can be
320
// used to dispatch the confirmation notification on the script.
321
func NewSpendRequest(op *wire.OutPoint, pkScript []byte) (SpendRequest, error) {
128✔
322
        var r SpendRequest
128✔
323
        outputScript, err := txscript.ParsePkScript(pkScript)
128✔
324
        if err != nil {
128✔
325
                return r, err
×
326
        }
×
327

328
        // We'll only set an outpoint for which we'll dispatch a spend
329
        // notification on this request if one was provided. Otherwise, we'll
330
        // default to dispatching on the spend of the script instead.
331
        if op != nil {
204✔
332
                r.OutPoint = *op
76✔
333
        }
76✔
334
        r.PkScript = outputScript
128✔
335

128✔
336
        // For Taproot spends we have the main problem that for the key spend
128✔
337
        // path we cannot derive the pkScript from only looking at the input's
128✔
338
        // witness. So we need to rely on the outpoint information alone.
128✔
339
        //
128✔
340
        // TODO(guggero): For script path spends we can derive the pkScript from
128✔
341
        // the witness, since we have the full control block and the spent
128✔
342
        // script available.
128✔
343
        if outputScript.Class() == txscript.WitnessV1TaprootTy {
131✔
344
                if op == nil {
6✔
345
                        return r, fmt.Errorf("cannot register witness v1 " +
3✔
346
                                "spend request without outpoint")
3✔
347
                }
3✔
348

349
                // We have an outpoint, so we can set the pkScript to an all
350
                // zero Taproot key that we'll compare this spend request to.
351
                r.PkScript = ZeroTaprootPkScript
3✔
352
        }
353

354
        return r, nil
128✔
355
}
356

357
// String returns the string representation of the SpendRequest.
358
func (r SpendRequest) String() string {
4✔
359
        if r.OutPoint != ZeroOutPoint {
8✔
360
                return fmt.Sprintf("outpoint=%v, script=%v", r.OutPoint,
4✔
361
                        r.PkScript)
4✔
362
        }
4✔
363
        return fmt.Sprintf("outpoint=<zero>, script=%v", r.PkScript)
×
364
}
365

366
// MatchesTx determines whether the given transaction satisfies the spend
367
// request. If the spend request is for an outpoint, then we'll check all of
368
// the outputs being spent by the inputs of the transaction to determine if it
369
// matches. Otherwise, we'll need to match on the output script being spent, so
370
// we'll recompute it for each input of the transaction to determine if it
371
// matches.
372
func (r SpendRequest) MatchesTx(tx *wire.MsgTx) (bool, uint32, error) {
9✔
373
        if r.OutPoint != ZeroOutPoint {
14✔
374
                for i, txIn := range tx.TxIn {
10✔
375
                        if txIn.PreviousOutPoint == r.OutPoint {
6✔
376
                                return true, uint32(i), nil
1✔
377
                        }
1✔
378
                }
379

380
                return false, 0, nil
5✔
381
        }
382

383
        for i, txIn := range tx.TxIn {
8✔
384
                pkScript, err := txscript.ComputePkScript(
4✔
385
                        txIn.SignatureScript, txIn.Witness,
4✔
386
                )
4✔
387
                if err == txscript.ErrUnsupportedScriptType {
4✔
388
                        continue
×
389
                }
390
                if err != nil {
4✔
391
                        return false, 0, err
×
392
                }
×
393

394
                if bytes.Equal(pkScript.Script(), r.PkScript.Script()) {
4✔
395
                        return true, uint32(i), nil
×
396
                }
×
397
        }
398

399
        return false, 0, nil
4✔
400
}
401

402
// SpendNtfn represents a client's request to receive a notification once an
403
// outpoint/output script has been spent on-chain. The client is asynchronously
404
// notified via the SpendEvent channels.
405
type SpendNtfn struct {
406
        // SpendID uniquely identies the spend notification request for the
407
        // specified outpoint/output script.
408
        SpendID uint64
409

410
        // SpendRequest represents either the outpoint or script we should
411
        // detect the spend of.
412
        SpendRequest
413

414
        // Event contains references to the channels that the notifications are
415
        // to be sent over.
416
        Event *SpendEvent
417

418
        // HeightHint is the earliest height in the chain that we expect to find
419
        // the spending transaction of the specified outpoint/output script.
420
        // This value will be overridden by the spend hint cache if it contains
421
        // an entry for it.
422
        HeightHint uint32
423

424
        // dispatched signals whether a spend notification has been dispatched
425
        // to the client.
426
        dispatched bool
427
}
428

429
// HistoricalSpendDispatch parametrizes a manual rescan to determine the
430
// spending details (if any) of an outpoint/output script. The parameters
431
// include the start and end block heights specifying the range of blocks to
432
// scan.
433
type HistoricalSpendDispatch struct {
434
        // SpendRequest represents either the outpoint or script we should
435
        // detect the spend of.
436
        SpendRequest
437

438
        // StartHeight specified the block height at which to begin the
439
        // historical rescan.
440
        StartHeight uint32
441

442
        // EndHeight specifies the last block height (inclusive) that the
443
        // historical rescan should consider.
444
        EndHeight uint32
445
}
446

447
// SpendRegistration encompasses all of the information required for callers to
448
// retrieve details about a spend event.
449
type SpendRegistration struct {
450
        // Event contains references to the channels that the notifications are
451
        // to be sent over.
452
        Event *SpendEvent
453

454
        // HistoricalDispatch, if non-nil, signals to the client who registered
455
        // the notification that they are responsible for attempting to manually
456
        // rescan blocks for the txid/output script between the start and end
457
        // heights.
458
        HistoricalDispatch *HistoricalSpendDispatch
459

460
        // Height is the height of the TxNotifier at the time the spend
461
        // notification was registered. This can be used so that backends can
462
        // request to be notified of spends from this point forwards.
463
        Height uint32
464
}
465

466
// TxNotifier is a struct responsible for delivering transaction notifications
467
// to subscribers. These notifications can be of two different types:
468
// transaction/output script confirmations and/or outpoint/output script spends.
469
// The TxNotifier will watch the blockchain as new blocks come in, in order to
470
// satisfy its client requests.
471
type TxNotifier struct {
472
        confClientCounter  uint64 // To be used atomically.
473
        spendClientCounter uint64 // To be used atomically.
474

475
        // currentHeight is the height of the tracked blockchain. It is used to
476
        // determine the number of confirmations a tx has and ensure blocks are
477
        // connected and disconnected in order.
478
        currentHeight uint32
479

480
        // reorgSafetyLimit is the chain depth beyond which it is assumed a
481
        // block will not be reorganized out of the chain. This is used to
482
        // determine when to prune old notification requests so that reorgs are
483
        // handled correctly. The coinbase maturity period is a reasonable value
484
        // to use.
485
        reorgSafetyLimit uint32
486

487
        // reorgDepth is the depth of a chain organization that this system is
488
        // being informed of. This is incremented as long as a sequence of
489
        // blocks are disconnected without being interrupted by a new block.
490
        reorgDepth uint32
491

492
        // confNotifications is an index of confirmation notification requests
493
        // by transaction hash/output script.
494
        confNotifications map[ConfRequest]*confNtfnSet
495

496
        // confsByInitialHeight is an index of watched transactions/output
497
        // scripts by the height that they are included at in the chain. This
498
        // is tracked so that incorrect notifications are not sent if a
499
        // transaction/output script is reorged out of the chain and so that
500
        // negative confirmations can be recognized.
501
        confsByInitialHeight map[uint32]map[ConfRequest]struct{}
502

503
        // ntfnsByConfirmHeight is an index of notification requests by the
504
        // height at which the transaction/output script will have sufficient
505
        // confirmations.
506
        ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{}
507

508
        // spendNotifications is an index of all active notification requests
509
        // per outpoint/output script.
510
        spendNotifications map[SpendRequest]*spendNtfnSet
511

512
        // spendsByHeight is an index that keeps tracks of the spending height
513
        // of outpoints/output scripts we are currently tracking notifications
514
        // for. This is used in order to recover from spending transactions
515
        // being reorged out of the chain.
516
        spendsByHeight map[uint32]map[SpendRequest]struct{}
517

518
        // confirmHintCache is a cache used to maintain the latest height hints
519
        // for transactions/output scripts. Each height hint represents the
520
        // earliest height at which they scripts could have been confirmed
521
        // within the chain.
522
        confirmHintCache ConfirmHintCache
523

524
        // spendHintCache is a cache used to maintain the latest height hints
525
        // for outpoints/output scripts. Each height hint represents the
526
        // earliest height at which they could have been spent within the chain.
527
        spendHintCache SpendHintCache
528

529
        // quit is closed in order to signal that the notifier is gracefully
530
        // exiting.
531
        quit chan struct{}
532

533
        sync.Mutex
534
}
535

536
// NewTxNotifier creates a TxNotifier. The current height of the blockchain is
537
// accepted as a parameter. The different hint caches (confirm and spend) are
538
// used as an optimization in order to retrieve a better starting point when
539
// dispatching a rescan for a historical event in the chain.
540
func NewTxNotifier(startHeight uint32, reorgSafetyLimit uint32,
541
        confirmHintCache ConfirmHintCache,
542
        spendHintCache SpendHintCache) *TxNotifier {
51✔
543

51✔
544
        return &TxNotifier{
51✔
545
                currentHeight:        startHeight,
51✔
546
                reorgSafetyLimit:     reorgSafetyLimit,
51✔
547
                confNotifications:    make(map[ConfRequest]*confNtfnSet),
51✔
548
                confsByInitialHeight: make(map[uint32]map[ConfRequest]struct{}),
51✔
549
                ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}),
51✔
550
                spendNotifications:   make(map[SpendRequest]*spendNtfnSet),
51✔
551
                spendsByHeight:       make(map[uint32]map[SpendRequest]struct{}),
51✔
552
                confirmHintCache:     confirmHintCache,
51✔
553
                spendHintCache:       spendHintCache,
51✔
554
                quit:                 make(chan struct{}),
51✔
555
        }
51✔
556
}
51✔
557

558
// newConfNtfn validates all of the parameters required to successfully create
559
// and register a confirmation notification.
560
func (n *TxNotifier) newConfNtfn(txid *chainhash.Hash,
561
        pkScript []byte, numConfs, heightHint uint32,
562
        opts *NotifierOptions) (*ConfNtfn, error) {
228✔
563

228✔
564
        // An accompanying output script must always be provided.
228✔
565
        if len(pkScript) == 0 {
229✔
566
                return nil, ErrNoScript
1✔
567
        }
1✔
568

569
        // Enforce that we will not dispatch confirmations beyond the reorg
570
        // safety limit.
571
        if numConfs == 0 || numConfs > n.reorgSafetyLimit {
229✔
572
                return nil, ErrNumConfsOutOfRange
2✔
573
        }
2✔
574

575
        // A height hint must be provided to prevent scanning from the genesis
576
        // block.
577
        if heightHint == 0 {
226✔
578
                return nil, ErrNoHeightHint
1✔
579
        }
1✔
580

581
        // Ensure the output script is of a supported type.
582
        confRequest, err := NewConfRequest(txid, pkScript)
224✔
583
        if err != nil {
224✔
584
                return nil, err
×
585
        }
×
586

587
        confID := atomic.AddUint64(&n.confClientCounter, 1)
224✔
588
        return &ConfNtfn{
224✔
589
                ConfID:           confID,
224✔
590
                ConfRequest:      confRequest,
224✔
591
                NumConfirmations: numConfs,
224✔
592
                Event: NewConfirmationEvent(numConfs, func() {
230✔
593
                        n.CancelConf(confRequest, confID)
6✔
594
                }),
6✔
595
                HeightHint:   heightHint,
596
                includeBlock: opts.IncludeBlock,
597
                numConfsLeft: numConfs,
598
        }, nil
599
}
600

601
// RegisterConf handles a new confirmation notification request. The client will
602
// be notified when the transaction/output script gets a sufficient number of
603
// confirmations in the blockchain.
604
//
605
// NOTE: If the transaction/output script has already been included in a block
606
// on the chain, the confirmation details must be provided with the
607
// UpdateConfDetails method, otherwise we will wait for the transaction/output
608
// script to confirm even though it already has.
609
func (n *TxNotifier) RegisterConf(txid *chainhash.Hash, pkScript []byte,
610
        numConfs, heightHint uint32,
611
        optFuncs ...NotifierOption) (*ConfRegistration, error) {
229✔
612

229✔
613
        select {
229✔
614
        case <-n.quit:
1✔
615
                return nil, ErrTxNotifierExiting
1✔
616
        default:
228✔
617
        }
618

619
        opts := DefaultNotifierOptions()
228✔
620
        for _, optFunc := range optFuncs {
283✔
621
                optFunc(opts)
55✔
622
        }
55✔
623

624
        // We'll start by performing a series of validation checks.
625
        ntfn, err := n.newConfNtfn(txid, pkScript, numConfs, heightHint, opts)
228✔
626
        if err != nil {
232✔
627
                return nil, err
4✔
628
        }
4✔
629

630
        // Before proceeding to register the notification, we'll query our
631
        // height hint cache to determine whether a better one exists.
632
        //
633
        // TODO(conner): verify that all submitted height hints are identical.
634
        startHeight := ntfn.HeightHint
224✔
635
        hint, err := n.confirmHintCache.QueryConfirmHint(ntfn.ConfRequest)
224✔
636
        if err == nil {
248✔
637
                if hint > startHeight {
46✔
638
                        Log.Debugf("Using height hint %d retrieved from cache "+
22✔
639
                                "for %v instead of %d for conf subscription",
22✔
640
                                hint, ntfn.ConfRequest, startHeight)
22✔
641
                        startHeight = hint
22✔
642
                }
22✔
643
        } else if err != ErrConfirmHintNotFound {
203✔
644
                Log.Errorf("Unable to query confirm hint for %v: %v",
×
645
                        ntfn.ConfRequest, err)
×
646
        }
×
647

648
        Log.Infof("New confirmation subscription: conf_id=%d, %v, "+
224✔
649
                "num_confs=%v height_hint=%d", ntfn.ConfID, ntfn.ConfRequest,
224✔
650
                numConfs, startHeight)
224✔
651

224✔
652
        n.Lock()
224✔
653
        defer n.Unlock()
224✔
654

224✔
655
        confSet, ok := n.confNotifications[ntfn.ConfRequest]
224✔
656
        if !ok {
382✔
657
                // If this is the first registration for this request, construct
158✔
658
                // a confSet to coalesce all notifications for the same request.
158✔
659
                confSet = newConfNtfnSet()
158✔
660
                n.confNotifications[ntfn.ConfRequest] = confSet
158✔
661
        }
158✔
662
        confSet.ntfns[ntfn.ConfID] = ntfn
224✔
663

224✔
664
        switch confSet.rescanStatus {
224✔
665

666
        // A prior rescan has already completed and we are actively watching at
667
        // tip for this request.
668
        case rescanComplete:
26✔
669
                // If the confirmation details for this set of notifications has
26✔
670
                // already been found, we'll attempt to deliver them immediately
26✔
671
                // to this client.
26✔
672
                Log.Debugf("Attempting to dispatch confirmation for %v on "+
26✔
673
                        "registration since rescan has finished, conf_id=%v",
26✔
674
                        ntfn.ConfRequest, ntfn.ConfID)
26✔
675

26✔
676
                // The default notification we assigned above includes the
26✔
677
                // block along with the rest of the details. However not all
26✔
678
                // clients want the block, so we make a copy here w/o the block
26✔
679
                // if needed so we can give clients only what they ask for.
26✔
680
                confDetails := confSet.details
26✔
681
                if !ntfn.includeBlock && confDetails != nil {
40✔
682
                        confDetailsCopy := *confDetails
14✔
683
                        confDetailsCopy.Block = nil
14✔
684

14✔
685
                        confDetails = &confDetailsCopy
14✔
686
                }
14✔
687

688
                // Deliver the details to the whole conf set where this ntfn
689
                // lives in.
690
                for _, subscriber := range confSet.ntfns {
113✔
691
                        err := n.dispatchConfDetails(subscriber, confDetails)
87✔
692
                        if err != nil {
87✔
693
                                return nil, err
×
694
                        }
×
695
                }
696

697
                return &ConfRegistration{
26✔
698
                        Event:              ntfn.Event,
26✔
699
                        HistoricalDispatch: nil,
26✔
700
                        Height:             n.currentHeight,
26✔
701
                }, nil
26✔
702

703
        // A rescan is already in progress, return here to prevent dispatching
704
        // another. When the rescan returns, this notification's details will be
705
        // updated as well.
706
        case rescanPending:
46✔
707
                Log.Debugf("Waiting for pending rescan to finish before "+
46✔
708
                        "notifying %v at tip", ntfn.ConfRequest)
46✔
709

46✔
710
                return &ConfRegistration{
46✔
711
                        Event:              ntfn.Event,
46✔
712
                        HistoricalDispatch: nil,
46✔
713
                        Height:             n.currentHeight,
46✔
714
                }, nil
46✔
715

716
        // If no rescan has been dispatched, attempt to do so now.
717
        case rescanNotStarted:
158✔
718
        }
719

720
        // If the provided or cached height hint indicates that the
721
        // transaction with the given txid/output script is to be confirmed at a
722
        // height greater than the notifier's current height, we'll refrain from
723
        // spawning a historical dispatch.
724
        if startHeight > n.currentHeight {
164✔
725
                Log.Debugf("Height hint is above current height, not "+
6✔
726
                        "dispatching historical confirmation rescan for %v",
6✔
727
                        ntfn.ConfRequest)
6✔
728

6✔
729
                // Set the rescan status to complete, which will allow the
6✔
730
                // notifier to start delivering messages for this set
6✔
731
                // immediately.
6✔
732
                confSet.rescanStatus = rescanComplete
6✔
733
                return &ConfRegistration{
6✔
734
                        Event:              ntfn.Event,
6✔
735
                        HistoricalDispatch: nil,
6✔
736
                        Height:             n.currentHeight,
6✔
737
                }, nil
6✔
738
        }
6✔
739

740
        Log.Debugf("Dispatching historical confirmation rescan for %v",
154✔
741
                ntfn.ConfRequest)
154✔
742

154✔
743
        // Construct the parameters for historical dispatch, scanning the range
154✔
744
        // of blocks between our best known height hint and the notifier's
154✔
745
        // current height. The notifier will begin also watching for
154✔
746
        // confirmations at tip starting with the next block.
154✔
747
        dispatch := &HistoricalConfDispatch{
154✔
748
                ConfRequest: ntfn.ConfRequest,
154✔
749
                StartHeight: startHeight,
154✔
750
                EndHeight:   n.currentHeight,
154✔
751
        }
154✔
752

154✔
753
        // Set this confSet's status to pending, ensuring subsequent
154✔
754
        // registrations don't also attempt a dispatch.
154✔
755
        confSet.rescanStatus = rescanPending
154✔
756

154✔
757
        return &ConfRegistration{
154✔
758
                Event:              ntfn.Event,
154✔
759
                HistoricalDispatch: dispatch,
154✔
760
                Height:             n.currentHeight,
154✔
761
        }, nil
154✔
762
}
763

764
// CancelConf cancels an existing request for a spend notification of an
765
// outpoint/output script. The request is identified by its spend ID.
766
func (n *TxNotifier) CancelConf(confRequest ConfRequest, confID uint64) {
6✔
767
        select {
6✔
768
        case <-n.quit:
×
769
                return
×
770
        default:
6✔
771
        }
772

773
        n.Lock()
6✔
774
        defer n.Unlock()
6✔
775

6✔
776
        confSet, ok := n.confNotifications[confRequest]
6✔
777
        if !ok {
6✔
778
                return
×
779
        }
×
780
        ntfn, ok := confSet.ntfns[confID]
6✔
781
        if !ok {
6✔
782
                return
×
783
        }
×
784

785
        Log.Debugf("Canceling confirmation notification: conf_id=%d, %v",
6✔
786
                confID, confRequest)
6✔
787

6✔
788
        // We'll close all the notification channels to let the client know
6✔
789
        // their cancel request has been fulfilled.
6✔
790
        close(ntfn.Event.Confirmed)
6✔
791
        close(ntfn.Event.Updates)
6✔
792
        close(ntfn.Event.NegativeConf)
6✔
793

6✔
794
        // Finally, we'll clean up any lingering references to this
6✔
795
        // notification.
6✔
796
        delete(confSet.ntfns, confID)
6✔
797

6✔
798
        // Remove the queued confirmation notification if the transaction has
6✔
799
        // already confirmed, but hasn't met its required number of
6✔
800
        // confirmations.
6✔
801
        if confSet.details != nil {
11✔
802
                confHeight := confSet.details.BlockHeight +
5✔
803
                        ntfn.NumConfirmations - 1
5✔
804
                delete(n.ntfnsByConfirmHeight[confHeight], ntfn)
5✔
805
        }
5✔
806
}
807

808
// UpdateConfDetails attempts to update the confirmation details for an active
809
// notification within the notifier. This should only be used in the case of a
810
// transaction/output script that has confirmed before the notifier's current
811
// height.
812
//
813
// NOTE: The notification should be registered first to ensure notifications are
814
// dispatched correctly.
815
func (n *TxNotifier) UpdateConfDetails(confRequest ConfRequest,
816
        details *TxConfirmation) error {
145✔
817

145✔
818
        select {
145✔
819
        case <-n.quit:
×
820
                return ErrTxNotifierExiting
×
821
        default:
145✔
822
        }
823

824
        // Ensure we hold the lock throughout handling the notification to
825
        // prevent the notifier from advancing its height underneath us.
826
        n.Lock()
145✔
827
        defer n.Unlock()
145✔
828

145✔
829
        // First, we'll determine whether we have an active confirmation
145✔
830
        // notification for the given txid/script.
145✔
831
        confSet, ok := n.confNotifications[confRequest]
145✔
832
        if !ok {
145✔
833
                return fmt.Errorf("confirmation notification for %v not found",
×
834
                        confRequest)
×
835
        }
×
836

837
        // If the confirmation details were already found at tip, all existing
838
        // notifications will have been dispatched or queued for dispatch. We
839
        // can exit early to avoid sending too many notifications on the
840
        // buffered channels.
841
        if confSet.details != nil {
150✔
842
                return nil
5✔
843
        }
5✔
844

845
        // The historical dispatch has been completed for this confSet. We'll
846
        // update the rescan status and cache any details that were found. If
847
        // the details are nil, that implies we did not find them and will
848
        // continue to watch for them at tip.
849
        confSet.rescanStatus = rescanComplete
141✔
850

141✔
851
        // The notifier has yet to reach the height at which the
141✔
852
        // transaction/output script was included in a block, so we should defer
141✔
853
        // until handling it then within ConnectTip.
141✔
854
        if details == nil {
261✔
855
                Log.Debugf("Confirmation details for %v not found during "+
120✔
856
                        "historical dispatch, waiting to dispatch at tip",
120✔
857
                        confRequest)
120✔
858

120✔
859
                // We'll commit the current height as the confirm hint to
120✔
860
                // prevent another potentially long rescan if we restart before
120✔
861
                // a new block comes in.
120✔
862
                err := n.confirmHintCache.CommitConfirmHint(
120✔
863
                        n.currentHeight, confRequest,
120✔
864
                )
120✔
865
                if err != nil {
120✔
866
                        // The error is not fatal as this is an optimistic
×
867
                        // optimization, so we'll avoid returning an error.
×
868
                        Log.Debugf("Unable to update confirm hint to %d for "+
×
869
                                "%v: %v", n.currentHeight, confRequest, err)
×
870
                }
×
871

872
                return nil
120✔
873
        }
874

875
        if details.BlockHeight > n.currentHeight {
25✔
876
                Log.Debugf("Confirmation details for %v found above current "+
1✔
877
                        "height, waiting to dispatch at tip", confRequest)
1✔
878

1✔
879
                return nil
1✔
880
        }
1✔
881

882
        Log.Debugf("Updating confirmation details for %v", confRequest)
23✔
883

23✔
884
        err := n.confirmHintCache.CommitConfirmHint(
23✔
885
                details.BlockHeight, confRequest,
23✔
886
        )
23✔
887
        if err != nil {
23✔
888
                // The error is not fatal, so we should not return an error to
×
889
                // the caller.
×
890
                Log.Errorf("Unable to update confirm hint to %d for %v: %v",
×
891
                        details.BlockHeight, confRequest, err)
×
892
        }
×
893

894
        // Cache the details found in the rescan and attempt to dispatch any
895
        // notifications that have not yet been delivered.
896
        confSet.details = details
23✔
897
        for _, ntfn := range confSet.ntfns {
51✔
898
                // The default notification we assigned above includes the
28✔
899
                // block along with the rest of the details. However not all
28✔
900
                // clients want the block, so we make a copy here w/o the block
28✔
901
                // if needed so we can give clients only what they ask for.
28✔
902
                confDetails := *details
28✔
903
                if !ntfn.includeBlock {
48✔
904
                        confDetails.Block = nil
20✔
905
                }
20✔
906

907
                err = n.dispatchConfDetails(ntfn, &confDetails)
28✔
908
                if err != nil {
28✔
909
                        return err
×
910
                }
×
911
        }
912

913
        return nil
23✔
914
}
915

916
// dispatchConfDetails attempts to cache and dispatch details to a particular
917
// client if the transaction/output script has sufficiently confirmed. If the
918
// provided details are nil, this method will be a no-op.
919
func (n *TxNotifier) dispatchConfDetails(
920
        ntfn *ConfNtfn, details *TxConfirmation) error {
112✔
921

112✔
922
        // If there are no conf details to dispatch or if the notification has
112✔
923
        // already been dispatched, then we can skip dispatching to this
112✔
924
        // client.
112✔
925
        if details == nil {
132✔
926
                Log.Debugf("Skipped dispatching nil conf details for request "+
20✔
927
                        "%v, conf_id=%v", ntfn.ConfRequest, ntfn.ConfID)
20✔
928

20✔
929
                return nil
20✔
930
        }
20✔
931

932
        if ntfn.dispatched {
146✔
933
                Log.Debugf("Skipped dispatched conf details for request %v "+
51✔
934
                        "conf_id=%v", ntfn.ConfRequest, ntfn.ConfID)
51✔
935

51✔
936
                return nil
51✔
937
        }
51✔
938

939
        // Now, we'll examine whether the transaction/output script of this
940
        // request has reached its required number of confirmations. If it has,
941
        // we'll dispatch a confirmation notification to the caller.
942
        confHeight := details.BlockHeight + ntfn.NumConfirmations - 1
47✔
943
        if confHeight <= n.currentHeight {
85✔
944
                Log.Debugf("Dispatching %v confirmation notification for "+
38✔
945
                        "conf_id=%v, %v", ntfn.NumConfirmations, ntfn.ConfID,
38✔
946
                        ntfn.ConfRequest)
38✔
947

38✔
948
                // We'll send a 0 value to the Updates channel,
38✔
949
                // indicating that the transaction/output script has already
38✔
950
                // been confirmed, and include the block height at which the
38✔
951
                // transaction was included.
38✔
952
                err := n.notifyNumConfsLeft(ntfn, TxUpdateInfo{
38✔
953
                        NumConfsLeft: 0,
38✔
954
                        BlockHeight:  details.BlockHeight,
38✔
955
                })
38✔
956
                if err != nil {
38✔
957
                        return err
×
958
                }
×
959

960
                select {
38✔
961
                case ntfn.Event.Confirmed <- details:
38✔
962
                        ntfn.dispatched = true
38✔
963
                case <-n.quit:
×
964
                        return ErrTxNotifierExiting
×
965
                }
966
        } else {
12✔
967
                Log.Debugf("Queueing %v confirmation notification for %v at "+
12✔
968
                        "tip", ntfn.NumConfirmations, ntfn.ConfRequest)
12✔
969

12✔
970
                // Otherwise, we'll keep track of the notification
12✔
971
                // request by the height at which we should dispatch the
12✔
972
                // confirmation notification.
12✔
973
                ntfnSet, exists := n.ntfnsByConfirmHeight[confHeight]
12✔
974
                if !exists {
24✔
975
                        ntfnSet = make(map[*ConfNtfn]struct{})
12✔
976
                        n.ntfnsByConfirmHeight[confHeight] = ntfnSet
12✔
977
                }
12✔
978
                ntfnSet[ntfn] = struct{}{}
12✔
979

12✔
980
                // We'll also send an update to the client of how many
12✔
981
                // confirmations are left for the transaction/output script to
12✔
982
                // be confirmed.
12✔
983
                numConfsLeft := confHeight - n.currentHeight
12✔
984
                err := n.notifyNumConfsLeft(ntfn, TxUpdateInfo{
12✔
985
                        NumConfsLeft: numConfsLeft,
12✔
986
                        BlockHeight:  details.BlockHeight,
12✔
987
                })
12✔
988
                if err != nil {
12✔
989
                        return err
×
990
                }
×
991
        }
992

993
        // As a final check, we'll also watch the transaction/output script if
994
        // it's still possible for it to get reorged out of the chain.
995
        reorgSafeHeight := details.BlockHeight + n.reorgSafetyLimit
47✔
996
        if reorgSafeHeight > n.currentHeight {
94✔
997
                txSet, exists := n.confsByInitialHeight[details.BlockHeight]
47✔
998
                if !exists {
61✔
999
                        txSet = make(map[ConfRequest]struct{})
14✔
1000
                        n.confsByInitialHeight[details.BlockHeight] = txSet
14✔
1001
                }
14✔
1002
                txSet[ntfn.ConfRequest] = struct{}{}
47✔
1003
        }
1004

1005
        return nil
47✔
1006
}
1007

1008
// newSpendNtfn validates all of the parameters required to successfully create
1009
// and register a spend notification.
1010
func (n *TxNotifier) newSpendNtfn(outpoint *wire.OutPoint,
1011
        pkScript []byte, heightHint uint32) (*SpendNtfn, error) {
130✔
1012

130✔
1013
        // An accompanying output script must always be provided.
130✔
1014
        if len(pkScript) == 0 {
131✔
1015
                return nil, ErrNoScript
1✔
1016
        }
1✔
1017

1018
        // A height hint must be provided to prevent scanning from the genesis
1019
        // block.
1020
        if heightHint == 0 {
130✔
1021
                return nil, ErrNoHeightHint
1✔
1022
        }
1✔
1023

1024
        // Ensure the output script is of a supported type.
1025
        spendRequest, err := NewSpendRequest(outpoint, pkScript)
128✔
1026
        if err != nil {
131✔
1027
                return nil, err
3✔
1028
        }
3✔
1029

1030
        spendID := atomic.AddUint64(&n.spendClientCounter, 1)
128✔
1031
        return &SpendNtfn{
128✔
1032
                SpendID:      spendID,
128✔
1033
                SpendRequest: spendRequest,
128✔
1034
                Event: NewSpendEvent(func() {
139✔
1035
                        n.CancelSpend(spendRequest, spendID)
11✔
1036
                }),
11✔
1037
                HeightHint: heightHint,
1038
        }, nil
1039
}
1040

1041
// RegisterSpend handles a new spend notification request. The client will be
1042
// notified once the outpoint/output script is detected as spent within the
1043
// chain.
1044
//
1045
// NOTE: If the outpoint/output script has already been spent within the chain
1046
// before the notifier's current tip, the spend details must be provided with
1047
// the UpdateSpendDetails method, otherwise we will wait for the outpoint/output
1048
// script to be spent at tip, even though it already has.
1049
func (n *TxNotifier) RegisterSpend(outpoint *wire.OutPoint, pkScript []byte,
1050
        heightHint uint32) (*SpendRegistration, error) {
131✔
1051

131✔
1052
        select {
131✔
1053
        case <-n.quit:
1✔
1054
                return nil, ErrTxNotifierExiting
1✔
1055
        default:
130✔
1056
        }
1057

1058
        // We'll start by performing a series of validation checks.
1059
        ntfn, err := n.newSpendNtfn(outpoint, pkScript, heightHint)
130✔
1060
        if err != nil {
135✔
1061
                return nil, err
5✔
1062
        }
5✔
1063

1064
        // Before proceeding to register the notification, we'll query our spend
1065
        // hint cache to determine whether a better one exists.
1066
        startHeight := ntfn.HeightHint
128✔
1067
        hint, err := n.spendHintCache.QuerySpendHint(ntfn.SpendRequest)
128✔
1068
        if err == nil {
160✔
1069
                if hint > startHeight {
54✔
1070
                        Log.Debugf("Using height hint %d retrieved from cache "+
22✔
1071
                                "for %v instead of %d for spend subscription",
22✔
1072
                                hint, ntfn.SpendRequest, startHeight)
22✔
1073
                        startHeight = hint
22✔
1074
                }
22✔
1075
        } else if err != ErrSpendHintNotFound {
99✔
1076
                Log.Errorf("Unable to query spend hint for %v: %v",
×
1077
                        ntfn.SpendRequest, err)
×
1078
        }
×
1079

1080
        n.Lock()
128✔
1081
        defer n.Unlock()
128✔
1082

128✔
1083
        Log.Debugf("New spend subscription: spend_id=%d, %v, height_hint=%d",
128✔
1084
                ntfn.SpendID, ntfn.SpendRequest, startHeight)
128✔
1085

128✔
1086
        // Keep track of the notification request so that we can properly
128✔
1087
        // dispatch a spend notification later on.
128✔
1088
        spendSet, ok := n.spendNotifications[ntfn.SpendRequest]
128✔
1089
        if !ok {
177✔
1090
                // If this is the first registration for the request, we'll
49✔
1091
                // construct a spendNtfnSet to coalesce all notifications.
49✔
1092
                spendSet = newSpendNtfnSet()
49✔
1093
                n.spendNotifications[ntfn.SpendRequest] = spendSet
49✔
1094
        }
49✔
1095
        spendSet.ntfns[ntfn.SpendID] = ntfn
128✔
1096

128✔
1097
        // We'll now let the caller know whether a historical rescan is needed
128✔
1098
        // depending on the current rescan status.
128✔
1099
        switch spendSet.rescanStatus {
128✔
1100

1101
        // If the spending details for this request have already been determined
1102
        // and cached, then we can use them to immediately dispatch the spend
1103
        // notification to the client.
1104
        case rescanComplete:
65✔
1105
                Log.Debugf("Attempting to dispatch spend for %v on "+
65✔
1106
                        "registration since rescan has finished",
65✔
1107
                        ntfn.SpendRequest)
65✔
1108

65✔
1109
                err := n.dispatchSpendDetails(ntfn, spendSet.details)
65✔
1110
                if err != nil {
65✔
1111
                        return nil, err
×
1112
                }
×
1113

1114
                return &SpendRegistration{
65✔
1115
                        Event:              ntfn.Event,
65✔
1116
                        HistoricalDispatch: nil,
65✔
1117
                        Height:             n.currentHeight,
65✔
1118
                }, nil
65✔
1119

1120
        // If there is an active rescan to determine whether the request has
1121
        // been spent, then we won't trigger another one.
1122
        case rescanPending:
20✔
1123
                Log.Debugf("Waiting for pending rescan to finish before "+
20✔
1124
                        "notifying %v at tip", ntfn.SpendRequest)
20✔
1125

20✔
1126
                return &SpendRegistration{
20✔
1127
                        Event:              ntfn.Event,
20✔
1128
                        HistoricalDispatch: nil,
20✔
1129
                        Height:             n.currentHeight,
20✔
1130
                }, nil
20✔
1131

1132
        // Otherwise, we'll fall through and let the caller know that a rescan
1133
        // should be dispatched to determine whether the request has already
1134
        // been spent.
1135
        case rescanNotStarted:
49✔
1136
        }
1137

1138
        // However, if the spend hint, either provided by the caller or
1139
        // retrieved from the cache, is found to be at a later height than the
1140
        // TxNotifier is aware of, then we'll refrain from dispatching a
1141
        // historical rescan and wait for the spend to come in at tip.
1142
        if startHeight > n.currentHeight {
71✔
1143
                Log.Debugf("Spend hint of %d for %v is above current height %d",
22✔
1144
                        startHeight, ntfn.SpendRequest, n.currentHeight)
22✔
1145

22✔
1146
                // We'll also set the rescan status as complete to ensure that
22✔
1147
                // spend hints for this request get updated upon
22✔
1148
                // connected/disconnected blocks.
22✔
1149
                spendSet.rescanStatus = rescanComplete
22✔
1150
                return &SpendRegistration{
22✔
1151
                        Event:              ntfn.Event,
22✔
1152
                        HistoricalDispatch: nil,
22✔
1153
                        Height:             n.currentHeight,
22✔
1154
                }, nil
22✔
1155
        }
22✔
1156

1157
        // We'll set the rescan status to pending to ensure subsequent
1158
        // notifications don't also attempt a historical dispatch.
1159
        spendSet.rescanStatus = rescanPending
27✔
1160

27✔
1161
        Log.Debugf("Dispatching historical spend rescan for %v, start=%d, "+
27✔
1162
                "end=%d", ntfn.SpendRequest, startHeight, n.currentHeight)
27✔
1163

27✔
1164
        return &SpendRegistration{
27✔
1165
                Event: ntfn.Event,
27✔
1166
                HistoricalDispatch: &HistoricalSpendDispatch{
27✔
1167
                        SpendRequest: ntfn.SpendRequest,
27✔
1168
                        StartHeight:  startHeight,
27✔
1169
                        EndHeight:    n.currentHeight,
27✔
1170
                },
27✔
1171
                Height: n.currentHeight,
27✔
1172
        }, nil
27✔
1173
}
1174

1175
// CancelSpend cancels an existing request for a spend notification of an
1176
// outpoint/output script. The request is identified by its spend ID.
1177
func (n *TxNotifier) CancelSpend(spendRequest SpendRequest, spendID uint64) {
12✔
1178
        select {
12✔
1179
        case <-n.quit:
×
1180
                return
×
1181
        default:
12✔
1182
        }
1183

1184
        n.Lock()
12✔
1185
        defer n.Unlock()
12✔
1186

12✔
1187
        spendSet, ok := n.spendNotifications[spendRequest]
12✔
1188
        if !ok {
12✔
1189
                return
×
1190
        }
×
1191
        ntfn, ok := spendSet.ntfns[spendID]
12✔
1192
        if !ok {
12✔
1193
                return
×
1194
        }
×
1195

1196
        Log.Debugf("Canceling spend notification: spend_id=%d, %v", spendID,
12✔
1197
                spendRequest)
12✔
1198

12✔
1199
        // We'll close all the notification channels to let the client know
12✔
1200
        // their cancel request has been fulfilled.
12✔
1201
        close(ntfn.Event.Spend)
12✔
1202
        close(ntfn.Event.Reorg)
12✔
1203
        close(ntfn.Event.Done)
12✔
1204
        delete(spendSet.ntfns, spendID)
12✔
1205
}
1206

1207
// ProcessRelevantSpendTx processes a transaction provided externally. This will
1208
// check whether the transaction is relevant to the notifier if it spends any
1209
// outpoints/output scripts for which we currently have registered notifications
1210
// for. If it is relevant, spend notifications will be dispatched to the caller.
1211
func (n *TxNotifier) ProcessRelevantSpendTx(tx *btcutil.Tx,
1212
        blockHeight uint32) error {
45✔
1213

45✔
1214
        select {
45✔
1215
        case <-n.quit:
×
1216
                return ErrTxNotifierExiting
×
1217
        default:
45✔
1218
        }
1219

1220
        // Ensure we hold the lock throughout handling the notification to
1221
        // prevent the notifier from advancing its height underneath us.
1222
        n.Lock()
45✔
1223
        defer n.Unlock()
45✔
1224

45✔
1225
        // We'll use a channel to coalesce all the spend requests that this
45✔
1226
        // transaction fulfills.
45✔
1227
        type spend struct {
45✔
1228
                request *SpendRequest
45✔
1229
                details *SpendDetail
45✔
1230
        }
45✔
1231

45✔
1232
        // We'll set up the onSpend filter callback to gather all the fulfilled
45✔
1233
        // spends requests within this transaction.
45✔
1234
        var spends []spend
45✔
1235
        onSpend := func(request SpendRequest, details *SpendDetail) {
84✔
1236
                spends = append(spends, spend{&request, details})
39✔
1237
        }
39✔
1238
        n.filterTx(nil, tx, blockHeight, nil, onSpend)
45✔
1239

45✔
1240
        // After the transaction has been filtered, we can finally dispatch
45✔
1241
        // notifications for each request.
45✔
1242
        for _, spend := range spends {
84✔
1243
                err := n.updateSpendDetails(*spend.request, spend.details)
39✔
1244
                if err != nil {
39✔
1245
                        return err
×
1246
                }
×
1247
        }
1248

1249
        return nil
45✔
1250
}
1251

1252
// UpdateSpendDetails attempts to update the spend details for all active spend
1253
// notification requests for an outpoint/output script. This method should be
1254
// used once a historical scan of the chain has finished. If the historical scan
1255
// did not find a spending transaction for it, the spend details may be nil.
1256
//
1257
// NOTE: A notification request for the outpoint/output script must be
1258
// registered first to ensure notifications are delivered.
1259
func (n *TxNotifier) UpdateSpendDetails(spendRequest SpendRequest,
1260
        details *SpendDetail) error {
17✔
1261

17✔
1262
        select {
17✔
1263
        case <-n.quit:
×
1264
                return ErrTxNotifierExiting
×
1265
        default:
17✔
1266
        }
1267

1268
        // Ensure we hold the lock throughout handling the notification to
1269
        // prevent the notifier from advancing its height underneath us.
1270
        n.Lock()
17✔
1271
        defer n.Unlock()
17✔
1272

17✔
1273
        return n.updateSpendDetails(spendRequest, details)
17✔
1274
}
1275

1276
// updateSpendDetails attempts to update the spend details for all active spend
1277
// notification requests for an outpoint/output script. This method should be
1278
// used once a historical scan of the chain has finished. If the historical scan
1279
// did not find a spending transaction for it, the spend details may be nil.
1280
//
1281
// NOTE: This method must be called with the TxNotifier's lock held.
1282
func (n *TxNotifier) updateSpendDetails(spendRequest SpendRequest,
1283
        details *SpendDetail) error {
53✔
1284

53✔
1285
        // Mark the ongoing historical rescan for this request as finished. This
53✔
1286
        // will allow us to update the spend hints for it at tip.
53✔
1287
        spendSet, ok := n.spendNotifications[spendRequest]
53✔
1288
        if !ok {
54✔
1289
                return fmt.Errorf("spend notification for %v not found",
1✔
1290
                        spendRequest)
1✔
1291
        }
1✔
1292

1293
        // If the spend details have already been found either at tip, then the
1294
        // notifications should have already been dispatched, so we can exit
1295
        // early to prevent sending duplicate notifications.
1296
        if spendSet.details != nil {
70✔
1297
                return nil
18✔
1298
        }
18✔
1299

1300
        // Since the historical rescan has completed for this request, we'll
1301
        // mark its rescan status as complete in order to ensure that the
1302
        // TxNotifier can properly update its spend hints upon
1303
        // connected/disconnected blocks.
1304
        spendSet.rescanStatus = rescanComplete
37✔
1305

37✔
1306
        // If the historical rescan was not able to find a spending transaction
37✔
1307
        // for this request, then we can track the spend at tip.
37✔
1308
        if details == nil {
46✔
1309
                // We'll commit the current height as the spend hint to prevent
9✔
1310
                // another potentially long rescan if we restart before a new
9✔
1311
                // block comes in.
9✔
1312
                err := n.spendHintCache.CommitSpendHint(
9✔
1313
                        n.currentHeight, spendRequest,
9✔
1314
                )
9✔
1315
                if err != nil {
9✔
1316
                        // The error is not fatal as this is an optimistic
×
1317
                        // optimization, so we'll avoid returning an error.
×
1318
                        Log.Debugf("Unable to update spend hint to %d for %v: %v",
×
1319
                                n.currentHeight, spendRequest, err)
×
1320
                }
×
1321

1322
                Log.Debugf("Updated spend hint to height=%v for unconfirmed "+
9✔
1323
                        "spend request %v", n.currentHeight, spendRequest)
9✔
1324
                return nil
9✔
1325
        }
1326

1327
        // Return an error if the witness data is not present in the spending
1328
        // transaction.
1329
        //
1330
        // NOTE: if the witness stack is empty, we will do a critical log which
1331
        // shuts down the node.
1332
        if !details.HasSpenderWitness() {
31✔
1333
                Log.Criticalf("Found spending tx for outpoint=%v, but the "+
×
1334
                        "transaction %v does not have witness",
×
1335
                        spendRequest.OutPoint, details.SpendingTx.TxHash())
×
1336

×
1337
                return ErrEmptyWitnessStack
×
1338
        }
×
1339

1340
        // If the historical rescan found the spending transaction for this
1341
        // request, but it's at a later height than the notifier (this can
1342
        // happen due to latency with the backend during a reorg), then we'll
1343
        // defer handling the notification until the notifier has caught up to
1344
        // such height.
1345
        if uint32(details.SpendingHeight) > n.currentHeight {
56✔
1346
                return nil
25✔
1347
        }
25✔
1348

1349
        // Now that we've determined the request has been spent, we'll commit
1350
        // its spending height as its hint in the cache and dispatch
1351
        // notifications to all of its respective clients.
1352
        err := n.spendHintCache.CommitSpendHint(
9✔
1353
                uint32(details.SpendingHeight), spendRequest,
9✔
1354
        )
9✔
1355
        if err != nil {
9✔
1356
                // The error is not fatal as this is an optimistic optimization,
×
1357
                // so we'll avoid returning an error.
×
1358
                Log.Debugf("Unable to update spend hint to %d for %v: %v",
×
1359
                        details.SpendingHeight, spendRequest, err)
×
1360
        }
×
1361

1362
        Log.Debugf("Updated spend hint to height=%v for confirmed spend "+
9✔
1363
                "request %v", details.SpendingHeight, spendRequest)
9✔
1364

9✔
1365
        spendSet.details = details
9✔
1366
        for _, ntfn := range spendSet.ntfns {
25✔
1367
                err := n.dispatchSpendDetails(ntfn, spendSet.details)
16✔
1368
                if err != nil {
16✔
1369
                        return err
×
1370
                }
×
1371
        }
1372

1373
        return nil
9✔
1374
}
1375

1376
// dispatchSpendDetails dispatches a spend notification to the client.
1377
//
1378
// NOTE: This must be called with the TxNotifier's lock held.
1379
func (n *TxNotifier) dispatchSpendDetails(ntfn *SpendNtfn, details *SpendDetail) error {
171✔
1380
        // If there are no spend details to dispatch or if the notification has
171✔
1381
        // already been dispatched, then we can skip dispatching to this client.
171✔
1382
        if details == nil || ntfn.dispatched {
217✔
1383
                Log.Debugf("Skipping dispatch of spend details(%v) for "+
46✔
1384
                        "request %v, dispatched=%v", details, ntfn.SpendRequest,
46✔
1385
                        ntfn.dispatched)
46✔
1386
                return nil
46✔
1387
        }
46✔
1388

1389
        Log.Debugf("Dispatching confirmed spend notification for %v at "+
128✔
1390
                "current height=%d: %v", ntfn.SpendRequest, n.currentHeight,
128✔
1391
                details)
128✔
1392

128✔
1393
        select {
128✔
1394
        case ntfn.Event.Spend <- details:
128✔
1395
                ntfn.dispatched = true
128✔
1396
        case <-n.quit:
×
1397
                return ErrTxNotifierExiting
×
1398
        }
1399

1400
        spendHeight := uint32(details.SpendingHeight)
128✔
1401

128✔
1402
        // We also add to spendsByHeight to notify on chain reorgs.
128✔
1403
        reorgSafeHeight := spendHeight + n.reorgSafetyLimit
128✔
1404
        if reorgSafeHeight > n.currentHeight {
256✔
1405
                txSet, exists := n.spendsByHeight[spendHeight]
128✔
1406
                if !exists {
137✔
1407
                        txSet = make(map[SpendRequest]struct{})
9✔
1408
                        n.spendsByHeight[spendHeight] = txSet
9✔
1409
                }
9✔
1410
                txSet[ntfn.SpendRequest] = struct{}{}
128✔
1411
        }
1412

1413
        return nil
128✔
1414
}
1415

1416
// ConnectTip handles a new block extending the current chain. It will go
1417
// through every transaction and determine if it is relevant to any of its
1418
// clients. A transaction can be relevant in either of the following two ways:
1419
//
1420
//  1. One of the inputs in the transaction spends an outpoint/output script
1421
//     for which we currently have an active spend registration for.
1422
//
1423
//  2. The transaction has a txid or output script for which we currently have
1424
//     an active confirmation registration for.
1425
//
1426
// In the event that the transaction is relevant, a confirmation/spend
1427
// notification will be queued for dispatch to the relevant clients.
1428
// Confirmation notifications will only be dispatched for transactions/output
1429
// scripts that have met the required number of confirmations required by the
1430
// client.
1431
//
1432
// NOTE: In order to actually dispatch the relevant transaction notifications to
1433
// clients, NotifyHeight must be called with the same block height in order to
1434
// maintain correctness.
1435
func (n *TxNotifier) ConnectTip(block *btcutil.Block,
1436
        blockHeight uint32) error {
1,589✔
1437

1,589✔
1438
        select {
1,589✔
1439
        case <-n.quit:
×
1440
                return ErrTxNotifierExiting
×
1441
        default:
1,589✔
1442
        }
1443

1444
        n.Lock()
1,589✔
1445
        defer n.Unlock()
1,589✔
1446

1,589✔
1447
        if blockHeight != n.currentHeight+1 {
1,589✔
1448
                return fmt.Errorf("received blocks out of order: "+
×
1449
                        "current height=%d, new height=%d",
×
1450
                        n.currentHeight, blockHeight)
×
1451
        }
×
1452
        n.currentHeight++
1,589✔
1453
        n.reorgDepth = 0
1,589✔
1454

1,589✔
1455
        // First, we'll iterate over all the transactions found in this block to
1,589✔
1456
        // determine if it includes any relevant transactions to the TxNotifier.
1,589✔
1457
        if block != nil {
3,175✔
1458
                Log.Debugf("Filtering %d txns for %d spend requests at "+
1,586✔
1459
                        "height %d", len(block.Transactions()),
1,586✔
1460
                        len(n.spendNotifications), blockHeight)
1,586✔
1461

1,586✔
1462
                for _, tx := range block.Transactions() {
3,847✔
1463
                        n.filterTx(
2,261✔
1464
                                block, tx, blockHeight,
2,261✔
1465
                                n.handleConfDetailsAtTip,
2,261✔
1466
                                n.handleSpendDetailsAtTip,
2,261✔
1467
                        )
2,261✔
1468
                }
2,261✔
1469
        }
1470

1471
        // Now that we've determined which requests were confirmed and spent
1472
        // within the new block, we can update their entries in their respective
1473
        // caches, along with all of our unconfirmed and unspent requests.
1474
        n.updateHints(blockHeight)
1,589✔
1475

1,589✔
1476
        // Finally, we'll clear the entries from our set of notifications for
1,589✔
1477
        // requests that are no longer under the risk of being reorged out of
1,589✔
1478
        // the chain.
1,589✔
1479
        if blockHeight >= n.reorgSafetyLimit {
3,065✔
1480
                matureBlockHeight := blockHeight - n.reorgSafetyLimit
1,476✔
1481
                for confRequest := range n.confsByInitialHeight[matureBlockHeight] {
1,492✔
1482
                        confSet := n.confNotifications[confRequest]
16✔
1483
                        for _, ntfn := range confSet.ntfns {
33✔
1484
                                select {
17✔
1485
                                case ntfn.Event.Done <- struct{}{}:
17✔
1486
                                case <-n.quit:
×
1487
                                        return ErrTxNotifierExiting
×
1488
                                }
1489
                        }
1490

1491
                        delete(n.confNotifications, confRequest)
16✔
1492
                }
1493
                delete(n.confsByInitialHeight, matureBlockHeight)
1,476✔
1494

1,476✔
1495
                for spendRequest := range n.spendsByHeight[matureBlockHeight] {
1,478✔
1496
                        spendSet := n.spendNotifications[spendRequest]
2✔
1497
                        for _, ntfn := range spendSet.ntfns {
4✔
1498
                                select {
2✔
1499
                                case ntfn.Event.Done <- struct{}{}:
2✔
1500
                                case <-n.quit:
×
1501
                                        return ErrTxNotifierExiting
×
1502
                                }
1503
                        }
1504

1505
                        Log.Debugf("Deleting mature spend request %v at "+
2✔
1506
                                "height=%d", spendRequest, blockHeight)
2✔
1507
                        delete(n.spendNotifications, spendRequest)
2✔
1508
                }
1509
                delete(n.spendsByHeight, matureBlockHeight)
1,476✔
1510
        }
1511

1512
        return nil
1,589✔
1513
}
1514

1515
// filterTx determines whether the transaction spends or confirms any
1516
// outstanding pending requests. The onConf and onSpend callbacks can be used to
1517
// retrieve all the requests fulfilled by this transaction as they occur.
1518
func (n *TxNotifier) filterTx(block *btcutil.Block, tx *btcutil.Tx,
1519
        blockHeight uint32, onConf func(ConfRequest, *TxConfirmation),
1520
        onSpend func(SpendRequest, *SpendDetail)) {
2,303✔
1521

2,303✔
1522
        // In order to determine if this transaction is relevant to the
2,303✔
1523
        // notifier, we'll check its inputs for any outstanding spend
2,303✔
1524
        // requests.
2,303✔
1525
        txHash := tx.Hash()
2,303✔
1526
        if onSpend != nil {
4,606✔
1527
                // notifyDetails is a helper closure that will construct the
2,303✔
1528
                // spend details of a request and hand them off to the onSpend
2,303✔
1529
                // callback.
2,303✔
1530
                notifyDetails := func(spendRequest SpendRequest,
2,303✔
1531
                        prevOut wire.OutPoint, inputIdx uint32) {
2,390✔
1532

87✔
1533
                        Log.Debugf("Found spend of %v: spend_tx=%v, "+
87✔
1534
                                "block_height=%d", spendRequest, txHash,
87✔
1535
                                blockHeight)
87✔
1536

87✔
1537
                        onSpend(spendRequest, &SpendDetail{
87✔
1538
                                SpentOutPoint:     &prevOut,
87✔
1539
                                SpenderTxHash:     txHash,
87✔
1540
                                SpendingTx:        tx.MsgTx(),
87✔
1541
                                SpenderInputIndex: inputIdx,
87✔
1542
                                SpendingHeight:    int32(blockHeight),
87✔
1543
                        })
87✔
1544
                }
87✔
1545

1546
                for i, txIn := range tx.MsgTx().TxIn {
4,844✔
1547
                        // We'll re-derive the script of the output being spent
2,541✔
1548
                        // to determine if the inputs spends any registered
2,541✔
1549
                        // requests.
2,541✔
1550
                        prevOut := txIn.PreviousOutPoint
2,541✔
1551
                        pkScript, err := txscript.ComputePkScript(
2,541✔
1552
                                txIn.SignatureScript, txIn.Witness,
2,541✔
1553
                        )
2,541✔
1554
                        if err != nil {
2,541✔
1555
                                continue
×
1556
                        }
1557
                        spendRequest := SpendRequest{
2,541✔
1558
                                OutPoint: prevOut,
2,541✔
1559
                                PkScript: pkScript,
2,541✔
1560
                        }
2,541✔
1561

2,541✔
1562
                        // If we have any, we'll record their spend height so
2,541✔
1563
                        // that notifications get dispatched to the respective
2,541✔
1564
                        // clients.
2,541✔
1565
                        if _, ok := n.spendNotifications[spendRequest]; ok {
2,590✔
1566
                                notifyDetails(spendRequest, prevOut, uint32(i))
49✔
1567
                        }
49✔
1568

1569
                        // Now try with an empty taproot key pkScript, since we
1570
                        // cannot derive the spent pkScript directly from the
1571
                        // witness. But we have the outpoint, which should be
1572
                        // enough.
1573
                        spendRequest.PkScript = ZeroTaprootPkScript
2,541✔
1574
                        if _, ok := n.spendNotifications[spendRequest]; ok {
2,544✔
1575
                                notifyDetails(spendRequest, prevOut, uint32(i))
3✔
1576
                        }
3✔
1577

1578
                        // Restore the pkScript but try with a zero outpoint
1579
                        // instead (won't be possible for Taproot).
1580
                        spendRequest.PkScript = pkScript
2,541✔
1581
                        spendRequest.OutPoint = ZeroOutPoint
2,541✔
1582
                        if _, ok := n.spendNotifications[spendRequest]; ok {
2,579✔
1583
                                notifyDetails(spendRequest, prevOut, uint32(i))
38✔
1584
                        }
38✔
1585
                }
1586
        }
1587

1588
        // We'll also check its outputs to determine if there are any
1589
        // outstanding confirmation requests.
1590
        if onConf != nil {
4,564✔
1591
                // notifyDetails is a helper closure that will construct the
2,261✔
1592
                // confirmation details of a request and hand them off to the
2,261✔
1593
                // onConf callback.
2,261✔
1594
                notifyDetails := func(confRequest ConfRequest) {
2,404✔
1595
                        Log.Debugf("Found initial confirmation of %v: "+
143✔
1596
                                "height=%d, hash=%v", confRequest,
143✔
1597
                                blockHeight, block.Hash())
143✔
1598

143✔
1599
                        details := &TxConfirmation{
143✔
1600
                                Tx:          tx.MsgTx(),
143✔
1601
                                BlockHash:   block.Hash(),
143✔
1602
                                BlockHeight: blockHeight,
143✔
1603
                                TxIndex:     uint32(tx.Index()),
143✔
1604
                                Block:       block.MsgBlock(),
143✔
1605
                        }
143✔
1606

143✔
1607
                        onConf(confRequest, details)
143✔
1608
                }
143✔
1609

1610
                for _, txOut := range tx.MsgTx().TxOut {
5,531✔
1611
                        // We'll parse the script of the output to determine if
3,270✔
1612
                        // we have any registered requests for it or the
3,270✔
1613
                        // transaction itself.
3,270✔
1614
                        pkScript, err := txscript.ParsePkScript(txOut.PkScript)
3,270✔
1615
                        if err != nil {
3,415✔
1616
                                continue
145✔
1617
                        }
1618
                        confRequest := ConfRequest{
3,128✔
1619
                                TxID:     *txHash,
3,128✔
1620
                                PkScript: pkScript,
3,128✔
1621
                        }
3,128✔
1622

3,128✔
1623
                        // If we have any, we'll record their confirmed height
3,128✔
1624
                        // so that notifications get dispatched when they
3,128✔
1625
                        // reaches the clients' desired number of confirmations.
3,128✔
1626
                        if _, ok := n.confNotifications[confRequest]; ok {
3,206✔
1627
                                notifyDetails(confRequest)
78✔
1628
                        }
78✔
1629
                        confRequest.TxID = ZeroHash
3,128✔
1630
                        if _, ok := n.confNotifications[confRequest]; ok {
3,193✔
1631
                                notifyDetails(confRequest)
65✔
1632
                        }
65✔
1633
                }
1634
        }
1635
}
1636

1637
// handleConfDetailsAtTip tracks the confirmation height of the txid/output
1638
// script in order to properly dispatch a confirmation notification after
1639
// meeting each request's desired number of confirmations for all current and
1640
// future registered clients.
1641
func (n *TxNotifier) handleConfDetailsAtTip(confRequest ConfRequest,
1642
        details *TxConfirmation) {
143✔
1643

143✔
1644
        // TODO(wilmer): cancel pending historical rescans if any?
143✔
1645
        confSet := n.confNotifications[confRequest]
143✔
1646

143✔
1647
        // If we already have details for this request, we don't want to add it
143✔
1648
        // again since we have already dispatched notifications for it.
143✔
1649
        if confSet.details != nil {
148✔
1650
                Log.Warnf("Ignoring address reuse for %s at height %d.",
5✔
1651
                        confRequest, details.BlockHeight)
5✔
1652
                return
5✔
1653
        }
5✔
1654

1655
        confSet.rescanStatus = rescanComplete
140✔
1656
        confSet.details = details
140✔
1657

140✔
1658
        for _, ntfn := range confSet.ntfns {
321✔
1659
                // In the event that this notification was aware that the
181✔
1660
                // transaction/output script was reorged out of the chain, we'll
181✔
1661
                // consume the reorg notification if it hasn't been done yet
181✔
1662
                // already.
181✔
1663
                select {
181✔
1664
                case <-ntfn.Event.NegativeConf:
11✔
1665
                default:
172✔
1666
                }
1667

1668
                // We'll note this client's required number of confirmations so
1669
                // that we can notify them when expected.
1670
                confHeight := details.BlockHeight + ntfn.NumConfirmations - 1
181✔
1671
                ntfnSet, exists := n.ntfnsByConfirmHeight[confHeight]
181✔
1672
                if !exists {
328✔
1673
                        ntfnSet = make(map[*ConfNtfn]struct{})
147✔
1674
                        n.ntfnsByConfirmHeight[confHeight] = ntfnSet
147✔
1675
                }
147✔
1676
                ntfnSet[ntfn] = struct{}{}
181✔
1677
        }
1678

1679
        // We'll also note the initial confirmation height in order to correctly
1680
        // handle dispatching notifications when the transaction/output script
1681
        // gets reorged out of the chain.
1682
        txSet, exists := n.confsByInitialHeight[details.BlockHeight]
140✔
1683
        if !exists {
235✔
1684
                txSet = make(map[ConfRequest]struct{})
95✔
1685
                n.confsByInitialHeight[details.BlockHeight] = txSet
95✔
1686
        }
95✔
1687
        txSet[confRequest] = struct{}{}
140✔
1688
}
1689

1690
// handleSpendDetailsAtTip tracks the spend height of the outpoint/output script
1691
// in order to properly dispatch a spend notification for all current and future
1692
// registered clients.
1693
func (n *TxNotifier) handleSpendDetailsAtTip(spendRequest SpendRequest,
1694
        details *SpendDetail) {
51✔
1695

51✔
1696
        // TODO(wilmer): cancel pending historical rescans if any?
51✔
1697
        spendSet := n.spendNotifications[spendRequest]
51✔
1698
        spendSet.rescanStatus = rescanComplete
51✔
1699
        spendSet.details = details
51✔
1700

51✔
1701
        for _, ntfn := range spendSet.ntfns {
147✔
1702
                // In the event that this notification was aware that the
96✔
1703
                // spending transaction of its outpoint/output script was
96✔
1704
                // reorged out of the chain, we'll consume the reorg
96✔
1705
                // notification if it hasn't been done yet already.
96✔
1706
                select {
96✔
1707
                case <-ntfn.Event.Reorg:
×
1708
                default:
96✔
1709
                }
1710
        }
1711

1712
        // We'll note the spending height of the request in order to correctly
1713
        // handle dispatching notifications when the spending transactions gets
1714
        // reorged out of the chain.
1715
        spendHeight := uint32(details.SpendingHeight)
51✔
1716
        opSet, exists := n.spendsByHeight[spendHeight]
51✔
1717
        if !exists {
102✔
1718
                opSet = make(map[SpendRequest]struct{})
51✔
1719
                n.spendsByHeight[spendHeight] = opSet
51✔
1720
        }
51✔
1721
        opSet[spendRequest] = struct{}{}
51✔
1722

51✔
1723
        Log.Debugf("Spend request %v spent at tip=%d", spendRequest,
51✔
1724
                spendHeight)
51✔
1725
}
1726

1727
// NotifyHeight dispatches confirmation and spend notifications to the clients
1728
// who registered for a notification which has been fulfilled at the passed
1729
// height.
1730
func (n *TxNotifier) NotifyHeight(height uint32) error {
1,489✔
1731
        n.Lock()
1,489✔
1732
        defer n.Unlock()
1,489✔
1733

1,489✔
1734
        // First, we'll dispatch an update to all of the notification clients
1,489✔
1735
        // for our watched requests with the number of confirmations left at
1,489✔
1736
        // this new height.
1,489✔
1737
        for _, confRequests := range n.confsByInitialHeight {
9,062✔
1738
                for confRequest := range confRequests {
20,937✔
1739
                        confSet := n.confNotifications[confRequest]
13,364✔
1740
                        for _, ntfn := range confSet.ntfns {
30,080✔
1741
                                // blockHeight is the height of the block which
16,716✔
1742
                                // contains the transaction.
16,716✔
1743
                                blockHeight := confSet.details.BlockHeight
16,716✔
1744
                                txConfHeight := blockHeight +
16,716✔
1745
                                        ntfn.NumConfirmations - 1
16,716✔
1746
                                numConfsLeft := txConfHeight - height
16,716✔
1747

16,716✔
1748
                                // Since we don't clear notifications until
16,716✔
1749
                                // transactions/output scripts are no longer
16,716✔
1750
                                // under the risk of being reorganized out of
16,716✔
1751
                                // the chain, we'll skip sending updates for
16,716✔
1752
                                // those that have already been confirmed.
16,716✔
1753
                                if int32(numConfsLeft) < 0 {
32,746✔
1754
                                        continue
16,030✔
1755
                                }
1756

1757
                                err := n.notifyNumConfsLeft(ntfn, TxUpdateInfo{
689✔
1758
                                        NumConfsLeft: numConfsLeft,
689✔
1759
                                        BlockHeight:  blockHeight,
689✔
1760
                                })
689✔
1761
                                if err != nil {
689✔
1762
                                        return err
×
1763
                                }
×
1764
                        }
1765
                }
1766
        }
1767

1768
        // Then, we'll dispatch notifications for all the requests that have
1769
        // become confirmed at this new block height.
1770
        for ntfn := range n.ntfnsByConfirmHeight[height] {
1,666✔
1771
                confSet := n.confNotifications[ntfn.ConfRequest]
177✔
1772

177✔
1773
                // The default notification we assigned above includes the
177✔
1774
                // block along with the rest of the details. However not all
177✔
1775
                // clients want the block, so we make a copy here w/o the block
177✔
1776
                // if needed so we can give clients only what they ask for.
177✔
1777
                confDetails := *confSet.details
177✔
1778
                if !ntfn.includeBlock {
314✔
1779
                        confDetails.Block = nil
137✔
1780
                }
137✔
1781

1782
                // If the `confDetails` has already been sent before, we'll
1783
                // skip it and continue processing the next one.
1784
                if ntfn.dispatched {
177✔
UNCOV
1785
                        Log.Debugf("Skipped dispatched conf details for "+
×
UNCOV
1786
                                "request %v conf_id=%v", ntfn.ConfRequest,
×
UNCOV
1787
                                ntfn.ConfID)
×
UNCOV
1788

×
UNCOV
1789
                        continue
×
1790
                }
1791

1792
                Log.Debugf("Dispatching %v confirmation notification for "+
177✔
1793
                        "conf_id=%v, %v", ntfn.NumConfirmations, ntfn.ConfID,
177✔
1794
                        ntfn.ConfRequest)
177✔
1795

177✔
1796
                select {
177✔
1797
                case ntfn.Event.Confirmed <- &confDetails:
177✔
1798
                        ntfn.dispatched = true
177✔
1799
                case <-n.quit:
×
1800
                        return ErrTxNotifierExiting
×
1801
                }
1802
        }
1803
        delete(n.ntfnsByConfirmHeight, height)
1,489✔
1804

1,489✔
1805
        // Finally, we'll dispatch spend notifications for all the requests that
1,489✔
1806
        // were spent at this new block height.
1,489✔
1807
        for spendRequest := range n.spendsByHeight[height] {
1,540✔
1808
                spendSet := n.spendNotifications[spendRequest]
51✔
1809
                for _, ntfn := range spendSet.ntfns {
147✔
1810
                        err := n.dispatchSpendDetails(ntfn, spendSet.details)
96✔
1811
                        if err != nil {
96✔
1812
                                return err
×
1813
                        }
×
1814
                }
1815
        }
1816

1817
        return nil
1,489✔
1818
}
1819

1820
// DisconnectTip handles the tip of the current chain being disconnected during
1821
// a chain reorganization. If any watched requests were included in this block,
1822
// internal structures are updated to ensure confirmation/spend notifications
1823
// are consumed (if not already), and reorg notifications are dispatched
1824
// instead. Confirmation/spend notifications will be dispatched again upon block
1825
// inclusion.
1826
func (n *TxNotifier) DisconnectTip(blockHeight uint32) error {
97✔
1827
        select {
97✔
1828
        case <-n.quit:
×
1829
                return ErrTxNotifierExiting
×
1830
        default:
97✔
1831
        }
1832

1833
        n.Lock()
97✔
1834
        defer n.Unlock()
97✔
1835

97✔
1836
        if blockHeight != n.currentHeight {
97✔
1837
                return fmt.Errorf("received blocks out of order: "+
×
1838
                        "current height=%d, disconnected height=%d",
×
1839
                        n.currentHeight, blockHeight)
×
1840
        }
×
1841
        n.currentHeight--
97✔
1842
        n.reorgDepth++
97✔
1843

97✔
1844
        // With the block disconnected, we'll update the confirm and spend hints
97✔
1845
        // for our notification requests to reflect the new height, except for
97✔
1846
        // those that have confirmed/spent at previous heights.
97✔
1847
        n.updateHints(blockHeight)
97✔
1848

97✔
1849
        // We'll go through all of our watched confirmation requests and attempt
97✔
1850
        // to drain their notification channels to ensure sending notifications
97✔
1851
        // to the clients is always non-blocking.
97✔
1852
        for initialHeight, txHashes := range n.confsByInitialHeight {
389✔
1853
                for txHash := range txHashes {
777✔
1854
                        // If the transaction/output script has been reorged out
485✔
1855
                        // of the chain, we'll make sure to remove the cached
485✔
1856
                        // confirmation details to prevent notifying clients
485✔
1857
                        // with old information.
485✔
1858
                        confSet := n.confNotifications[txHash]
485✔
1859
                        if initialHeight == blockHeight {
500✔
1860
                                confSet.details = nil
15✔
1861
                        }
15✔
1862

1863
                        for _, ntfn := range confSet.ntfns {
1,098✔
1864
                                // First, we'll attempt to drain an update
613✔
1865
                                // from each notification to ensure sends to the
613✔
1866
                                // Updates channel are always non-blocking.
613✔
1867
                                select {
613✔
1868
                                case <-ntfn.Event.Updates:
326✔
1869
                                case <-n.quit:
×
1870
                                        return ErrTxNotifierExiting
×
1871
                                default:
289✔
1872
                                }
1873

1874
                                // We also reset the num of confs update.
1875
                                ntfn.numConfsLeft = ntfn.NumConfirmations
613✔
1876

613✔
1877
                                // Then, we'll check if the current
613✔
1878
                                // transaction/output script was included in the
613✔
1879
                                // block currently being disconnected. If it
613✔
1880
                                // was, we'll need to dispatch a reorg
613✔
1881
                                // notification to the client.
613✔
1882
                                if initialHeight == blockHeight {
628✔
1883
                                        err := n.dispatchConfReorg(
15✔
1884
                                                ntfn, blockHeight,
15✔
1885
                                        )
15✔
1886
                                        if err != nil {
15✔
1887
                                                return err
×
1888
                                        }
×
1889
                                }
1890
                        }
1891
                }
1892
        }
1893

1894
        // We'll also go through our watched spend requests and attempt to drain
1895
        // their dispatched notifications to ensure dispatching notifications to
1896
        // clients later on is always non-blocking. We're only interested in
1897
        // requests whose spending transaction was included at the height being
1898
        // disconnected.
1899
        for op := range n.spendsByHeight[blockHeight] {
112✔
1900
                // Since the spending transaction is being reorged out of the
15✔
1901
                // chain, we'll need to clear out the spending details of the
15✔
1902
                // request.
15✔
1903
                spendSet := n.spendNotifications[op]
15✔
1904
                spendSet.details = nil
15✔
1905

15✔
1906
                // For all requests which have had a spend notification
15✔
1907
                // dispatched, we'll attempt to drain it and send a reorg
15✔
1908
                // notification instead.
15✔
1909
                for _, ntfn := range spendSet.ntfns {
30✔
1910
                        if err := n.dispatchSpendReorg(ntfn); err != nil {
15✔
1911
                                return err
×
1912
                        }
×
1913
                }
1914
        }
1915

1916
        // Finally, we can remove the requests that were confirmed and/or spent
1917
        // at the height being disconnected. We'll still continue to track them
1918
        // until they have been confirmed/spent and are no longer under the risk
1919
        // of being reorged out of the chain again.
1920
        delete(n.confsByInitialHeight, blockHeight)
97✔
1921
        delete(n.spendsByHeight, blockHeight)
97✔
1922

97✔
1923
        return nil
97✔
1924
}
1925

1926
// updateHints attempts to update the confirm and spend hints for all relevant
1927
// requests respectively. The height parameter is used to determine which
1928
// requests we should update based on whether a new block is being
1929
// connected/disconnected.
1930
//
1931
// NOTE: This must be called with the TxNotifier's lock held and after its
1932
// height has already been reflected by a block being connected/disconnected.
1933
func (n *TxNotifier) updateHints(height uint32) {
1,683✔
1934
        // TODO(wilmer): update under one database transaction.
1,683✔
1935
        //
1,683✔
1936
        // To update the height hint for all the required confirmation requests
1,683✔
1937
        // under one database transaction, we'll gather the set of unconfirmed
1,683✔
1938
        // requests along with the ones that confirmed at the height being
1,683✔
1939
        // connected/disconnected.
1,683✔
1940
        confRequests := n.unconfirmedRequests()
1,683✔
1941
        for confRequest := range n.confsByInitialHeight[height] {
1,836✔
1942
                confRequests = append(confRequests, confRequest)
153✔
1943
        }
153✔
1944
        err := n.confirmHintCache.CommitConfirmHint(
1,683✔
1945
                n.currentHeight, confRequests...,
1,683✔
1946
        )
1,683✔
1947
        if err != nil {
1,683✔
1948
                // The error is not fatal as this is an optimistic optimization,
×
1949
                // so we'll avoid returning an error.
×
1950
                Log.Debugf("Unable to update confirm hints to %d for "+
×
1951
                        "%v: %v", n.currentHeight, confRequests, err)
×
1952
        }
×
1953

1954
        // Similarly, to update the height hint for all the required spend
1955
        // requests under one database transaction, we'll gather the set of
1956
        // unspent requests along with the ones that were spent at the height
1957
        // being connected/disconnected.
1958
        spendRequests := n.unspentRequests()
1,683✔
1959
        for spendRequest := range n.spendsByHeight[height] {
1,746✔
1960
                spendRequests = append(spendRequests, spendRequest)
63✔
1961
        }
63✔
1962
        err = n.spendHintCache.CommitSpendHint(n.currentHeight, spendRequests...)
1,683✔
1963
        if err != nil {
1,683✔
1964
                // The error is not fatal as this is an optimistic optimization,
×
1965
                // so we'll avoid returning an error.
×
1966
                Log.Debugf("Unable to update spend hints to %d for "+
×
1967
                        "%v: %v", n.currentHeight, spendRequests, err)
×
1968
        }
×
1969
}
1970

1971
// unconfirmedRequests returns the set of confirmation requests that are
1972
// still seen as unconfirmed by the TxNotifier.
1973
//
1974
// NOTE: This method must be called with the TxNotifier's lock held.
1975
func (n *TxNotifier) unconfirmedRequests() []ConfRequest {
1,683✔
1976
        var unconfirmed []ConfRequest
1,683✔
1977
        for confRequest, confNtfnSet := range n.confNotifications {
16,339✔
1978
                // If the notification is already aware of its confirmation
14,656✔
1979
                // details, or it's in the process of learning them, we'll skip
14,656✔
1980
                // it as we can't yet determine if it's confirmed or not.
14,656✔
1981
                if confNtfnSet.rescanStatus != rescanComplete ||
14,656✔
1982
                        confNtfnSet.details != nil {
28,620✔
1983
                        continue
13,964✔
1984
                }
1985

1986
                unconfirmed = append(unconfirmed, confRequest)
695✔
1987
        }
1988

1989
        return unconfirmed
1,683✔
1990
}
1991

1992
// unspentRequests returns the set of spend requests that are still seen as
1993
// unspent by the TxNotifier.
1994
//
1995
// NOTE: This method must be called with the TxNotifier's lock held.
1996
func (n *TxNotifier) unspentRequests() []SpendRequest {
1,683✔
1997
        var unspent []SpendRequest
1,683✔
1998
        for spendRequest, spendNtfnSet := range n.spendNotifications {
3,266✔
1999
                // If the notification is already aware of its spend details, or
1,583✔
2000
                // it's in the process of learning them, we'll skip it as we
1,583✔
2001
                // can't yet determine if it's unspent or not.
1,583✔
2002
                if spendNtfnSet.rescanStatus != rescanComplete ||
1,583✔
2003
                        spendNtfnSet.details != nil {
3,126✔
2004
                        continue
1,543✔
2005
                }
2006

2007
                unspent = append(unspent, spendRequest)
43✔
2008
        }
2009

2010
        return unspent
1,683✔
2011
}
2012

2013
// dispatchConfReorg dispatches a reorg notification to the client if the
2014
// confirmation notification was already delivered.
2015
//
2016
// NOTE: This must be called with the TxNotifier's lock held.
2017
func (n *TxNotifier) dispatchConfReorg(ntfn *ConfNtfn,
2018
        heightDisconnected uint32) error {
15✔
2019

15✔
2020
        // If the request's confirmation notification has yet to be dispatched,
15✔
2021
        // we'll need to clear its entry within the ntfnsByConfirmHeight index
15✔
2022
        // to prevent from notifying the client once the notifier reaches the
15✔
2023
        // confirmation height.
15✔
2024
        if !ntfn.dispatched {
28✔
2025
                confHeight := heightDisconnected + ntfn.NumConfirmations - 1
13✔
2026
                ntfnSet, exists := n.ntfnsByConfirmHeight[confHeight]
13✔
2027

13✔
2028
                // We also signal the reorg to the notifier in case the
13✔
2029
                // subscriber is also interested in the reorgs before the
13✔
2030
                // transaction received its required confirmation.
13✔
2031
                //
13✔
2032
                // Because as soon as a new block is connected which has the
13✔
2033
                // transaction included again we preemptively read the buffered
13✔
2034
                // channel.
13✔
2035
                select {
13✔
2036
                case ntfn.Event.NegativeConf <- int32(n.reorgDepth):
13✔
2037
                case <-n.quit:
×
2038
                        return ErrTxNotifierExiting
×
2039
                }
2040

2041
                if exists {
26✔
2042
                        delete(ntfnSet, ntfn)
13✔
2043
                }
13✔
2044
                return nil
13✔
2045
        }
2046

2047
        // Otherwise, the entry within the ntfnsByConfirmHeight has already been
2048
        // deleted, so we'll attempt to drain the confirmation notification to
2049
        // ensure sends to the Confirmed channel are always non-blocking.
2050
        select {
4✔
2051
        case <-ntfn.Event.Confirmed:
×
2052
        case <-n.quit:
×
2053
                return ErrTxNotifierExiting
×
2054
        default:
4✔
2055
        }
2056

2057
        ntfn.dispatched = false
4✔
2058

4✔
2059
        // Send a negative confirmation notification to the client indicating
4✔
2060
        // how many blocks have been disconnected successively.
4✔
2061
        select {
4✔
2062
        case ntfn.Event.NegativeConf <- int32(n.reorgDepth):
4✔
2063
        case <-n.quit:
×
2064
                return ErrTxNotifierExiting
×
2065
        }
2066

2067
        return nil
4✔
2068
}
2069

2070
// dispatchSpendReorg dispatches a reorg notification to the client if a spend
2071
// notiification was already delivered.
2072
//
2073
// NOTE: This must be called with the TxNotifier's lock held.
2074
func (n *TxNotifier) dispatchSpendReorg(ntfn *SpendNtfn) error {
15✔
2075
        if !ntfn.dispatched {
15✔
2076
                return nil
×
2077
        }
×
2078

2079
        // Attempt to drain the spend notification to ensure sends to the Spend
2080
        // channel are always non-blocking.
2081
        select {
15✔
2082
        case <-ntfn.Event.Spend:
1✔
2083
        default:
14✔
2084
        }
2085

2086
        // Send a reorg notification to the client in order for them to
2087
        // correctly handle reorgs.
2088
        select {
15✔
2089
        case ntfn.Event.Reorg <- struct{}{}:
15✔
2090
        case <-n.quit:
×
2091
                return ErrTxNotifierExiting
×
2092
        }
2093

2094
        ntfn.dispatched = false
15✔
2095

15✔
2096
        return nil
15✔
2097
}
2098

2099
// TearDown is to be called when the owner of the TxNotifier is exiting. This
2100
// closes the event channels of all registered notifications that have not been
2101
// dispatched yet.
2102
func (n *TxNotifier) TearDown() {
25✔
2103
        close(n.quit)
25✔
2104

25✔
2105
        n.Lock()
25✔
2106
        defer n.Unlock()
25✔
2107

25✔
2108
        for _, confSet := range n.confNotifications {
153✔
2109
                for confID, ntfn := range confSet.ntfns {
312✔
2110
                        close(ntfn.Event.Confirmed)
184✔
2111
                        close(ntfn.Event.Updates)
184✔
2112
                        close(ntfn.Event.NegativeConf)
184✔
2113
                        close(ntfn.Event.Done)
184✔
2114
                        delete(confSet.ntfns, confID)
184✔
2115
                }
184✔
2116
        }
2117

2118
        for _, spendSet := range n.spendNotifications {
61✔
2119
                for spendID, ntfn := range spendSet.ntfns {
136✔
2120
                        close(ntfn.Event.Spend)
100✔
2121
                        close(ntfn.Event.Reorg)
100✔
2122
                        close(ntfn.Event.Done)
100✔
2123
                        delete(spendSet.ntfns, spendID)
100✔
2124
                }
100✔
2125
        }
2126
}
2127

2128
// notifyNumConfsLeft sends the number of confirmations left to the
2129
// notification subscriber through the Event.Updates channel, along with the
2130
// block height in which the transaction was included.
2131
//
2132
// NOTE: must be used with the TxNotifier's lock held.
2133
func (n *TxNotifier) notifyNumConfsLeft(ntfn *ConfNtfn,
2134
        info TxUpdateInfo) error {
733✔
2135

733✔
2136
        // If the number left is no less than the recorded value, we can skip
733✔
2137
        // sending it as it means this same value has already been sent before.
733✔
2138
        if info.NumConfsLeft >= ntfn.numConfsLeft {
736✔
2139
                Log.Debugf("Skipped dispatched update (numConfsLeft=%v) for "+
3✔
2140
                        "request %v conf_id=%v", info.NumConfsLeft,
3✔
2141
                        ntfn.ConfRequest, ntfn.ConfID)
3✔
2142

3✔
2143
                return nil
3✔
2144
        }
3✔
2145

2146
        // Update the number of confirmations left to the notification.
2147
        ntfn.numConfsLeft = info.NumConfsLeft
733✔
2148

733✔
2149
        select {
733✔
2150
        case ntfn.Event.Updates <- info:
733✔
2151
        case <-n.quit:
×
2152
                return ErrTxNotifierExiting
×
2153
        }
2154

2155
        return nil
733✔
2156
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc