• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.3
/peer/ping_manager.go
1
package peer
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/lightningnetwork/lnd/lnwire"
11
)
12

13
// PingManagerConfig is a structure containing various parameters that govern
14
// how the PingManager behaves.
15
type PingManagerConfig struct {
16
        // NewPingPayload is a closure that returns the payload to be packaged
17
        // in the Ping message.
18
        NewPingPayload func() []byte
19

20
        // NewPongSize is a closure that returns a random value between
21
        // [0, lnwire.MaxPongBytes]. This random value helps to more effectively
22
        // pair Pong messages with Ping.
23
        NewPongSize func() uint16
24

25
        // IntervalDuration is the Duration between attempted pings.
26
        IntervalDuration time.Duration
27

28
        // TimeoutDuration is the Duration we wait before declaring a ping
29
        // attempt failed.
30
        TimeoutDuration time.Duration
31

32
        // SendPing is a closure that is responsible for sending the Ping
33
        // message out to our peer
34
        SendPing func(ping *lnwire.Ping)
35

36
        // OnPongFailure is a closure that is responsible for executing the
37
        // logic when a Pong message is either late or does not match our
38
        // expectations for that Pong
39
        OnPongFailure func(error)
40
}
41

42
// PingManager is a structure that is designed to manage the internal state
43
// of the ping pong lifecycle with the remote peer. We assume there is only one
44
// ping outstanding at once.
45
//
46
// NOTE: This structure MUST be initialized with NewPingManager.
47
type PingManager struct {
48
        cfg *PingManagerConfig
49

50
        // pingTime is a rough estimate of the RTT (round-trip-time) between us
51
        // and the connected peer.
52
        // To be used atomically.
53
        // TODO(roasbeef): also use a WMA or EMA?
54
        pingTime atomic.Pointer[time.Duration]
55

56
        // pingLastSend is the time when we sent our last ping message.
57
        // To be used atomically.
58
        pingLastSend *time.Time
59

60
        // outstandingPongSize is the current size of the requested pong
61
        // payload.  This value can only validly range from [0,65531]. Any
62
        // value < 0 is interpreted as if there is no outstanding ping message.
63
        outstandingPongSize int32
64

65
        // pingTicker is a pointer to a Ticker that fires on every ping
66
        // interval.
67
        pingTicker *time.Ticker
68

69
        // pingTimeout is a Timer that will fire when we want to time out a
70
        // ping
71
        pingTimeout *time.Timer
72

73
        // pongChan is the channel on which the pingManager will write Pong
74
        // messages it is evaluating
75
        pongChan chan *lnwire.Pong
76

77
        started sync.Once
78
        stopped sync.Once
79

80
        quit chan struct{}
81
        wg   sync.WaitGroup
82
}
83

84
// NewPingManager constructs a pingManager in a valid state. It must be started
85
// before it does anything useful, though.
86
func NewPingManager(cfg *PingManagerConfig) *PingManager {
28✔
87
        m := PingManager{
28✔
88
                cfg:                 cfg,
28✔
89
                outstandingPongSize: -1,
28✔
90
                pongChan:            make(chan *lnwire.Pong, 1),
28✔
91
                quit:                make(chan struct{}),
28✔
92
        }
28✔
93

28✔
94
        return &m
28✔
95
}
28✔
96

97
// Start launches the primary goroutine that is owned by the pingManager.
98
func (m *PingManager) Start() error {
6✔
99
        var err error
6✔
100
        m.started.Do(func() {
12✔
101
                m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
6✔
102
                m.pingTimeout = time.NewTimer(0)
6✔
103

6✔
104
                m.wg.Add(1)
6✔
105
                go m.pingHandler()
6✔
106
        })
6✔
107

108
        return err
6✔
109
}
110

111
// pingHandler is the main goroutine responsible for enforcing the ping/pong
112
// protocol.
113
func (m *PingManager) pingHandler() {
6✔
114
        defer m.wg.Done()
6✔
115
        defer m.pingTimeout.Stop()
6✔
116

6✔
117
        // Ensure that the pingTimeout channel is empty.
6✔
118
        if !m.pingTimeout.Stop() {
11✔
119
                <-m.pingTimeout.C
5✔
120
        }
5✔
121

122
        for {
16✔
123
                select {
10✔
124
                case <-m.pingTicker.C:
3✔
125
                        // If this occurs it means that the new ping cycle has
3✔
126
                        // begun while there is still an outstanding ping
3✔
127
                        // awaiting a pong response.  This should never occur,
3✔
128
                        // but if it does, it implies a timeout.
3✔
129
                        if m.outstandingPongSize >= 0 {
3✔
130
                                e := errors.New("impossible: new ping" +
×
131
                                        "in unclean state",
×
132
                                )
×
133
                                m.cfg.OnPongFailure(e)
×
134

×
135
                                return
×
136
                        }
×
137

138
                        pongSize := m.cfg.NewPongSize()
3✔
139
                        ping := &lnwire.Ping{
3✔
140
                                NumPongBytes: pongSize,
3✔
141
                                PaddingBytes: m.cfg.NewPingPayload(),
3✔
142
                        }
3✔
143

3✔
144
                        // Set up our bookkeeping for the new Ping.
3✔
145
                        if err := m.setPingState(pongSize); err != nil {
3✔
146
                                m.cfg.OnPongFailure(err)
×
147

×
148
                                return
×
149
                        }
×
150

151
                        m.cfg.SendPing(ping)
3✔
152

153
                case <-m.pingTimeout.C:
1✔
154
                        m.resetPingState()
1✔
155

1✔
156
                        e := errors.New("timeout while waiting for " +
1✔
157
                                "pong response",
1✔
158
                        )
1✔
159

1✔
160
                        m.cfg.OnPongFailure(e)
1✔
161

1✔
162
                        return
1✔
163

164
                case pong := <-m.pongChan:
2✔
165
                        pongSize := int32(len(pong.PongBytes))
2✔
166

2✔
167
                        // Save off values we are about to override when we
2✔
168
                        // call resetPingState.
2✔
169
                        expected := m.outstandingPongSize
2✔
170
                        lastPing := m.pingLastSend
2✔
171

2✔
172
                        m.resetPingState()
2✔
173

2✔
174
                        // If the pong we receive doesn't match the ping we
2✔
175
                        // sent out, then we fail out.
2✔
176
                        if pongSize != expected {
3✔
177
                                e := errors.New("pong response does " +
1✔
178
                                        "not match expected size",
1✔
179
                                )
1✔
180

1✔
181
                                m.cfg.OnPongFailure(e)
1✔
182

1✔
183
                                return
1✔
184
                        }
1✔
185

186
                        // Compute RTT of ping and save that for future
187
                        // querying.
188
                        if lastPing != nil {
2✔
189
                                rtt := time.Since(*lastPing)
1✔
190
                                m.pingTime.Store(&rtt)
1✔
191
                        }
1✔
192

193
                case <-m.quit:
1✔
194
                        return
1✔
195
                }
196
        }
197
}
198

199
// Stop interrupts the goroutines that the PingManager owns.
200
func (m *PingManager) Stop() {
3✔
201
        if m.pingTicker == nil {
3✔
202
                return
×
203
        }
×
204

205
        m.stopped.Do(func() {
6✔
206
                close(m.quit)
3✔
207
                m.wg.Wait()
3✔
208

3✔
209
                m.pingTicker.Stop()
3✔
210
                m.pingTimeout.Stop()
3✔
211
        })
3✔
212
}
213

214
// setPingState is a private method to keep track of all of the fields we need
215
// to set when we send out a Ping.
216
func (m *PingManager) setPingState(pongSize uint16) error {
3✔
217
        t := time.Now()
3✔
218
        m.pingLastSend = &t
3✔
219
        m.outstandingPongSize = int32(pongSize)
3✔
220
        if m.pingTimeout.Reset(m.cfg.TimeoutDuration) {
3✔
221
                return fmt.Errorf(
×
222
                        "impossible: ping timeout reset when already active",
×
223
                )
×
224
        }
×
225

226
        return nil
3✔
227
}
228

229
// resetPingState is a private method that resets all of the bookkeeping that
230
// is tracking a currently outstanding Ping.
231
func (m *PingManager) resetPingState() {
3✔
232
        m.pingLastSend = nil
3✔
233
        m.outstandingPongSize = -1
3✔
234
        if !m.pingTimeout.Stop() {
4✔
235
                select {
1✔
236
                case <-m.pingTimeout.C:
×
237
                default:
1✔
238
                }
239
        }
240
}
241

242
// GetPingTimeMicroSeconds reports back the RTT calculated by the pingManager.
UNCOV
243
func (m *PingManager) GetPingTimeMicroSeconds() int64 {
×
UNCOV
244
        rtt := m.pingTime.Load()
×
UNCOV
245

×
UNCOV
246
        if rtt == nil {
×
UNCOV
247
                return -1
×
UNCOV
248
        }
×
249

250
        return rtt.Microseconds()
×
251
}
252

253
// ReceivedPong is called to evaluate a Pong message against the expectations
254
// we have for it. It will cause the PingManager to invoke the supplied
255
// OnPongFailure function if the Pong argument supplied violates expectations.
256
func (m *PingManager) ReceivedPong(msg *lnwire.Pong) {
3✔
257
        select {
3✔
258
        case m.pongChan <- msg:
3✔
259
        case <-m.quit:
×
260
        }
261
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc