• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157733617

05 Feb 2025 12:49PM UTC coverage: 57.712% (-1.1%) from 58.82%
13157733617

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19472 existing lines in 252 files now uncovered.

103634 of 179570 relevant lines covered (57.71%)

24840.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.57
/chainntnfs/mempool.go
1
package chainntnfs
2

3
import (
4
        "sync"
5
        "sync/atomic"
6

7
        "github.com/btcsuite/btcd/btcutil"
8
        "github.com/btcsuite/btcd/wire"
9
        "github.com/lightningnetwork/lnd/lnutils"
10
)
11

12
// inputsWithTx is a map of outpoints to the tx that spends them.
13
type inputsWithTx map[wire.OutPoint]*SpendDetail
14

15
// MempoolNotifier defines an internal mempool notifier that's used to notify
16
// the spending of given inputs. This is mounted to either `BitcoindNotifier`
17
// or `BtcdNotifier` depending on the chain backend.
18
type MempoolNotifier struct {
19
        wg sync.WaitGroup
20

21
        // subscribedInputs stores the inputs that we want to watch their
22
        // spending event for.
23
        subscribedInputs *lnutils.SyncMap[wire.OutPoint,
24
                *lnutils.SyncMap[uint64, *MempoolSpendEvent]]
25

26
        // sCounter is used to generate unique subscription IDs.
27
        sCounter atomic.Uint64
28

29
        // quit is closed when the notifier is torn down.
30
        quit chan struct{}
31
}
32

33
// MempoolSpendEvent is returned to the subscriber to watch for the spending
34
// event for the requested input.
35
type MempoolSpendEvent struct {
36
        // Spend is a receive only channel which will be sent upon once the
37
        // target outpoint has been spent.
38
        //
39
        // NOTE: This channel must be buffered.
40
        Spend <-chan *SpendDetail
41

42
        // id is the unique identifier of this subscription.
43
        id uint64
44

45
        // outpoint is the subscribed outpoint.
46
        outpoint wire.OutPoint
47

48
        // event is the channel that will be sent upon once the target outpoint
49
        // is spent.
50
        event chan *SpendDetail
51

52
        // cancel cancels the subscription.
53
        cancel chan struct{}
54
}
55

56
// newMempoolSpendEvent returns a new instance of MempoolSpendEvent.
57
func newMempoolSpendEvent(id uint64, op wire.OutPoint) *MempoolSpendEvent {
16✔
58
        sub := &MempoolSpendEvent{
16✔
59
                id:       id,
16✔
60
                outpoint: op,
16✔
61
                event:    make(chan *SpendDetail, 1),
16✔
62
                cancel:   make(chan struct{}),
16✔
63
        }
16✔
64

16✔
65
        // Mount the receive only channel to the event channel.
16✔
66
        sub.Spend = (<-chan *SpendDetail)(sub.event)
16✔
67

16✔
68
        return sub
16✔
69
}
16✔
70

71
// NewMempoolNotifier takes a chain connection and returns a new mempool
72
// notifier.
73
func NewMempoolNotifier() *MempoolNotifier {
31✔
74
        return &MempoolNotifier{
31✔
75
                subscribedInputs: &lnutils.SyncMap[
31✔
76
                        wire.OutPoint, *lnutils.SyncMap[
31✔
77
                                uint64, *MempoolSpendEvent,
31✔
78
                        ]]{},
31✔
79
                quit: make(chan struct{}),
31✔
80
        }
31✔
81
}
31✔
82

83
// SubscribeInput takes an outpoint of the input and returns a channel that the
84
// subscriber can listen to for the spending event.
85
func (m *MempoolNotifier) SubscribeInput(
86
        outpoint wire.OutPoint) *MempoolSpendEvent {
15✔
87

15✔
88
        // Get the current subscribers for this input or create a new one.
15✔
89
        clients := &lnutils.SyncMap[uint64, *MempoolSpendEvent]{}
15✔
90
        clients, _ = m.subscribedInputs.LoadOrStore(outpoint, clients)
15✔
91

15✔
92
        // Increment the subscription counter and return the new value.
15✔
93
        subscriptionID := m.sCounter.Add(1)
15✔
94

15✔
95
        // Create a new subscription.
15✔
96
        sub := newMempoolSpendEvent(subscriptionID, outpoint)
15✔
97

15✔
98
        // Add the subscriber with a unique id.
15✔
99
        clients.Store(subscriptionID, sub)
15✔
100

15✔
101
        // Update the subscribed inputs.
15✔
102
        m.subscribedInputs.Store(outpoint, clients)
15✔
103

15✔
104
        Log.Debugf("Subscribed(id=%v) mempool event for input=%s",
15✔
105
                subscriptionID, outpoint)
15✔
106

15✔
107
        return sub
15✔
108
}
15✔
109

110
// UnsubscribeInput removes all the subscriptions for the given outpoint.
111
func (m *MempoolNotifier) UnsubscribeInput(outpoint wire.OutPoint) {
2✔
112
        Log.Debugf("Unsubscribing MempoolSpendEvent for input %s", outpoint)
2✔
113
        m.subscribedInputs.Delete(outpoint)
2✔
114
}
2✔
115

116
// UnsubscribeEvent removes a given subscriber for the given MempoolSpendEvent.
117
func (m *MempoolNotifier) UnsubscribeEvent(sub *MempoolSpendEvent) {
2✔
118
        Log.Debugf("Unsubscribing(id=%v) MempoolSpendEvent for input=%s",
2✔
119
                sub.id, sub.outpoint)
2✔
120

2✔
121
        // Load all the subscribers for this input.
2✔
122
        clients, loaded := m.subscribedInputs.Load(sub.outpoint)
2✔
123
        if !loaded {
2✔
UNCOV
124
                Log.Debugf("No subscribers for input %s", sub.outpoint)
×
UNCOV
125
                return
×
UNCOV
126
        }
×
127

128
        // Load the subscriber.
129
        subscriber, loaded := clients.Load(sub.id)
2✔
130
        if !loaded {
2✔
131
                Log.Debugf("No subscribers for input %s with id %v",
×
132
                        sub.outpoint, sub.id)
×
133
                return
×
134
        }
×
135

136
        // Close the cancel channel in case it's been used in a goroutine.
137
        close(subscriber.cancel)
2✔
138

2✔
139
        // Remove the subscriber.
2✔
140
        clients.Delete(sub.id)
2✔
141
}
142

143
// UnsubsribeConfirmedSpentTx takes a transaction and removes the subscriptions
144
// identified using its inputs.
145
func (m *MempoolNotifier) UnsubsribeConfirmedSpentTx(tx *btcutil.Tx) {
32✔
146
        Log.Tracef("Unsubscribe confirmed tx %s", tx.Hash())
32✔
147

32✔
148
        // Get the spent inputs of interest.
32✔
149
        spentInputs, err := m.findRelevantInputs(tx)
32✔
150
        if err != nil {
32✔
151
                Log.Errorf("Unable to find relevant inputs for tx %s: %v",
×
152
                        tx.Hash(), err)
×
153

×
154
                return
×
155
        }
×
156

157
        // Unsubscribe the subscribers.
158
        for outpoint := range spentInputs {
33✔
159
                m.UnsubscribeInput(outpoint)
1✔
160
        }
1✔
161

162
        Log.Tracef("Finished unsubscribing confirmed tx %s, found %d inputs",
32✔
163
                tx.Hash(), len(spentInputs))
32✔
164
}
165

166
// ProcessRelevantSpendTx takes a transaction and checks whether it spends any
167
// of the subscribed inputs. If so, spend notifications are sent to the
168
// relevant subscribers.
169
func (m *MempoolNotifier) ProcessRelevantSpendTx(tx *btcutil.Tx) error {
38✔
170
        Log.Tracef("Processing mempool tx %s", tx.Hash())
38✔
171
        defer Log.Tracef("Finished processing mempool tx %s", tx.Hash())
38✔
172

38✔
173
        // Get the spent inputs of interest.
38✔
174
        spentInputs, err := m.findRelevantInputs(tx)
38✔
175
        if err != nil {
38✔
176
                return err
×
177
        }
×
178

179
        // Notify the subscribers.
180
        m.notifySpent(spentInputs)
38✔
181

38✔
182
        return nil
38✔
183
}
184

185
// TearDown stops the notifier and cleans up resources.
186
func (m *MempoolNotifier) TearDown() {
18✔
187
        Log.Infof("Stopping mempool notifier")
18✔
188
        defer Log.Debug("mempool notifier stopped")
18✔
189

18✔
190
        close(m.quit)
18✔
191
        m.wg.Wait()
18✔
192
}
18✔
193

194
// findRelevantInputs takes a transaction to find the subscribed inputs and
195
// returns them.
196
func (m *MempoolNotifier) findRelevantInputs(tx *btcutil.Tx) (inputsWithTx,
197
        error) {
72✔
198

72✔
199
        txid := tx.Hash()
72✔
200
        watchedInputs := make(inputsWithTx)
72✔
201

72✔
202
        // NOTE: we may have found multiple targeted inputs in the same tx.
72✔
203
        for i, input := range tx.MsgTx().TxIn {
146✔
204
                op := &input.PreviousOutPoint
74✔
205

74✔
206
                // Check whether this input is subscribed.
74✔
207
                _, loaded := m.subscribedInputs.Load(*op)
74✔
208
                if !loaded {
145✔
209
                        continue
71✔
210
                }
211

212
                // If found, save it to watchedInputs to notify the
213
                // subscriber later.
214
                Log.Debugf("Found input %s, spent in %s", op, txid)
3✔
215

3✔
216
                // Construct the spend details.
3✔
217
                details := &SpendDetail{
3✔
218
                        SpentOutPoint:     op,
3✔
219
                        SpenderTxHash:     txid,
3✔
220
                        SpendingTx:        tx.MsgTx().Copy(),
3✔
221
                        SpenderInputIndex: uint32(i),
3✔
222
                        SpendingHeight:    0,
3✔
223
                }
3✔
224
                watchedInputs[*op] = details
3✔
225

3✔
226
                // Sanity check the witness stack. If it's not empty, continue
3✔
227
                // to next iteration.
3✔
228
                if details.HasSpenderWitness() {
5✔
229
                        continue
2✔
230
                }
231

232
                // Return an error if the witness data is not present in the
233
                // spending transaction.
234
                Log.Criticalf("Found spending tx for outpoint=%v in mempool, "+
1✔
235
                        "but the transaction %v does not have witness",
1✔
236
                        op, details.SpendingTx.TxHash())
1✔
237

1✔
238
                return nil, ErrEmptyWitnessStack
1✔
239
        }
240

241
        return watchedInputs, nil
71✔
242
}
243

244
// notifySpent iterates all the spentInputs and notifies the subscribers about
245
// the spent details.
246
func (m *MempoolNotifier) notifySpent(spentInputs inputsWithTx) {
42✔
247
        // notifySingle sends a notification to a single subscriber about the
42✔
248
        // spending event.
42✔
249
        //
42✔
250
        // NOTE: must be used inside a goroutine.
42✔
251
        notifySingle := func(id uint64, sub *MempoolSpendEvent,
42✔
252
                op wire.OutPoint, detail *SpendDetail) {
48✔
253

6✔
254
                defer m.wg.Done()
6✔
255

6✔
256
                // Send the spend details to the subscriber.
6✔
257
                select {
6✔
258
                case sub.event <- detail:
6✔
259
                        Log.Debugf("Notified(id=%v) mempool spent for input %s",
6✔
260
                                sub.id, op)
6✔
261

262
                case <-sub.cancel:
×
263
                        Log.Debugf("Subscription(id=%v) canceled, skipped "+
×
264
                                "notifying spent for input %s", sub.id, op)
×
265

266
                case <-m.quit:
×
267
                        Log.Debugf("Mempool notifier quit, skipped notifying "+
×
268
                                "mempool spent for input %s", op)
×
269
                }
270
        }
271

272
        // notifyAll is a helper closure that constructs a spend detail and
273
        // sends it to all the subscribers of that particular input.
274
        //
275
        // NOTE: must be used inside a goroutine.
276
        notifyAll := func(detail *SpendDetail, op wire.OutPoint) {
47✔
277
                defer m.wg.Done()
5✔
278

5✔
279
                txid := detail.SpendingTx.TxHash()
5✔
280
                Log.Debugf("Notifying all clients for the spend of %s in tx %s",
5✔
281
                        op, txid)
5✔
282

5✔
283
                // Load the subscriber.
5✔
284
                subs, loaded := m.subscribedInputs.Load(op)
5✔
285
                if !loaded {
5✔
UNCOV
286
                        Log.Errorf("Sub not found for %s", op)
×
UNCOV
287
                        return
×
UNCOV
288
                }
×
289

290
                // Iterate all the subscribers for this input and notify them.
291
                subs.ForEach(func(id uint64, sub *MempoolSpendEvent) error {
11✔
292
                        m.wg.Add(1)
6✔
293
                        go notifySingle(id, sub, op, detail)
6✔
294

6✔
295
                        return nil
6✔
296
                })
6✔
297
        }
298

299
        // Iterate the spent inputs to notify the subscribers concurrently.
300
        for op, tx := range spentInputs {
47✔
301
                op, tx := op, tx
5✔
302

5✔
303
                m.wg.Add(1)
5✔
304
                go notifyAll(tx, op)
5✔
305
        }
5✔
306
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc