• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12583319996

02 Jan 2025 01:38PM UTC coverage: 57.522% (-1.1%) from 58.598%
12583319996

Pull #9361

github

starius
fn/ContextGuard: use context.AfterFunc to wait

Simplifies context cancellation handling by using context.AfterFunc instead of a
goroutine to wait for context cancellation. This approach avoids the overhead of
a goroutine during the waiting period.

For ctxQuitUnsafe, since g.quit is closed only in the Quit method (which also
cancels all associated contexts), waiting on context cancellation ensures the
same behavior without unnecessary dependency on g.quit.

Added a test to ensure that the Create method does not launch any goroutines.
Pull Request #9361: fn: optimize context guard

102587 of 178344 relevant lines covered (57.52%)

24734.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.27
/watchtower/wtserver/server.go
1
package wtserver
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "net"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btcd/chaincfg/chainhash"
13
        "github.com/btcsuite/btcd/connmgr"
14
        "github.com/lightningnetwork/lnd/keychain"
15
        "github.com/lightningnetwork/lnd/lnwire"
16
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
17
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
18
)
19

20
var (
21
        // ErrPeerAlreadyConnected signals that a peer with the same session id
22
        // is already active within the server.
23
        ErrPeerAlreadyConnected = errors.New("peer already connected")
24

25
        // ErrServerExiting signals that a request could not be processed
26
        // because the server has been requested to shut down.
27
        ErrServerExiting = errors.New("server shutting down")
28
)
29

30
// Config abstracts the primary components and dependencies of the server.
31
type Config struct {
32
        // DB provides persistent access to the server's sessions and for
33
        // storing state updates.
34
        DB DB
35

36
        // NodeKeyECDH is the ECDH capable wrapper of the key to be used in
37
        // accepting new brontide connections.
38
        NodeKeyECDH keychain.SingleKeyECDH
39

40
        // Listeners specifies which address to which clients may connect.
41
        Listeners []net.Listener
42

43
        // ReadTimeout specifies how long a client may go without sending a
44
        // message.
45
        ReadTimeout time.Duration
46

47
        // WriteTimeout specifies how long a client may go without reading a
48
        // message from the other end, if the connection has stopped buffering
49
        // the server's replies.
50
        WriteTimeout time.Duration
51

52
        // NewAddress is used to generate reward addresses, where a cut of
53
        // successfully sent funds can be received.
54
        NewAddress func() (btcutil.Address, error)
55

56
        // ChainHash identifies the network that the server is watching.
57
        ChainHash chainhash.Hash
58

59
        // NoAckCreateSession causes the server to not reply to create session
60
        // requests, this should only be used for testing.
61
        NoAckCreateSession bool
62

63
        // NoAckUpdates causes the server to not acknowledge state updates, this
64
        // should only be used for testing.
65
        NoAckUpdates bool
66

67
        // DisableReward causes the server to reject any session creation
68
        // attempts that request rewards.
69
        DisableReward bool
70
}
71

72
// Server houses the state required to handle watchtower peers. It's primary job
73
// is to accept incoming connections, and dispatch processing of the client
74
// message streams.
75
type Server struct {
76
        started sync.Once
77
        stopped sync.Once
78

79
        cfg *Config
80

81
        connMgr *connmgr.ConnManager
82

83
        clientMtx sync.RWMutex
84
        clients   map[wtdb.SessionID]Peer
85

86
        newPeers chan Peer
87

88
        localInit *wtwire.Init
89

90
        wg   sync.WaitGroup
91
        quit chan struct{}
92
}
93

94
// New creates a new server to handle watchtower clients. The server will accept
95
// clients connecting to the listener addresses, and allows them to open
96
// sessions and send state updates.
97
func New(cfg *Config) (*Server, error) {
96✔
98
        localInit := wtwire.NewInitMessage(
96✔
99
                lnwire.NewRawFeatureVector(
96✔
100
                        wtwire.AltruistSessionsOptional,
96✔
101
                        wtwire.AnchorCommitOptional,
96✔
102
                ),
96✔
103
                cfg.ChainHash,
96✔
104
        )
96✔
105

96✔
106
        s := &Server{
96✔
107
                cfg:       cfg,
96✔
108
                clients:   make(map[wtdb.SessionID]Peer),
96✔
109
                newPeers:  make(chan Peer),
96✔
110
                localInit: localInit,
96✔
111
                quit:      make(chan struct{}),
96✔
112
        }
96✔
113

96✔
114
        connMgr, err := connmgr.New(&connmgr.Config{
96✔
115
                Listeners: cfg.Listeners,
96✔
116
                OnAccept:  s.inboundPeerConnected,
96✔
117
                Dial:      noDial,
96✔
118
        })
96✔
119
        if err != nil {
96✔
120
                return nil, err
×
121
        }
×
122

123
        s.connMgr = connMgr
96✔
124

96✔
125
        return s, nil
96✔
126
}
127

128
// Start begins listening on the server's listeners.
129
func (s *Server) Start() error {
64✔
130
        s.started.Do(func() {
128✔
131
                log.Infof("Starting watchtower server")
64✔
132

64✔
133
                s.wg.Add(1)
64✔
134
                go s.peerHandler()
64✔
135

64✔
136
                s.connMgr.Start()
64✔
137

64✔
138
                log.Infof("Watchtower server started successfully")
64✔
139
        })
64✔
140
        return nil
64✔
141
}
142

143
// Stop shutdowns down the server's listeners and any active requests.
144
func (s *Server) Stop() error {
58✔
145
        s.stopped.Do(func() {
114✔
146
                log.Infof("Stopping watchtower server")
56✔
147

56✔
148
                s.connMgr.Stop()
56✔
149

56✔
150
                close(s.quit)
56✔
151
                s.wg.Wait()
56✔
152

56✔
153
                log.Infof("Watchtower server stopped successfully")
56✔
154
        })
56✔
155
        return nil
58✔
156
}
157

158
// inboundPeerConnected is the callback given to the connection manager, and is
159
// called each time a new connection is made to the watchtower. This method
160
// proxies the new peers by filtering out those that do not satisfy the
161
// server.Peer interface, and closes their connection. Successful connections
162
// will be passed on to the public InboundPeerConnected method.
163
func (s *Server) inboundPeerConnected(c net.Conn) {
×
164
        peer, ok := c.(Peer)
×
165
        if !ok {
×
166
                log.Warnf("incoming connection %T does not satisfy "+
×
167
                        "server.Peer interface", c)
×
168
                c.Close()
×
169
                return
×
170
        }
×
171

172
        s.InboundPeerConnected(peer)
×
173
}
174

175
// InboundPeerConnected accepts a server.Peer, and handles the request submitted
176
// by the client. This method serves also as a public endpoint for locally
177
// registering new clients with the server.
178
func (s *Server) InboundPeerConnected(peer Peer) {
466✔
179
        select {
466✔
180
        case s.newPeers <- peer:
466✔
181
        case <-s.quit:
×
182
        }
183
}
184

185
// peerHandler processes newly accepted peers and spawns a client handler for
186
// each. The peerHandler is used to ensure that waitgrouped client handlers are
187
// spawned from a waitgrouped goroutine.
188
func (s *Server) peerHandler() {
64✔
189
        defer s.wg.Done()
64✔
190
        defer s.removeAllPeers()
64✔
191

64✔
192
        for {
594✔
193
                select {
530✔
194
                case peer := <-s.newPeers:
466✔
195
                        s.wg.Add(1)
466✔
196
                        go s.handleClient(peer)
466✔
197

198
                case <-s.quit:
56✔
199
                        return
56✔
200
                }
201
        }
202
}
203

204
// handleClient processes a series watchtower messages sent by a client. The
205
// client may either send:
206
//   - a single CreateSession message.
207
//   - a series of StateUpdate messages.
208
//
209
// This method uses the server's peer map to ensure at most one peer using the
210
// same session id can enter the main event loop. The connection will be
211
// dropped by the watchtower if no messages are sent or received by the
212
// configured Read/WriteTimeouts.
213
//
214
// NOTE: This method MUST be run as a goroutine.
215
func (s *Server) handleClient(peer Peer) {
466✔
216
        defer s.wg.Done()
466✔
217

466✔
218
        // Use the connection's remote pubkey as the client's session id.
466✔
219
        id := wtdb.NewSessionIDFromPubKey(peer.RemotePub())
466✔
220

466✔
221
        // Register this peer in the server's client map, and defer the
466✔
222
        // connection's cleanup. If the peer already exists, we will close the
466✔
223
        // connection and exit immediately.
466✔
224
        err := s.addPeer(&id, peer)
466✔
225
        if err != nil {
467✔
226
                peer.Close()
1✔
227
                return
1✔
228
        }
1✔
229
        defer s.removePeer(&id, peer.RemoteAddr())
465✔
230

465✔
231
        msg, err := s.readMessage(peer)
465✔
232
        if err != nil {
465✔
233
                log.Errorf("Unable to read message from client %s@%s: %v",
×
234
                        id, peer.RemoteAddr(), err)
×
235
                return
×
236
        }
×
237

238
        remoteInit, ok := msg.(*wtwire.Init)
465✔
239
        if !ok {
465✔
240
                log.Errorf("Client %s@%s did not send Init msg as first "+
×
241
                        "message", id, peer.RemoteAddr())
×
242
                return
×
243
        }
×
244

245
        err = s.sendMessage(peer, s.localInit)
465✔
246
        if err != nil {
465✔
247
                log.Errorf("Unable to send Init msg to %s: %v", id, err)
×
248
                return
×
249
        }
×
250

251
        err = s.localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
465✔
252
        if err != nil {
465✔
253
                log.Errorf("Cannot support client %s: %v", id, err)
×
254
                return
×
255
        }
×
256

257
        nextMsg, err := s.readMessage(peer)
465✔
258
        if err != nil {
466✔
259
                log.Errorf("Unable to read watchtower msg from %s: %v",
1✔
260
                        id, err)
1✔
261
                return
1✔
262
        }
1✔
263

264
        switch msg := nextMsg.(type) {
464✔
265
        case *wtwire.CreateSession:
128✔
266
                // Attempt to open a new session for this client.
128✔
267
                err = s.handleCreateSession(peer, &id, msg)
128✔
268
                if err != nil {
164✔
269
                        log.Errorf("Unable to handle CreateSession "+
36✔
270
                                "from %s: %v", id, err)
36✔
271
                }
36✔
272

273
        case *wtwire.DeleteSession:
6✔
274
                err = s.handleDeleteSession(peer, &id)
6✔
275
                if err != nil {
7✔
276
                        log.Errorf("Unable to handle DeleteSession "+
1✔
277
                                "from %s: %v", id, err)
1✔
278
                }
1✔
279

280
        case *wtwire.StateUpdate:
330✔
281
                err = s.handleStateUpdates(peer, &id, msg)
330✔
282
                if err != nil {
561✔
283
                        log.Errorf("Unable to handle StateUpdate "+
231✔
284
                                "from %s: %v", id, err)
231✔
285
                }
231✔
286

287
        default:
×
288
                log.Errorf("Received unsupported message type: %T "+
×
289
                        "from %s", nextMsg, id)
×
290
        }
291
}
292

293
// connFailure is a default error used when a request failed with a non-zero
294
// error code.
295
type connFailure struct {
296
        ID   wtdb.SessionID
297
        Code wtwire.ErrorCode
298
}
299

300
// Error displays the SessionID and Code that caused the connection failure.
301
func (f *connFailure) Error() string {
×
302
        return fmt.Sprintf("connection with %s failed with code=%s",
×
303
                f.ID, f.Code,
×
304
        )
×
305
}
×
306

307
// readMessage receives and parses the next message from the given Peer. An
308
// error is returned if a message is not received before the server's read
309
// timeout, the read off the wire failed, or the message could not be
310
// deserialized.
311
func (s *Server) readMessage(peer Peer) (wtwire.Message, error) {
1,319✔
312
        // Set a read timeout to ensure we drop the client if not sent in a
1,319✔
313
        // timely manner.
1,319✔
314
        err := peer.SetReadDeadline(time.Now().Add(s.cfg.ReadTimeout))
1,319✔
315
        if err != nil {
1,319✔
316
                err = fmt.Errorf("unable to set read deadline: %w", err)
×
317
                return nil, err
×
318
        }
×
319

320
        // Pull the next message off the wire, and parse it according to the
321
        // watchtower wire specification.
322
        rawMsg, err := peer.ReadNextMessage()
1,319✔
323
        if err != nil {
1,326✔
324
                err = fmt.Errorf("unable to read message: %w", err)
7✔
325
                return nil, err
7✔
326
        }
7✔
327

328
        msgReader := bytes.NewReader(rawMsg)
1,312✔
329
        msg, err := wtwire.ReadMessage(msgReader, 0)
1,312✔
330
        if err != nil {
1,312✔
331
                err = fmt.Errorf("unable to parse message: %w", err)
×
332
                return nil, err
×
333
        }
×
334

335
        logMessage(peer, msg, true)
1,312✔
336

1,312✔
337
        return msg, nil
1,312✔
338
}
339

340
// sendMessage sends a watchtower wire message to the target peer.
341
func (s *Server) sendMessage(peer Peer, msg wtwire.Message) error {
1,062✔
342
        // TODO(conner): use buffer pool?
1,062✔
343

1,062✔
344
        var b bytes.Buffer
1,062✔
345
        _, err := wtwire.WriteMessage(&b, msg, 0)
1,062✔
346
        if err != nil {
1,062✔
347
                err = fmt.Errorf("unable to encode msg: %w", err)
×
348
                return err
×
349
        }
×
350

351
        err = peer.SetWriteDeadline(time.Now().Add(s.cfg.WriteTimeout))
1,062✔
352
        if err != nil {
1,062✔
353
                err = fmt.Errorf("unable to set write deadline: %w", err)
×
354
                return err
×
355
        }
×
356

357
        logMessage(peer, msg, false)
1,062✔
358

1,062✔
359
        _, err = peer.Write(b.Bytes())
1,062✔
360
        return err
1,062✔
361
}
362

363
// addPeer stores a client in the server's client map. An error is returned if a
364
// client with the same session id already exists.
365
func (s *Server) addPeer(id *wtdb.SessionID, peer Peer) error {
466✔
366
        s.clientMtx.Lock()
466✔
367
        defer s.clientMtx.Unlock()
466✔
368

466✔
369
        if existingPeer, ok := s.clients[*id]; ok {
467✔
370
                log.Infof("Already connected to peer %s@%s, disconnecting %s",
1✔
371
                        id, existingPeer.RemoteAddr(), peer.RemoteAddr())
1✔
372
                return ErrPeerAlreadyConnected
1✔
373
        }
1✔
374
        s.clients[*id] = peer
465✔
375

465✔
376
        log.Infof("Accepted incoming peer %s@%s",
465✔
377
                id, peer.RemoteAddr())
465✔
378

465✔
379
        return nil
465✔
380
}
381

382
// removePeer deletes a client from the server's client map. If a peer is found,
383
// this method will close the peer's connection.
384
func (s *Server) removePeer(id *wtdb.SessionID, addr net.Addr) {
465✔
385
        log.Infof("Releasing incoming peer %s@%s", id, addr)
465✔
386

465✔
387
        s.clientMtx.Lock()
465✔
388
        peer, ok := s.clients[*id]
465✔
389
        delete(s.clients, *id)
465✔
390
        s.clientMtx.Unlock()
465✔
391

465✔
392
        if ok {
930✔
393
                peer.Close()
465✔
394
        }
465✔
395
}
396

397
// removeAllPeers iterates through the server's current set of peers and closes
398
// all open connections.
399
func (s *Server) removeAllPeers() {
56✔
400
        s.clientMtx.Lock()
56✔
401
        defer s.clientMtx.Unlock()
56✔
402

56✔
403
        for id, peer := range s.clients {
56✔
404
                log.Infof("Releasing incoming peer %s@%s", id,
×
405
                        peer.RemoteAddr())
×
406

×
407
                delete(s.clients, id)
×
408
                peer.Close()
×
409
        }
×
410
}
411

412
// logMessage writes information about a message exchanged with a remote peer,
413
// using directional prepositions to signal whether the message was sent or
414
// received.
415
func logMessage(peer Peer, msg wtwire.Message, read bool) {
2,374✔
416
        var action = "Received"
2,374✔
417
        var preposition = "from"
2,374✔
418
        if !read {
3,436✔
419
                action = "Sending"
1,062✔
420
                preposition = "to"
1,062✔
421
        }
1,062✔
422

423
        summary := wtwire.MessageSummary(msg)
2,374✔
424
        if len(summary) > 0 {
3,806✔
425
                summary = "(" + summary + ")"
1,432✔
426
        }
1,432✔
427

428
        log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
2,374✔
429
                preposition, peer.RemotePub().SerializeCompressed(),
2,374✔
430
                peer.RemoteAddr())
2,374✔
431
}
432

433
// noDial is a dummy dial method passed to the server's connmgr.
434
func noDial(_ net.Addr) (net.Conn, error) {
×
435
        return nil, fmt.Errorf("watchtower cannot make outgoing conns")
×
436
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc