• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.57
/chainntnfs/mempool.go
1
package chainntnfs
2

3
import (
4
        "sync"
5
        "sync/atomic"
6

7
        "github.com/btcsuite/btcd/btcutil"
8
        "github.com/btcsuite/btcd/wire"
9
        "github.com/lightningnetwork/lnd/lnutils"
10
)
11

12
// inputsWithTx is a map of outpoints to the tx that spends them.
13
type inputsWithTx map[wire.OutPoint]*SpendDetail
14

15
// MempoolNotifier defines an internal mempool notifier that's used to notify
16
// the spending of given inputs. This is mounted to either `BitcoindNotifier`
17
// or `BtcdNotifier` depending on the chain backend.
18
type MempoolNotifier struct {
19
        wg sync.WaitGroup
20

21
        // subscribedInputs stores the inputs that we want to watch their
22
        // spending event for.
23
        subscribedInputs *lnutils.SyncMap[wire.OutPoint,
24
                *lnutils.SyncMap[uint64, *MempoolSpendEvent]]
25

26
        // sCounter is used to generate unique subscription IDs.
27
        sCounter atomic.Uint64
28

29
        // quit is closed when the notifier is torn down.
30
        quit chan struct{}
31
}
32

33
// MempoolSpendEvent is returned to the subscriber to watch for the spending
34
// event for the requested input.
35
type MempoolSpendEvent struct {
36
        // Spend is a receive only channel which will be sent upon once the
37
        // target outpoint has been spent.
38
        //
39
        // NOTE: This channel must be buffered.
40
        Spend <-chan *SpendDetail
41

42
        // id is the unique identifier of this subscription.
43
        id uint64
44

45
        // outpoint is the subscribed outpoint.
46
        outpoint wire.OutPoint
47

48
        // event is the channel that will be sent upon once the target outpoint
49
        // is spent.
50
        event chan *SpendDetail
51

52
        // cancel cancels the subscription.
53
        cancel chan struct{}
54
}
55

56
// newMempoolSpendEvent returns a new instance of MempoolSpendEvent.
57
func newMempoolSpendEvent(id uint64, op wire.OutPoint) *MempoolSpendEvent {
16✔
58
        sub := &MempoolSpendEvent{
16✔
59
                id:       id,
16✔
60
                outpoint: op,
16✔
61
                event:    make(chan *SpendDetail, 1),
16✔
62
                cancel:   make(chan struct{}),
16✔
63
        }
16✔
64

16✔
65
        // Mount the receive only channel to the event channel.
16✔
66
        sub.Spend = (<-chan *SpendDetail)(sub.event)
16✔
67

16✔
68
        return sub
16✔
69
}
16✔
70

71
// NewMempoolNotifier takes a chain connection and returns a new mempool
72
// notifier.
73
func NewMempoolNotifier() *MempoolNotifier {
31✔
74
        return &MempoolNotifier{
31✔
75
                subscribedInputs: &lnutils.SyncMap[
31✔
76
                        wire.OutPoint, *lnutils.SyncMap[
31✔
77
                                uint64, *MempoolSpendEvent,
31✔
78
                        ]]{},
31✔
79
                quit: make(chan struct{}),
31✔
80
        }
31✔
81
}
31✔
82

83
// SubscribeInput takes an outpoint of the input and returns a channel that the
84
// subscriber can listen to for the spending event.
85
func (m *MempoolNotifier) SubscribeInput(
86
        outpoint wire.OutPoint) *MempoolSpendEvent {
15✔
87

15✔
88
        // Get the current subscribers for this input or create a new one.
15✔
89
        clients := &lnutils.SyncMap[uint64, *MempoolSpendEvent]{}
15✔
90
        clients, _ = m.subscribedInputs.LoadOrStore(outpoint, clients)
15✔
91

15✔
92
        // Increment the subscription counter and return the new value.
15✔
93
        subscriptionID := m.sCounter.Add(1)
15✔
94

15✔
95
        // Create a new subscription.
15✔
96
        sub := newMempoolSpendEvent(subscriptionID, outpoint)
15✔
97

15✔
98
        // Add the subscriber with a unique id.
15✔
99
        clients.Store(subscriptionID, sub)
15✔
100

15✔
101
        // Update the subscribed inputs.
15✔
102
        m.subscribedInputs.Store(outpoint, clients)
15✔
103

15✔
104
        Log.Debugf("Subscribed(id=%v) mempool event for input=%s",
15✔
105
                subscriptionID, outpoint)
15✔
106

15✔
107
        return sub
15✔
108
}
15✔
109

110
// UnsubscribeInput removes all the subscriptions for the given outpoint.
111
func (m *MempoolNotifier) UnsubscribeInput(outpoint wire.OutPoint) {
2✔
112
        Log.Debugf("Unsubscribing MempoolSpendEvent for input %s", outpoint)
2✔
113
        m.subscribedInputs.Delete(outpoint)
2✔
114
}
2✔
115

116
// UnsubscribeEvent removes a given subscriber for the given MempoolSpendEvent.
117
func (m *MempoolNotifier) UnsubscribeEvent(sub *MempoolSpendEvent) {
2✔
118
        Log.Debugf("Unsubscribing(id=%v) MempoolSpendEvent for input=%s",
2✔
119
                sub.id, sub.outpoint)
2✔
120

2✔
121
        // Load all the subscribers for this input.
2✔
122
        clients, loaded := m.subscribedInputs.Load(sub.outpoint)
2✔
123
        if !loaded {
2✔
UNCOV
124
                Log.Debugf("No subscribers for input %s", sub.outpoint)
×
UNCOV
125
                return
×
UNCOV
126
        }
×
127

128
        // Load the subscriber.
129
        subscriber, loaded := clients.Load(sub.id)
2✔
130
        if !loaded {
2✔
131
                Log.Debugf("No subscribers for input %s with id %v",
×
132
                        sub.outpoint, sub.id)
×
133
                return
×
134
        }
×
135

136
        // Close the cancel channel in case it's been used in a goroutine.
137
        close(subscriber.cancel)
2✔
138

2✔
139
        // Remove the subscriber.
2✔
140
        clients.Delete(sub.id)
2✔
141
}
142

143
// UnsubsribeConfirmedSpentTx takes a transaction and removes the subscriptions
144
// identified using its inputs.
145
func (m *MempoolNotifier) UnsubsribeConfirmedSpentTx(tx *btcutil.Tx) {
32✔
146
        Log.Tracef("Unsubscribe confirmed tx %s", tx.Hash())
32✔
147

32✔
148
        // Get the spent inputs of interest.
32✔
149
        spentInputs, err := m.findRelevantInputs(tx)
32✔
150
        if err != nil {
32✔
151
                Log.Errorf("Unable to find relevant inputs for tx %s: %v",
×
152
                        tx.Hash(), err)
×
153

×
154
                return
×
155
        }
×
156

157
        // Unsubscribe the subscribers.
158
        for outpoint := range spentInputs {
33✔
159
                m.UnsubscribeInput(outpoint)
1✔
160
        }
1✔
161

162
        Log.Tracef("Finished unsubscribing confirmed tx %s, found %d inputs",
32✔
163
                tx.Hash(), len(spentInputs))
32✔
164
}
165

166
// ProcessRelevantSpendTx takes a transaction and checks whether it spends any
167
// of the subscribed inputs. If so, spend notifications are sent to the
168
// relevant subscribers.
169
func (m *MempoolNotifier) ProcessRelevantSpendTx(tx *btcutil.Tx) error {
39✔
170
        Log.Tracef("Processing mempool tx %s", tx.Hash())
39✔
171
        defer Log.Tracef("Finished processing mempool tx %s", tx.Hash())
39✔
172

39✔
173
        // Get the spent inputs of interest.
39✔
174
        spentInputs, err := m.findRelevantInputs(tx)
39✔
175
        if err != nil {
39✔
176
                return err
×
177
        }
×
178

179
        // Notify the subscribers.
180
        m.notifySpent(spentInputs)
39✔
181

39✔
182
        return nil
39✔
183
}
184

185
// TearDown stops the notifier and cleans up resources.
186
func (m *MempoolNotifier) TearDown() {
18✔
187
        Log.Infof("Stopping mempool notifier")
18✔
188
        defer Log.Debug("mempool notifier stopped")
18✔
189

18✔
190
        close(m.quit)
18✔
191
        m.wg.Wait()
18✔
192
}
18✔
193

194
// findRelevantInputs takes a transaction to find the subscribed inputs and
195
// returns them.
196
func (m *MempoolNotifier) findRelevantInputs(tx *btcutil.Tx) (inputsWithTx,
197
        error) {
73✔
198

73✔
199
        txid := tx.Hash()
73✔
200
        watchedInputs := make(inputsWithTx)
73✔
201

73✔
202
        // NOTE: we may have found multiple targeted inputs in the same tx.
73✔
203
        for i, input := range tx.MsgTx().TxIn {
148✔
204
                op := &input.PreviousOutPoint
75✔
205

75✔
206
                // Check whether this input is subscribed.
75✔
207
                _, loaded := m.subscribedInputs.Load(*op)
75✔
208
                if !loaded {
147✔
209
                        continue
72✔
210
                }
211

212
                // If found, save it to watchedInputs to notify the
213
                // subscriber later.
214
                Log.Infof("Found input %s, spent in %s", op, txid)
3✔
215

3✔
216
                // Construct the spend details.
3✔
217
                details := &SpendDetail{
3✔
218
                        SpentOutPoint:     op,
3✔
219
                        SpenderTxHash:     txid,
3✔
220
                        SpendingTx:        tx.MsgTx().Copy(),
3✔
221
                        SpenderInputIndex: uint32(i),
3✔
222
                        SpendingHeight:    0,
3✔
223
                }
3✔
224
                watchedInputs[*op] = details
3✔
225

3✔
226
                // Sanity check the witness stack. If it's not empty, continue
3✔
227
                // to next iteration.
3✔
228
                if details.HasSpenderWitness() {
5✔
229
                        continue
2✔
230
                }
231

232
                // Return an error if the witness data is not present in the
233
                // spending transaction.
234
                Log.Criticalf("Found spending tx for outpoint=%v in mempool, "+
1✔
235
                        "but the transaction %v does not have witness",
1✔
236
                        op, details.SpendingTx.TxHash())
1✔
237

1✔
238
                return nil, ErrEmptyWitnessStack
1✔
239
        }
240

241
        return watchedInputs, nil
72✔
242
}
243

244
// notifySpent iterates all the spentInputs and notifies the subscribers about
245
// the spent details.
246
func (m *MempoolNotifier) notifySpent(spentInputs inputsWithTx) {
43✔
247
        // notifySingle sends a notification to a single subscriber about the
43✔
248
        // spending event.
43✔
249
        //
43✔
250
        // NOTE: must be used inside a goroutine.
43✔
251
        notifySingle := func(id uint64, sub *MempoolSpendEvent,
43✔
252
                op wire.OutPoint, detail *SpendDetail) {
49✔
253

6✔
254
                defer m.wg.Done()
6✔
255

6✔
256
                // Send the spend details to the subscriber.
6✔
257
                select {
6✔
258
                case sub.event <- detail:
6✔
259
                        Log.Debugf("Notified(id=%v) mempool spent for input %s",
6✔
260
                                sub.id, op)
6✔
261

262
                case <-sub.cancel:
×
263
                        Log.Debugf("Subscription(id=%v) canceled, skipped "+
×
264
                                "notifying spent for input %s", sub.id, op)
×
265

266
                case <-m.quit:
×
267
                        Log.Debugf("Mempool notifier quit, skipped notifying "+
×
268
                                "mempool spent for input %s", op)
×
269
                }
270
        }
271

272
        // notifyAll is a helper closure that constructs a spend detail and
273
        // sends it to all the subscribers of that particular input.
274
        //
275
        // NOTE: must be used inside a goroutine.
276
        notifyAll := func(detail *SpendDetail, op wire.OutPoint) {
48✔
277
                defer m.wg.Done()
5✔
278

5✔
279
                txid := detail.SpendingTx.TxHash()
5✔
280
                Log.Debugf("Notifying all clients for the spend of %s in tx %s",
5✔
281
                        op, txid)
5✔
282

5✔
283
                // Load the subscriber.
5✔
284
                subs, loaded := m.subscribedInputs.Load(op)
5✔
285
                if !loaded {
5✔
UNCOV
286
                        Log.Errorf("Sub not found for %s", op)
×
UNCOV
287
                        return
×
UNCOV
288
                }
×
289

290
                // Iterate all the subscribers for this input and notify them.
291
                subs.ForEach(func(id uint64, sub *MempoolSpendEvent) error {
11✔
292
                        m.wg.Add(1)
6✔
293
                        go notifySingle(id, sub, op, detail)
6✔
294

6✔
295
                        return nil
6✔
296
                })
6✔
297
        }
298

299
        // Iterate the spent inputs to notify the subscribers concurrently.
300
        for op, tx := range spentInputs {
48✔
301
                op, tx := op, tx
5✔
302

5✔
303
                m.wg.Add(1)
5✔
304
                go notifyAll(tx, op)
5✔
305
        }
5✔
306
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc