• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15574102646

11 Jun 2025 01:44AM UTC coverage: 68.554% (+9.9%) from 58.637%
15574102646

Pull #9652

github

web-flow
Merge eb863e46a into 92a5d35cf
Pull Request #9652: lnwallet/chancloser: fix flake in TestRbfCloseClosingNegotiationLocal

11 of 12 new or added lines in 1 file covered. (91.67%)

7276 existing lines in 84 files now uncovered.

134508 of 196208 relevant lines covered (68.55%)

44569.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.5
/discovery/reliable_sender.go
1
package discovery
2

3
import (
4
        "context"
5
        "sync"
6

7
        "github.com/lightningnetwork/lnd/fn/v2"
8
        "github.com/lightningnetwork/lnd/lnpeer"
9
        "github.com/lightningnetwork/lnd/lnwire"
10
)
11

12
// reliableSenderCfg contains all of necessary items for the reliableSender to
13
// carry out its duties.
14
type reliableSenderCfg struct {
15
        // NotifyWhenOnline is a function that allows the gossiper to be
16
        // notified when a certain peer comes online, allowing it to
17
        // retry sending a peer message.
18
        //
19
        // NOTE: The peerChan channel must be buffered.
20
        NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
21

22
        // NotifyWhenOffline is a function that allows the gossiper to be
23
        // notified when a certain peer disconnects, allowing it to request a
24
        // notification for when it reconnects.
25
        NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
26

27
        // MessageStore is a persistent storage of gossip messages which we will
28
        // use to determine which messages need to be resent for a given peer.
29
        MessageStore GossipMessageStore
30

31
        // IsMsgStale determines whether a message retrieved from the backing
32
        // MessageStore is seen as stale by the current graph.
33
        IsMsgStale func(context.Context, lnwire.Message) bool
34
}
35

36
// peerManager contains the set of channels required for the peerHandler to
37
// properly carry out its duties.
38
type peerManager struct {
39
        // msgs is the channel through which messages will be streamed to the
40
        // handler in order to send the message to the peer while they're
41
        // online.
42
        msgs chan lnwire.Message
43

44
        // done is a channel that will be closed to signal that the handler for
45
        // the given peer has been torn down for whatever reason.
46
        done chan struct{}
47
}
48

49
// reliableSender is a small subsystem of the gossiper used to reliably send
50
// gossip messages to peers.
51
type reliableSender struct {
52
        start sync.Once
53
        stop  sync.Once
54

55
        cfg reliableSenderCfg
56

57
        // activePeers keeps track of whether a peerHandler exists for a given
58
        // peer. A peerHandler is tasked with handling requests for messages
59
        // that should be reliably sent to peers while also taking into account
60
        // the peer's connection lifecycle.
61
        activePeers    map[[33]byte]peerManager
62
        activePeersMtx sync.Mutex
63

64
        wg     sync.WaitGroup
65
        quit   chan struct{}
66
        cancel fn.Option[context.CancelFunc]
67
}
68

69
// newReliableSender returns a new reliableSender backed by the given config.
70
func newReliableSender(cfg *reliableSenderCfg) *reliableSender {
70✔
71
        return &reliableSender{
70✔
72
                cfg:         *cfg,
70✔
73
                activePeers: make(map[[33]byte]peerManager),
70✔
74
                quit:        make(chan struct{}),
70✔
75
        }
70✔
76
}
70✔
77

78
// Start spawns message handlers for any peers with pending messages.
79
func (s *reliableSender) Start() error {
66✔
80
        var err error
66✔
81
        s.start.Do(func() {
132✔
82
                ctx, cancel := context.WithCancel(context.Background())
66✔
83
                s.cancel = fn.Some(cancel)
66✔
84

66✔
85
                err = s.resendPendingMsgs(ctx)
66✔
86
        })
66✔
87
        return err
66✔
88
}
89

90
// Stop halts the reliable sender from sending messages to peers.
91
func (s *reliableSender) Stop() {
66✔
92
        s.stop.Do(func() {
132✔
93
                log.Debugf("reliableSender is stopping")
66✔
94
                defer log.Debugf("reliableSender stopped")
66✔
95

66✔
96
                s.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
132✔
97
                close(s.quit)
66✔
98
                s.wg.Wait()
66✔
99
        })
100
}
101

102
// sendMessage constructs a request to send a message reliably to a peer. In the
103
// event that the peer is currently offline, this will only write the message to
104
// disk. Once the peer reconnects, this message, along with any others pending,
105
// will be sent to the peer.
106
func (s *reliableSender) sendMessage(ctx context.Context, msg lnwire.Message,
107
        peerPubKey [33]byte) error {
60✔
108

60✔
109
        // We'll start by persisting the message to disk. This allows us to
60✔
110
        // resend the message upon restarts and peer reconnections.
60✔
111
        if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil {
60✔
UNCOV
112
                return err
×
UNCOV
113
        }
×
114

115
        // Then, we'll spawn a peerHandler for this peer to handle resending its
116
        // pending messages while taking into account its connection lifecycle.
117
spawnHandler:
118
        msgHandler, ok := s.spawnPeerHandler(ctx, peerPubKey)
60✔
119

60✔
120
        // If the handler wasn't previously active, we can exit now as we know
60✔
121
        // that the message will be sent once the peer online notification is
60✔
122
        // received. This prevents us from potentially sending the message
60✔
123
        // twice.
60✔
124
        if !ok {
90✔
125
                return nil
30✔
126
        }
30✔
127

128
        // Otherwise, we'll attempt to stream the message to the handler.
129
        // There's a subtle race condition where the handler can be torn down
130
        // due to all of the messages sent being stale, so we'll handle this
131
        // gracefully by spawning another one to prevent blocking.
132
        select {
36✔
133
        case msgHandler.msgs <- msg:
36✔
UNCOV
134
        case <-msgHandler.done:
×
UNCOV
135
                goto spawnHandler
×
UNCOV
136
        case <-s.quit:
×
UNCOV
137
                return ErrGossiperShuttingDown
×
138
        }
139

140
        return nil
36✔
141
}
142

143
// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't
144
// one already active. The boolean returned signals whether there was already
145
// one active or not.
146
func (s *reliableSender) spawnPeerHandler(ctx context.Context,
147
        peerPubKey [33]byte) (peerManager, bool) {
62✔
148

62✔
149
        s.activePeersMtx.Lock()
62✔
150
        msgHandler, ok := s.activePeers[peerPubKey]
62✔
151
        if !ok {
94✔
152
                msgHandler = peerManager{
32✔
153
                        msgs: make(chan lnwire.Message),
32✔
154
                        done: make(chan struct{}),
32✔
155
                }
32✔
156
                s.activePeers[peerPubKey] = msgHandler
32✔
157
        }
32✔
158
        s.activePeersMtx.Unlock()
62✔
159

62✔
160
        // If this is a newly initiated peerManager, we will create a
62✔
161
        // peerHandler.
62✔
162
        if !ok {
94✔
163
                s.wg.Add(1)
32✔
164
                go s.peerHandler(ctx, msgHandler, peerPubKey)
32✔
165
        }
32✔
166

167
        return msgHandler, ok
62✔
168
}
169

170
// peerHandler is responsible for handling our reliable message send requests
171
// for a given peer while also taking into account the peer's connection
172
// lifecycle. Any messages that are attempted to be sent while the peer is
173
// offline will be queued and sent once the peer reconnects.
174
//
175
// NOTE: This must be run as a goroutine.
176
func (s *reliableSender) peerHandler(ctx context.Context, peerMgr peerManager,
177
        peerPubKey [33]byte) {
32✔
178

32✔
179
        defer s.wg.Done()
32✔
180

32✔
181
        // We'll start by requesting a notification for when the peer
32✔
182
        // reconnects.
32✔
183
        peerChan := make(chan lnpeer.Peer, 1)
32✔
184

32✔
185
waitUntilOnline:
32✔
186
        log.Debugf("Requesting online notification for peer=%x", peerPubKey)
38✔
187

38✔
188
        s.cfg.NotifyWhenOnline(peerPubKey, peerChan)
38✔
189

38✔
190
        var peer lnpeer.Peer
38✔
191
out:
38✔
192
        for {
80✔
193
                select {
42✔
194
                // While we're waiting, we'll also consume any messages that
195
                // must be sent to prevent blocking the caller. These can be
196
                // ignored for now since the peer is currently offline. Once
197
                // they reconnect, the messages will be sent since they should
198
                // have been persisted to disk.
199
                case msg := <-peerMgr.msgs:
10✔
200
                        // Retrieve the short channel ID for which this message
10✔
201
                        // applies for logging purposes. The error can be
10✔
202
                        // ignored as the store can only contain messages which
10✔
203
                        // have a ShortChannelID field.
10✔
204
                        shortChanID, _ := msgShortChanID(msg)
10✔
205
                        log.Debugf("Received request to send %v message for "+
10✔
206
                                "channel=%v while peer=%x is offline",
10✔
207
                                msg.MsgType(), shortChanID, peerPubKey)
10✔
208

209
                case peer = <-peerChan:
36✔
210
                        break out
36✔
211

212
                case <-s.quit:
8✔
213
                        return
8✔
214
                }
215
        }
216

217
        log.Debugf("Peer=%x is now online, proceeding to send pending messages",
36✔
218
                peerPubKey)
36✔
219

36✔
220
        // Once we detect the peer has reconnected, we'll also request a
36✔
221
        // notification for when they disconnect. We'll use this to make sure
36✔
222
        // they haven't disconnected (in the case of a flappy peer, etc.) by the
36✔
223
        // time we attempt to send them the pending messages.
36✔
224
        log.Debugf("Requesting offline notification for peer=%x", peerPubKey)
36✔
225

36✔
226
        offlineChan := s.cfg.NotifyWhenOffline(peerPubKey)
36✔
227

36✔
228
        pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey)
36✔
229
        if err != nil {
36✔
UNCOV
230
                log.Errorf("Unable to retrieve pending messages for peer %x: %v",
×
UNCOV
231
                        peerPubKey, err)
×
UNCOV
232
                return
×
UNCOV
233
        }
×
234

235
        // With the peer online, we can now proceed to send our pending messages
236
        // for them.
237
        for _, msg := range pendingMsgs {
80✔
238
                // Retrieve the short channel ID for which this message applies
44✔
239
                // for logging purposes. The error can be ignored as the store
44✔
240
                // can only contain messages which have a ShortChannelID field.
44✔
241
                shortChanID, _ := msgShortChanID(msg)
44✔
242

44✔
243
                // Ensure the peer is still online right before sending the
44✔
244
                // message.
44✔
245
                select {
44✔
UNCOV
246
                case <-offlineChan:
×
UNCOV
247
                        goto waitUntilOnline
×
248
                default:
44✔
249
                }
250

251
                if err := peer.SendMessage(false, msg); err != nil {
44✔
UNCOV
252
                        log.Errorf("Unable to send %v message for channel=%v "+
×
UNCOV
253
                                "to %x: %v", msg.MsgType(), shortChanID,
×
UNCOV
254
                                peerPubKey, err)
×
UNCOV
255
                        goto waitUntilOnline
×
256
                }
257

258
                log.Debugf("Successfully sent %v message for channel=%v with "+
44✔
259
                        "peer=%x upon reconnection", msg.MsgType(), shortChanID,
44✔
260
                        peerPubKey)
44✔
261

44✔
262
                // Now that the message has at least been sent once, we can
44✔
263
                // check whether it's stale. This guarantees that
44✔
264
                // AnnounceSignatures are sent at least once if we happen to
44✔
265
                // already have signatures for both parties.
44✔
266
                if s.cfg.IsMsgStale(ctx, msg) {
56✔
267
                        err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey)
12✔
268
                        if err != nil {
12✔
UNCOV
269
                                log.Errorf("Unable to remove stale %v message "+
×
UNCOV
270
                                        "for channel=%v with peer %x: %v",
×
UNCOV
271
                                        msg.MsgType(), shortChanID, peerPubKey,
×
UNCOV
272
                                        err)
×
UNCOV
273
                                continue
×
274
                        }
275

276
                        log.Debugf("Removed stale %v message for channel=%v "+
12✔
277
                                "with peer=%x", msg.MsgType(), shortChanID,
12✔
278
                                peerPubKey)
12✔
279
                }
280
        }
281

282
        // If all of our messages were stale, then there's no need for this
283
        // handler to continue running, so we can exit now.
284
        pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey)
36✔
285
        if err != nil {
36✔
UNCOV
286
                log.Errorf("Unable to retrieve pending messages for peer %x: %v",
×
UNCOV
287
                        peerPubKey, err)
×
UNCOV
288
                return
×
UNCOV
289
        }
×
290

291
        if len(pendingMsgs) == 0 {
46✔
292
                log.Debugf("No pending messages left for peer=%x", peerPubKey)
10✔
293

10✔
294
                s.activePeersMtx.Lock()
10✔
295
                delete(s.activePeers, peerPubKey)
10✔
296
                s.activePeersMtx.Unlock()
10✔
297

10✔
298
                close(peerMgr.done)
10✔
299

10✔
300
                return
10✔
301
        }
10✔
302

303
        // Once the pending messages are sent, we can continue to send any
304
        // future messages while the peer remains connected.
305
        for {
90✔
306
                select {
58✔
307
                case msg := <-peerMgr.msgs:
32✔
308
                        // Retrieve the short channel ID for which this message
32✔
309
                        // applies for logging purposes. The error can be
32✔
310
                        // ignored as the store can only contain messages which
32✔
311
                        // have a ShortChannelID field.
32✔
312
                        shortChanID, _ := msgShortChanID(msg)
32✔
313

32✔
314
                        if err := peer.SendMessage(false, msg); err != nil {
32✔
UNCOV
315
                                log.Errorf("Unable to send %v message for "+
×
UNCOV
316
                                        "channel=%v to %x: %v", msg.MsgType(),
×
UNCOV
317
                                        shortChanID, peerPubKey, err)
×
UNCOV
318
                        }
×
319

320
                        log.Debugf("Successfully sent %v message for "+
32✔
321
                                "channel=%v with peer=%x", msg.MsgType(),
32✔
322
                                shortChanID, peerPubKey)
32✔
323

324
                case <-offlineChan:
12✔
325
                        goto waitUntilOnline
12✔
326

327
                case <-s.quit:
22✔
328
                        return
22✔
329
                }
330
        }
331
}
332

333
// resendPendingMsgs retrieves and sends all of the messages within the message
334
// store that should be reliably sent to their respective peers.
335
func (s *reliableSender) resendPendingMsgs(ctx context.Context) error {
66✔
336
        // Fetch all of the peers for which we have pending messages for and
66✔
337
        // spawn a peerMsgHandler for each. Once the peer is seen as online, all
66✔
338
        // of the pending messages will be sent.
66✔
339
        peers, err := s.cfg.MessageStore.Peers()
66✔
340
        if err != nil {
66✔
UNCOV
341
                return err
×
UNCOV
342
        }
×
343

344
        for peer := range peers {
74✔
345
                s.spawnPeerHandler(ctx, peer)
8✔
346
        }
8✔
347

348
        return nil
66✔
349
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc