• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13523316608

25 Feb 2025 02:12PM UTC coverage: 49.351% (-9.5%) from 58.835%
13523316608

Pull #9549

github

yyforyongyu
routing/chainview: refactor `TestFilteredChainView`

So each test has its own miner and chainView.
Pull Request #9549: Fix unit test flake `TestHistoricalConfDetailsTxIndex`

0 of 120 new or added lines in 1 file covered. (0.0%)

27196 existing lines in 434 files now uncovered.

100945 of 204543 relevant lines covered (49.35%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.09
/peer/ping_manager.go
1
package peer
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/lightningnetwork/lnd/lnwire"
11
)
12

13
// PingManagerConfig is a structure containing various parameters that govern
14
// how the PingManager behaves.
15
type PingManagerConfig struct {
16
        // NewPingPayload is a closure that returns the payload to be packaged
17
        // in the Ping message.
18
        NewPingPayload func() []byte
19

20
        // NewPongSize is a closure that returns a random value between
21
        // [0, lnwire.MaxPongBytes]. This random value helps to more effectively
22
        // pair Pong messages with Ping.
23
        NewPongSize func() uint16
24

25
        // IntervalDuration is the Duration between attempted pings.
26
        IntervalDuration time.Duration
27

28
        // TimeoutDuration is the Duration we wait before declaring a ping
29
        // attempt failed.
30
        TimeoutDuration time.Duration
31

32
        // SendPing is a closure that is responsible for sending the Ping
33
        // message out to our peer
34
        SendPing func(ping *lnwire.Ping)
35

36
        // OnPongFailure is a closure that is responsible for executing the
37
        // logic when a Pong message is either late or does not match our
38
        // expectations for that Pong
39
        OnPongFailure func(error)
40
}
41

42
// PingManager is a structure that is designed to manage the internal state
43
// of the ping pong lifecycle with the remote peer. We assume there is only one
44
// ping outstanding at once.
45
//
46
// NOTE: This structure MUST be initialized with NewPingManager.
47
type PingManager struct {
48
        cfg *PingManagerConfig
49

50
        // pingTime is a rough estimate of the RTT (round-trip-time) between us
51
        // and the connected peer.
52
        // To be used atomically.
53
        // TODO(roasbeef): also use a WMA or EMA?
54
        pingTime atomic.Pointer[time.Duration]
55

56
        // pingLastSend is the time when we sent our last ping message.
57
        // To be used atomically.
58
        pingLastSend *time.Time
59

60
        // outstandingPongSize is the current size of the requested pong
61
        // payload.  This value can only validly range from [0,65531]. Any
62
        // value < 0 is interpreted as if there is no outstanding ping message.
63
        outstandingPongSize int32
64

65
        // pingTicker is a pointer to a Ticker that fires on every ping
66
        // interval.
67
        pingTicker *time.Ticker
68

69
        // pingTimeout is a Timer that will fire when we want to time out a
70
        // ping
71
        pingTimeout *time.Timer
72

73
        // pongChan is the channel on which the pingManager will write Pong
74
        // messages it is evaluating
75
        pongChan chan *lnwire.Pong
76

77
        started sync.Once
78
        stopped sync.Once
79

80
        quit chan struct{}
81
        wg   sync.WaitGroup
82
}
83

84
// NewPingManager constructs a pingManager in a valid state. It must be started
85
// before it does anything useful, though.
86
func NewPingManager(cfg *PingManagerConfig) *PingManager {
3✔
87
        m := PingManager{
3✔
88
                cfg:                 cfg,
3✔
89
                outstandingPongSize: -1,
3✔
90
                pongChan:            make(chan *lnwire.Pong, 1),
3✔
91
                quit:                make(chan struct{}),
3✔
92
        }
3✔
93

3✔
94
        return &m
3✔
95
}
3✔
96

97
// Start launches the primary goroutine that is owned by the pingManager.
98
func (m *PingManager) Start() error {
3✔
99
        var err error
3✔
100
        m.started.Do(func() {
6✔
101
                m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
3✔
102
                m.pingTimeout = time.NewTimer(0)
3✔
103

3✔
104
                m.wg.Add(1)
3✔
105
                go m.pingHandler()
3✔
106
        })
3✔
107

108
        return err
3✔
109
}
110

111
// pingHandler is the main goroutine responsible for enforcing the ping/pong
112
// protocol.
113
func (m *PingManager) pingHandler() {
3✔
114
        defer m.wg.Done()
3✔
115
        defer m.pingTimeout.Stop()
3✔
116

3✔
117
        // Ensure that the pingTimeout channel is empty.
3✔
118
        if !m.pingTimeout.Stop() {
3✔
119
                <-m.pingTimeout.C
×
120
        }
×
121

122
        for {
6✔
123
                select {
3✔
UNCOV
124
                case <-m.pingTicker.C:
×
UNCOV
125
                        // If this occurs it means that the new ping cycle has
×
UNCOV
126
                        // begun while there is still an outstanding ping
×
UNCOV
127
                        // awaiting a pong response.  This should never occur,
×
UNCOV
128
                        // but if it does, it implies a timeout.
×
UNCOV
129
                        if m.outstandingPongSize >= 0 {
×
130
                                e := errors.New("impossible: new ping" +
×
131
                                        "in unclean state",
×
132
                                )
×
133
                                m.cfg.OnPongFailure(e)
×
134

×
135
                                return
×
136
                        }
×
137

UNCOV
138
                        pongSize := m.cfg.NewPongSize()
×
UNCOV
139
                        ping := &lnwire.Ping{
×
UNCOV
140
                                NumPongBytes: pongSize,
×
UNCOV
141
                                PaddingBytes: m.cfg.NewPingPayload(),
×
UNCOV
142
                        }
×
UNCOV
143

×
UNCOV
144
                        // Set up our bookkeeping for the new Ping.
×
UNCOV
145
                        if err := m.setPingState(pongSize); err != nil {
×
146
                                m.cfg.OnPongFailure(err)
×
147

×
148
                                return
×
149
                        }
×
150

UNCOV
151
                        m.cfg.SendPing(ping)
×
152

UNCOV
153
                case <-m.pingTimeout.C:
×
UNCOV
154
                        m.resetPingState()
×
UNCOV
155

×
UNCOV
156
                        e := errors.New("timeout while waiting for " +
×
UNCOV
157
                                "pong response",
×
UNCOV
158
                        )
×
UNCOV
159

×
UNCOV
160
                        m.cfg.OnPongFailure(e)
×
UNCOV
161

×
UNCOV
162
                        return
×
163

UNCOV
164
                case pong := <-m.pongChan:
×
UNCOV
165
                        pongSize := int32(len(pong.PongBytes))
×
UNCOV
166

×
UNCOV
167
                        // Save off values we are about to override when we
×
UNCOV
168
                        // call resetPingState.
×
UNCOV
169
                        expected := m.outstandingPongSize
×
UNCOV
170
                        lastPing := m.pingLastSend
×
UNCOV
171

×
UNCOV
172
                        m.resetPingState()
×
UNCOV
173

×
UNCOV
174
                        // If the pong we receive doesn't match the ping we
×
UNCOV
175
                        // sent out, then we fail out.
×
UNCOV
176
                        if pongSize != expected {
×
UNCOV
177
                                e := errors.New("pong response does " +
×
UNCOV
178
                                        "not match expected size",
×
UNCOV
179
                                )
×
UNCOV
180

×
UNCOV
181
                                m.cfg.OnPongFailure(e)
×
UNCOV
182

×
UNCOV
183
                                return
×
UNCOV
184
                        }
×
185

186
                        // Compute RTT of ping and save that for future
187
                        // querying.
UNCOV
188
                        if lastPing != nil {
×
UNCOV
189
                                rtt := time.Since(*lastPing)
×
UNCOV
190
                                m.pingTime.Store(&rtt)
×
UNCOV
191
                        }
×
192

193
                case <-m.quit:
3✔
194
                        return
3✔
195
                }
196
        }
197
}
198

199
// Stop interrupts the goroutines that the PingManager owns.
200
func (m *PingManager) Stop() {
3✔
201
        if m.pingTicker == nil {
3✔
202
                return
×
203
        }
×
204

205
        m.stopped.Do(func() {
6✔
206
                close(m.quit)
3✔
207
                m.wg.Wait()
3✔
208

3✔
209
                m.pingTicker.Stop()
3✔
210
                m.pingTimeout.Stop()
3✔
211
        })
3✔
212
}
213

214
// setPingState is a private method to keep track of all of the fields we need
215
// to set when we send out a Ping.
UNCOV
216
func (m *PingManager) setPingState(pongSize uint16) error {
×
UNCOV
217
        t := time.Now()
×
UNCOV
218
        m.pingLastSend = &t
×
UNCOV
219
        m.outstandingPongSize = int32(pongSize)
×
UNCOV
220
        if m.pingTimeout.Reset(m.cfg.TimeoutDuration) {
×
221
                return fmt.Errorf(
×
222
                        "impossible: ping timeout reset when already active",
×
223
                )
×
224
        }
×
225

UNCOV
226
        return nil
×
227
}
228

229
// resetPingState is a private method that resets all of the bookkeeping that
230
// is tracking a currently outstanding Ping.
UNCOV
231
func (m *PingManager) resetPingState() {
×
UNCOV
232
        m.pingLastSend = nil
×
UNCOV
233
        m.outstandingPongSize = -1
×
UNCOV
234
        if !m.pingTimeout.Stop() {
×
UNCOV
235
                select {
×
236
                case <-m.pingTimeout.C:
×
UNCOV
237
                default:
×
238
                }
239
        }
240
}
241

242
// GetPingTimeMicroSeconds reports back the RTT calculated by the pingManager.
243
func (m *PingManager) GetPingTimeMicroSeconds() int64 {
3✔
244
        rtt := m.pingTime.Load()
3✔
245

3✔
246
        if rtt == nil {
6✔
247
                return -1
3✔
248
        }
3✔
249

250
        return rtt.Microseconds()
×
251
}
252

253
// ReceivedPong is called to evaluate a Pong message against the expectations
254
// we have for it. It will cause the PingManager to invoke the supplied
255
// OnPongFailure function if the Pong argument supplied violates expectations.
UNCOV
256
func (m *PingManager) ReceivedPong(msg *lnwire.Pong) {
×
UNCOV
257
        select {
×
UNCOV
258
        case m.pongChan <- msg:
×
259
        case <-m.quit:
×
260
        }
261
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc