• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14939628844

09 May 2025 11:53PM UTC coverage: 58.588% (-0.002%) from 58.59%
14939628844

Pull #9801

github

web-flow
Merge 8c23404a8 into ee25c228e
Pull Request #9801: peer+lnd: add new CLI option to control if we D/C on slow pongs

6 of 80 new or added lines in 4 files covered. (7.5%)

38 existing lines in 10 files now uncovered.

97437 of 166310 relevant lines covered (58.59%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.01
/peer/ping_manager.go
1
package peer
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/lightningnetwork/lnd/lnwire"
11
)
12

13
// PingManagerConfig is a structure containing various parameters that govern
14
// how the PingManager behaves.
15
type PingManagerConfig struct {
16
        // NewPingPayload is a closure that returns the payload to be packaged
17
        // in the Ping message.
18
        NewPingPayload func() []byte
19

20
        // NewPongSize is a closure that returns a random value between
21
        // [0, lnwire.MaxPongBytes]. This random value helps to more effectively
22
        // pair Pong messages with Ping.
23
        NewPongSize func() uint16
24

25
        // IntervalDuration is the Duration between attempted pings.
26
        IntervalDuration time.Duration
27

28
        // TimeoutDuration is the Duration we wait before declaring a ping
29
        // attempt failed.
30
        TimeoutDuration time.Duration
31

32
        // SendPing is a closure that is responsible for sending the Ping
33
        // message out to our peer
34
        SendPing func(ping *lnwire.Ping)
35

36
        // OnPongFailure is a closure that is responsible for executing the
37
        // logic when a Pong message is either late or does not match our
38
        // expectations for that Pong
39
        OnPongFailure func(failureReason error, timeWaitedForPong time.Duration,
40
                lastKnownRTT time.Duration)
41
}
42

43
// PingManager is a structure that is designed to manage the internal state
44
// of the ping pong lifecycle with the remote peer. We assume there is only one
45
// ping outstanding at once.
46
//
47
// NOTE: This structure MUST be initialized with NewPingManager.
48
type PingManager struct {
49
        cfg *PingManagerConfig
50

51
        // pingTime is a rough estimate of the RTT (round-trip-time) between us
52
        // and the connected peer.
53
        // To be used atomically.
54
        // TODO(roasbeef): also use a WMA or EMA?
55
        pingTime atomic.Pointer[time.Duration]
56

57
        // pingLastSend is the time when we sent our last ping message.
58
        // To be used atomically.
59
        pingLastSend *time.Time
60

61
        // outstandingPongSize is the current size of the requested pong
62
        // payload.  This value can only validly range from [0,65531]. Any
63
        // value < 0 is interpreted as if there is no outstanding ping message.
64
        outstandingPongSize int32
65

66
        // pingTicker is a pointer to a Ticker that fires on every ping
67
        // interval.
68
        pingTicker *time.Ticker
69

70
        // pingTimeout is a Timer that will fire when we want to time out a
71
        // ping
72
        pingTimeout *time.Timer
73

74
        // pongChan is the channel on which the pingManager will write Pong
75
        // messages it is evaluating
76
        pongChan chan *lnwire.Pong
77

78
        started sync.Once
79
        stopped sync.Once
80

81
        quit chan struct{}
82
        wg   sync.WaitGroup
83
}
84

85
// NewPingManager constructs a pingManager in a valid state. It must be started
86
// before it does anything useful, though.
87
func NewPingManager(cfg *PingManagerConfig) *PingManager {
3✔
88
        m := PingManager{
3✔
89
                cfg:                 cfg,
3✔
90
                outstandingPongSize: -1,
3✔
91
                pongChan:            make(chan *lnwire.Pong, 1),
3✔
92
                quit:                make(chan struct{}),
3✔
93
        }
3✔
94

3✔
95
        return &m
3✔
96
}
3✔
97

98
// Start launches the primary goroutine that is owned by the pingManager.
99
func (m *PingManager) Start() error {
3✔
100
        var err error
3✔
101
        m.started.Do(func() {
6✔
102
                m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
3✔
103
                m.pingTimeout = time.NewTimer(0)
3✔
104

3✔
105
                m.wg.Add(1)
3✔
106
                go m.pingHandler()
3✔
107
        })
3✔
108

109
        return err
3✔
110
}
111

112
// getLastRTT safely retrieves the last known RTT, returning 0 if none exists.
NEW
113
func (m *PingManager) getLastRTT() time.Duration {
×
NEW
114
        rttPtr := m.pingTime.Load()
×
NEW
115
        if rttPtr == nil {
×
NEW
116
                return 0
×
NEW
117
        }
×
118

NEW
119
        return *rttPtr
×
120
}
121

122
// getTimeWaitedOrDefault calculates the time waited since the last ping was
123
// sent. If no ping was sent (pingLastSend is nil), it returns the provided
124
// defaultDuration.
125
func (m *PingManager) getTimeWaitedOrDefault(defaultDuration time.Duration,
NEW
126
) time.Duration {
×
NEW
127

×
NEW
128
        if m.pingLastSend != nil {
×
NEW
129
                return time.Since(*m.pingLastSend)
×
NEW
130
        }
×
131

NEW
132
        return defaultDuration
×
133
}
134

135
// pingHandler is the main goroutine responsible for enforcing the ping/pong
136
// protocol.
137
func (m *PingManager) pingHandler() {
3✔
138
        defer m.wg.Done()
3✔
139
        defer m.pingTimeout.Stop()
3✔
140

3✔
141
        // Ensure that the pingTimeout channel is empty.
3✔
142
        if !m.pingTimeout.Stop() {
3✔
143
                <-m.pingTimeout.C
×
144
        }
×
145

146
        for {
6✔
147
                select {
3✔
148
                case <-m.pingTicker.C:
×
149
                        // If this occurs it means that the new ping cycle has
×
150
                        // begun while there is still an outstanding ping
×
151
                        // awaiting a pong response.  This should never occur,
×
152
                        // but if it does, it implies a timeout.
×
153
                        if m.outstandingPongSize >= 0 {
×
NEW
154
                                // Ping was outstanding, meaning it timed out by
×
NEW
155
                                // the arrival of the next ping interval.
×
NEW
156
                                timeWaited := m.getTimeWaitedOrDefault(
×
NEW
157
                                        m.cfg.IntervalDuration,
×
158
                                )
×
NEW
159
                                lastRTT := m.getLastRTT()
×
160

×
NEW
161
                                m.cfg.OnPongFailure(
×
NEW
162
                                        errors.New("ping timed "+
×
NEW
163
                                                "out by next interval"),
×
NEW
164
                                        timeWaited, lastRTT,
×
NEW
165
                                )
×
NEW
166

×
NEW
167
                                m.resetPingState()
×
UNCOV
168
                        }
×
169

170
                        pongSize := m.cfg.NewPongSize()
×
171
                        ping := &lnwire.Ping{
×
172
                                NumPongBytes: pongSize,
×
173
                                PaddingBytes: m.cfg.NewPingPayload(),
×
174
                        }
×
175

×
176
                        // Set up our bookkeeping for the new Ping.
×
177
                        if err := m.setPingState(pongSize); err != nil {
×
NEW
178
                                // This is an internal error related to timer
×
NEW
179
                                // reset. Pass it to OnPongFailure as it's
×
NEW
180
                                // critical. Current and last RTT are not
×
NEW
181
                                // directly applicable here.
×
NEW
182
                                m.cfg.OnPongFailure(err, 0, 0)
×
183
                                return
×
184
                        }
×
185

186
                        m.cfg.SendPing(ping)
×
187

188
                case <-m.pingTimeout.C:
×
NEW
189
                        timeWaited := m.getTimeWaitedOrDefault(
×
NEW
190
                                m.cfg.TimeoutDuration,
×
191
                        )
×
NEW
192
                        lastRTT := m.getLastRTT()
×
193

×
NEW
194
                        m.cfg.OnPongFailure(
×
NEW
195
                                errors.New("timeout while waiting for "+
×
NEW
196
                                        "pong response"),
×
NEW
197
                                timeWaited, lastRTT,
×
NEW
198
                        )
×
199

×
NEW
200
                        m.resetPingState()
×
201

202
                case pong := <-m.pongChan:
×
203
                        pongSize := int32(len(pong.PongBytes))
×
204

×
NEW
205
                        // Save off values we are about to override when we call
×
NEW
206
                        // resetPingState.
×
207
                        expected := m.outstandingPongSize
×
NEW
208
                        lastPingTime := m.pingLastSend
×
209

×
NEW
210
                        if lastPingTime == nil {
×
NEW
211
                                // This is an unexpected pong, we'll continue.
×
NEW
212
                                continue
×
213
                        }
214

NEW
215
                        actualRTT := time.Since(*lastPingTime)
×
216

×
NEW
217
                        // If the pong we receive doesn't match the ping we sent
×
NEW
218
                        // out, then we fail out.
×
219
                        if pongSize != expected {
×
NEW
220
                                e := fmt.Errorf("pong response does not match "+
×
NEW
221
                                        "expected size. Expected: %d, Got: %d",
×
NEW
222
                                        expected, pongSize)
×
223

×
NEW
224
                                lastRTT := m.getLastRTT()
×
NEW
225
                                m.cfg.OnPongFailure(e, actualRTT, lastRTT)
×
226

×
NEW
227
                                m.resetPingState()
×
228

×
NEW
229
                                continue
×
230
                        }
231

232
                        // Pong is good, update RTT and reset state.
NEW
233
                        m.pingTime.Store(&actualRTT)
×
NEW
234
                        m.resetPingState()
×
235

236
                case <-m.quit:
3✔
237
                        return
3✔
238
                }
239
        }
240
}
241

242
// Stop interrupts the goroutines that the PingManager owns.
243
func (m *PingManager) Stop() {
3✔
244
        if m.pingTicker == nil {
6✔
245
                return
3✔
246
        }
3✔
247

248
        m.stopped.Do(func() {
6✔
249
                close(m.quit)
3✔
250
                m.wg.Wait()
3✔
251

3✔
252
                m.pingTicker.Stop()
3✔
253
                m.pingTimeout.Stop()
3✔
254
        })
3✔
255
}
256

257
// setPingState is a private method to keep track of all of the fields we need
258
// to set when we send out a Ping.
259
func (m *PingManager) setPingState(pongSize uint16) error {
×
260
        t := time.Now()
×
261
        m.pingLastSend = &t
×
262
        m.outstandingPongSize = int32(pongSize)
×
263
        if m.pingTimeout.Reset(m.cfg.TimeoutDuration) {
×
264
                return fmt.Errorf(
×
265
                        "impossible: ping timeout reset when already active",
×
266
                )
×
267
        }
×
268

269
        return nil
×
270
}
271

272
// resetPingState is a private method that resets all of the bookkeeping that
273
// is tracking a currently outstanding Ping.
274
func (m *PingManager) resetPingState() {
×
275
        m.pingLastSend = nil
×
276
        m.outstandingPongSize = -1
×
NEW
277

×
278
        if !m.pingTimeout.Stop() {
×
279
                select {
×
280
                case <-m.pingTimeout.C:
×
281
                default:
×
282
                }
283
        }
284
}
285

286
// GetPingTimeMicroSeconds reports back the RTT calculated by the pingManager.
287
func (m *PingManager) GetPingTimeMicroSeconds() int64 {
3✔
288
        rtt := m.pingTime.Load()
3✔
289

3✔
290
        if rtt == nil {
6✔
291
                return -1
3✔
292
        }
3✔
293

294
        return rtt.Microseconds()
×
295
}
296

297
// ReceivedPong is called to evaluate a Pong message against the expectations
298
// we have for it. It will cause the PingManager to invoke the supplied
299
// OnPongFailure function if the Pong argument supplied violates expectations.
300
func (m *PingManager) ReceivedPong(msg *lnwire.Pong) {
×
301
        select {
×
302
        case m.pongChan <- msg:
×
303
        case <-m.quit:
×
304
        }
305
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc