• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.32
/discovery/message_store.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8

9
        "github.com/lightningnetwork/lnd/kvdb"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
)
12

13
var (
14
        // messageStoreBucket is a key used to create a top level bucket in the
15
        // gossiper database, used for storing messages that are to be sent to
16
        // peers. Upon restarts, these messages will be read and resent to their
17
        // respective peers.
18
        //
19
        // maps:
20
        //   pubKey (33 bytes) + msgShortChanID (8 bytes) + msgType (2 bytes) -> msg
21
        messageStoreBucket = []byte("message-store")
22

23
        // ErrUnsupportedMessage is an error returned when we attempt to add a
24
        // message to the store that is not supported.
25
        ErrUnsupportedMessage = errors.New("unsupported message type")
26

27
        // ErrCorruptedMessageStore indicates that the on-disk bucketing
28
        // structure has altered since the gossip message store instance was
29
        // initialized.
30
        ErrCorruptedMessageStore = errors.New("gossip message store has been " +
31
                "corrupted")
32
)
33

34
// GossipMessageStore is a store responsible for storing gossip messages which
35
// we should reliably send to our peers.
36
type GossipMessageStore interface {
37
        // AddMessage adds a message to the store for this peer.
38
        AddMessage(lnwire.Message, [33]byte) error
39

40
        // DeleteMessage deletes a message from the store for this peer.
41
        DeleteMessage(lnwire.Message, [33]byte) error
42

43
        // Messages returns the total set of messages that exist within the
44
        // store for all peers.
45
        Messages() (map[[33]byte][]lnwire.Message, error)
46

47
        // Peers returns the public key of all peers with messages within the
48
        // store.
49
        Peers() (map[[33]byte]struct{}, error)
50

51
        // MessagesForPeer returns the set of messages that exists within the
52
        // store for the given peer.
53
        MessagesForPeer([33]byte) ([]lnwire.Message, error)
54
}
55

56
// MessageStore is an implementation of the GossipMessageStore interface backed
57
// by a channeldb instance. By design, this store will only keep the latest
58
// version of a message (like in the case of multiple ChannelUpdate's) for a
59
// channel with a peer.
60
type MessageStore struct {
61
        db kvdb.Backend
62
}
63

64
// A compile-time assertion to ensure messageStore implements the
65
// GossipMessageStore interface.
66
var _ GossipMessageStore = (*MessageStore)(nil)
67

68
// NewMessageStore creates a new message store backed by a channeldb instance.
69
func NewMessageStore(db kvdb.Backend) (*MessageStore, error) {
3✔
70
        err := kvdb.Batch(db, func(tx kvdb.RwTx) error {
6✔
71
                _, err := tx.CreateTopLevelBucket(messageStoreBucket)
3✔
72
                return err
3✔
73
        })
3✔
74
        if err != nil {
3✔
75
                return nil, fmt.Errorf("unable to create required buckets: %w",
×
76
                        err)
×
77
        }
×
78

79
        return &MessageStore{db}, nil
3✔
80
}
81

82
// msgShortChanID retrieves the short channel ID of the message.
83
func msgShortChanID(msg lnwire.Message) (lnwire.ShortChannelID, error) {
3✔
84
        var shortChanID lnwire.ShortChannelID
3✔
85
        switch msg := msg.(type) {
3✔
86
        case *lnwire.AnnounceSignatures1:
3✔
87
                shortChanID = msg.ShortChannelID
3✔
88
        case *lnwire.ChannelUpdate1:
3✔
89
                shortChanID = msg.ShortChannelID
3✔
UNCOV
90
        default:
×
UNCOV
91
                return shortChanID, ErrUnsupportedMessage
×
92
        }
93

94
        return shortChanID, nil
3✔
95
}
96

97
// messageStoreKey constructs the database key for the message to be stored.
98
func messageStoreKey(msg lnwire.Message, peerPubKey [33]byte) ([]byte, error) {
3✔
99
        shortChanID, err := msgShortChanID(msg)
3✔
100
        if err != nil {
3✔
UNCOV
101
                return nil, err
×
UNCOV
102
        }
×
103

104
        var k [33 + 8 + 2]byte
3✔
105
        copy(k[:33], peerPubKey[:])
3✔
106
        binary.BigEndian.PutUint64(k[33:41], shortChanID.ToUint64())
3✔
107
        binary.BigEndian.PutUint16(k[41:43], uint16(msg.MsgType()))
3✔
108

3✔
109
        return k[:], nil
3✔
110
}
111

112
// AddMessage adds a message to the store for this peer.
113
func (s *MessageStore) AddMessage(msg lnwire.Message, peerPubKey [33]byte) error {
3✔
114
        log.Tracef("Adding message of type %v to store for peer %x",
3✔
115
                msg.MsgType(), peerPubKey)
3✔
116

3✔
117
        // Construct the key for which we'll find this message with in the
3✔
118
        // store.
3✔
119
        msgKey, err := messageStoreKey(msg, peerPubKey)
3✔
120
        if err != nil {
3✔
UNCOV
121
                return err
×
UNCOV
122
        }
×
123

124
        // Serialize the message with its wire encoding.
125
        var b bytes.Buffer
3✔
126
        if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil {
3✔
127
                return err
×
128
        }
×
129

130
        return kvdb.Batch(s.db, func(tx kvdb.RwTx) error {
6✔
131
                messageStore := tx.ReadWriteBucket(messageStoreBucket)
3✔
132
                if messageStore == nil {
3✔
133
                        return ErrCorruptedMessageStore
×
134
                }
×
135

136
                return messageStore.Put(msgKey, b.Bytes())
3✔
137
        })
138
}
139

140
// DeleteMessage deletes a message from the store for this peer.
141
func (s *MessageStore) DeleteMessage(msg lnwire.Message,
142
        peerPubKey [33]byte) error {
3✔
143

3✔
144
        log.Tracef("Deleting message of type %v from store for peer %x",
3✔
145
                msg.MsgType(), peerPubKey)
3✔
146

3✔
147
        // Construct the key for which we'll find this message with in the
3✔
148
        // store.
3✔
149
        msgKey, err := messageStoreKey(msg, peerPubKey)
3✔
150
        if err != nil {
3✔
151
                return err
×
152
        }
×
153

154
        return kvdb.Batch(s.db, func(tx kvdb.RwTx) error {
6✔
155
                messageStore := tx.ReadWriteBucket(messageStoreBucket)
3✔
156
                if messageStore == nil {
3✔
157
                        return ErrCorruptedMessageStore
×
158
                }
×
159

160
                // In the event that we're attempting to delete a ChannelUpdate
161
                // from the store, we'll make sure that we're actually deleting
162
                // the correct one as it can be overwritten.
163
                if msg, ok := msg.(*lnwire.ChannelUpdate1); ok {
6✔
164
                        // Deleting a value from a bucket that doesn't exist
3✔
165
                        // acts as a NOP, so we'll return if a message doesn't
3✔
166
                        // exist under this key.
3✔
167
                        v := messageStore.Get(msgKey)
3✔
168
                        if v == nil {
3✔
169
                                return nil
×
170
                        }
×
171

172
                        dbMsg, err := lnwire.ReadMessage(bytes.NewReader(v), 0)
3✔
173
                        if err != nil {
3✔
174
                                return err
×
175
                        }
×
176

177
                        // If the timestamps don't match, then the update stored
178
                        // should be the latest one, so we'll avoid deleting it.
179
                        m, ok := dbMsg.(*lnwire.ChannelUpdate1)
3✔
180
                        if !ok {
3✔
181
                                return fmt.Errorf("expected "+
×
182
                                        "*lnwire.ChannelUpdate1, got: %T",
×
183
                                        dbMsg)
×
184
                        }
×
185
                        if msg.Timestamp != m.Timestamp {
3✔
UNCOV
186
                                return nil
×
UNCOV
187
                        }
×
188
                }
189

190
                return messageStore.Delete(msgKey)
3✔
191
        })
192
}
193

194
// readMessage reads a message from its serialized form and ensures its
195
// supported by the current version of the message store.
196
func readMessage(msgBytes []byte) (lnwire.Message, error) {
3✔
197
        msg, err := lnwire.ReadMessage(bytes.NewReader(msgBytes), 0)
3✔
198
        if err != nil {
3✔
199
                return nil, err
×
200
        }
×
201

202
        // Check if the message is supported by the store. We can reuse the
203
        // check for ShortChannelID as its a dependency on messages stored.
204
        if _, err := msgShortChanID(msg); err != nil {
3✔
UNCOV
205
                return nil, err
×
UNCOV
206
        }
×
207

208
        return msg, nil
3✔
209
}
210

211
// Messages returns the total set of messages that exist within the store for
212
// all peers.
UNCOV
213
func (s *MessageStore) Messages() (map[[33]byte][]lnwire.Message, error) {
×
UNCOV
214
        var msgs map[[33]byte][]lnwire.Message
×
UNCOV
215
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
×
UNCOV
216
                messageStore := tx.ReadBucket(messageStoreBucket)
×
UNCOV
217
                if messageStore == nil {
×
218
                        return ErrCorruptedMessageStore
×
219
                }
×
220

UNCOV
221
                return messageStore.ForEach(func(k, v []byte) error {
×
UNCOV
222
                        var pubKey [33]byte
×
UNCOV
223
                        copy(pubKey[:], k[:33])
×
UNCOV
224

×
UNCOV
225
                        // Deserialize the message from its raw bytes and filter
×
UNCOV
226
                        // out any which are not currently supported by the
×
UNCOV
227
                        // store.
×
UNCOV
228
                        msg, err := readMessage(v)
×
UNCOV
229
                        if err == ErrUnsupportedMessage {
×
UNCOV
230
                                return nil
×
UNCOV
231
                        }
×
UNCOV
232
                        if err != nil {
×
233
                                return err
×
234
                        }
×
235

UNCOV
236
                        msgs[pubKey] = append(msgs[pubKey], msg)
×
UNCOV
237
                        return nil
×
238
                })
UNCOV
239
        }, func() {
×
UNCOV
240
                msgs = make(map[[33]byte][]lnwire.Message)
×
UNCOV
241
        })
×
UNCOV
242
        if err != nil {
×
243
                return nil, err
×
244
        }
×
245

UNCOV
246
        return msgs, nil
×
247
}
248

249
// MessagesForPeer returns the set of messages that exists within the store for
250
// the given peer.
251
func (s *MessageStore) MessagesForPeer(
252
        peerPubKey [33]byte) ([]lnwire.Message, error) {
3✔
253

3✔
254
        var msgs []lnwire.Message
3✔
255
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
256
                messageStore := tx.ReadBucket(messageStoreBucket)
3✔
257
                if messageStore == nil {
3✔
258
                        return ErrCorruptedMessageStore
×
259
                }
×
260

261
                c := messageStore.ReadCursor()
3✔
262
                k, v := c.Seek(peerPubKey[:])
3✔
263
                for ; bytes.HasPrefix(k, peerPubKey[:]); k, v = c.Next() {
6✔
264
                        // Deserialize the message from its raw bytes and filter
3✔
265
                        // out any which are not currently supported by the
3✔
266
                        // store.
3✔
267
                        msg, err := readMessage(v)
3✔
268
                        if err == ErrUnsupportedMessage {
3✔
UNCOV
269
                                continue
×
270
                        }
271
                        if err != nil {
3✔
272
                                return err
×
273
                        }
×
274

275
                        msgs = append(msgs, msg)
3✔
276
                }
277

278
                return nil
3✔
279
        }, func() {
3✔
280
                msgs = nil
3✔
281
        })
3✔
282
        if err != nil {
3✔
283
                return nil, err
×
284
        }
×
285

286
        return msgs, nil
3✔
287
}
288

289
// Peers returns the public key of all peers with messages within the store.
290
func (s *MessageStore) Peers() (map[[33]byte]struct{}, error) {
3✔
291
        var peers map[[33]byte]struct{}
3✔
292
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
293
                messageStore := tx.ReadBucket(messageStoreBucket)
3✔
294
                if messageStore == nil {
3✔
295
                        return ErrCorruptedMessageStore
×
296
                }
×
297

298
                return messageStore.ForEach(func(k, _ []byte) error {
6✔
299
                        var pubKey [33]byte
3✔
300
                        copy(pubKey[:], k[:33])
3✔
301
                        peers[pubKey] = struct{}{}
3✔
302
                        return nil
3✔
303
                })
3✔
304
        }, func() {
3✔
305
                peers = make(map[[33]byte]struct{})
3✔
306
        })
3✔
307
        if err != nil {
3✔
308
                return nil, err
×
309
        }
×
310

311
        return peers, nil
3✔
312
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc