• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.57
/chainntnfs/mempool.go
1
package chainntnfs
2

3
import (
4
        "sync"
5
        "sync/atomic"
6

7
        "github.com/btcsuite/btcd/btcutil"
8
        "github.com/btcsuite/btcd/wire"
9
        "github.com/lightningnetwork/lnd/lnutils"
10
)
11

12
// inputsWithTx is a map of outpoints to the tx that spends them.
13
type inputsWithTx map[wire.OutPoint]*SpendDetail
14

15
// MempoolNotifier defines an internal mempool notifier that's used to notify
16
// the spending of given inputs. This is mounted to either `BitcoindNotifier`
17
// or `BtcdNotifier` depending on the chain backend.
18
type MempoolNotifier struct {
19
        wg sync.WaitGroup
20

21
        // subscribedInputs stores the inputs that we want to watch their
22
        // spending event for.
23
        subscribedInputs *lnutils.SyncMap[wire.OutPoint,
24
                *lnutils.SyncMap[uint64, *MempoolSpendEvent]]
25

26
        // sCounter is used to generate unique subscription IDs.
27
        sCounter atomic.Uint64
28

29
        // quit is closed when the notifier is torn down.
30
        quit chan struct{}
31
}
32

33
// MempoolSpendEvent is returned to the subscriber to watch for the spending
34
// event for the requested input.
35
type MempoolSpendEvent struct {
36
        // Spend is a receive only channel which will be sent upon once the
37
        // target outpoint has been spent.
38
        //
39
        // NOTE: This channel must be buffered.
40
        Spend <-chan *SpendDetail
41

42
        // id is the unique identifier of this subscription.
43
        id uint64
44

45
        // outpoint is the subscribed outpoint.
46
        outpoint wire.OutPoint
47

48
        // event is the channel that will be sent upon once the target outpoint
49
        // is spent.
50
        event chan *SpendDetail
51

52
        // cancel cancels the subscription.
53
        cancel chan struct{}
54
}
55

56
// newMempoolSpendEvent returns a new instance of MempoolSpendEvent.
57
func newMempoolSpendEvent(id uint64, op wire.OutPoint) *MempoolSpendEvent {
16✔
58
        sub := &MempoolSpendEvent{
16✔
59
                id:       id,
16✔
60
                outpoint: op,
16✔
61
                event:    make(chan *SpendDetail, 1),
16✔
62
                cancel:   make(chan struct{}),
16✔
63
        }
16✔
64

16✔
65
        // Mount the receive only channel to the event channel.
16✔
66
        sub.Spend = (<-chan *SpendDetail)(sub.event)
16✔
67

16✔
68
        return sub
16✔
69
}
16✔
70

71
// NewMempoolNotifier takes a chain connection and returns a new mempool
72
// notifier.
73
func NewMempoolNotifier() *MempoolNotifier {
31✔
74
        return &MempoolNotifier{
31✔
75
                subscribedInputs: &lnutils.SyncMap[
31✔
76
                        wire.OutPoint, *lnutils.SyncMap[
31✔
77
                                uint64, *MempoolSpendEvent,
31✔
78
                        ]]{},
31✔
79
                quit: make(chan struct{}),
31✔
80
        }
31✔
81
}
31✔
82

83
// SubscribeInput takes an outpoint of the input and returns a channel that the
84
// subscriber can listen to for the spending event.
85
func (m *MempoolNotifier) SubscribeInput(
86
        outpoint wire.OutPoint) *MempoolSpendEvent {
15✔
87

15✔
88
        // Get the current subscribers for this input or create a new one.
15✔
89
        clients := &lnutils.SyncMap[uint64, *MempoolSpendEvent]{}
15✔
90
        clients, _ = m.subscribedInputs.LoadOrStore(outpoint, clients)
15✔
91

15✔
92
        // Increment the subscription counter and return the new value.
15✔
93
        subscriptionID := m.sCounter.Add(1)
15✔
94

15✔
95
        // Create a new subscription.
15✔
96
        sub := newMempoolSpendEvent(subscriptionID, outpoint)
15✔
97

15✔
98
        // Add the subscriber with a unique id.
15✔
99
        clients.Store(subscriptionID, sub)
15✔
100

15✔
101
        // Update the subscribed inputs.
15✔
102
        m.subscribedInputs.Store(outpoint, clients)
15✔
103

15✔
104
        Log.Debugf("Subscribed(id=%v) mempool event for input=%s",
15✔
105
                subscriptionID, outpoint)
15✔
106

15✔
107
        return sub
15✔
108
}
15✔
109

110
// UnsubscribeInput removes all the subscriptions for the given outpoint.
111
func (m *MempoolNotifier) UnsubscribeInput(outpoint wire.OutPoint) {
2✔
112
        Log.Debugf("Unsubscribing MempoolSpendEvent for input %s", outpoint)
2✔
113
        m.subscribedInputs.Delete(outpoint)
2✔
114
}
2✔
115

116
// UnsubscribeEvent removes a given subscriber for the given MempoolSpendEvent.
117
func (m *MempoolNotifier) UnsubscribeEvent(sub *MempoolSpendEvent) {
2✔
118
        Log.Debugf("Unsubscribing(id=%v) MempoolSpendEvent for input=%s",
2✔
119
                sub.id, sub.outpoint)
2✔
120

2✔
121
        // Load all the subscribers for this input.
2✔
122
        clients, loaded := m.subscribedInputs.Load(sub.outpoint)
2✔
123
        if !loaded {
2✔
124
                Log.Debugf("No subscribers for input %s", sub.outpoint)
×
125
                return
×
126
        }
×
127

128
        // Load the subscriber.
129
        subscriber, loaded := clients.Load(sub.id)
2✔
130
        if !loaded {
2✔
131
                Log.Debugf("No subscribers for input %s with id %v",
×
132
                        sub.outpoint, sub.id)
×
133
                return
×
134
        }
×
135

136
        // Close the cancel channel in case it's been used in a goroutine.
137
        close(subscriber.cancel)
2✔
138

2✔
139
        // Remove the subscriber.
2✔
140
        clients.Delete(sub.id)
2✔
141
}
142

143
// UnsubsribeConfirmedSpentTx takes a transaction and removes the subscriptions
144
// identified using its inputs.
145
func (m *MempoolNotifier) UnsubsribeConfirmedSpentTx(tx *btcutil.Tx) {
32✔
146
        Log.Tracef("Unsubscribe confirmed tx %s", tx.Hash())
32✔
147

32✔
148
        // Get the spent inputs of interest.
32✔
149
        spentInputs, err := m.findRelevantInputs(tx)
32✔
150
        if err != nil {
32✔
151
                Log.Errorf("Unable to find relevant inputs for tx %s: %v",
×
152
                        tx.Hash(), err)
×
153

×
154
                return
×
155
        }
×
156

157
        // Unsubscribe the subscribers.
158
        for outpoint := range spentInputs {
33✔
159
                m.UnsubscribeInput(outpoint)
1✔
160
        }
1✔
161

162
        Log.Tracef("Finished unsubscribing confirmed tx %s, found %d inputs",
32✔
163
                tx.Hash(), len(spentInputs))
32✔
164
}
165

166
// ProcessRelevantSpendTx takes a transaction and checks whether it spends any
167
// of the subscribed inputs. If so, spend notifications are sent to the
168
// relevant subscribers.
169
func (m *MempoolNotifier) ProcessRelevantSpendTx(tx *btcutil.Tx) error {
38✔
170
        Log.Tracef("Processing mempool tx %s", tx.Hash())
38✔
171
        defer Log.Tracef("Finished processing mempool tx %s", tx.Hash())
38✔
172

38✔
173
        // Get the spent inputs of interest.
38✔
174
        spentInputs, err := m.findRelevantInputs(tx)
38✔
175
        if err != nil {
38✔
176
                return err
×
177
        }
×
178

179
        // Notify the subscribers.
180
        m.notifySpent(spentInputs)
38✔
181

38✔
182
        return nil
38✔
183
}
184

185
// TearDown stops the notifier and cleans up resources.
186
func (m *MempoolNotifier) TearDown() {
18✔
187
        Log.Infof("Stopping mempool notifier")
18✔
188
        defer Log.Debug("mempool notifier stopped")
18✔
189

18✔
190
        close(m.quit)
18✔
191
        m.wg.Wait()
18✔
192
}
18✔
193

194
// findRelevantInputs takes a transaction to find the subscribed inputs and
195
// returns them.
196
func (m *MempoolNotifier) findRelevantInputs(tx *btcutil.Tx) (inputsWithTx,
197
        error) {
72✔
198

72✔
199
        txid := tx.Hash()
72✔
200
        watchedInputs := make(inputsWithTx)
72✔
201

72✔
202
        // NOTE: we may have found multiple targeted inputs in the same tx.
72✔
203
        for i, input := range tx.MsgTx().TxIn {
146✔
204
                op := &input.PreviousOutPoint
74✔
205

74✔
206
                // Check whether this input is subscribed.
74✔
207
                _, loaded := m.subscribedInputs.Load(*op)
74✔
208
                if !loaded {
145✔
209
                        continue
71✔
210
                }
211

212
                // If found, save it to watchedInputs to notify the
213
                // subscriber later.
214
                Log.Infof("Found input %s, spent in %s", op, txid)
3✔
215

3✔
216
                // Construct the spend details.
3✔
217
                details := &SpendDetail{
3✔
218
                        SpentOutPoint:     op,
3✔
219
                        SpenderTxHash:     txid,
3✔
220
                        SpendingTx:        tx.MsgTx().Copy(),
3✔
221
                        SpenderInputIndex: uint32(i),
3✔
222
                        SpendingHeight:    0,
3✔
223
                }
3✔
224
                watchedInputs[*op] = details
3✔
225

3✔
226
                // Sanity check the witness stack. If it's not empty, continue
3✔
227
                // to next iteration.
3✔
228
                if details.HasSpenderWitness() {
5✔
229
                        continue
2✔
230
                }
231

232
                // Return an error if the witness data is not present in the
233
                // spending transaction.
234
                Log.Criticalf("Found spending tx for outpoint=%v in mempool, "+
1✔
235
                        "but the transaction %v does not have witness",
1✔
236
                        op, details.SpendingTx.TxHash())
1✔
237

1✔
238
                return nil, ErrEmptyWitnessStack
1✔
239
        }
240

241
        return watchedInputs, nil
71✔
242
}
243

244
// notifySpent iterates all the spentInputs and notifies the subscribers about
245
// the spent details.
246
func (m *MempoolNotifier) notifySpent(spentInputs inputsWithTx) {
42✔
247
        // notifySingle sends a notification to a single subscriber about the
42✔
248
        // spending event.
42✔
249
        //
42✔
250
        // NOTE: must be used inside a goroutine.
42✔
251
        notifySingle := func(id uint64, sub *MempoolSpendEvent,
42✔
252
                op wire.OutPoint, detail *SpendDetail) {
48✔
253

6✔
254
                defer m.wg.Done()
6✔
255

6✔
256
                // Send the spend details to the subscriber.
6✔
257
                select {
6✔
258
                case sub.event <- detail:
6✔
259
                        Log.Debugf("Notified(id=%v) mempool spent for input %s",
6✔
260
                                sub.id, op)
6✔
261

262
                case <-sub.cancel:
×
263
                        Log.Debugf("Subscription(id=%v) canceled, skipped "+
×
264
                                "notifying spent for input %s", sub.id, op)
×
265

266
                case <-m.quit:
×
267
                        Log.Debugf("Mempool notifier quit, skipped notifying "+
×
268
                                "mempool spent for input %s", op)
×
269
                }
270
        }
271

272
        // notifyAll is a helper closure that constructs a spend detail and
273
        // sends it to all the subscribers of that particular input.
274
        //
275
        // NOTE: must be used inside a goroutine.
276
        notifyAll := func(detail *SpendDetail, op wire.OutPoint) {
47✔
277
                defer m.wg.Done()
5✔
278

5✔
279
                txid := detail.SpendingTx.TxHash()
5✔
280
                Log.Debugf("Notifying all clients for the spend of %s in tx %s",
5✔
281
                        op, txid)
5✔
282

5✔
283
                // Load the subscriber.
5✔
284
                subs, loaded := m.subscribedInputs.Load(op)
5✔
285
                if !loaded {
5✔
286
                        Log.Errorf("Sub not found for %s", op)
×
287
                        return
×
288
                }
×
289

290
                // Iterate all the subscribers for this input and notify them.
291
                subs.ForEach(func(id uint64, sub *MempoolSpendEvent) error {
11✔
292
                        m.wg.Add(1)
6✔
293
                        go notifySingle(id, sub, op, detail)
6✔
294

6✔
295
                        return nil
6✔
296
                })
6✔
297
        }
298

299
        // Iterate the spent inputs to notify the subscribers concurrently.
300
        for op, tx := range spentInputs {
47✔
301
                op, tx := op, tx
5✔
302

5✔
303
                m.wg.Add(1)
5✔
304
                go notifyAll(tx, op)
5✔
305
        }
5✔
306
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc