• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16911773184

12 Aug 2025 02:21PM UTC coverage: 57.471% (-9.4%) from 66.9%
16911773184

Pull #10103

github

web-flow
Merge d64a1234d into f3e1f2f35
Pull Request #10103: Rate limit outgoing gossip bandwidth by peer

57 of 77 new or added lines in 5 files covered. (74.03%)

28294 existing lines in 457 files now uncovered.

99110 of 172451 relevant lines covered (57.47%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.83
/peer/ping_manager.go
1
package peer
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/lightningnetwork/lnd/fn/v2"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
)
13

14
// PingManagerConfig is a structure containing various parameters that govern
15
// how the PingManager behaves.
16
type PingManagerConfig struct {
17
        // NewPingPayload is a closure that returns the payload to be packaged
18
        // in the Ping message.
19
        NewPingPayload func() []byte
20

21
        // NewPongSize is a closure that returns a random value between
22
        // [0, lnwire.MaxPongBytes]. This random value helps to more effectively
23
        // pair Pong messages with Ping.
24
        NewPongSize func() uint16
25

26
        // IntervalDuration is the Duration between attempted pings.
27
        IntervalDuration time.Duration
28

29
        // TimeoutDuration is the Duration we wait before declaring a ping
30
        // attempt failed.
31
        TimeoutDuration time.Duration
32

33
        // SendPing is a closure that is responsible for sending the Ping
34
        // message out to our peer
35
        SendPing func(ping *lnwire.Ping)
36

37
        // OnPongFailure is a closure that is responsible for executing the
38
        // logic when a Pong message is either late or does not match our
39
        // expectations for that Pong
40
        OnPongFailure func(failureReason error, timeWaitedForPong time.Duration,
41
                lastKnownRTT time.Duration)
42
}
43

44
// PingManager is a structure that is designed to manage the internal state
45
// of the ping pong lifecycle with the remote peer. We assume there is only one
46
// ping outstanding at once.
47
//
48
// NOTE: This structure MUST be initialized with NewPingManager.
49
type PingManager struct {
50
        cfg *PingManagerConfig
51

52
        // pingTime is a rough estimate of the RTT (round-trip-time) between us
53
        // and the connected peer.
54
        // To be used atomically.
55
        // TODO(roasbeef): also use a WMA or EMA?
56
        pingTime atomic.Pointer[time.Duration]
57

58
        // pingLastSend is the time when we sent our last ping message.
59
        // To be used atomically.
60
        pingLastSend *time.Time
61

62
        // outstandingPongSize is the current size of the requested pong
63
        // payload.  This value can only validly range from [0,65531]. Any
64
        // value < 0 is interpreted as if there is no outstanding ping message.
65
        outstandingPongSize int32
66

67
        // pingTicker is a pointer to a Ticker that fires on every ping
68
        // interval.
69
        pingTicker *time.Ticker
70

71
        // pingTimeout is a Timer that will fire when we want to time out a
72
        // ping
73
        pingTimeout *time.Timer
74

75
        // pongChan is the channel on which the pingManager will write Pong
76
        // messages it is evaluating
77
        pongChan chan *lnwire.Pong
78

79
        started sync.Once
80
        stopped sync.Once
81

82
        quit chan struct{}
83
        wg   sync.WaitGroup
84
}
85

86
// NewPingManager constructs a pingManager in a valid state. It must be started
87
// before it does anything useful, though.
88
func NewPingManager(cfg *PingManagerConfig) *PingManager {
3✔
89
        m := PingManager{
3✔
90
                cfg:                 cfg,
3✔
91
                outstandingPongSize: -1,
3✔
92
                pongChan:            make(chan *lnwire.Pong, 1),
3✔
93
                quit:                make(chan struct{}),
3✔
94
        }
3✔
95

3✔
96
        return &m
3✔
97
}
3✔
98

99
// Start launches the primary goroutine that is owned by the pingManager.
100
func (m *PingManager) Start() error {
3✔
101
        var err error
3✔
102
        m.started.Do(func() {
6✔
103
                m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
3✔
104
                m.pingTimeout = time.NewTimer(0)
3✔
105

3✔
106
                m.wg.Add(1)
3✔
107
                go m.pingHandler()
3✔
108
        })
3✔
109

110
        return err
3✔
111
}
112

113
// getLastRTT safely retrieves the last known RTT, returning 0 if none exists.
UNCOV
114
func (m *PingManager) getLastRTT() time.Duration {
×
UNCOV
115
        rttPtr := m.pingTime.Load()
×
UNCOV
116
        if rttPtr == nil {
×
UNCOV
117
                return 0
×
UNCOV
118
        }
×
119

120
        return *rttPtr
×
121
}
122

123
// pendingPingWait calculates the time waited since the last ping was sent. If
124
// no ping time is reported, None is returned. defaultDuration.
UNCOV
125
func (m *PingManager) pendingPingWait() fn.Option[time.Duration] {
×
UNCOV
126
        if m.pingLastSend != nil {
×
UNCOV
127
                return fn.Some(time.Since(*m.pingLastSend))
×
UNCOV
128
        }
×
129

130
        return fn.None[time.Duration]()
×
131
}
132

133
// pingHandler is the main goroutine responsible for enforcing the ping/pong
134
// protocol.
135
func (m *PingManager) pingHandler() {
3✔
136
        defer m.wg.Done()
3✔
137
        defer m.pingTimeout.Stop()
3✔
138

3✔
139
        // Ensure that the pingTimeout channel is empty.
3✔
140
        if !m.pingTimeout.Stop() {
3✔
141
                <-m.pingTimeout.C
×
142
        }
×
143

144
        // Because we don't know if the OnPingFailure callback actually
145
        // disconnects a peer (dependent on user config), we should never return
146
        // from this loop unless the ping manager is stopped explicitly (which
147
        // happens on disconnect).
148
        for {
6✔
149
                select {
3✔
UNCOV
150
                case <-m.pingTicker.C:
×
UNCOV
151
                        // If this occurs it means that the new ping cycle has
×
UNCOV
152
                        // begun while there is still an outstanding ping
×
UNCOV
153
                        // awaiting a pong response.  This should never occur,
×
UNCOV
154
                        // but if it does, it implies a timeout.
×
UNCOV
155
                        if m.outstandingPongSize >= 0 {
×
156
                                // Ping was outstanding, meaning it timed out by
×
157
                                // the arrival of the next ping interval.
×
158
                                timeWaited := m.pendingPingWait().UnwrapOr(
×
159
                                        m.cfg.IntervalDuration,
×
160
                                )
×
161
                                lastRTT := m.getLastRTT()
×
162

×
163
                                m.cfg.OnPongFailure(
×
164
                                        errors.New("ping timed "+
×
165
                                                "out by next interval"),
×
166
                                        timeWaited, lastRTT,
×
167
                                )
×
168

×
169
                                m.resetPingState()
×
170
                        }
×
171

UNCOV
172
                        pongSize := m.cfg.NewPongSize()
×
UNCOV
173
                        ping := &lnwire.Ping{
×
UNCOV
174
                                NumPongBytes: pongSize,
×
UNCOV
175
                                PaddingBytes: m.cfg.NewPingPayload(),
×
UNCOV
176
                        }
×
UNCOV
177

×
UNCOV
178
                        // Set up our bookkeeping for the new Ping.
×
UNCOV
179
                        if err := m.setPingState(pongSize); err != nil {
×
180
                                // This is an internal error related to timer
×
181
                                // reset. Pass it to OnPongFailure as it's
×
182
                                // critical. Current and last RTT are not
×
183
                                // directly applicable here.
×
184
                                m.cfg.OnPongFailure(err, 0, 0)
×
185

×
186
                                m.resetPingState()
×
187

×
188
                                continue
×
189
                        }
190

UNCOV
191
                        m.cfg.SendPing(ping)
×
192

UNCOV
193
                case <-m.pingTimeout.C:
×
UNCOV
194
                        timeWaited := m.pendingPingWait().UnwrapOr(
×
UNCOV
195
                                m.cfg.TimeoutDuration,
×
UNCOV
196
                        )
×
UNCOV
197
                        lastRTT := m.getLastRTT()
×
UNCOV
198

×
UNCOV
199
                        m.cfg.OnPongFailure(
×
UNCOV
200
                                errors.New("timeout while waiting for "+
×
UNCOV
201
                                        "pong response"),
×
UNCOV
202
                                timeWaited, lastRTT,
×
UNCOV
203
                        )
×
UNCOV
204

×
UNCOV
205
                        m.resetPingState()
×
206

UNCOV
207
                case pong := <-m.pongChan:
×
UNCOV
208
                        pongSize := int32(len(pong.PongBytes))
×
UNCOV
209

×
UNCOV
210
                        // Save off values we are about to override when we call
×
UNCOV
211
                        // resetPingState.
×
UNCOV
212
                        expected := m.outstandingPongSize
×
UNCOV
213
                        lastPingTime := m.pingLastSend
×
UNCOV
214

×
UNCOV
215
                        // This is an unexpected pong, we'll continue.
×
UNCOV
216
                        if lastPingTime == nil {
×
217
                                continue
×
218
                        }
219

UNCOV
220
                        actualRTT := time.Since(*lastPingTime)
×
UNCOV
221

×
UNCOV
222
                        // If the pong we receive doesn't match the ping we sent
×
UNCOV
223
                        // out, then we fail out.
×
UNCOV
224
                        if pongSize != expected {
×
UNCOV
225
                                e := fmt.Errorf("pong response does not match "+
×
UNCOV
226
                                        "expected size. Expected: %d, Got: %d",
×
UNCOV
227
                                        expected, pongSize)
×
UNCOV
228

×
UNCOV
229
                                lastRTT := m.getLastRTT()
×
UNCOV
230
                                m.cfg.OnPongFailure(e, actualRTT, lastRTT)
×
UNCOV
231

×
UNCOV
232
                                m.resetPingState()
×
UNCOV
233

×
UNCOV
234
                                continue
×
235
                        }
236

237
                        // Pong is good, update RTT and reset state.
UNCOV
238
                        m.pingTime.Store(&actualRTT)
×
UNCOV
239
                        m.resetPingState()
×
240

241
                case <-m.quit:
3✔
242
                        return
3✔
243
                }
244
        }
245
}
246

247
// Stop interrupts the goroutines that the PingManager owns.
248
func (m *PingManager) Stop() {
3✔
249
        if m.pingTicker == nil {
6✔
250
                return
3✔
251
        }
3✔
252

253
        m.stopped.Do(func() {
6✔
254
                close(m.quit)
3✔
255
                m.wg.Wait()
3✔
256

3✔
257
                m.pingTicker.Stop()
3✔
258
                m.pingTimeout.Stop()
3✔
259
        })
3✔
260
}
261

262
// setPingState is a private method to keep track of all of the fields we need
263
// to set when we send out a Ping.
UNCOV
264
func (m *PingManager) setPingState(pongSize uint16) error {
×
UNCOV
265
        t := time.Now()
×
UNCOV
266
        m.pingLastSend = &t
×
UNCOV
267
        m.outstandingPongSize = int32(pongSize)
×
UNCOV
268
        if m.pingTimeout.Reset(m.cfg.TimeoutDuration) {
×
269
                return fmt.Errorf(
×
270
                        "impossible: ping timeout reset when already active",
×
271
                )
×
272
        }
×
273

UNCOV
274
        return nil
×
275
}
276

277
// resetPingState is a private method that resets all of the bookkeeping that
278
// is tracking a currently outstanding Ping.
UNCOV
279
func (m *PingManager) resetPingState() {
×
UNCOV
280
        m.pingLastSend = nil
×
UNCOV
281
        m.outstandingPongSize = -1
×
UNCOV
282

×
UNCOV
283
        if !m.pingTimeout.Stop() {
×
UNCOV
284
                select {
×
285
                case <-m.pingTimeout.C:
×
UNCOV
286
                default:
×
287
                }
288
        }
289
}
290

291
// GetPingTimeMicroSeconds reports back the RTT calculated by the pingManager.
292
func (m *PingManager) GetPingTimeMicroSeconds() int64 {
3✔
293
        rtt := m.pingTime.Load()
3✔
294

3✔
295
        if rtt == nil {
6✔
296
                return -1
3✔
297
        }
3✔
298

299
        return rtt.Microseconds()
×
300
}
301

302
// ReceivedPong is called to evaluate a Pong message against the expectations
303
// we have for it. It will cause the PingManager to invoke the supplied
304
// OnPongFailure function if the Pong argument supplied violates expectations.
UNCOV
305
func (m *PingManager) ReceivedPong(msg *lnwire.Pong) {
×
UNCOV
306
        select {
×
UNCOV
307
        case m.pongChan <- msg:
×
308
        case <-m.quit:
×
309
        }
310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc