• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14984362616

12 May 2025 11:04PM UTC coverage: 58.554% (-0.005%) from 58.559%
14984362616

Pull #9801

github

web-flow
Merge 5a2fbedfa into 1db6c31e2
Pull Request #9801: peer+lnd: add new CLI option to control if we D/C on slow pongs

8 of 84 new or added lines in 4 files covered. (9.52%)

33 existing lines in 8 files now uncovered.

97386 of 166318 relevant lines covered (58.55%)

1.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.66
/peer/ping_manager.go
1
package peer
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9
        "time"
10

11
        "github.com/lightningnetwork/lnd/fn/v2"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
)
14

15
// PingManagerConfig is a structure containing various parameters that govern
16
// how the PingManager behaves.
17
type PingManagerConfig struct {
18
        // NewPingPayload is a closure that returns the payload to be packaged
19
        // in the Ping message.
20
        NewPingPayload func() []byte
21

22
        // NewPongSize is a closure that returns a random value between
23
        // [0, lnwire.MaxPongBytes]. This random value helps to more effectively
24
        // pair Pong messages with Ping.
25
        NewPongSize func() uint16
26

27
        // IntervalDuration is the Duration between attempted pings.
28
        IntervalDuration time.Duration
29

30
        // TimeoutDuration is the Duration we wait before declaring a ping
31
        // attempt failed.
32
        TimeoutDuration time.Duration
33

34
        // SendPing is a closure that is responsible for sending the Ping
35
        // message out to our peer
36
        SendPing func(ping *lnwire.Ping)
37

38
        // OnPongFailure is a closure that is responsible for executing the
39
        // logic when a Pong message is either late or does not match our
40
        // expectations for that Pong
41
        OnPongFailure func(failureReason error, timeWaitedForPong time.Duration,
42
                lastKnownRTT time.Duration)
43

44
        // PeerCtx is the context of the peer that owns this ping manager. We'll
45
        // use this to ensure that if the peer is shutting down, then we will as
46
        // well.
47
        PeerCtx context.Context
48
}
49

50
// PingManager is a structure that is designed to manage the internal state
51
// of the ping pong lifecycle with the remote peer. We assume there is only one
52
// ping outstanding at once.
53
//
54
// NOTE: This structure MUST be initialized with NewPingManager.
55
type PingManager struct {
56
        cfg *PingManagerConfig
57

58
        // pingTime is a rough estimate of the RTT (round-trip-time) between us
59
        // and the connected peer.
60
        // To be used atomically.
61
        // TODO(roasbeef): also use a WMA or EMA?
62
        pingTime atomic.Pointer[time.Duration]
63

64
        // pingLastSend is the time when we sent our last ping message.
65
        // To be used atomically.
66
        pingLastSend *time.Time
67

68
        // outstandingPongSize is the current size of the requested pong
69
        // payload.  This value can only validly range from [0,65531]. Any
70
        // value < 0 is interpreted as if there is no outstanding ping message.
71
        outstandingPongSize int32
72

73
        // pingTicker is a pointer to a Ticker that fires on every ping
74
        // interval.
75
        pingTicker *time.Ticker
76

77
        // pingTimeout is a Timer that will fire when we want to time out a
78
        // ping
79
        pingTimeout *time.Timer
80

81
        // pongChan is the channel on which the pingManager will write Pong
82
        // messages it is evaluating
83
        pongChan chan *lnwire.Pong
84

85
        started sync.Once
86
        stopped sync.Once
87

88
        quit chan struct{}
89
        wg   sync.WaitGroup
90
}
91

92
// NewPingManager constructs a pingManager in a valid state. It must be started
93
// before it does anything useful, though.
94
func NewPingManager(cfg *PingManagerConfig) *PingManager {
3✔
95
        m := PingManager{
3✔
96
                cfg:                 cfg,
3✔
97
                outstandingPongSize: -1,
3✔
98
                pongChan:            make(chan *lnwire.Pong, 1),
3✔
99
                quit:                make(chan struct{}),
3✔
100
        }
3✔
101

3✔
102
        return &m
3✔
103
}
3✔
104

105
// Start launches the primary goroutine that is owned by the pingManager.
106
func (m *PingManager) Start() error {
3✔
107
        var err error
3✔
108
        m.started.Do(func() {
6✔
109
                m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
3✔
110
                m.pingTimeout = time.NewTimer(0)
3✔
111

3✔
112
                m.wg.Add(1)
3✔
113
                go m.pingHandler()
3✔
114
        })
3✔
115

116
        return err
3✔
117
}
118

119
// getLastRTT safely retrieves the last known RTT, returning 0 if none exists.
NEW
120
func (m *PingManager) getLastRTT() time.Duration {
×
NEW
121
        rttPtr := m.pingTime.Load()
×
NEW
122
        if rttPtr == nil {
×
NEW
123
                return 0
×
NEW
124
        }
×
125

NEW
126
        return *rttPtr
×
127
}
128

129
// pendingPingWait calculates the time waited since the last ping was sent. If
130
// no ping time is reported, None is returned. defaultDuration.
NEW
131
func (m *PingManager) pendingPingWait() fn.Option[time.Duration] {
×
NEW
132
        if m.pingLastSend != nil {
×
NEW
133
                return fn.Some(time.Since(*m.pingLastSend))
×
NEW
134
        }
×
135

NEW
136
        return fn.None[time.Duration]()
×
137
}
138

139
// pingHandler is the main goroutine responsible for enforcing the ping/pong
140
// protocol.
141
func (m *PingManager) pingHandler() {
3✔
142
        defer m.wg.Done()
3✔
143
        defer m.pingTimeout.Stop()
3✔
144

3✔
145
        // Ensure that the pingTimeout channel is empty.
3✔
146
        if !m.pingTimeout.Stop() {
3✔
147
                <-m.pingTimeout.C
×
148
        }
×
149

150
        for {
6✔
151
                select {
3✔
152
                case <-m.pingTicker.C:
×
153
                        // If this occurs it means that the new ping cycle has
×
154
                        // begun while there is still an outstanding ping
×
155
                        // awaiting a pong response.  This should never occur,
×
156
                        // but if it does, it implies a timeout.
×
157
                        if m.outstandingPongSize >= 0 {
×
NEW
158
                                // Ping was outstanding, meaning it timed out by
×
NEW
159
                                timeWaited := m.pendingPingWait().UnwrapOr(
×
NEW
160
                                        // the arrival of the next ping interval.
×
NEW
161
                                        m.cfg.IntervalDuration,
×
162
                                )
×
NEW
163
                                lastRTT := m.getLastRTT()
×
164

×
NEW
165
                                m.cfg.OnPongFailure(
×
NEW
166
                                        errors.New("ping timed "+
×
NEW
167
                                                "out by next interval"),
×
NEW
168
                                        timeWaited, lastRTT,
×
NEW
169
                                )
×
NEW
170

×
NEW
171
                                m.resetPingState()
×
UNCOV
172
                        }
×
173

174
                        pongSize := m.cfg.NewPongSize()
×
175
                        ping := &lnwire.Ping{
×
176
                                NumPongBytes: pongSize,
×
177
                                PaddingBytes: m.cfg.NewPingPayload(),
×
178
                        }
×
179

×
180
                        // Set up our bookkeeping for the new Ping.
×
181
                        if err := m.setPingState(pongSize); err != nil {
×
NEW
182
                                // This is an internal error related to timer
×
NEW
183
                                // reset. Pass it to OnPongFailure as it's
×
NEW
184
                                // critical. Current and last RTT are not
×
NEW
185
                                // directly applicable here.
×
NEW
186
                                m.cfg.OnPongFailure(err, 0, 0)
×
187
                                return
×
188
                        }
×
189

190
                        m.cfg.SendPing(ping)
×
191

192
                case <-m.pingTimeout.C:
×
NEW
193
                        timeWaited := m.pendingPingWait().UnwrapOr(
×
NEW
194
                                m.cfg.TimeoutDuration,
×
195
                        )
×
NEW
196
                        lastRTT := m.getLastRTT()
×
197

×
NEW
198
                        m.cfg.OnPongFailure(
×
NEW
199
                                errors.New("timeout while waiting for "+
×
NEW
200
                                        "pong response"),
×
NEW
201
                                timeWaited, lastRTT,
×
NEW
202
                        )
×
203

×
NEW
204
                        m.resetPingState()
×
205

206
                case pong := <-m.pongChan:
×
207
                        pongSize := int32(len(pong.PongBytes))
×
208

×
NEW
209
                        // Save off values we are about to override when we call
×
210
                        expected := m.outstandingPongSize
×
NEW
211
                        // resetPingState.
×
NEW
212
                        lastPingTime := m.pingLastSend
×
213

×
NEW
214
                        // This is an unexpected pong, we'll continue.
×
NEW
215
                        if lastPingTime == nil {
×
NEW
216
                                continue
×
217
                        }
218

NEW
219
                        actualRTT := time.Since(*lastPingTime)
×
220

×
NEW
221
                        // If the pong we receive doesn't match the ping we sent
×
NEW
222
                        // out, then we fail out.
×
223
                        if pongSize != expected {
×
NEW
224
                                e := fmt.Errorf("pong response does not match "+
×
NEW
225
                                        "expected size. Expected: %d, Got: %d",
×
NEW
226
                                        expected, pongSize)
×
227

×
NEW
228
                                lastRTT := m.getLastRTT()
×
NEW
229
                                m.cfg.OnPongFailure(e, actualRTT, lastRTT)
×
230

×
NEW
231
                                m.resetPingState()
×
232

×
NEW
233
                                continue
×
234
                        }
235

236
                        // Pong is good, update RTT and reset state.
NEW
237
                        m.pingTime.Store(&actualRTT)
×
NEW
238
                        m.resetPingState()
×
239

NEW
240
                case <-m.cfg.PeerCtx.Done():
×
NEW
241
                        return
×
242

243
                case <-m.quit:
3✔
244
                        return
3✔
245
                }
246
        }
247
}
248

249
// Stop interrupts the goroutines that the PingManager owns.
250
func (m *PingManager) Stop() {
3✔
251
        if m.pingTicker == nil {
6✔
252
                return
3✔
253
        }
3✔
254

255
        m.stopped.Do(func() {
6✔
256
                close(m.quit)
3✔
257
                m.wg.Wait()
3✔
258

3✔
259
                m.pingTicker.Stop()
3✔
260
                m.pingTimeout.Stop()
3✔
261
        })
3✔
262
}
263

264
// setPingState is a private method to keep track of all of the fields we need
265
// to set when we send out a Ping.
266
func (m *PingManager) setPingState(pongSize uint16) error {
×
267
        t := time.Now()
×
268
        m.pingLastSend = &t
×
269
        m.outstandingPongSize = int32(pongSize)
×
270
        if m.pingTimeout.Reset(m.cfg.TimeoutDuration) {
×
271
                return fmt.Errorf(
×
272
                        "impossible: ping timeout reset when already active",
×
273
                )
×
274
        }
×
275

276
        return nil
×
277
}
278

279
// resetPingState is a private method that resets all of the bookkeeping that
280
// is tracking a currently outstanding Ping.
281
func (m *PingManager) resetPingState() {
×
282
        m.pingLastSend = nil
×
283
        m.outstandingPongSize = -1
×
NEW
284

×
285
        if !m.pingTimeout.Stop() {
×
286
                select {
×
287
                case <-m.pingTimeout.C:
×
288
                default:
×
289
                }
290
        }
291
}
292

293
// GetPingTimeMicroSeconds reports back the RTT calculated by the pingManager.
294
func (m *PingManager) GetPingTimeMicroSeconds() int64 {
3✔
295
        rtt := m.pingTime.Load()
3✔
296

3✔
297
        if rtt == nil {
6✔
298
                return -1
3✔
299
        }
3✔
300

301
        return rtt.Microseconds()
×
302
}
303

304
// ReceivedPong is called to evaluate a Pong message against the expectations
305
// we have for it. It will cause the PingManager to invoke the supplied
306
// OnPongFailure function if the Pong argument supplied violates expectations.
307
func (m *PingManager) ReceivedPong(msg *lnwire.Pong) {
×
308
        select {
×
309
        case m.pongChan <- msg:
×
310
        case <-m.quit:
×
NEW
311
        case <-m.cfg.PeerCtx.Done():
×
312
        }
313
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc