• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12199391122

06 Dec 2024 01:10PM UTC coverage: 49.807% (-9.1%) from 58.933%
12199391122

push

github

web-flow
Merge pull request #9337 from Guayaba221/patch-1

chore: fix typo in ruby.md

100137 of 201051 relevant lines covered (49.81%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.42
/watchtower/wtserver/server.go
1
package wtserver
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "net"
8
        "sync"
9
        "time"
10

11
        "github.com/btcsuite/btcd/btcutil"
12
        "github.com/btcsuite/btcd/chaincfg/chainhash"
13
        "github.com/btcsuite/btcd/connmgr"
14
        "github.com/lightningnetwork/lnd/keychain"
15
        "github.com/lightningnetwork/lnd/lnwire"
16
        "github.com/lightningnetwork/lnd/watchtower/wtdb"
17
        "github.com/lightningnetwork/lnd/watchtower/wtwire"
18
)
19

20
var (
21
        // ErrPeerAlreadyConnected signals that a peer with the same session id
22
        // is already active within the server.
23
        ErrPeerAlreadyConnected = errors.New("peer already connected")
24

25
        // ErrServerExiting signals that a request could not be processed
26
        // because the server has been requested to shut down.
27
        ErrServerExiting = errors.New("server shutting down")
28
)
29

30
// Config abstracts the primary components and dependencies of the server.
31
type Config struct {
32
        // DB provides persistent access to the server's sessions and for
33
        // storing state updates.
34
        DB DB
35

36
        // NodeKeyECDH is the ECDH capable wrapper of the key to be used in
37
        // accepting new brontide connections.
38
        NodeKeyECDH keychain.SingleKeyECDH
39

40
        // Listeners specifies which address to which clients may connect.
41
        Listeners []net.Listener
42

43
        // ReadTimeout specifies how long a client may go without sending a
44
        // message.
45
        ReadTimeout time.Duration
46

47
        // WriteTimeout specifies how long a client may go without reading a
48
        // message from the other end, if the connection has stopped buffering
49
        // the server's replies.
50
        WriteTimeout time.Duration
51

52
        // NewAddress is used to generate reward addresses, where a cut of
53
        // successfully sent funds can be received.
54
        NewAddress func() (btcutil.Address, error)
55

56
        // ChainHash identifies the network that the server is watching.
57
        ChainHash chainhash.Hash
58

59
        // NoAckCreateSession causes the server to not reply to create session
60
        // requests, this should only be used for testing.
61
        NoAckCreateSession bool
62

63
        // NoAckUpdates causes the server to not acknowledge state updates, this
64
        // should only be used for testing.
65
        NoAckUpdates bool
66

67
        // DisableReward causes the server to reject any session creation
68
        // attempts that request rewards.
69
        DisableReward bool
70
}
71

72
// Server houses the state required to handle watchtower peers. It's primary job
73
// is to accept incoming connections, and dispatch processing of the client
74
// message streams.
75
type Server struct {
76
        started sync.Once
77
        stopped sync.Once
78

79
        cfg *Config
80

81
        connMgr *connmgr.ConnManager
82

83
        clientMtx sync.RWMutex
84
        clients   map[wtdb.SessionID]Peer
85

86
        newPeers chan Peer
87

88
        localInit *wtwire.Init
89

90
        wg   sync.WaitGroup
91
        quit chan struct{}
92
}
93

94
// New creates a new server to handle watchtower clients. The server will accept
95
// clients connecting to the listener addresses, and allows them to open
96
// sessions and send state updates.
97
func New(cfg *Config) (*Server, error) {
4✔
98
        localInit := wtwire.NewInitMessage(
4✔
99
                lnwire.NewRawFeatureVector(
4✔
100
                        wtwire.AltruistSessionsOptional,
4✔
101
                        wtwire.AnchorCommitOptional,
4✔
102
                ),
4✔
103
                cfg.ChainHash,
4✔
104
        )
4✔
105

4✔
106
        s := &Server{
4✔
107
                cfg:       cfg,
4✔
108
                clients:   make(map[wtdb.SessionID]Peer),
4✔
109
                newPeers:  make(chan Peer),
4✔
110
                localInit: localInit,
4✔
111
                quit:      make(chan struct{}),
4✔
112
        }
4✔
113

4✔
114
        connMgr, err := connmgr.New(&connmgr.Config{
4✔
115
                Listeners: cfg.Listeners,
4✔
116
                OnAccept:  s.inboundPeerConnected,
4✔
117
                Dial:      noDial,
4✔
118
        })
4✔
119
        if err != nil {
4✔
120
                return nil, err
×
121
        }
×
122

123
        s.connMgr = connMgr
4✔
124

4✔
125
        return s, nil
4✔
126
}
127

128
// Start begins listening on the server's listeners.
129
func (s *Server) Start() error {
4✔
130
        s.started.Do(func() {
8✔
131
                log.Infof("Starting watchtower server")
4✔
132

4✔
133
                s.wg.Add(1)
4✔
134
                go s.peerHandler()
4✔
135

4✔
136
                s.connMgr.Start()
4✔
137

4✔
138
                log.Infof("Watchtower server started successfully")
4✔
139
        })
4✔
140
        return nil
4✔
141
}
142

143
// Stop shutdowns down the server's listeners and any active requests.
144
func (s *Server) Stop() error {
4✔
145
        s.stopped.Do(func() {
8✔
146
                log.Infof("Stopping watchtower server")
4✔
147

4✔
148
                s.connMgr.Stop()
4✔
149

4✔
150
                close(s.quit)
4✔
151
                s.wg.Wait()
4✔
152

4✔
153
                log.Infof("Watchtower server stopped successfully")
4✔
154
        })
4✔
155
        return nil
4✔
156
}
157

158
// inboundPeerConnected is the callback given to the connection manager, and is
159
// called each time a new connection is made to the watchtower. This method
160
// proxies the new peers by filtering out those that do not satisfy the
161
// server.Peer interface, and closes their connection. Successful connections
162
// will be passed on to the public InboundPeerConnected method.
163
func (s *Server) inboundPeerConnected(c net.Conn) {
4✔
164
        peer, ok := c.(Peer)
4✔
165
        if !ok {
4✔
166
                log.Warnf("incoming connection %T does not satisfy "+
×
167
                        "server.Peer interface", c)
×
168
                c.Close()
×
169
                return
×
170
        }
×
171

172
        s.InboundPeerConnected(peer)
4✔
173
}
174

175
// InboundPeerConnected accepts a server.Peer, and handles the request submitted
176
// by the client. This method serves also as a public endpoint for locally
177
// registering new clients with the server.
178
func (s *Server) InboundPeerConnected(peer Peer) {
4✔
179
        select {
4✔
180
        case s.newPeers <- peer:
4✔
181
        case <-s.quit:
×
182
        }
183
}
184

185
// peerHandler processes newly accepted peers and spawns a client handler for
186
// each. The peerHandler is used to ensure that waitgrouped client handlers are
187
// spawned from a waitgrouped goroutine.
188
func (s *Server) peerHandler() {
4✔
189
        defer s.wg.Done()
4✔
190
        defer s.removeAllPeers()
4✔
191

4✔
192
        for {
8✔
193
                select {
4✔
194
                case peer := <-s.newPeers:
4✔
195
                        s.wg.Add(1)
4✔
196
                        go s.handleClient(peer)
4✔
197

198
                case <-s.quit:
4✔
199
                        return
4✔
200
                }
201
        }
202
}
203

204
// handleClient processes a series watchtower messages sent by a client. The
205
// client may either send:
206
//   - a single CreateSession message.
207
//   - a series of StateUpdate messages.
208
//
209
// This method uses the server's peer map to ensure at most one peer using the
210
// same session id can enter the main event loop. The connection will be
211
// dropped by the watchtower if no messages are sent or received by the
212
// configured Read/WriteTimeouts.
213
//
214
// NOTE: This method MUST be run as a goroutine.
215
func (s *Server) handleClient(peer Peer) {
4✔
216
        defer s.wg.Done()
4✔
217

4✔
218
        // Use the connection's remote pubkey as the client's session id.
4✔
219
        id := wtdb.NewSessionIDFromPubKey(peer.RemotePub())
4✔
220

4✔
221
        // Register this peer in the server's client map, and defer the
4✔
222
        // connection's cleanup. If the peer already exists, we will close the
4✔
223
        // connection and exit immediately.
4✔
224
        err := s.addPeer(&id, peer)
4✔
225
        if err != nil {
4✔
226
                peer.Close()
×
227
                return
×
228
        }
×
229
        defer s.removePeer(&id, peer.RemoteAddr())
4✔
230

4✔
231
        msg, err := s.readMessage(peer)
4✔
232
        if err != nil {
4✔
233
                log.Errorf("Unable to read message from client %s@%s: %v",
×
234
                        id, peer.RemoteAddr(), err)
×
235
                return
×
236
        }
×
237

238
        remoteInit, ok := msg.(*wtwire.Init)
4✔
239
        if !ok {
4✔
240
                log.Errorf("Client %s@%s did not send Init msg as first "+
×
241
                        "message", id, peer.RemoteAddr())
×
242
                return
×
243
        }
×
244

245
        err = s.sendMessage(peer, s.localInit)
4✔
246
        if err != nil {
4✔
247
                log.Errorf("Unable to send Init msg to %s: %v", id, err)
×
248
                return
×
249
        }
×
250

251
        err = s.localInit.CheckRemoteInit(remoteInit, wtwire.FeatureNames)
4✔
252
        if err != nil {
4✔
253
                log.Errorf("Cannot support client %s: %v", id, err)
×
254
                return
×
255
        }
×
256

257
        nextMsg, err := s.readMessage(peer)
4✔
258
        if err != nil {
4✔
259
                log.Errorf("Unable to read watchtower msg from %s: %v",
×
260
                        id, err)
×
261
                return
×
262
        }
×
263

264
        switch msg := nextMsg.(type) {
4✔
265
        case *wtwire.CreateSession:
4✔
266
                // Attempt to open a new session for this client.
4✔
267
                err = s.handleCreateSession(peer, &id, msg)
4✔
268
                if err != nil {
4✔
269
                        log.Errorf("Unable to handle CreateSession "+
×
270
                                "from %s: %v", id, err)
×
271
                }
×
272

273
        case *wtwire.DeleteSession:
4✔
274
                err = s.handleDeleteSession(peer, &id)
4✔
275
                if err != nil {
4✔
276
                        log.Errorf("Unable to handle DeleteSession "+
×
277
                                "from %s: %v", id, err)
×
278
                }
×
279

280
        case *wtwire.StateUpdate:
4✔
281
                err = s.handleStateUpdates(peer, &id, msg)
4✔
282
                if err != nil {
4✔
283
                        log.Errorf("Unable to handle StateUpdate "+
×
284
                                "from %s: %v", id, err)
×
285
                }
×
286

287
        default:
×
288
                log.Errorf("Received unsupported message type: %T "+
×
289
                        "from %s", nextMsg, id)
×
290
        }
291
}
292

293
// connFailure is a default error used when a request failed with a non-zero
294
// error code.
295
type connFailure struct {
296
        ID   wtdb.SessionID
297
        Code wtwire.ErrorCode
298
}
299

300
// Error displays the SessionID and Code that caused the connection failure.
301
func (f *connFailure) Error() string {
×
302
        return fmt.Sprintf("connection with %s failed with code=%s",
×
303
                f.ID, f.Code,
×
304
        )
×
305
}
×
306

307
// readMessage receives and parses the next message from the given Peer. An
308
// error is returned if a message is not received before the server's read
309
// timeout, the read off the wire failed, or the message could not be
310
// deserialized.
311
func (s *Server) readMessage(peer Peer) (wtwire.Message, error) {
4✔
312
        // Set a read timeout to ensure we drop the client if not sent in a
4✔
313
        // timely manner.
4✔
314
        err := peer.SetReadDeadline(time.Now().Add(s.cfg.ReadTimeout))
4✔
315
        if err != nil {
4✔
316
                err = fmt.Errorf("unable to set read deadline: %w", err)
×
317
                return nil, err
×
318
        }
×
319

320
        // Pull the next message off the wire, and parse it according to the
321
        // watchtower wire specification.
322
        rawMsg, err := peer.ReadNextMessage()
4✔
323
        if err != nil {
4✔
324
                err = fmt.Errorf("unable to read message: %w", err)
×
325
                return nil, err
×
326
        }
×
327

328
        msgReader := bytes.NewReader(rawMsg)
4✔
329
        msg, err := wtwire.ReadMessage(msgReader, 0)
4✔
330
        if err != nil {
4✔
331
                err = fmt.Errorf("unable to parse message: %w", err)
×
332
                return nil, err
×
333
        }
×
334

335
        logMessage(peer, msg, true)
4✔
336

4✔
337
        return msg, nil
4✔
338
}
339

340
// sendMessage sends a watchtower wire message to the target peer.
341
func (s *Server) sendMessage(peer Peer, msg wtwire.Message) error {
4✔
342
        // TODO(conner): use buffer pool?
4✔
343

4✔
344
        var b bytes.Buffer
4✔
345
        _, err := wtwire.WriteMessage(&b, msg, 0)
4✔
346
        if err != nil {
4✔
347
                err = fmt.Errorf("unable to encode msg: %w", err)
×
348
                return err
×
349
        }
×
350

351
        err = peer.SetWriteDeadline(time.Now().Add(s.cfg.WriteTimeout))
4✔
352
        if err != nil {
4✔
353
                err = fmt.Errorf("unable to set write deadline: %w", err)
×
354
                return err
×
355
        }
×
356

357
        logMessage(peer, msg, false)
4✔
358

4✔
359
        _, err = peer.Write(b.Bytes())
4✔
360
        return err
4✔
361
}
362

363
// addPeer stores a client in the server's client map. An error is returned if a
364
// client with the same session id already exists.
365
func (s *Server) addPeer(id *wtdb.SessionID, peer Peer) error {
4✔
366
        s.clientMtx.Lock()
4✔
367
        defer s.clientMtx.Unlock()
4✔
368

4✔
369
        if existingPeer, ok := s.clients[*id]; ok {
4✔
370
                log.Infof("Already connected to peer %s@%s, disconnecting %s",
×
371
                        id, existingPeer.RemoteAddr(), peer.RemoteAddr())
×
372
                return ErrPeerAlreadyConnected
×
373
        }
×
374
        s.clients[*id] = peer
4✔
375

4✔
376
        log.Infof("Accepted incoming peer %s@%s",
4✔
377
                id, peer.RemoteAddr())
4✔
378

4✔
379
        return nil
4✔
380
}
381

382
// removePeer deletes a client from the server's client map. If a peer is found,
383
// this method will close the peer's connection.
384
func (s *Server) removePeer(id *wtdb.SessionID, addr net.Addr) {
4✔
385
        log.Infof("Releasing incoming peer %s@%s", id, addr)
4✔
386

4✔
387
        s.clientMtx.Lock()
4✔
388
        peer, ok := s.clients[*id]
4✔
389
        delete(s.clients, *id)
4✔
390
        s.clientMtx.Unlock()
4✔
391

4✔
392
        if ok {
8✔
393
                peer.Close()
4✔
394
        }
4✔
395
}
396

397
// removeAllPeers iterates through the server's current set of peers and closes
398
// all open connections.
399
func (s *Server) removeAllPeers() {
4✔
400
        s.clientMtx.Lock()
4✔
401
        defer s.clientMtx.Unlock()
4✔
402

4✔
403
        for id, peer := range s.clients {
4✔
404
                log.Infof("Releasing incoming peer %s@%s", id,
×
405
                        peer.RemoteAddr())
×
406

×
407
                delete(s.clients, id)
×
408
                peer.Close()
×
409
        }
×
410
}
411

412
// logMessage writes information about a message exchanged with a remote peer,
413
// using directional prepositions to signal whether the message was sent or
414
// received.
415
func logMessage(peer Peer, msg wtwire.Message, read bool) {
4✔
416
        var action = "Received"
4✔
417
        var preposition = "from"
4✔
418
        if !read {
8✔
419
                action = "Sending"
4✔
420
                preposition = "to"
4✔
421
        }
4✔
422

423
        summary := wtwire.MessageSummary(msg)
4✔
424
        if len(summary) > 0 {
8✔
425
                summary = "(" + summary + ")"
4✔
426
        }
4✔
427

428
        log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
4✔
429
                preposition, peer.RemotePub().SerializeCompressed(),
4✔
430
                peer.RemoteAddr())
4✔
431
}
432

433
// noDial is a dummy dial method passed to the server's connmgr.
434
func noDial(_ net.Addr) (net.Conn, error) {
×
435
        return nil, fmt.Errorf("watchtower cannot make outgoing conns")
×
436
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc