• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16911773184

12 Aug 2025 02:21PM UTC coverage: 57.471% (-9.4%) from 66.9%
16911773184

Pull #10103

github

web-flow
Merge d64a1234d into f3e1f2f35
Pull Request #10103: Rate limit outgoing gossip bandwidth by peer

57 of 77 new or added lines in 5 files covered. (74.03%)

28294 existing lines in 457 files now uncovered.

99110 of 172451 relevant lines covered (57.47%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.32
/discovery/message_store.go
1
package discovery
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8

9
        "github.com/lightningnetwork/lnd/kvdb"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
)
12

13
var (
14
        // messageStoreBucket is a key used to create a top level bucket in the
15
        // gossiper database, used for storing messages that are to be sent to
16
        // peers. Upon restarts, these messages will be read and resent to their
17
        // respective peers.
18
        //
19
        // maps:
20
        //   pubKey (33 bytes) + msgShortChanID (8 bytes) + msgType (2 bytes) -> msg
21
        messageStoreBucket = []byte("message-store")
22

23
        // ErrUnsupportedMessage is an error returned when we attempt to add a
24
        // message to the store that is not supported.
25
        ErrUnsupportedMessage = errors.New("unsupported message type")
26

27
        // ErrCorruptedMessageStore indicates that the on-disk bucketing
28
        // structure has altered since the gossip message store instance was
29
        // initialized.
30
        ErrCorruptedMessageStore = errors.New("gossip message store has been " +
31
                "corrupted")
32
)
33

34
// GossipMessageStore is a store responsible for storing gossip messages which
35
// we should reliably send to our peers.
36
type GossipMessageStore interface {
37
        // AddMessage adds a message to the store for this peer.
38
        AddMessage(lnwire.Message, [33]byte) error
39

40
        // DeleteMessage deletes a message from the store for this peer.
41
        DeleteMessage(lnwire.Message, [33]byte) error
42

43
        // Messages returns the total set of messages that exist within the
44
        // store for all peers.
45
        Messages() (map[[33]byte][]lnwire.Message, error)
46

47
        // Peers returns the public key of all peers with messages within the
48
        // store.
49
        Peers() (map[[33]byte]struct{}, error)
50

51
        // MessagesForPeer returns the set of messages that exists within the
52
        // store for the given peer.
53
        MessagesForPeer([33]byte) ([]lnwire.Message, error)
54
}
55

56
// MessageStore is an implementation of the GossipMessageStore interface backed
57
// by a channeldb instance. By design, this store will only keep the latest
58
// version of a message (like in the case of multiple ChannelUpdate's) for a
59
// channel with a peer.
60
type MessageStore struct {
61
        db kvdb.Backend
62
}
63

64
// A compile-time assertion to ensure messageStore implements the
65
// GossipMessageStore interface.
66
var _ GossipMessageStore = (*MessageStore)(nil)
67

68
// NewMessageStore creates a new message store backed by a channeldb instance.
69
func NewMessageStore(db kvdb.Backend) (*MessageStore, error) {
3✔
70
        err := kvdb.Batch(db, func(tx kvdb.RwTx) error {
6✔
71
                _, err := tx.CreateTopLevelBucket(messageStoreBucket)
3✔
72
                return err
3✔
73
        })
3✔
74
        if err != nil {
3✔
75
                return nil, fmt.Errorf("unable to create required buckets: %w",
×
76
                        err)
×
77
        }
×
78

79
        return &MessageStore{db}, nil
3✔
80
}
81

82
// msgShortChanID retrieves the short channel ID of the message.
83
func msgShortChanID(msg lnwire.Message) (lnwire.ShortChannelID, error) {
3✔
84
        var shortChanID lnwire.ShortChannelID
3✔
85
        switch msg := msg.(type) {
3✔
86
        case *lnwire.AnnounceSignatures1:
3✔
87
                shortChanID = msg.ShortChannelID
3✔
88
        case *lnwire.ChannelUpdate1:
3✔
89
                shortChanID = msg.ShortChannelID
3✔
UNCOV
90
        default:
×
UNCOV
91
                return shortChanID, ErrUnsupportedMessage
×
92
        }
93

94
        return shortChanID, nil
3✔
95
}
96

97
// messageStoreKey constructs the database key for the message to be stored.
98
func messageStoreKey(msg lnwire.Message, peerPubKey [33]byte) ([]byte, error) {
3✔
99
        shortChanID, err := msgShortChanID(msg)
3✔
100
        if err != nil {
3✔
UNCOV
101
                return nil, err
×
UNCOV
102
        }
×
103

104
        var k [33 + 8 + 2]byte
3✔
105
        copy(k[:33], peerPubKey[:])
3✔
106
        binary.BigEndian.PutUint64(k[33:41], shortChanID.ToUint64())
3✔
107
        binary.BigEndian.PutUint16(k[41:43], uint16(msg.MsgType()))
3✔
108

3✔
109
        return k[:], nil
3✔
110
}
111

112
// AddMessage adds a message to the store for this peer.
113
func (s *MessageStore) AddMessage(msg lnwire.Message, peerPubKey [33]byte) error {
3✔
114
        log.Tracef("Adding message of type %v to store for peer %x",
3✔
115
                msg.MsgType(), peerPubKey)
3✔
116

3✔
117
        // Construct the key for which we'll find this message with in the
3✔
118
        // store.
3✔
119
        msgKey, err := messageStoreKey(msg, peerPubKey)
3✔
120
        if err != nil {
3✔
UNCOV
121
                return err
×
UNCOV
122
        }
×
123

124
        // Serialize the message with its wire encoding.
125
        var b bytes.Buffer
3✔
126
        if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil {
3✔
127
                return err
×
128
        }
×
129

130
        return kvdb.Batch(s.db, func(tx kvdb.RwTx) error {
6✔
131
                messageStore := tx.ReadWriteBucket(messageStoreBucket)
3✔
132
                if messageStore == nil {
3✔
133
                        return ErrCorruptedMessageStore
×
134
                }
×
135

136
                return messageStore.Put(msgKey, b.Bytes())
3✔
137
        })
138
}
139

140
// DeleteMessage deletes a message from the store for this peer.
141
func (s *MessageStore) DeleteMessage(msg lnwire.Message,
142
        peerPubKey [33]byte) error {
3✔
143

3✔
144
        log.Tracef("Deleting message of type %v from store for peer %x",
3✔
145
                msg.MsgType(), peerPubKey)
3✔
146

3✔
147
        // Construct the key for which we'll find this message with in the
3✔
148
        // store.
3✔
149
        msgKey, err := messageStoreKey(msg, peerPubKey)
3✔
150
        if err != nil {
3✔
151
                return err
×
152
        }
×
153

154
        return kvdb.Batch(s.db, func(tx kvdb.RwTx) error {
6✔
155
                messageStore := tx.ReadWriteBucket(messageStoreBucket)
3✔
156
                if messageStore == nil {
3✔
157
                        return ErrCorruptedMessageStore
×
158
                }
×
159

160
                // In the event that we're attempting to delete a ChannelUpdate
161
                // from the store, we'll make sure that we're actually deleting
162
                // the correct one as it can be overwritten.
163
                if msg, ok := msg.(*lnwire.ChannelUpdate1); ok {
6✔
164
                        // Deleting a value from a bucket that doesn't exist
3✔
165
                        // acts as a NOP, so we'll return if a message doesn't
3✔
166
                        // exist under this key.
3✔
167
                        v := messageStore.Get(msgKey)
3✔
168
                        if v == nil {
3✔
169
                                return nil
×
170
                        }
×
171

172
                        dbMsg, err := lnwire.ReadMessage(bytes.NewReader(v), 0)
3✔
173
                        if err != nil {
3✔
174
                                return err
×
175
                        }
×
176

177
                        // If the timestamps don't match, then the update stored
178
                        // should be the latest one, so we'll avoid deleting it.
179
                        m, ok := dbMsg.(*lnwire.ChannelUpdate1)
3✔
180
                        if !ok {
3✔
181
                                return fmt.Errorf("expected "+
×
182
                                        "*lnwire.ChannelUpdate1, got: %T",
×
183
                                        dbMsg)
×
184
                        }
×
185
                        if msg.Timestamp != m.Timestamp {
3✔
UNCOV
186
                                return nil
×
UNCOV
187
                        }
×
188
                }
189

190
                return messageStore.Delete(msgKey)
3✔
191
        })
192
}
193

194
// readMessage reads a message from its serialized form and ensures its
195
// supported by the current version of the message store.
196
func readMessage(msgBytes []byte) (lnwire.Message, error) {
3✔
197
        msg, err := lnwire.ReadMessage(bytes.NewReader(msgBytes), 0)
3✔
198
        if err != nil {
3✔
199
                return nil, err
×
200
        }
×
201

202
        // Check if the message is supported by the store. We can reuse the
203
        // check for ShortChannelID as its a dependency on messages stored.
204
        if _, err := msgShortChanID(msg); err != nil {
3✔
UNCOV
205
                return nil, err
×
UNCOV
206
        }
×
207

208
        return msg, nil
3✔
209
}
210

211
// Messages returns the total set of messages that exist within the store for
212
// all peers.
UNCOV
213
func (s *MessageStore) Messages() (map[[33]byte][]lnwire.Message, error) {
×
UNCOV
214
        var msgs map[[33]byte][]lnwire.Message
×
UNCOV
215
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
×
UNCOV
216
                messageStore := tx.ReadBucket(messageStoreBucket)
×
UNCOV
217
                if messageStore == nil {
×
218
                        return ErrCorruptedMessageStore
×
219
                }
×
220

UNCOV
221
                return messageStore.ForEach(func(k, v []byte) error {
×
UNCOV
222
                        var pubKey [33]byte
×
UNCOV
223
                        copy(pubKey[:], k[:33])
×
UNCOV
224

×
UNCOV
225
                        // Deserialize the message from its raw bytes and filter
×
UNCOV
226
                        // out any which are not currently supported by the
×
UNCOV
227
                        // store.
×
UNCOV
228
                        msg, err := readMessage(v)
×
UNCOV
229
                        if err == ErrUnsupportedMessage {
×
UNCOV
230
                                return nil
×
UNCOV
231
                        }
×
UNCOV
232
                        if err != nil {
×
233
                                return err
×
234
                        }
×
235

UNCOV
236
                        msgs[pubKey] = append(msgs[pubKey], msg)
×
UNCOV
237
                        return nil
×
238
                })
UNCOV
239
        }, func() {
×
UNCOV
240
                msgs = make(map[[33]byte][]lnwire.Message)
×
UNCOV
241
        })
×
UNCOV
242
        if err != nil {
×
243
                return nil, err
×
244
        }
×
245

UNCOV
246
        return msgs, nil
×
247
}
248

249
// MessagesForPeer returns the set of messages that exists within the store for
250
// the given peer.
251
func (s *MessageStore) MessagesForPeer(
252
        peerPubKey [33]byte) ([]lnwire.Message, error) {
3✔
253

3✔
254
        var msgs []lnwire.Message
3✔
255
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
256
                messageStore := tx.ReadBucket(messageStoreBucket)
3✔
257
                if messageStore == nil {
3✔
258
                        return ErrCorruptedMessageStore
×
259
                }
×
260

261
                c := messageStore.ReadCursor()
3✔
262
                k, v := c.Seek(peerPubKey[:])
3✔
263
                for ; bytes.HasPrefix(k, peerPubKey[:]); k, v = c.Next() {
6✔
264
                        // Deserialize the message from its raw bytes and filter
3✔
265
                        // out any which are not currently supported by the
3✔
266
                        // store.
3✔
267
                        msg, err := readMessage(v)
3✔
268
                        if err == ErrUnsupportedMessage {
3✔
UNCOV
269
                                continue
×
270
                        }
271
                        if err != nil {
3✔
272
                                return err
×
273
                        }
×
274

275
                        msgs = append(msgs, msg)
3✔
276
                }
277

278
                return nil
3✔
279
        }, func() {
3✔
280
                msgs = nil
3✔
281
        })
3✔
282
        if err != nil {
3✔
283
                return nil, err
×
284
        }
×
285

286
        return msgs, nil
3✔
287
}
288

289
// Peers returns the public key of all peers with messages within the store.
290
func (s *MessageStore) Peers() (map[[33]byte]struct{}, error) {
3✔
291
        var peers map[[33]byte]struct{}
3✔
292
        err := kvdb.View(s.db, func(tx kvdb.RTx) error {
6✔
293
                messageStore := tx.ReadBucket(messageStoreBucket)
3✔
294
                if messageStore == nil {
3✔
295
                        return ErrCorruptedMessageStore
×
296
                }
×
297

298
                return messageStore.ForEach(func(k, _ []byte) error {
6✔
299
                        var pubKey [33]byte
3✔
300
                        copy(pubKey[:], k[:33])
3✔
301
                        peers[pubKey] = struct{}{}
3✔
302
                        return nil
3✔
303
                })
3✔
304
        }, func() {
3✔
305
                peers = make(map[[33]byte]struct{})
3✔
306
        })
3✔
307
        if err != nil {
3✔
308
                return nil, err
×
309
        }
×
310

311
        return peers, nil
3✔
312
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc