• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11954082915

21 Nov 2024 01:20PM UTC coverage: 59.327% (+0.6%) from 58.776%
11954082915

Pull #8754

github

ViktorTigerstrom
itest: wrap deriveCustomScopeAccounts at 80 chars

This commit fixes that word wrapping for the deriveCustomScopeAccounts
function docs, and ensures that it wraps at 80 characters or less.
Pull Request #8754: Add `Outbound` Remote Signer implementation

1940 of 2984 new or added lines in 44 files covered. (65.01%)

226 existing lines in 37 files now uncovered.

135234 of 227947 relevant lines covered (59.33%)

19316.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.67
/lnwallet/rpcwallet/remote_signer_client.go
1
package rpcwallet
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "os"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/fn"
13
        "github.com/lightningnetwork/lnd/lnrpc"
14
        "github.com/lightningnetwork/lnd/lnrpc/signrpc"
15
        "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
16
        "github.com/lightningnetwork/lnd/macaroons"
17
        "google.golang.org/grpc"
18
        "google.golang.org/grpc/credentials"
19
        "google.golang.org/protobuf/reflect/protoreflect"
20
        "gopkg.in/macaroon.v2"
21
)
22

23
type signerResponse = walletrpc.SignCoordinatorResponse
24

25
var (
26
        // ErrShuttingDown indicates that the server is in the process of
27
        // gracefully exiting.
28
        ErrShuttingDown = errors.New("lnd is shutting down")
29

30
        // ErrRequestType is returned when the request type by the watch-only
31
        // node has not been implemented by remote signer.
32
        ErrRequestType = errors.New("unimplemented request by watch-only node")
33
)
34

35
const (
36
        // defaultRetryTimeout is the default timeout used when retrying to
37
        // connect to the watch-only node.
38
        defaultRetryTimeout = time.Second * 1
39

40
        // retryMultiplier is the multiplier used to increase the retry timeout
41
        // for every retry.
42
        retryMultiplier = 1.5
43

44
        // defaultMaxRetryTimeout is the default max value for the
45
        // maxRetryTimeout, which defines the maximum backoff period before
46
        // attempting to reconnect to the watch-only node.
47
        defaultMaxRetryTimeout = time.Minute * 1
48

49
        // handshakeRequestID is the request ID that is reversed for the
50
        // handshake with the watch-only node.
51
        handshakeRequestID = uint64(1)
52
)
53

54
// SignCoordinatorStreamFeeder is an interface that returns a newly created
55
// stream to the watch-only node. The stream is used to send and receive
56
// messages between the remote signer client and the watch-only node.
57
type SignCoordinatorStreamFeeder interface {
58
        // GetStream returns a new stream to the watch-only node. The function
59
        // also returns a cleanup function that should be called when the stream
60
        // is no longer needed.
61
        GetStream(streamCtx context.Context) (StreamClient, func(), error)
62

63
        // Stop stops the stream feeder.
64
        Stop()
65
}
66

67
// RemoteSignerClient is an interface that defines the methods that a remote
68
// signer client should implement.
69
type RemoteSignerClient interface {
70
        // Start starts the remote signer client.
71
        Start() error
72

73
        // Stop stops the remote signer client.
74
        Stop() error
75
}
76

77
// StreamFeeder is an implementation of the SignCoordinatorStreamFeeder
78
// interface that creates a new stream to the watch-only node, by making an
79
// outbound gRPC connection to the watch-only node.
80
type StreamFeeder struct {
81
        wg sync.WaitGroup
82

83
        rpcHost, macaroonPath, tlsCertPath string
84

85
        timeout time.Duration
86

87
        quit chan struct{}
88
}
89

90
// NewStreamFeeder creates a new StreamFeeder instance.
91
func NewStreamFeeder(rpcHost, macaroonPath, tlsCertPath string,
92
        timeout time.Duration) *StreamFeeder {
4✔
93

4✔
94
        return &StreamFeeder{
4✔
95
                quit:         make(chan struct{}),
4✔
96
                rpcHost:      rpcHost,
4✔
97
                macaroonPath: macaroonPath,
4✔
98
                tlsCertPath:  tlsCertPath,
4✔
99
                timeout:      timeout,
4✔
100
        }
4✔
101
}
4✔
102

103
// Stop stops the StreamFeeder and disables the StreamFeeder from creating any
104
// new connections.
105
//
106
// NOTE: This is part of the SignCoordinatorStreamFeeder interface.
107
func (s *StreamFeeder) Stop() {
4✔
108
        close(s.quit)
4✔
109

4✔
110
        s.wg.Wait()
4✔
111
}
4✔
112

113
// GetStream returns a new stream to the watch-only node, by making an
114
// outbound gRPC connection to the watch-only node. The function also returns a
115
// cleanup function that closes the connection, which should be called when the
116
// stream is no longer needed.
117
//
118
// NOTE: This is part of the SignCoordinatorStreamFeeder interface.
119
func (s *StreamFeeder) GetStream(streamCtx context.Context) (
120
        StreamClient, func(), error) {
4✔
121

4✔
122
        select {
4✔
123
        // Don't run if the StreamFeeder has already been shutdown.
NEW
124
        case <-s.quit:
×
NEW
125
                return nil, nil, ErrShuttingDown
×
126
        default:
4✔
127
        }
128

129
        // Create a new outbound gRPC connection to the watch-only node.
130
        conn, err := s.getClientConn()
4✔
131
        if err != nil {
8✔
132
                return nil, nil, err
4✔
133
        }
4✔
134

135
        cleanUp := func() {
8✔
136
                conn.Close()
4✔
137
        }
4✔
138

139
        // Wrap the connection in a WalletKitClient.
140
        walletKitClient := walletrpc.NewWalletKitClient(conn)
4✔
141

4✔
142
        // Create a new stream to the watch-only node.
4✔
143
        stream, err := walletKitClient.SignCoordinatorStreams(streamCtx)
4✔
144
        if err != nil {
4✔
NEW
145
                cleanUp()
×
NEW
146

×
NEW
147
                return nil, nil, err
×
NEW
148
        }
×
149

150
        return stream, cleanUp, nil
4✔
151
}
152

153
// getClientConn creates a new outbound gRPC connection to the watch-only node.
154
func (s *StreamFeeder) getClientConn() (*grpc.ClientConn, error) {
4✔
155
        // If we fail to connect to the watch-only node within the
4✔
156
        // configured timeout we should return an error.
4✔
157
        ctxt, cancel := context.WithTimeout(
4✔
158
                context.Background(), s.timeout,
4✔
159
        )
4✔
160
        defer cancel()
4✔
161

4✔
162
        // Load the specified macaroon file for the watch-only node.
4✔
163
        macBytes, err := os.ReadFile(s.macaroonPath)
4✔
164
        if err != nil {
8✔
165
                return nil, fmt.Errorf("could not read macaroon file: %w", err)
4✔
166
        }
4✔
167

168
        mac := &macaroon.Macaroon{}
4✔
169

4✔
170
        err = mac.UnmarshalBinary(macBytes)
4✔
171
        if err != nil {
4✔
NEW
172
                return nil, fmt.Errorf("could not unmarshal macaroon: %w", err)
×
NEW
173
        }
×
174

175
        macCred, err := macaroons.NewMacaroonCredential(mac)
4✔
176
        if err != nil {
4✔
NEW
177
                return nil, fmt.Errorf(
×
NEW
178
                        "could not create macaroon credential: %w", err)
×
NEW
179
        }
×
180

181
        // Load the specified TLS cert for the watch-only node.
182
        tlsCreds, err := credentials.NewClientTLSFromFile(s.tlsCertPath, "")
4✔
183
        if err != nil {
4✔
NEW
184
                return nil, fmt.Errorf("could not load TLS cert: %w", err)
×
NEW
185
        }
×
186

187
        opts := []grpc.DialOption{
4✔
188
                grpc.WithBlock(),
4✔
189
                grpc.WithTransportCredentials(tlsCreds),
4✔
190
                grpc.WithPerRPCCredentials(macCred),
4✔
191
        }
4✔
192

4✔
193
        var (
4✔
194
                // A channel to signal when has successfully been created.
4✔
195
                connDoneChan = make(chan *grpc.ClientConn)
4✔
196
                errChan      = make(chan error)
4✔
197
        )
4✔
198

4✔
199
        // Now let's try to connect to the watch-only node. We'll do this in a
4✔
200
        // goroutine to ensure we can exit if the quit channel is closed. If the
4✔
201
        // quit channel is closed, the context will also be canceled, hence
4✔
202
        // stopping the goroutine.
4✔
203
        s.wg.Add(1)
4✔
204
        go func() {
8✔
205
                defer s.wg.Done()
4✔
206

4✔
207
                log.Infof("Attempting to connect to the watch-only node on: %s",
4✔
208
                        s.rpcHost)
4✔
209

4✔
210
                conn, err := grpc.DialContext(ctxt, s.rpcHost, opts...)
4✔
211
                if err != nil {
4✔
NEW
212
                        select {
×
213
                        case errChan <- fmt.Errorf("could not connect to "+
NEW
214
                                "watch-only node: %v", err):
×
215

NEW
216
                        case <-ctxt.Done():
×
217
                        }
218

NEW
219
                        return
×
220
                }
221

222
                // Only send the connection if the getClientConn function hasn't
223
                // returned yet.
224
                select {
4✔
NEW
225
                case <-ctxt.Done():
×
NEW
226
                        return
×
227

228
                case connDoneChan <- conn:
4✔
229
                }
230
        }()
231

232
        select {
4✔
233
        case conn := <-connDoneChan:
4✔
234
                return conn, nil
4✔
235

NEW
236
        case err := <-errChan:
×
NEW
237
                return nil, err
×
238

NEW
239
        case <-s.quit:
×
NEW
240
                return nil, ErrShuttingDown
×
241

NEW
242
        case <-ctxt.Done():
×
NEW
243
                return nil, ctxt.Err()
×
244
        }
245
}
246

247
// A compile time assertion to ensure StreamFeeder meets the
248
// SignCoordinatorStreamFeeder interface.
249
var _ SignCoordinatorStreamFeeder = (*StreamFeeder)(nil)
250

251
// NoOpClient is a remote signer client that is a no op, and is used when the
252
// configuration doesn't enable the use of a remote signer client.
253
type NoOpClient struct{}
254

255
// Start implements RemoteSignerClient, and is a no op.
256
func (n *NoOpClient) Start() error {
4✔
257
        return nil
4✔
258
}
4✔
259

260
// Stop implements RemoteSignerClient, and is a no op.
261
func (n *NoOpClient) Stop() error {
4✔
262
        return nil
4✔
263
}
4✔
264

265
// A compile time assertion to ensure NoOpClient meets the
266
// RemoteSignerClient interface.
267
var _ RemoteSignerClient = (*NoOpClient)(nil)
268

269
// OutboundClient is a remote signer client which will process and respond to
270
// sign requests from the watch-only node, which are sent over a stream between
271
// the node and a watch-only node.
272
type OutboundClient struct {
273
        stopped atomic.Bool
274

275
        // walletServer is the WalletKitServer that the remote signer client
276
        // will use to process walletrpc requests.
277
        walletServer walletrpc.WalletKitServer
278

279
        // signerServer is the SignerServer that the remote signer client will
280
        // use to process signrpc requests.
281
        signerServer signrpc.SignerServer
282

283
        // streamFeeder is the stream feeder that will set up a stream to the
284
        // watch-only node when requested to do so by the remote signer client.
285
        streamFeeder SignCoordinatorStreamFeeder
286

287
        // stream is the stream between the node and the watch-only node.
288
        stream StreamClient
289

290
        // requestTimeout is the timeout used when sending responses to the
291
        // watch-only node.
292
        requestTimeout time.Duration
293

294
        // retryTimeout is the backoff timeout used when retrying to set up a
295
        // connection to the watch-only node, if the previous connection/attempt
296
        // failed.
297
        retryTimeout time.Duration
298

299
        // maxRetryTimeout is the max value for the retryTimeout, defining
300
        // the maximum backoff period before attempting to reconnect to the
301
        // watch-only node.
302
        maxRetryTimeout time.Duration
303

304
        quit chan struct{}
305

306
        gManager    *fn.GoroutineManager
307
        gmCtxCancel context.CancelFunc
308
}
309

310
// NewOutboundClient creates a new instance of the remote signer client.
311
// The passed subServers need to include a walletrpc.WalletKitServer and a
312
// signrpc.SignerServer, or the OutboundClient will be disabled.
313
// Note that the client will only fully start if the configuration
314
// enables an outbound remote signer.
315
func NewOutboundClient(walletServer walletrpc.WalletKitServer,
316
        signerServer signrpc.SignerServer,
317
        streamFeeder SignCoordinatorStreamFeeder,
318
        requestTimeout time.Duration) (*OutboundClient, error) {
9✔
319

9✔
320
        if walletServer == nil || signerServer == nil {
9✔
NEW
321
                return nil, errors.New("sub-servers cannot be nil when using " +
×
NEW
322
                        "an outbound remote signer")
×
NEW
323
        }
×
324

325
        if streamFeeder == nil {
9✔
NEW
326
                return nil, errors.New("streamFeeder cannot be nil")
×
NEW
327
        }
×
328

329
        ctxc, cancel := context.WithCancel(context.Background())
9✔
330

9✔
331
        return &OutboundClient{
9✔
332
                walletServer:    walletServer,
9✔
333
                signerServer:    signerServer,
9✔
334
                streamFeeder:    streamFeeder,
9✔
335
                requestTimeout:  requestTimeout,
9✔
336
                quit:            make(chan struct{}),
9✔
337
                retryTimeout:    defaultRetryTimeout,
9✔
338
                maxRetryTimeout: defaultMaxRetryTimeout,
9✔
339
                gManager:        fn.NewGoroutineManager(ctxc),
9✔
340
                gmCtxCancel:     cancel,
9✔
341
        }, nil
9✔
342
}
343

344
// Start starts the remote signer client. The function will continuously try to
345
// setup a connection to the configured watch-only node, and retry to connect if
346
// the connection fails until we Stop the remote signer client.
347
func (r *OutboundClient) Start() error {
9✔
348
        // We'll continuously try setup a connection to the watch-only node, and
9✔
349
        // retry to connect if the connection fails until we Stop the remote
9✔
350
        // signer client.
9✔
351
        err := r.gManager.Go(func(_ context.Context) {
18✔
352
                for {
24✔
353
                        err := r.run()
15✔
354
                        if err != nil {
30✔
355
                                log.Errorf("Remote signer client error: %v",
15✔
356
                                        err)
15✔
357
                        }
15✔
358

359
                        select {
15✔
360
                        case <-r.quit:
8✔
361
                                return
8✔
362
                        default:
10✔
363
                                log.Infof("Will retry to connect to "+
10✔
364
                                        "watch-only node in: %v",
10✔
365
                                        r.retryTimeout)
10✔
366

10✔
367
                                // Backoff before retrying to connect to the
10✔
368
                                // watch-only node.
10✔
369
                                select {
10✔
370
                                case <-r.quit:
4✔
371
                                        return
4✔
372
                                case <-time.After(r.retryTimeout):
10✔
373
                                }
374
                        }
375

376
                        log.Infof("Retrying to connect to watch-only node")
10✔
377

10✔
378
                        // Increase the retry timeout by 50% for every retry.
10✔
379
                        r.retryTimeout = time.Duration(float64(r.retryTimeout) *
10✔
380
                                retryMultiplier)
10✔
381

10✔
382
                        // But cap the retryTimeout at r.maxRetryTimeout
10✔
383
                        if r.retryTimeout > r.maxRetryTimeout {
12✔
384
                                r.retryTimeout = r.maxRetryTimeout
2✔
385
                        }
2✔
386
                }
387
        })
388

389
        return err
9✔
390
}
391

392
// Stop stops the remote signer client.
393
func (r *OutboundClient) Stop() error {
9✔
394
        if r.stopped.Swap(true) {
9✔
NEW
395
                return errors.New("remote signer client is already shut down")
×
NEW
396
        }
×
397

398
        log.Info("Remote signer client shutting down")
9✔
399

9✔
400
        close(r.quit)
9✔
401

9✔
402
        if r.streamFeeder != nil {
18✔
403
                r.streamFeeder.Stop()
9✔
404
        }
9✔
405

406
        r.gManager.Stop()
9✔
407

9✔
408
        r.gmCtxCancel()
9✔
409

9✔
410
        log.Debugf("Remote signer client shut down")
9✔
411

9✔
412
        return nil
9✔
413
}
414

415
// run creates a new stream to the watch-only node, and starts processing and
416
// responding to the sign requests that are sent over the stream. The function
417
// will continuously run until the remote signer client is either stopped or
418
// the stream errors.
419
func (r *OutboundClient) run() error {
15✔
420
        select {
15✔
NEW
421
        case <-r.quit:
×
NEW
422
                return ErrShuttingDown
×
423
        default:
15✔
424
        }
425

426
        streamCtx, cancel := context.WithCancel(context.Background())
15✔
427

15✔
428
        // Cancel the stream context whenever we return from this function.
15✔
429
        defer cancel()
15✔
430

15✔
431
        log.Infof("Attempting to setup the watch-only node connection")
15✔
432

15✔
433
        // Try to get a new stream to the watch-only node.
15✔
434
        stream, streamCleanUp, err := r.streamFeeder.GetStream(streamCtx)
15✔
435
        if err != nil {
23✔
436
                return err
8✔
437
        }
8✔
438

439
        r.stream = stream
11✔
440
        defer streamCleanUp()
11✔
441

11✔
442
        // Once the stream has been created, we'll need to perform the handshake
11✔
443
        // process with the watch-only node, before it will start sending us
11✔
444
        // requests.
11✔
445
        err = r.handshake(streamCtx)
11✔
446
        if err != nil {
14✔
447
                return err
3✔
448
        }
3✔
449

450
        log.Infof("Completed setup connection to watch-only node")
9✔
451

9✔
452
        // Reset the retry timeout after a successful connection.
9✔
453
        r.retryTimeout = defaultRetryTimeout
9✔
454

9✔
455
        return r.processSignRequests(streamCtx)
9✔
456
}
457

458
// handshake performs the handshake process with the watch-only node. As we are
459
// the initiator of the stream, we need to send the first message over the
460
// stream. The watch-only node will only proceed to sending us requests after
461
// the handshake has been completed.
462
func (r *OutboundClient) handshake(streamCtx context.Context) error {
11✔
463
        var (
11✔
464
                regSentChan  = make(chan struct{})
11✔
465
                completeChan = make(chan *walletrpc.RegistrationComplete)
11✔
466
                errChan      = make(chan error)
11✔
467

11✔
468
                // The returnedChan is used to signal that this function has
11✔
469
                // already returned, so that the goroutines don't remain blocked
11✔
470
                // indefinitely when trying to send to the above channels.
11✔
471
                returnedChan = make(chan struct{})
11✔
472
        )
11✔
473
        defer close(returnedChan)
11✔
474

11✔
475
        // completeType is a type alias for the registration complete type,
11✔
476
        // created to keep line length within 80 characters.
11✔
477
        type completeType = walletrpc.RegistrationResponse_RegistrationComplete
11✔
478

11✔
479
        // TODO(viktor): This could be extended to include info about the
11✔
480
        // version of the remote signer in the future.
11✔
481
        // The RegistrationChallenge should also be set to a randomized string.
11✔
482
        signReg := &walletrpc.SignerRegistration{
11✔
483
                RegistrationChallenge: "registrationChallenge",
11✔
484
                RegistrationInfo:      "outboundSigner",
11✔
485
        }
11✔
486

11✔
487
        regType := &walletrpc.SignCoordinatorResponse_SignerRegistration{
11✔
488
                SignerRegistration: signReg,
11✔
489
        }
11✔
490

11✔
491
        registrationMsg := &walletrpc.SignCoordinatorResponse{
11✔
492
                RefRequestId:     handshakeRequestID,
11✔
493
                SignResponseType: regType,
11✔
494
        }
11✔
495

11✔
496
        // Send the registration message to the watch-only node.
11✔
497
        err := r.gManager.Go(func(_ context.Context) {
22✔
498
                err := r.stream.Send(registrationMsg)
11✔
499
                if err != nil {
13✔
500
                        select {
2✔
NEW
501
                        case errChan <- err:
×
502
                        case <-returnedChan:
2✔
503
                        }
504

505
                        return
2✔
506
                }
507

508
                close(regSentChan)
9✔
509
        })
510
        if err != nil {
11✔
NEW
511
                return fmt.Errorf("error starting registration message "+
×
NEW
512
                        "sending function : %w", err)
×
NEW
513
        }
×
514

515
        select {
11✔
NEW
516
        case err := <-errChan:
×
NEW
517
                return fmt.Errorf("error sending registration message to "+
×
NEW
518
                        "watch-only node: %w", err)
×
519

NEW
520
        case <-streamCtx.Done():
×
NEW
521
                return streamCtx.Err()
×
522

523
        case <-r.quit:
2✔
524
                return ErrShuttingDown
2✔
525

NEW
526
        case <-time.After(r.requestTimeout):
×
NEW
527
                return errors.New("watch-only node handshake timeout")
×
528

529
        case <-regSentChan:
9✔
530
        }
531

532
        // After the registration message has been sent, the signer node will
533
        // respond with a message indicating that it has accepted the signer
534
        // registration request if the registration was successful.
535
        err = r.gManager.Go(func(_ context.Context) {
18✔
536
                msg, err := r.stream.Recv()
9✔
537
                if err != nil {
10✔
538
                        select {
1✔
539
                        case errChan <- err:
1✔
NEW
540
                        case <-returnedChan:
×
541
                        }
542

543
                        return
1✔
544
                }
545

546
                // Verify that the request ID of the response is the same as the
547
                // request ID of the registration message.
548
                if msg.GetRequestId() != handshakeRequestID {
9✔
NEW
549
                        err = fmt.Errorf("initial response request id must "+
×
NEW
550
                                "be %d, but is: %d", handshakeRequestID,
×
NEW
551
                                msg.GetRequestId())
×
NEW
552

×
NEW
553
                        select {
×
NEW
554
                        case errChan <- err:
×
NEW
555
                        case <-returnedChan:
×
556
                        }
557

NEW
558
                        return
×
559
                }
560

561
                // Check the type of the response message.
562
                switch reqType := msg.GetSignRequestType().(type) {
9✔
563
                case *walletrpc.SignCoordinatorRequest_RegistrationResponse:
9✔
564

9✔
565
                        switch rType := reqType.RegistrationResponse.
9✔
566
                                GetRegistrationResponseType().(type) {
9✔
567
                        // The registration was successful.
568
                        case *completeType:
9✔
569
                                select {
9✔
570
                                case completeChan <- rType.RegistrationComplete:
9✔
NEW
571
                                case <-returnedChan:
×
572
                                }
573

574
                        // An error occurred during the registration process.
NEW
575
                        case *walletrpc.RegistrationResponse_RegistrationError:
×
NEW
576
                                err := fmt.Errorf("registration error: %s",
×
NEW
577
                                        rType.RegistrationError)
×
NEW
578

×
NEW
579
                                select {
×
NEW
580
                                case errChan <- err:
×
NEW
581
                                case <-returnedChan:
×
582
                                }
583
                        }
584

585
                        return
9✔
586

NEW
587
                default:
×
NEW
588
                        err := fmt.Errorf("expected registration response, "+
×
NEW
589
                                "but got: %T", reqType)
×
NEW
590

×
NEW
591
                        select {
×
NEW
592
                        case errChan <- err:
×
NEW
593
                        case <-returnedChan:
×
594
                        }
595

NEW
596
                        return
×
597
                }
598
        })
599
        if err != nil {
9✔
NEW
600
                return fmt.Errorf("error starting registration completion "+
×
NEW
601
                        "checking function : %w", err)
×
NEW
602
        }
×
603

604
        // Wait for the watch-only node to respond that it has accepted the
605
        // signer has registered.
606
        select {
9✔
607
        case <-completeChan:
9✔
608
                // TODO(viktor): This should verify that the signature in the
609
                // complete message is valid.
610

611
        case err := <-errChan:
1✔
612
                return fmt.Errorf("watch-only node handshake error: %w", err)
1✔
613

NEW
614
        case <-r.quit:
×
NEW
615
                return ErrShuttingDown
×
616

NEW
617
        case <-streamCtx.Done():
×
NEW
618
                return streamCtx.Err()
×
619

NEW
620
        case <-time.After(r.requestTimeout):
×
NEW
621
                return errors.New("watch-only node handshake timeout")
×
622
        }
623

624
        return nil
9✔
625
}
626

627
// processSignRequests processes and responds to the sign requests that are
628
// sent over the stream. The function will continuously run until the remote
629
// signer client is either stopped or the stream errors.
630
func (r *OutboundClient) processSignRequests(streamCtx context.Context) error {
9✔
631
        var (
9✔
632
                reqChan = make(chan *walletrpc.SignCoordinatorRequest)
9✔
633
                errChan = make(chan error)
9✔
634
        )
9✔
635

9✔
636
        // We run the receive loop in a goroutine to ensure we can stop if the
9✔
637
        // remote signer client is shutting down (i.e. the quit channel is
9✔
638
        // closed). Closing the quit channel will make the processSignRequests
9✔
639
        // function return, which will cancel the stream context, which in turn
9✔
640
        // will stop the receive goroutine.
9✔
641
        err := r.gManager.Go(func(_ context.Context) {
18✔
642
                for {
22✔
643
                        req, err := r.stream.Recv()
13✔
644
                        if err != nil {
22✔
645
                                wrappedErr := fmt.Errorf("error receiving "+
9✔
646
                                        "request from watch-only node: %w", err)
9✔
647

9✔
648
                                // Send the error to the error channel, given
9✔
649
                                // that we're still listening on the channel.
9✔
650
                                select {
9✔
651
                                case errChan <- wrappedErr:
6✔
652
                                case <-streamCtx.Done():
1✔
653
                                case <-r.quit:
5✔
654
                                }
655

656
                                return
9✔
657
                        }
658

659
                        select {
8✔
NEW
660
                        case <-streamCtx.Done():
×
NEW
661
                                return
×
662

NEW
663
                        case <-r.quit:
×
NEW
664
                                return
×
665

666
                        case reqChan <- req:
8✔
667
                        }
668
                }
669
        })
670
        if err != nil {
9✔
NEW
671
                return fmt.Errorf("error starting receiving loop: %w", err)
×
NEW
672
        }
×
673

674
        for {
22✔
675
                log.Tracef("Waiting for a request from the watch-only node")
13✔
676

13✔
677
                select {
13✔
678
                case req := <-reqChan:
8✔
679
                        // Process the received request.
8✔
680
                        err := r.handleRequest(streamCtx, req)
8✔
681
                        if err != nil {
8✔
NEW
682
                                return err
×
NEW
683
                        }
×
684

685
                case <-r.quit:
6✔
686
                        return ErrShuttingDown
6✔
687

NEW
688
                case <-streamCtx.Done():
×
NEW
689
                        return streamCtx.Err()
×
690

691
                case err := <-errChan:
6✔
692
                        return err
6✔
693
                }
694
        }
695
}
696

697
// handleRequest processes the received request from the watch-only node, and
698
// sends the corresponding response back.
699
func (r *OutboundClient) handleRequest(streamCtx context.Context,
700
        req *walletrpc.SignCoordinatorRequest) error {
8✔
701

8✔
702
        log.Debugf("Processing a request from watch-only node of type: %T",
8✔
703
                req.GetSignRequestType())
8✔
704

8✔
705
        log.Tracef("Request content: %v", formatSignCoordinatorMsg(req))
8✔
706

8✔
707
        // Process the request.
8✔
708
        resp, err := r.process(streamCtx, req)
8✔
709
        if err != nil {
13✔
710
                errStr := "error processing the request in the remote " +
5✔
711
                        "signer: " + err.Error()
5✔
712

5✔
713
                log.Errorf(errStr)
5✔
714

5✔
715
                // If we fail to process the request, we will send a SignerError
5✔
716
                // back to the watch-only node, indicating the nature of the
5✔
717
                // error.
5✔
718
                eType := &walletrpc.SignCoordinatorResponse_SignerError{
5✔
719
                        SignerError: &walletrpc.SignerError{
5✔
720
                                Error: errStr,
5✔
721
                        },
5✔
722
                }
5✔
723

5✔
724
                resp = &signerResponse{
5✔
725
                        RefRequestId:     req.GetRequestId(),
5✔
726
                        SignResponseType: eType,
5✔
727
                }
5✔
728
        }
5✔
729

730
        // Send the response back to the watch-only node.
731
        err = r.sendResponse(streamCtx, resp)
8✔
732
        if err != nil {
8✔
NEW
733
                return fmt.Errorf("error sending response to watch-only "+
×
NEW
734
                        "node: %w", err)
×
NEW
735
        }
×
736

737
        log.Tracef("Sent the following response to watch-only node: %v",
8✔
738
                formatSignCoordinatorMsg(resp))
8✔
739

8✔
740
        return nil
8✔
741
}
742

743
// process sends the passed request on to the appropriate server for processing
744
// it, and returns the response.
745
func (r *OutboundClient) process(ctx context.Context,
746
        req *walletrpc.SignCoordinatorRequest) (*signerResponse, error) {
8✔
747

8✔
748
        var (
8✔
749
                requestID = req.GetRequestId()
8✔
750
                signResp  = &signerResponse{
8✔
751
                        RefRequestId: requestID,
8✔
752
                }
8✔
753
        )
8✔
754

8✔
755
        //nolint:lll
8✔
756
        switch reqType := req.GetSignRequestType().(type) {
8✔
757
        case *walletrpc.SignCoordinatorRequest_SharedKeyRequest:
4✔
758
                resp, err := r.signerServer.DeriveSharedKey(
4✔
759
                        ctx, reqType.SharedKeyRequest,
4✔
760
                )
4✔
761
                if err != nil {
4✔
NEW
762
                        return nil, err
×
NEW
763
                }
×
764

765
                rType := &walletrpc.SignCoordinatorResponse_SharedKeyResponse{
4✔
766
                        SharedKeyResponse: resp,
4✔
767
                }
4✔
768

4✔
769
                signResp.SignResponseType = rType
4✔
770

4✔
771
                return signResp, nil
4✔
772

773
        case *walletrpc.SignCoordinatorRequest_SignMessageReq:
5✔
774
                resp, err := r.signerServer.SignMessage(
5✔
775
                        ctx, reqType.SignMessageReq,
5✔
776
                )
5✔
777
                if err != nil {
6✔
778
                        return nil, err
1✔
779
                }
1✔
780

781
                rType := &walletrpc.SignCoordinatorResponse_SignMessageResp{
4✔
782
                        SignMessageResp: resp,
4✔
783
                }
4✔
784

4✔
785
                signResp.SignResponseType = rType
4✔
786

4✔
787
                return signResp, nil
4✔
788

789
        case *walletrpc.SignCoordinatorRequest_MuSig2SessionRequest:
4✔
790
                resp, err := r.signerServer.MuSig2CreateSession(
4✔
791
                        ctx, reqType.MuSig2SessionRequest,
4✔
792
                )
4✔
793
                if err != nil {
4✔
NEW
794
                        return nil, err
×
NEW
795
                }
×
796

797
                rType := &walletrpc.SignCoordinatorResponse_MuSig2SessionResponse{
4✔
798
                        MuSig2SessionResponse: resp,
4✔
799
                }
4✔
800

4✔
801
                signResp.SignResponseType = rType
4✔
802

4✔
803
                return signResp, nil
4✔
804

805
        case *walletrpc.SignCoordinatorRequest_MuSig2RegisterNoncesRequest:
4✔
806
                resp, err := r.signerServer.MuSig2RegisterNonces(
4✔
807
                        ctx, reqType.MuSig2RegisterNoncesRequest,
4✔
808
                )
4✔
809
                if err != nil {
4✔
NEW
810
                        return nil, err
×
NEW
811
                }
×
812

813
                rType := &walletrpc.SignCoordinatorResponse_MuSig2RegisterNoncesResponse{
4✔
814
                        MuSig2RegisterNoncesResponse: resp,
4✔
815
                }
4✔
816

4✔
817
                signResp.SignResponseType = rType
4✔
818

4✔
819
                return signResp, nil
4✔
820

821
        case *walletrpc.SignCoordinatorRequest_MuSig2SignRequest:
4✔
822
                resp, err := r.signerServer.MuSig2Sign(
4✔
823
                        ctx, reqType.MuSig2SignRequest,
4✔
824
                )
4✔
825
                if err != nil {
8✔
826
                        return nil, err
4✔
827
                }
4✔
828

829
                rType := &walletrpc.SignCoordinatorResponse_MuSig2SignResponse{
4✔
830
                        MuSig2SignResponse: resp,
4✔
831
                }
4✔
832

4✔
833
                signResp.SignResponseType = rType
4✔
834

4✔
835
                return signResp, nil
4✔
836

837
        case *walletrpc.SignCoordinatorRequest_MuSig2CombineSigRequest:
4✔
838
                resp, err := r.signerServer.MuSig2CombineSig(
4✔
839
                        ctx, reqType.MuSig2CombineSigRequest,
4✔
840
                )
4✔
841
                if err != nil {
4✔
NEW
842
                        return nil, err
×
NEW
843
                }
×
844

845
                rType := &walletrpc.SignCoordinatorResponse_MuSig2CombineSigResponse{
4✔
846
                        MuSig2CombineSigResponse: resp,
4✔
847
                }
4✔
848

4✔
849
                signResp.SignResponseType = rType
4✔
850

4✔
851
                return signResp, nil
4✔
852

853
        case *walletrpc.SignCoordinatorRequest_MuSig2CleanupRequest:
4✔
854
                resp, err := r.signerServer.MuSig2Cleanup(
4✔
855
                        ctx, reqType.MuSig2CleanupRequest,
4✔
856
                )
4✔
857
                if err != nil {
4✔
NEW
858
                        return nil, err
×
NEW
859
                }
×
860

861
                rType := &walletrpc.SignCoordinatorResponse_MuSig2CleanupResponse{
4✔
862
                        MuSig2CleanupResponse: resp,
4✔
863
                }
4✔
864

4✔
865
                signResp.SignResponseType = rType
4✔
866

4✔
867
                return signResp, nil
4✔
868

869
        case *walletrpc.SignCoordinatorRequest_SignPsbtRequest:
4✔
870
                resp, err := r.walletServer.SignPsbt(
4✔
871
                        ctx, reqType.SignPsbtRequest,
4✔
872
                )
4✔
873
                if err != nil {
4✔
NEW
874
                        return nil, err
×
NEW
875
                }
×
876

877
                rType := &walletrpc.SignCoordinatorResponse_SignPsbtResponse{
4✔
878
                        SignPsbtResponse: resp,
4✔
879
                }
4✔
880

4✔
881
                signResp.SignResponseType = rType
4✔
882

4✔
883
                return signResp, nil
4✔
884

885
        case *walletrpc.SignCoordinatorRequest_Ping:
7✔
886
                // If the received request is a ping, we don't need to pass the
7✔
887
                // request on to a server, but can respond with a pong directly.
7✔
888
                rType := &walletrpc.SignCoordinatorResponse_Pong{
7✔
889
                        Pong: true,
7✔
890
                }
7✔
891

7✔
892
                signResp.SignResponseType = rType
7✔
893

7✔
894
                return signResp, nil
7✔
895

NEW
896
        default:
×
NEW
897
                return nil, ErrRequestType
×
898
        }
899
}
900

901
// sendResponse sends the passed response back to the watch-only node over the
902
// stream.
903
func (r *OutboundClient) sendResponse(ctx context.Context,
904
        resp *signerResponse) error {
8✔
905

8✔
906
        // We send the response in a goroutine to ensure we can return an error
8✔
907
        // if the send times out or the context is canceled. This is done to
8✔
908
        // ensure that this function won't block indefinitely.
8✔
909
        var (
8✔
910
                sendDone = make(chan struct{})
8✔
911
                errChan  = make(chan error)
8✔
912

8✔
913
                // The returnedChan is used to signal that this function has
8✔
914
                // already returned, so that the goroutines don't remain blocked
8✔
915
                // indefinitely when trying to send to the above channels.
8✔
916
                returnedChan = make(chan struct{})
8✔
917
        )
8✔
918
        defer close(returnedChan)
8✔
919

8✔
920
        err := r.gManager.Go(func(_ context.Context) {
16✔
921
                err := r.stream.Send(resp)
8✔
922
                if err != nil {
8✔
NEW
923
                        select {
×
NEW
924
                        case errChan <- err:
×
NEW
925
                        case <-returnedChan:
×
926
                        }
927

NEW
928
                        return
×
929
                }
930

931
                close(sendDone)
8✔
932
        })
933
        if err != nil {
8✔
NEW
934
                return fmt.Errorf("error starting send function: %w", err)
×
NEW
935
        }
×
936

937
        select {
8✔
NEW
938
        case err := <-errChan:
×
NEW
939
                return fmt.Errorf("send response to watch-only node error: %w",
×
NEW
940
                        err)
×
941

NEW
942
        case <-time.After(r.requestTimeout):
×
NEW
943
                return errors.New("send response to watch-only node timeout")
×
944

NEW
945
        case <-r.quit:
×
NEW
946
                return ErrShuttingDown
×
947

NEW
948
        case <-ctx.Done():
×
NEW
949
                return ctx.Err()
×
950

951
        case <-sendDone:
8✔
952
                return nil
8✔
953
        }
954
}
955

956
// A compile time assertion to ensure OutboundClient meets the
957
// RemoteSignerClient interface.
958
var _ RemoteSignerClient = (*OutboundClient)(nil)
959

960
// formatSignCoordinatorMsg formats the passed proto message into a JSON string.
961
func formatSignCoordinatorMsg(msg protoreflect.ProtoMessage) string {
30✔
962
        jsonBytes, err := lnrpc.ProtoJSONMarshalOpts.Marshal(msg)
30✔
963
        if err != nil {
30✔
NEW
964
                return fmt.Sprintf("<err: %v>", err.Error())
×
NEW
965
        }
×
966

967
        return string(jsonBytes)
30✔
968
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc